Commit 74a765a0 authored by Sebastian Miller's avatar Sebastian Miller
Browse files

concurrent initial training

parent f27e227e
......@@ -537,6 +537,43 @@ class Predictor:
# TODO usunąć to i odkomentować powyższe - częstsze w celu prezentacji
return (horizon // MS_IN_S)
def _construct_model(self, metric_info: dict, forecaster_models: list, forecaster_positions: dict):
"""
Trains a new model for the metric from metric_info and saves it.
Parameters
----------
metric_info
Info about the metric from start_ensembler order
forecaster_models: list
The originally ordered list of input forecasters,
supplied in the start_ensembler message
forecaster_positions: dict
A mapping of forecaster names to proper positions
"""
metric_name = metric_info["metric"]
# We prevent the continual training job to be run in the middle
# of new training.
self._remove_continual_training_job(metric_name)
publish_rate = metric_info["publish_rate"]
model = self._prepare_extended_model(
self._train_new_model(metric_name, forecaster_models, publish_rate),
forecaster_models,
forecaster_positions,
publish_rate
)
self.models[metric_name] = model #TODO lock na models tu i przy updacie (i przy predykowaniu)?
save_ext_model(metric_name, model)
# A scheduler job responsible for continual training of the
# newly created model is added.
# The job is scheduled to run periodically every N seconds,
# where N is the time during which CONTINUAL_TRAINING_SET_SIZE
# new datapoints are expected to be produced.
self._add_continual_training_job(metric_name, publish_rate)
def train_new_models(self, start_ensembler: dict):
"""
Trains new models, compliant with the order
......@@ -557,29 +594,12 @@ class Predictor:
forecaster_positions = self._prepare_forecaster_positions(forecaster_models)
for metric_info in metrics_info:
# TODO to można zeschedulować, żeby kilka trenowało się na raz
metric_name = metric_info["metric"]
# We prevent the continual training job to be run in the middle
# of new training.
self._remove_continual_training_job(metric_name)
publish_rate = metric_info["publish_rate"]
model = self._prepare_extended_model(
self._train_new_model(metric_name, forecaster_models, publish_rate),
forecaster_models,
forecaster_positions,
publish_rate
# Perform the model trainings concurrently.
# A job scheduled this way is run once, immediately.
self.scheduler.add_job(
func=self._construct_model,
args=[metric_info, forecaster_models, forecaster_positions]
)
self.models[metric_name] = model #TODO lock na models tu i przy updacie (i przy predykowaniu)?
save_ext_model(metric_name, model)
# A scheduler job responsible for continual training of the
# newly created model is added.
# The job is scheduled to run periodically every N seconds,
# where N is the time during which CONTINUAL_TRAINING_SET_SIZE
# new datapoints are expected to be produced.
self._add_continual_training_job(metric_name, publish_rate)
def _calculate_prediction(self, weights, forecasts: np.ndarray) -> float:
"""
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment