Commit d39181eb authored by Sebastian Miller's avatar Sebastian Miller
Browse files

Properties refactor; Filled properties with target hyperparams; Predictor refactors

parent f0a2eb51
......@@ -18,49 +18,98 @@ components:
username: admin
password: admin
destination: /topic/start_ensembler
id: ensemble_settings # Internal id of the topic subscription. Insignificant
reconnect_attempts_max: -1 # -1 means that stomp.py will attempt to reconnect indefinitely
reconnect_sleep_initial: 10 # Time between consecutive reconnect attempts
# Internal id of the topic subscription. Insignificant
id: ensemble_settings
# -1 means that stomp.py will attempt to reconnect indefinitely
reconnect_attempts_max: -1
# Time between consecutive reconnect attempts
reconnect_sleep_initial: 10
# Model training hyperparameters
hyperparameters:
n_steps: 100 # S-B3 n_steps PPO hyperparam
batch_size: 25 # S-B3 batch_size PPO hyperparam
n_epochs: 30 # S-B3 n_epochs PPO hyperparam
gamma: 0.9 # S-B3 gamma PPO hyperparam
learning_rate: 0.003 # S-B3 learning_rate PPO hyperparam
dataset_traversals: 6 # How many times should the whole dataset be stepped through during training
window_size: 10 # Length of the past predictions window (the state passed to the model)
episode_length: 100 # Length of an episode - steps between artificial state resets
mode: "continuous" # Discrete or continuous (enhanced) reward
extend_predictions_by: 0.0 # Mocked forecasters prediction value extension (0.0 means no mocked forecasters)
reward_diff_factor: 0.05 # Factor for the continuous reward
extended_state: False # Whether or not the extended state should be used
# S-B3 n_steps PPO hyperparam
n_steps: 5000
# WARNING: the `extended_state` and `extend_predictions_by` params are INCOMPATIBLE with the component.
# Setting them to something other than the default values breaks the component.
# S-B3 batch_size PPO hyperparam
batch_size: 1250
# S-B3 n_epochs PPO hyperparam
n_epochs: 15
# S-B3 gamma PPO hyperparam
gamma: 0.99
# S-B3 learning_rate PPO hyperparam
learning_rate: 0.003
# How many times should the whole dataset be stepped through during training
dataset_traversals: 200
# Length of the past predictions window (the state passed to the model)
window_size: 10
# Length of an episode - steps between artificial state resets
episode_length: 5000
# Discrete or continuous (enhanced) reward
mode: "continuous"
# Factor for the continuous reward
reward_diff_factor: 0.05
# WARNING: the `extended_state` and `extend_predictions_by` params are INCOMPATIBLE with the component.
# Setting them to something other than the default values breaks the component.
# Mocked forecasters prediction value extension (0.0 means no mocked forecasters)
extend_predictions_by: 0.0
# Whether or not the extended state should be used
extended_state: False
# Parameters of the data used for training
training-data-params:
training_set_size: 500 # Target training data set size (how many data points should be downloaded from InfluxDB)
continual_training_set_size: 300 # Target continual training data set size
training_set_size_eps: 0.1 # How much more data should be downloaded from InfluxDB (to account for data trimming)
run_break_len_factor: 20 # If the difference between the timestamps of two consecutive downloaded data points
# is greater than 20 * publish_rate, then the data is split into separate runs.
max_prediction_value: 100.0 # Arbitrary value - used only for generating the initial random state
first_prediction_column: 2 # First column in downloaded data that contains predictions
grand_truth_column: 1 # The column in downloaded data that contains the real values
fresh_data_ratio: 0.5 # The proportion: fresh_data_points_number/continual_training_set_size
# (How much fresh data should be used in a single continual training)
minimum_cohesive_run_len: 50 # Minimum number of data points of a single cohesive run
# Target training data set size (how many data points should be downloaded from InfluxDB)
training_set_size: 5000
# Target continual training data set size
continual_training_set_size: 5000
# How much more data should be downloaded from InfluxDB (to account for data trimming)
training_set_size_eps: 0.1
# If the difference between the timestamps of two consecutive downloaded data points
# is greater than 20 * publish_rate, then the data is split into separate runs.
run_break_len_factor: 20
# Arbitrary value - used only for generating the initial random state
max_prediction_value: 100.0
# Number of the first column in downloaded data that contains predictions
first_prediction_column: 2
# Number of the column in downloaded data that contains the real values
grand_truth_column: 1
# The proportion: fresh_data_points_number/continual_training_set_size
# (How much fresh data should be used in a single continual training)
fresh_data_ratio: 0.2
real_values_db_name: melodic_ui
predictions_db_name: morphemic
misc:
training_thread_pool_size: 20 # The number of threads involved in training.
# The number of threads involved in training.
training_thread_pool_size: 20
logging:
filename: ensembler.out # Logs filename
level: DEBUG # Logging level
# Logs filename
filename: ensembler.out
# Logging level
level: DEBUG
date-format: "%Y-%m-%d %H:%M:%S"
format: "%(asctime)s.%(msecs)03d %(levelname)s - %(threadName)s - %(module)s - %(funcName)s: %(message)s"
......@@ -232,14 +232,120 @@ class Predictor:
"""
return [
df for df in runs_data
if df.shape[0] >= training_data_params['minimum_cohesive_run_len']
if df.shape[0] * hyperparameters['dataset_traversals'] >= hyperparameters['n_steps']
# The default value of 'minimum_cohesive_run_len' is 50, since
# training won't fail even for data size 1. However, training the model
# for too many timesteps on too few data points may ruin the policy
# so another reasonable heuristic may be applied here.
# There is no point in running training over data from some run if not
# even a single policy update would be performed during this training
# (a policy update is performed after n_steps steps).
]
def _obtain_real_values_(self, metric: str, extended_set_size: int) -> pd.DataFrame:
"""
Makes a query for appropriate real metric values to InfluxDB
and returns a properly ordered DataFrame with the results.
Parameters
----------
metric: str
The name of the metric whose real values are queried
extended_set_size: int
The max number of queried datapoints
Returns
-------
A DataFrame with the real metric values
"""
logging.debug("Obtaining real values from InfluxDB.")
real_values_data_raw = self.influxdb_client.query(
query='SELECT time,value '
f'FROM {metric} '
'ORDER BY time DESC '
f'LIMIT {extended_set_size};',
database=training_data_params['real_values_db_name'],
epoch='ns'
).get_points()
# Dataframe in ascending timestamp order.
real_values_data_raw = pd.DataFrame(real_values_data_raw)[::-1].reset_index(drop=True)
logging.debug("Obtained real values from InfluxDB:")
logging.debug(real_values_data_raw)
return real_values_data_raw
def _obtain_prediction_values(
self,
metric: str,
extended_set_size: int,
forecaster_models: list,
latest_real_value_ts: int
) -> pd.DataFrame:
"""
Makes a query for appropriate metric value predictions to InfluxDB
and returns a properly ordered DataFrame with the results.
Parameters
----------
metric: str
The name of the metric whose predictions are queried
extended_set_size: int
The max number of queried datapoints
forecaster_models: list
List of input forecaster names
latest_real_value_ts: int
The timestamp of the latest real metric value downloaded
before - no point in downloading predictions for further
timestamps.
Returns
-------
A DataFrame with the metric value predictions
"""
logging.debug("Obtaining prediction values from InfluxDB.")
predictions_data_raw = self.influxdb_client.query(
query=f'SELECT time,{self._prepare_forecaster_field_names(forecaster_models)} '
f'FROM {metric}Predictions '
f'WHERE time <= {latest_real_value_ts} '
'ORDER BY time DESC '
f'LIMIT {extended_set_size};',
database=training_data_params['predictions_db_name'],
epoch='ns'
).get_points()
predictions_data_raw = pd.DataFrame(predictions_data_raw)[::-1].reset_index(drop=True)
logging.debug("Obtained prediction values from InfluxDB:")
logging.debug(predictions_data_raw)
return predictions_data_raw
def _merge_real_and_prediction_values(
self,
real_values_data_raw: pd.DataFrame,
predictions_data_raw: pd.DataFrame,
publish_rate: int
) -> pd.DataFrame:
"""
Merges the downloaded DataFrames with historical real metric values
and predictions based on their timestamps - in order to perform trianing,
each real metric value needs to be matched with the predictions for the
same (or similar) timestamp, so that accuracy can be judged.
Parameters
----------
real_values_data_raw: pd.DataFrame
DataFrame with real metric values
predictions_data_raw: pd.DataFrame
DataFrame with predictions (matching the real metric values)
publish_rate: int
The publish rate of the considered metric
"""
data = pd.merge_asof(real_values_data_raw, predictions_data_raw,
on='time', direction='nearest', tolerance=publish_rate * NS_IN_MS)
logging.debug("Merged real values and predictions:")
logging.debug(data)
return data
def _get_training_data(
self,
metric: str,
......@@ -281,20 +387,7 @@ class Predictor:
# of losing too much data (and thus waiting) in the process of data pruning.
extended_set_size = int((1 + training_data_params['training_set_size_eps']) * training_set_size)
logging.debug("Obtaining real values from InfluxDB.")
real_values_data_raw = self.influxdb_client.query(
query='SELECT time,value '
f'FROM {metric} '
'ORDER BY time DESC '
f'LIMIT {extended_set_size};',
database=training_data_params['real_values_db_name'],
epoch='ns'
).get_points()
# Dataframe in ascending timestamp order.
real_values_data_raw = pd.DataFrame(real_values_data_raw)[::-1].reset_index(drop=True)
logging.debug("Obtained real values from InfluxDB:")
logging.debug(real_values_data_raw)
real_values_data_raw = self._obtain_real_values_(metric, extended_set_size)
# If too little data has been obtained here, there is no point in its further processing.
real_value_num = real_values_data_raw.shape[0]
......@@ -304,28 +397,20 @@ class Predictor:
latest_real_value_ts = int(real_values_data_raw.iloc[-1]["time"])
logging.debug("Obtaining prediction values from InfluxDB.")
predictions_data_raw = self.influxdb_client.query(
query=f'SELECT time,{self._prepare_forecaster_field_names(forecaster_models)} '
f'FROM {metric}Predictions '
f'WHERE time <= {latest_real_value_ts} '
'ORDER BY time DESC '
f'LIMIT {extended_set_size};',
database=training_data_params['predictions_db_name'],
epoch='ns'
).get_points()
predictions_data_raw = pd.DataFrame(predictions_data_raw)[::-1].reset_index(drop=True)
logging.debug("Obtained prediction values from InfluxDB:")
logging.debug(predictions_data_raw)
predictions_data_raw = self._obtain_prediction_values(
metric,
extended_set_size,
forecaster_models,
latest_real_value_ts
)
data = pd.merge_asof(real_values_data_raw, predictions_data_raw,
on='time', direction='nearest', tolerance=publish_rate * NS_IN_MS)
logging.debug("Merged real values and predictions:")
logging.debug(data)
data = self._merge_real_and_prediction_values(
real_values_data_raw,
predictions_data_raw,
publish_rate
)
cohesive_runs_data = self._split_data_into_cohesive_runs(data, publish_rate)
self._trim_all_runs_data(cohesive_runs_data)
cohesive_runs_data = self._eliminate_too_short_runs_data(cohesive_runs_data)
logging.debug("Pruned the obtained data:")
......@@ -361,6 +446,41 @@ class Predictor:
#TODO tu może jakiś epsilon dłużej żeby wziąć pod uwagę jakieś opóźnienia
)
def _prepare_envs(self, input_forecaster_num: int, training_data: list) -> list:
"""
Creates a separate env for each cohesive run in the training data.
Parameters
----------
input_forecaster_num: int
The number of input forecasters (on which the new model will be based)
training_data: list
A list of tuples (data_from_cohesive_run, cohesive_run_len)
Returns
-------
A list of envs and lengths of the runs with which each env is associated
"""
config = EnsemblerEnvConfig(input_size=input_forecaster_num)
return [(EnsemblerEnv(run_data, config), run_len) for (run_data, run_len) in training_data]
def _train_on_runs(self, model: PPO, envs_and_lens: list):
"""
Performs training on each env from envs_and_lens. Each of the envs is
associated with a particular cohesive run.
Parameters
----------
model: PPO
The SB3 model object to be trained
envs_and_lens: list
The envs on which training is to be performed
"""
for env, run_len in envs_and_lens:
model.set_env(env)
model.learn(total_timesteps=hyperparameters['dataset_traversals']*run_len)
def _train_new_model(self, metric: str, forecaster_models: list, publish_rate: int) -> PPO:
"""
Creates a new model, suited for predicting a specified metric
......@@ -395,10 +515,8 @@ class Predictor:
"Waiting for more data.")
self._wait_for_enough_data(training_data_params['training_set_size'], total_example_num, publish_rate)
config = EnsemblerEnvConfig(input_size=len(forecaster_models))
# Separate env for data from each run.
envs_and_lens = [(EnsemblerEnv(run_data, config), run_len) for (run_data, run_len) in data]
envs_and_lens = self._prepare_envs(len(forecaster_models), data)
# Create the model with the first env.
new_model = PPO(
......@@ -415,9 +533,7 @@ class Predictor:
# Train the model on data from all runs.
logging.info(f"Starting the training of a model for: {metric}.")
new_model.learn(total_timesteps=hyperparameters['dataset_traversals']*envs_and_lens[0][1])
for env, run_len in envs_and_lens[1:]:
new_model.set_env(env)
new_model.learn(total_timesteps=hyperparameters['dataset_traversals']*run_len)
self._train_on_runs(new_model, envs_and_lens[1:])
logging.info(f"Training finished.")
return new_model
......@@ -492,16 +608,12 @@ class Predictor:
#TODO tu też coś będzie z tym mutexem chyba
self.scheduler.resume_job(metric_name, 'default')
config = EnsemblerEnvConfig(input_size=len(forecaster_models))
# Separate env for data from each run.
envs_and_lens = [(EnsemblerEnv(run_data, config), run_len) for (run_data, run_len) in data]
envs_and_lens = self._prepare_envs(len(forecaster_models), data)
# Perform additional training of the model on fresh data from all runs.
logging.info(f"Starting continual training of the model for: {metric_name}.")
for env, run_len in envs_and_lens:
updated_model.set_env(env)
updated_model.learn(total_timesteps=hyperparameters['dataset_traversals']*run_len)
self._train_on_runs(updated_model, envs_and_lens)
logging.info("Continual training finished.")
# We only update the Stable-Baselines3 model object to preserve the
......@@ -532,10 +644,10 @@ class Predictor:
Model update interval (in seconds)
"""
#return (horizon // MS_IN_S) * int(training_data_params['continual_training_set_size'] * training_data_params['fresh_data_ratio'])
# More frequent version for manual tests:
# return (horizon // MS_IN_S)
# TODO usunąć to i odkomentować powyższe - częstsze w celu prezentacji
return (horizon // MS_IN_S)
return (horizon // MS_IN_S) * int(training_data_params['continual_training_set_size'] * training_data_params['fresh_data_ratio'])
def _construct_model(self, metric_info: dict, forecaster_models: list, forecaster_positions: dict):
"""
......
......@@ -18,49 +18,98 @@ components:
username: admin
password: admin
destination: /topic/start_ensembler
id: ensemble_settings # Internal id of the topic subscription. Insignificant
reconnect_attempts_max: -1 # -1 means that stomp.py will attempt to reconnect indefinitely
reconnect_sleep_initial: 10 # Time between consecutive reconnect attempts
# Internal id of the topic subscription. Insignificant
id: ensemble_settings
# -1 means that stomp.py will attempt to reconnect indefinitely
reconnect_attempts_max: -1
# Time between consecutive reconnect attempts
reconnect_sleep_initial: 10
# Model training hyperparameters
hyperparameters:
n_steps: 100 # S-B3 n_steps PPO hyperparam
batch_size: 25 # S-B3 batch_size PPO hyperparam
n_epochs: 30 # S-B3 n_epochs PPO hyperparam
gamma: 0.9 # S-B3 gamma PPO hyperparam
learning_rate: 0.003 # S-B3 learning_rate PPO hyperparam
dataset_traversals: 6 # How many times should the whole dataset be stepped through during training
window_size: 10 # Length of the past predictions window (the state passed to the model)
episode_length: 100 # Length of an episode - steps between artificial state resets
mode: "continuous" # Discrete or continuous (enhanced) reward
extend_predictions_by: 0.0 # Mocked forecasters prediction value extension (0.0 means no mocked forecasters)
reward_diff_factor: 0.05 # Factor for the continuous reward
extended_state: False # Whether or not the extended state should be used
# S-B3 n_steps PPO hyperparam
n_steps: 100
# WARNING: the `extended_state` and `extend_predictions_by` params are INCOMPATIBLE with the component.
# Setting them to something other than the default values breaks the component.
# S-B3 batch_size PPO hyperparam
batch_size: 25
# S-B3 n_epochs PPO hyperparam
n_epochs: 30
# S-B3 gamma PPO hyperparam
gamma: 0.9
# S-B3 learning_rate PPO hyperparam
learning_rate: 0.003
# How many times should the whole dataset be stepped through during training
dataset_traversals: 6
# Length of the past predictions window (the state passed to the model)
window_size: 10
# Length of an episode - steps between artificial state resets
episode_length: 100
# Discrete or continuous (enhanced) reward
mode: "continuous"
# Factor for the continuous reward
reward_diff_factor: 0.05
# WARNING: the `extended_state` and `extend_predictions_by` params are INCOMPATIBLE with the component.
# Setting them to something other than the default values breaks the component.
# Mocked forecasters prediction value extension (0.0 means no mocked forecasters)
extend_predictions_by: 0.0
# Whether or not the extended state should be used
extended_state: False
# Parameters of the data used for training
training-data-params:
training_set_size: 500 # Target training data set size (how many data points should be downloaded from InfluxDB)
continual_training_set_size: 300 # Target continual training data set size
training_set_size_eps: 0.1 # How much more data should be downloaded from InfluxDB (to account for data trimming)
run_break_len_factor: 20 # If the difference between the timestamps of two consecutive downloaded data points
# is greater than 20 * publish_rate, then the data is split into separate runs.
max_prediction_value: 100.0 # Arbitrary value - used only for generating the initial random state
first_prediction_column: 2 # First column in downloaded data that contains predictions
grand_truth_column: 1 # The column in downloaded data that contains the real values
fresh_data_ratio: 0.5 # The proportion: fresh_data_points_number/continual_training_set_size
# (How much fresh data should be used in a single continual training)
minimum_cohesive_run_len: 50 # Minimum number of data points of a single cohesive run
# Target training data set size (how many data points should be downloaded from InfluxDB)
training_set_size: 500
# Target continual training data set size
continual_training_set_size: 300
# How much more data should be downloaded from InfluxDB (to account for data trimming)
training_set_size_eps: 0.1
# If the difference between the timestamps of two consecutive downloaded data points
# is greater than 20 * publish_rate, then the data is split into separate runs.
run_break_len_factor: 20
# Arbitrary value - used only for generating the initial random state
max_prediction_value: 100.0
# Number of the first column in downloaded data that contains predictions
first_prediction_column: 2
# Number of the column in downloaded data that contains the real values
grand_truth_column: 1
# The proportion: fresh_data_points_number/continual_training_set_size
# (How much fresh data should be used in a single continual training)
fresh_data_ratio: 0.5
real_values_db_name: melodic_ui
predictions_db_name: morphemic
misc:
training_thread_pool_size: 20 # The number of threads involved in training.
# The number of threads involved in training.
training_thread_pool_size: 20
logging:
filename: ensembler.out # Logs filename
level: DEBUG # Logging level
# Logs filename
filename: ensembler.out
# Logging level
level: DEBUG
date-format: "%Y-%m-%d %H:%M:%S"
format: "%(asctime)s.%(msecs)03d %(levelname)s - %(threadName)s - %(module)s - %(funcName)s: %(message)s"
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment