diff --git a/deployment/nbeats/main.py b/deployment/nbeats/main.py index 3e63599d873edee1e1f7f803e3fb11f3a5998c3d..9b9bea1fa2412cd8efab57c74824ca90456bbedc 100644 --- a/deployment/nbeats/main.py +++ b/deployment/nbeats/main.py @@ -20,17 +20,6 @@ METHOD = os.environ.get("METHOD", "nbeats") START_TOPIC = f"start_forecasting.{METHOD}" TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") -# logging.basicConfig( -# filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", -# level=logging.INFO, -# datefmt="%Y-%m-%d %H:%M:%S", -# format="AAA %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", -# ) - -# logging.Formatter.converter = lambda *args: datetime.now(tz=timezone(TZ)).timetuple() - -# import logging.config - def run_process(args): print("running") diff --git a/deployment/nbeats/model.yaml b/deployment/nbeats/model.yaml index b16e9771dfe18b42532a531cd333fc88e6cc8837..95b13d0c9f232d90c6a8fc1c3e8edf6dddb3d335 100644 --- a/deployment/nbeats/model.yaml +++ b/deployment/nbeats/model.yaml @@ -2,7 +2,7 @@ data: csv_path: demo.csv training: bs: 64 - max_epochs: 20 + max_epochs: 40 loss: rmse dataset: tv_unknown_reals: [] @@ -13,7 +13,6 @@ dataset: prediction_length: 5 classification: 0 model: - learning_rate: 0.05 hidden_size: 32 attention_head_size: 1 hidden_continuous_size: 16 diff --git a/deployment/nbeats/predict.py b/deployment/nbeats/predict.py index c56c10c8eb6bb373523d1fe2ffc668eee3e07aaa..5fa7410f9dca5319c2d09d894913cf0e22cdb0ce 100644 --- a/deployment/nbeats/predict.py +++ b/deployment/nbeats/predict.py @@ -179,14 +179,8 @@ def main(): if __name__ == "__main__": logging.basicConfig( filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", - level=logging.INFO, - datefmt="%Y-%m-%d %H:%M:%S", - format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", ) - logging.Formatter.converter = lambda *args: datetime.now( - tz=timezone(TZ) - ).timetuple() msg = json.loads(sys.argv[1]) metrics_info = { m["metric"]: { diff --git a/deployment/nbeats/requirements.txt b/deployment/nbeats/requirements.txt index fc3b095358ac6b401c1754c46fba400b42e7b8dc..1940fae2ffeb2ab99665dc424196659443108911 100644 --- a/deployment/nbeats/requirements.txt +++ b/deployment/nbeats/requirements.txt @@ -5,5 +5,6 @@ pytorch-forecasting==0.8.4 filelock==3.0.12 influxdb python-slugify +torchmetrics==0.5.0 diff --git a/deployment/nbeats/retrain.py b/deployment/nbeats/retrain.py index b3b9340601756737476cd0a571144b1ae40e98a0..55eae32bcb58f7b1e10993d3a62cf1885b390e5a 100644 --- a/deployment/nbeats/retrain.py +++ b/deployment/nbeats/retrain.py @@ -8,6 +8,7 @@ from amq_message_python_library import * from src.dataset_maker import CSVData import pytz import time +from pytz import timezone from datetime import datetime TOPIC_NAME = "training_models" @@ -56,6 +57,10 @@ def main(predicted_metrics, prediction_horizon): if __name__ == "__main__": + logging.basicConfig( + filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", + ) + logging.info(f"Training loop started") msg = json.loads(sys.argv[1]) metrics_info = { diff --git a/deployment/nbeats/src/model_train.py b/deployment/nbeats/src/model_train.py index dd3be7060eed80329650000e55a7295e777dbb3f..5b437e875660ba4d28b56e45644aeacc1518de2e 100644 --- a/deployment/nbeats/src/model_train.py +++ b/deployment/nbeats/src/model_train.py @@ -24,6 +24,10 @@ LOSSES_DICT = { "crossentropy": CrossEntropy(), } +logging.basicConfig( + filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO +) + def train(target_column, prediction_length, yaml_file="model.yaml"): torch.manual_seed(12345) @@ -82,9 +86,9 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): max_epochs=params["training"]["max_epochs"], gpus=0, gradient_clip_val=0.5, - # callbacks=[lr_logger, early_stop_callback], + callbacks=[lr_logger, early_stop_callback], checkpoint_callback=False, - logger=None, + # logger=None, ) model = NBeats.from_dataset( @@ -94,6 +98,7 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): weight_decay=1e-2, widths=[32, 512], backcast_loss_ratio=1.0, + reduce_on_plateau_patience=5, ) model_path = os.path.join(params["save_path"], f"{target_column}.pth") diff --git a/deployment/nbeats/src/preprocess_dataset.py b/deployment/nbeats/src/preprocess_dataset.py index 3affc9505d9391d775adbd0ca33452487a2967de..9383d3d352c04a4da870943570c6d96eb749d556 100644 --- a/deployment/nbeats/src/preprocess_dataset.py +++ b/deployment/nbeats/src/preprocess_dataset.py @@ -37,10 +37,6 @@ class Dataset(object): self.prediction_length = prediction_length self.dataset = dataset self.check_gap() - # self.dataset = self.cut_nan_start(dataset) - # self.fill_na() - # self.dataset = self.add_obligatory_columns(self.dataset) - # self.dataset = self.convert_formats(self.dataset) self.n = dataset.shape[0] self.ts_dataset = self.create_time_series_dataset() @@ -129,6 +125,10 @@ class Dataset(object): if s.shape[0] > self.prediction_length * 2 + self.context_length: s["series"] = i preprocessed_series.append(s) + if i == len(series) - 1: + logging.info( + f"Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}" + ) logging.info(f"{len(preprocessed_series)} long enough series found") print(f"{len(preprocessed_series)} long enough series found") @@ -158,119 +158,3 @@ class Dataset(object): return TimeSeriesDataSet.from_dataset( self.ts_dataset, dataset, min_prediction_idx=0, stop_randomization=True ) - - -# class Dataset(object): -# def __init__( -# self, -# dataset, -# target_column="value", -# tv_unknown_reals="[]", -# known_reals="[]", -# tv_unknown_cat="[]", -# static_reals="[]", -# classification=0, -# context_length=40, -# prediction_length=5, -# ): - -# self.target_column = target_column -# self.tv_unknown_cat = tv_unknown_cat -# self.known_reals = known_reals -# self.tv_unknown_reals = tv_unknown_reals -# self.static_reals = static_reals -# self.classification = classification -# self.context_length = context_length -# self.prediction_length = prediction_length -# self.prepare_dataset(dataset) -# # self.dataset = self.cut_nan_start(dataset) -# # self.fill_na() -# # self.dataset = self.add_obligatory_columns(self.dataset) -# # self.dataset = self.convert_formats(self.dataset) -# # self.n = dataset.shape[0] -# self.ts_dataset = self.create_time_series_dataset() - -# def prepare_dataset(self, dataset): -# self.dataset = self.cut_nan_start(dataset) -# self.fill_na() -# self.dataset = self.add_obligatory_columns(self.dataset) -# self.dataset = self.convert_formats(self.dataset) -# self.n = dataset.shape[0] - -# def cut_nan_start(self, dataset): -# first_not_nan_index = dataset[self.target_column][ -# dataset[self.target_column] != "None" -# ].index[0] -# return dataset[dataset.index > first_not_nan_index] - -# def fill_na(self): -# self.dataset = self.dataset.replace("None", np.nan) -# self.dataset = self.dataset.ffill(axis="rows") - -# def convert_formats(self, dataset): -# if not self.classification: -# dataset[self.target_column] = dataset[self.target_column].astype(float) -# else: -# dataset[self.target_column] = dataset[self.target_column].astype(int) - -# for name in self.tv_unknown_cat: -# dataset[name] = dataset[name].astype(str) - -# dataset["series"] = dataset["series"].astype(str) -# return dataset - -# def add_obligatory_columns(self, dataset): -# dataset["series"] = 0 -# dataset["split"] = "train" -# n = dataset.shape[0] -# dataset["split"][int(n * 0.9) :] = "val" -# dataset["time_idx"] = range(n) # TODO check time gaps -# return dataset - -# def create_time_series_dataset(self): -# if not self.classification: -# self.time_varying_unknown_reals = [ -# self.target_column -# ] + self.tv_unknown_reals -# self.time_varying_unknown_categoricals = self.tv_unknown_cat -# else: -# self.time_varying_unknown_reals = self.tv_unknown_reals -# self.time_varying_unknown_categoricals = [ -# self.target_column -# ] + self.tv_unknown_cat - -# ts_dataset = TimeSeriesDataSet( -# self.dataset[lambda x: x.split == "train"], -# time_idx="time_idx", -# target=self.target_column, -# categorical_encoders={"series": NaNLabelEncoder().fit(self.dataset.series)}, -# group_ids=["series"], -# time_varying_unknown_reals=[self.target_column], -# min_encoder_length=self.context_length, -# max_encoder_length=self.context_length, -# max_prediction_length=self.prediction_length, -# min_prediction_length=self.prediction_length, -# add_relative_time_idx=False, -# ) -# return ts_dataset - -# def inherited_dataset(self, split1, split2): -# df1 = ( -# self.dataset[lambda x: x.split == split1] -# .groupby("series", as_index=False) -# .apply(lambda x: x.iloc[-self.context_length :]) -# ) # previous split fragment -# df2 = self.dataset[lambda x: x.split == split2] # split part -# inh_dataset = pd.concat([df1, df2]) -# inh_dataset = inh_dataset.sort_values(by=["series", "time_idx"]) -# inh_dataset = TimeSeriesDataSet.from_dataset( -# self.ts_dataset, inh_dataset, min_prediction_idx=0, stop_randomization=True -# ) -# return inh_dataset - -# def get_from_dataset(self, dataset): -# print(self.context_length + self.prediction_length) -# print(dataset) -# return TimeSeriesDataSet.from_dataset( -# self.ts_dataset, dataset, min_prediction_idx=0, stop_randomization=True -# ) diff --git a/deployment/tft/main.py b/deployment/tft/main.py index 3e63599d873edee1e1f7f803e3fb11f3a5998c3d..361b146d36f09411874b42157a5f166b5c8a0c77 100644 --- a/deployment/tft/main.py +++ b/deployment/tft/main.py @@ -9,8 +9,6 @@ from datetime import datetime from pytz import timezone from datetime import datetime -# from src.log import logger - AMQ_USER = os.environ.get("AMQ_USER", "admin") AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin") AMQ_HOST = os.environ.get("AMQ_HOST", "localhost") @@ -20,17 +18,6 @@ METHOD = os.environ.get("METHOD", "nbeats") START_TOPIC = f"start_forecasting.{METHOD}" TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") -# logging.basicConfig( -# filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", -# level=logging.INFO, -# datefmt="%Y-%m-%d %H:%M:%S", -# format="AAA %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", -# ) - -# logging.Formatter.converter = lambda *args: datetime.now(tz=timezone(TZ)).timetuple() - -# import logging.config - def run_process(args): print("running") @@ -125,11 +112,11 @@ def main(): ) # msg1 = Msg() - # msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 10000}]' + # msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 30000}]' # msg2 = Msg() # msg2.body = ( # "{" - # + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 20' + # + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' # + "}" # ) diff --git a/deployment/tft/model.yaml b/deployment/tft/model.yaml index 53f9b6b67ee2f11980e6eb565d4e0c4900885846..ea58a201f41e15e0e329c736cb0cb25db351f9c5 100644 --- a/deployment/tft/model.yaml +++ b/deployment/tft/model.yaml @@ -2,7 +2,7 @@ data: csv_path: demo.csv training: bs: 64 - max_epochs: 1 + max_epochs: 40 loss: quantile dataset: tv_unknown_reals: [] diff --git a/deployment/tft/requirements.txt b/deployment/tft/requirements.txt index fc3b095358ac6b401c1754c46fba400b42e7b8dc..c2e88571a4afef8440f44b1751deaa0e26bb8330 100644 --- a/deployment/tft/requirements.txt +++ b/deployment/tft/requirements.txt @@ -5,5 +5,4 @@ pytorch-forecasting==0.8.4 filelock==3.0.12 influxdb python-slugify - - +torchmetrics==0.5.0 \ No newline at end of file diff --git a/deployment/tft/retrain.py b/deployment/tft/retrain.py index b3b9340601756737476cd0a571144b1ae40e98a0..2a6e654d17e71abb3ec2aac8cdc632634c556200 100644 --- a/deployment/tft/retrain.py +++ b/deployment/tft/retrain.py @@ -19,6 +19,10 @@ AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613") APP_NAME = os.environ.get("APP_NAME", "demo") TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") +logging.basicConfig( + filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO +) + def main(predicted_metrics, prediction_horizon): start_conn = morphemic.Connection( diff --git a/deployment/tft/src/model_predict.py b/deployment/tft/src/model_predict.py index 1d984b23c547e1269dcdd98f51a825db9f7be1bd..2f7f9b5c073da5907ac44d254ce2e864b497eba1 100644 --- a/deployment/tft/src/model_predict.py +++ b/deployment/tft/src/model_predict.py @@ -15,6 +15,10 @@ from datetime import datetime TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") +logging.basicConfig( + filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO +) + pd.options.mode.chained_assignment = None """Script for temporal fusion transformer prediction""" diff --git a/deployment/tft/src/model_train.py b/deployment/tft/src/model_train.py index 795f1f114500912d2074ffb8eddee45e782e680a..b493b4fdc3a35e30505a963daca0a3b78e2ba97f 100644 --- a/deployment/tft/src/model_train.py +++ b/deployment/tft/src/model_train.py @@ -19,6 +19,10 @@ TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") """Script for temporal fusion transformer training""" +logging.basicConfig( + filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO +) + LOSSES_DICT = { "quantile": QuantileLoss(), "mae": MAE(), @@ -83,9 +87,9 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): max_epochs=params["training"]["max_epochs"], gpus=0, gradient_clip_val=0.5, - # callbacks=[lr_logger, early_stop_callback], + callbacks=[lr_logger, early_stop_callback], checkpoint_callback=False, # TODO: is pl checkpoint_callback thread safe? - logger=None, + # logger=None, ) tft = TemporalFusionTransformer.from_dataset( diff --git a/deployment/tft/src/preprocess_dataset.py b/deployment/tft/src/preprocess_dataset.py index 1c9561d627b8cc0392519c784aecced12ca3091b..1880094eb7955e8062377ca74f91928e27f02317 100644 --- a/deployment/tft/src/preprocess_dataset.py +++ b/deployment/tft/src/preprocess_dataset.py @@ -37,10 +37,6 @@ class Dataset(object): self.prediction_length = prediction_length self.dataset = dataset self.check_gap() - # self.dataset = self.cut_nan_start(dataset) - # self.fill_na() - # self.dataset = self.add_obligatory_columns(self.dataset) - # self.dataset = self.convert_formats(self.dataset) self.n = dataset.shape[0] self.ts_dataset = self.create_time_series_dataset() @@ -137,6 +133,10 @@ class Dataset(object): if s.shape[0] > self.prediction_length * 2 + self.context_length: s["series"] = i preprocessed_series.append(s) + if i == len(series) - 1: + logging.info( + f"Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}" + ) logging.info(f"{len(preprocessed_series)} long enough series found") print(f"{len(preprocessed_series)} long enough series found") @@ -166,119 +166,3 @@ class Dataset(object): return TimeSeriesDataSet.from_dataset( self.ts_dataset, dataset, min_prediction_idx=0, stop_randomization=True ) - - -# class Dataset(object): -# def __init__( -# self, -# dataset, -# target_column="value", -# tv_unknown_reals="[]", -# known_reals="[]", -# tv_unknown_cat="[]", -# static_reals="[]", -# classification=0, -# context_length=40, -# prediction_length=5, -# ): - -# self.target_column = target_column -# self.tv_unknown_cat = tv_unknown_cat -# self.known_reals = known_reals -# self.tv_unknown_reals = tv_unknown_reals -# self.static_reals = static_reals -# self.classification = classification -# self.context_length = context_length -# self.prediction_length = prediction_length -# self.prepare_dataset(dataset) -# # self.dataset = self.cut_nan_start(dataset) -# # self.fill_na() -# # self.dataset = self.add_obligatory_columns(self.dataset) -# # self.dataset = self.convert_formats(self.dataset) -# # self.n = dataset.shape[0] -# self.ts_dataset = self.create_time_series_dataset() - -# def prepare_dataset(self, dataset): -# self.dataset = self.cut_nan_start(dataset) -# self.fill_na() -# self.dataset = self.add_obligatory_columns(self.dataset) -# self.dataset = self.convert_formats(self.dataset) -# self.n = dataset.shape[0] - -# def cut_nan_start(self, dataset): -# first_not_nan_index = dataset[self.target_column][ -# dataset[self.target_column] != "None" -# ].index[0] -# return dataset[dataset.index > first_not_nan_index] - -# def fill_na(self): -# self.dataset = self.dataset.replace("None", np.nan) -# self.dataset = self.dataset.ffill(axis="rows") - -# def convert_formats(self, dataset): -# if not self.classification: -# dataset[self.target_column] = dataset[self.target_column].astype(float) -# else: -# dataset[self.target_column] = dataset[self.target_column].astype(int) - -# for name in self.tv_unknown_cat: -# dataset[name] = dataset[name].astype(str) - -# dataset["series"] = dataset["series"].astype(str) -# return dataset - -# def add_obligatory_columns(self, dataset): -# dataset["series"] = 0 -# dataset["split"] = "train" -# n = dataset.shape[0] -# dataset["split"][int(n * 0.9) :] = "val" -# dataset["time_idx"] = range(n) # TODO check time gaps -# return dataset - -# def create_time_series_dataset(self): -# if not self.classification: -# self.time_varying_unknown_reals = [ -# self.target_column -# ] + self.tv_unknown_reals -# self.time_varying_unknown_categoricals = self.tv_unknown_cat -# else: -# self.time_varying_unknown_reals = self.tv_unknown_reals -# self.time_varying_unknown_categoricals = [ -# self.target_column -# ] + self.tv_unknown_cat - -# ts_dataset = TimeSeriesDataSet( -# self.dataset[lambda x: x.split == "train"], -# time_idx="time_idx", -# target=self.target_column, -# categorical_encoders={"series": NaNLabelEncoder().fit(self.dataset.series)}, -# group_ids=["series"], -# time_varying_unknown_reals=[self.target_column], -# min_encoder_length=self.context_length, -# max_encoder_length=self.context_length, -# max_prediction_length=self.prediction_length, -# min_prediction_length=self.prediction_length, -# add_relative_time_idx=False, -# ) -# return ts_dataset - -# def inherited_dataset(self, split1, split2): -# df1 = ( -# self.dataset[lambda x: x.split == split1] -# .groupby("series", as_index=False) -# .apply(lambda x: x.iloc[-self.context_length :]) -# ) # previous split fragment -# df2 = self.dataset[lambda x: x.split == split2] # split part -# inh_dataset = pd.concat([df1, df2]) -# inh_dataset = inh_dataset.sort_values(by=["series", "time_idx"]) -# inh_dataset = TimeSeriesDataSet.from_dataset( -# self.ts_dataset, inh_dataset, min_prediction_idx=0, stop_randomization=True -# ) -# return inh_dataset - -# def get_from_dataset(self, dataset): -# print(self.context_length + self.prediction_length) -# print(dataset) -# return TimeSeriesDataSet.from_dataset( -# self.ts_dataset, dataset, min_prediction_idx=0, stop_randomization=True -# )