diff --git a/deployment/nbeats/env b/deployment/nbeats/env index bfd0ff1fdfe83736bde5380088ebe6e410e07dda..a83d6aec69c8f0a1d106dee9a64b527c6e667e77 100644 --- a/deployment/nbeats/env +++ b/deployment/nbeats/env @@ -11,5 +11,5 @@ INFLUXDB_PORT=8086 INFLUXDB_USERNAME=morphemic INFLUXDB_PASSWORD=password INFLUXDB_DBNAME=morphemic - +TIME_ZONE=Europe/Vienna diff --git a/deployment/nbeats/loggers_config.ini b/deployment/nbeats/loggers_config.ini new file mode 100644 index 0000000000000000000000000000000000000000..2cfdc2d03ad16097454d40a2de76511248409c6b --- /dev/null +++ b/deployment/nbeats/loggers_config.ini @@ -0,0 +1,31 @@ +[loggers] +keys=root,retrain, main + +[handlers] +keys=consoleHandler + +[formatters] +keys=defaultFormatter + +[logger_root] +handlers=consoleHandler + +[logger_retrain] +handlers=consoleHandler +level=INFO +qualname=core +propagate=0 + +[logger_main] +handlers=consoleHandler +level=DEBUG +qualname=__main__ +propagate=0 + +[handler_consoleHandler] +class=logging.StreamHandler +formatter=defaultFormatter +args=(sys.stdout,) + +[formatter_defaultFormatter] +format=%(asctime)s %(levelname)s %(asctime)s %(filename)s - %(message)s \ No newline at end of file diff --git a/deployment/nbeats/main.py b/deployment/nbeats/main.py index ec34ebe24b73deaed558f21bf58ad0e988b21dfd..5e341e7c39558e09b25a319337b759c72fe53c68 100644 --- a/deployment/nbeats/main.py +++ b/deployment/nbeats/main.py @@ -6,7 +6,10 @@ from amq_message_python_library import * # python amq-message-python-library import logging import time from datetime import datetime -import pytz +from pytz import timezone +from datetime import datetime + +# from src.log import logger AMQ_USER = os.environ.get("AMQ_USER", "admin") AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin") @@ -17,9 +20,16 @@ METHOD = os.environ.get("METHOD", "nbeats") START_TOPIC = f"start_forecasting.{METHOD}" TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") -logging.basicConfig( - filename=f"logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO -) +# logging.basicConfig( +# filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", +# level=logging.INFO, +# datefmt="%Y-%m-%d %H:%M:%S", +# format="AAA %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", +# ) + +# logging.Formatter.converter = lambda *args: datetime.now(tz=timezone(TZ)).timetuple() + +# import logging.config def run_process(args): @@ -47,8 +57,7 @@ class StartListener(stomp.ConnectionListener): print('received an error "%s"' % frame.body) def on_message(self, frame): - print(self.topic_name) - logging.debug(f" Body: {frame.body}") + logging.info(f" Body: {frame.body}") message = json.loads(frame.body) global publish_rate, all_metrics @@ -83,12 +92,19 @@ class Msg(object): def main(): - logging.getLogger().setLevel(logging.DEBUG) - logging.info( - f"EXPERIMENT STARTED FOR APPLICATION: {os.environ.get('APP_NAME', 'demo')}" + logging.basicConfig( + filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", + format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", ) - logging.info( - f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + + logging.Formatter.converter = lambda *args: datetime.now( + tz=timezone(TZ) + ).timetuple() + log = logging.getLogger() + log.info( + f"EXPERIMENT STARTED FOR APPLICATION: {os.environ.get('APP_NAME', 'demo')}" ) start_app_conn = morphemic.Connection( diff --git a/deployment/nbeats/model.yaml b/deployment/nbeats/model.yaml index 8c1ea3ae226e656407db68e22c8ba8cf295dedab..8563e936a4850f1493389204dd34df93c2a28429 100644 --- a/deployment/nbeats/model.yaml +++ b/deployment/nbeats/model.yaml @@ -2,7 +2,7 @@ data: csv_path: demo.csv training: bs: 8 - max_epochs: 5 + max_epochs: 8 loss: rmse dataset: tv_unknown_reals: [] @@ -22,3 +22,5 @@ prediction: bs: 8 save_path: models +dataloader_path: + dataloader diff --git a/deployment/nbeats/predict.py b/deployment/nbeats/predict.py index 9552ed52ec72ff73bf9ec4250745d7e67c1901f2..e8b4c7dfd761f25787b20ab7860f38686268d2d9 100644 --- a/deployment/nbeats/predict.py +++ b/deployment/nbeats/predict.py @@ -11,12 +11,9 @@ import pandas as pd import logging from datetime import datetime from src.dataset_maker import CSVData +from pytz import timezone import pytz - -logging.basicConfig( - filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO -) - +from datetime import datetime METHOD = os.environ.get("METHOD", "nbeats") START_TOPIC = f"start_forecasting.{METHOD}" @@ -92,13 +89,17 @@ def main(): dataset_preprocessor = CSVData(APP_NAME) dataset_preprocessor.prepare_csv() - logging.debug("dataset downloaded") - logging.info(f"Dataset downloaded") + logging.info( + f"Dataset downloaded TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + # logging.info( + # f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + # ) influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME) - logging.debug( - f"waiting {(msg['epoch_start'] * 1000 - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds" + logging.info( + f"waiting {(msg['epoch_start'] * 1000 - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) dataset_preprocessor.prepare_csv() @@ -113,8 +114,12 @@ def main(): while True: start_time = int(time.time()) - logging.debug("prediction") - logging.info(f"prediction loop started") + log.info( + f"prediction TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + log.info( + f"prediction loop started TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) dataset_preprocessor.prepare_csv() global time_0 for metric in predicted_metrics: @@ -140,30 +145,50 @@ def main(): predictions = prediction if prediction_msgs: - logging.info(f"Sending predictions for {metric} metric") logging.info( - f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + f"Sending predictions for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) + dest = f"{PRED_TOPIC_PREF}.{metric}" print( f'{int(prediction_msgs[metric]["predictionTime"]) - int(prediction_msgs[metric]["timestamp"])} difference between timestamp and predicted in secnds' ) - print( - f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds' + logging.info( + f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds TIME: {datetime.now(pytz.timezone(TZ)).strftime("%d/%m/%Y %H:%M:%S")}' ) logging.info( - f"Message: {prediction_msgs[metric]}, destination: {dest}" + f"Message: {prediction_msgs[metric]}, destination: {dest} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) start_conn.send_to_topic(dest, prediction_msgs[metric]) influxdb_conn.send_to_influxdb(metric, prediction_msgs) end_time = int(time.time()) print(f"sleeping {prediction_cycle - (end_time - start_time)} seconds") + time_0 = time_0 + prediction_cycle - time.sleep(prediction_cycle - (end_time - start_time)) + time_to_wait = prediction_cycle - (end_time - start_time) + if time_to_wait < 0: + time_to_wait = prediction_cycle - (time_to_wait % prediction_cycle) + logging.info( + f"Prediction time is too slow (predictions might be delayed) TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + time_0 = time_0 + prediction_cycle + + time.sleep(time_to_wait) if __name__ == "__main__": + logging.basicConfig( + filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", + format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", + ) + + logging.Formatter.converter = lambda *args: datetime.now( + tz=timezone(TZ) + ).timetuple() + log = logging.getLogger() msg = json.loads(sys.argv[1]) metrics_info = { m["metric"]: { @@ -178,7 +203,7 @@ if __name__ == "__main__": predicted_metrics = set(msg["metrics"]) prediction_cycle = msg["prediction_horizon"] - logging.debug(f"Predicted metrics: {predicted_metrics}") + log.info(f"Predicted metrics: {predicted_metrics}") number_of_forward_predictions = { metric: msg["number_of_forward_predictions"] for metric in predicted_metrics } # deafult number of forward predictions diff --git a/deployment/nbeats/retrain.py b/deployment/nbeats/retrain.py index 0bcc644b73cc967e7fb49494984e86148427655f..1fccaa2c6991a2eba752d51c01180cff7359fb11 100644 --- a/deployment/nbeats/retrain.py +++ b/deployment/nbeats/retrain.py @@ -6,6 +6,9 @@ import time from src.model_train import train from amq_message_python_library import * from src.dataset_maker import CSVData +import pytz +import time +from datetime import datetime TOPIC_NAME = "training_models" RETRAIN_CYCLE = 10 # minutes @@ -14,12 +17,7 @@ AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin") AMQ_HOST = os.environ.get("AMQ_HOST", "localhost") AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613") APP_NAME = os.environ.get("APP_NAME", "demo") - -logging.basicConfig( - filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO -) - -print(os.listdir("./logs"), "files") +TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") def main(predicted_metrics, prediction_horizon): @@ -36,15 +34,25 @@ def main(predicted_metrics, prediction_horizon): for metric in predicted_metrics: retrain_msg = train(metric, prediction_horizon) if retrain_msg: - logging.info(f"Training completed for {metric} metric") + logging.info( + f"Training completed for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) start_conn.send_to_topic(TOPIC_NAME, retrain_msg) + else: print("Not enough data for model training, waiting ...") - logging.info("Not enough data for model training, waiting ...") - start_conn.send_to_topic(TOPIC_NAME, retrain_msg) + logging.info( + f"Not enough data for model training, waiting ... TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) end_time = int(time.time()) - time.sleep(60 * RETRAIN_CYCLE - (end_time - start_time)) + time_to_wait = 60 * RETRAIN_CYCLE - (end_time - start_time) + if time_to_wait < 0: + time_to_wait = 60 * RETRAIN_CYCLE - (time_to_wait % 60 * RETRAIN_CYCLE) + logging.info( + f"Waiting for the next training: {time_to_wait} seconds TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + time.sleep(time_to_wait) if __name__ == "__main__": @@ -59,5 +67,7 @@ if __name__ == "__main__": } prediction_horizon = (msg["prediction_horizon"] * 1000) // msg["publish_rate"] predicted_metrics = set(metrics_info.keys()) - logging.debug(f"Predicted metrics: {predicted_metrics}") + logging.info( + f"Predicted metrics: {predicted_metrics} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) main(predicted_metrics, prediction_horizon) diff --git a/deployment/nbeats/src/log.py b/deployment/nbeats/src/log.py new file mode 100644 index 0000000000000000000000000000000000000000..153230ec443ee361c7bcd510735890cef9e3cc92 --- /dev/null +++ b/deployment/nbeats/src/log.py @@ -0,0 +1,17 @@ +import logging +from pytz import timezone +from datetime import datetime +import os + +TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") + +logging.basicConfig( + filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", + format="BBB %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", +) + +logging.Formatter.converter = lambda *args: datetime.now(tz=timezone(TZ)).timetuple() + +logger = logging diff --git a/deployment/nbeats/src/model_predict.py b/deployment/nbeats/src/model_predict.py index 30b16397fb57dd08c7e7f0df3ce09ca18549fc94..b382da4f1539ac471c023ce93870b411b480b3a0 100644 --- a/deployment/nbeats/src/model_predict.py +++ b/deployment/nbeats/src/model_predict.py @@ -1,4 +1,4 @@ -from pytorch_forecasting.metrics import RMSE +from pytorch_forecasting.metrics import RMSE, MAE import yaml import pandas as pd import numpy as np @@ -10,6 +10,11 @@ from src.preprocess_dataset import Dataset from pytorch_forecasting import NBeats import scipy.stats as st import logging +import pickle +import pytz +from datetime import datetime + +TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") """Script for nbeats fusion transformer prediction""" logging.basicConfig( @@ -26,23 +31,44 @@ def predict( prediction_hor=60, timestamp=0, ): + with open(yaml_file) as file: params = yaml.load(file, Loader=yaml.FullLoader) params["dataset"]["prediction_length"] = prediction_length params["dataset"]["context_length"] = prediction_length * 10 + print(prediction_length, "prediction length") + + model_path = os.path.join(params["save_path"], f"{target_column}.pth") + + if not os.path.isfile(model_path): # no pretrained model, unable to predict + logging.info( + f"no pretrained model, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + print("no pretrained model, unable to predict") + return (None, None) + data_path = os.path.join( os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' ) - dataset = pd.read_csv(data_path).tail(1000) + dataset = pd.read_csv(data_path) + # dataset[target_column] = range(dataset.shape[0]) if extra_data is not None: dataset = pd.concat([dataset, extra_data], ignore_index=True) - ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + lockfile = params["dataloader_path"] + ".pickle" + lock = FileLock(lockfile + ".lock") - ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + if os.path.isfile(lockfile): + with lock: + with open(lockfile, "rb") as handle: + ts_dataset = pickle.load(handle) + ts_dataset.prepare_dataset(dataset) + ts_dataset.dataset["split"] = "train" + + # ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) pred_len = params["dataset"]["prediction_length"] @@ -52,8 +78,9 @@ def predict( future_df["split"] = "future" ts_dataset.dataset = pd.concat([ts_dataset.dataset, future_df]).reset_index() + # print(ts_dataset.dataset[target_column]) - prediction_input = ts_dataset.inherited_dataset("val", "future") + prediction_input = ts_dataset.inherited_dataset("train", "future") model = NBeats.from_dataset( ts_dataset.ts_dataset, @@ -65,13 +92,6 @@ def predict( lockfile = params["save_path"] lock = FileLock(lockfile + ".lock") - model_path = os.path.join(params["save_path"], f"{target_column}.pth") - - if not os.path.isfile(model_path): # no pretrained model, unable to predict - logging.info("no pretrained model, unable to predict") - print("no pretrained model, unable to predict") - return (None, None) - if os.path.isfile(lockfile): with lock: model.load_state_dict(torch.load(model_path)) @@ -82,7 +102,7 @@ def predict( predictions_with_dropout = [] model.train() - model.loss = RMSE() + model.loss = MAE() with torch.no_grad(): for _ in range(20): for x, _ in prediction_input: diff --git a/deployment/nbeats/src/model_train.py b/deployment/nbeats/src/model_train.py index a78a6ee192e3975f87280ed49388e1b3bc1ee6ba..9dc7eb0de609247fd636519f046779b7a5aab590 100644 --- a/deployment/nbeats/src/model_train.py +++ b/deployment/nbeats/src/model_train.py @@ -10,10 +10,7 @@ from pytorch_lightning.callbacks import EarlyStopping, LearningRateMonitor from pytorch_forecasting.metrics import QuantileLoss, MAE, RMSE, CrossEntropy from pytorch_forecasting import NBeats from src.preprocess_dataset import Dataset - -logging.basicConfig( - filename=f"/logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO -) +import pickle """Script for temporal fusion transformer training""" @@ -37,7 +34,8 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' ) - dataset = pd.read_csv(data_path).tail(1000) + dataset = pd.read_csv(data_path) + # dataset[target_column] = range(dataset.shape[0]) if dataset.shape[0] < 12 * prediction_length: logging.info( @@ -47,6 +45,14 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + lockfile = params["dataloader_path"] + ".pickle" + lock = FileLock(lockfile + ".lock") + + with lock: + with open(lockfile, "wb") as handle: + pickle.dump(ts_dataset, handle) + print(f"train dataset saved: {lockfile}") + training = ts_dataset.ts_dataset validation = ts_dataset.inherited_dataset( "train", "val" @@ -67,8 +73,9 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): max_epochs=params["training"]["max_epochs"], gpus=0, gradient_clip_val=0.5, - callbacks=[lr_logger, early_stop_callback], - checkpoint_callback=False, # TODO: is pl checkpoint_callback thread safe? + # callbacks=[lr_logger, early_stop_callback], + # checkpoint_callback=False, + logger=None, ) model = NBeats.from_dataset( diff --git a/deployment/nbeats/src/preprocess_dataset.py b/deployment/nbeats/src/preprocess_dataset.py index ce31583d59fce6899b286e2ad8876cab66eefbbe..39d433e09cb3da331f0064b00dd597ac93382a04 100644 --- a/deployment/nbeats/src/preprocess_dataset.py +++ b/deployment/nbeats/src/preprocess_dataset.py @@ -31,12 +31,20 @@ class Dataset(object): self.classification = classification self.context_length = context_length self.prediction_length = prediction_length + self.prepare_dataset(dataset) + # self.dataset = self.cut_nan_start(dataset) + # self.fill_na() + # self.dataset = self.add_obligatory_columns(self.dataset) + # self.dataset = self.convert_formats(self.dataset) + # self.n = dataset.shape[0] + self.ts_dataset = self.create_time_series_dataset() + + def prepare_dataset(self, dataset): self.dataset = self.cut_nan_start(dataset) self.fill_na() self.dataset = self.add_obligatory_columns(self.dataset) self.dataset = self.convert_formats(self.dataset) self.n = dataset.shape[0] - self.ts_dataset = self.create_time_series_dataset() def cut_nan_start(self, dataset): first_not_nan_index = dataset[self.target_column][ @@ -64,7 +72,7 @@ class Dataset(object): dataset["series"] = 0 dataset["split"] = "train" n = dataset.shape[0] - dataset["split"][int(n * 0.8) :] = "val" + dataset["split"][int(n * 0.9) :] = "val" dataset["time_idx"] = range(n) # TODO check time gaps return dataset @@ -109,3 +117,8 @@ class Dataset(object): self.ts_dataset, inh_dataset, min_prediction_idx=0, stop_randomization=True ) return inh_dataset + + def get_from_dataset(self, dataset): + return TimeSeriesDataSet.from_dataset( + self.ts_dataset, dataset, min_prediction_idx=0, stop_randomization=True + ) diff --git a/deployment/tft/main.py b/deployment/tft/main.py index ec34ebe24b73deaed558f21bf58ad0e988b21dfd..e70f263a85db2a365e8077d532ec0ad76d8ae7e3 100644 --- a/deployment/tft/main.py +++ b/deployment/tft/main.py @@ -6,7 +6,10 @@ from amq_message_python_library import * # python amq-message-python-library import logging import time from datetime import datetime -import pytz +from pytz import timezone +from datetime import datetime + +# from src.log import logger AMQ_USER = os.environ.get("AMQ_USER", "admin") AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin") @@ -17,9 +20,16 @@ METHOD = os.environ.get("METHOD", "nbeats") START_TOPIC = f"start_forecasting.{METHOD}" TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") -logging.basicConfig( - filename=f"logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO -) +# logging.basicConfig( +# filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", +# level=logging.INFO, +# datefmt="%Y-%m-%d %H:%M:%S", +# format="AAA %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", +# ) + +# logging.Formatter.converter = lambda *args: datetime.now(tz=timezone(TZ)).timetuple() + +# import logging.config def run_process(args): @@ -47,8 +57,7 @@ class StartListener(stomp.ConnectionListener): print('received an error "%s"' % frame.body) def on_message(self, frame): - print(self.topic_name) - logging.debug(f" Body: {frame.body}") + logging.info(f" Body: {frame.body}") message = json.loads(frame.body) global publish_rate, all_metrics @@ -83,12 +92,19 @@ class Msg(object): def main(): - logging.getLogger().setLevel(logging.DEBUG) - logging.info( - f"EXPERIMENT STARTED FOR APPLICATION: {os.environ.get('APP_NAME', 'demo')}" + logging.basicConfig( + filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", + format="AAA %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", ) - logging.info( - f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + + logging.Formatter.converter = lambda *args: datetime.now( + tz=timezone(TZ) + ).timetuple() + log = logging.getLogger() + log.info( + f"EXPERIMENT STARTED FOR APPLICATION: {os.environ.get('APP_NAME', 'demo')}" ) start_app_conn = morphemic.Connection( diff --git a/deployment/tft/model.yaml b/deployment/tft/model.yaml index 1984863b6867c7735185932a551cac55f9ecbbd4..24235c2d2d14934d36e045f7384dfefba5c1e91b 100644 --- a/deployment/tft/model.yaml +++ b/deployment/tft/model.yaml @@ -2,7 +2,7 @@ data: csv_path: demo.csv training: bs: 8 - max_epochs: 1 + max_epochs: 10 loss: quantile dataset: tv_unknown_reals: [] diff --git a/deployment/tft/predict.py b/deployment/tft/predict.py index 9552ed52ec72ff73bf9ec4250745d7e67c1901f2..e8b4c7dfd761f25787b20ab7860f38686268d2d9 100644 --- a/deployment/tft/predict.py +++ b/deployment/tft/predict.py @@ -11,12 +11,9 @@ import pandas as pd import logging from datetime import datetime from src.dataset_maker import CSVData +from pytz import timezone import pytz - -logging.basicConfig( - filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO -) - +from datetime import datetime METHOD = os.environ.get("METHOD", "nbeats") START_TOPIC = f"start_forecasting.{METHOD}" @@ -92,13 +89,17 @@ def main(): dataset_preprocessor = CSVData(APP_NAME) dataset_preprocessor.prepare_csv() - logging.debug("dataset downloaded") - logging.info(f"Dataset downloaded") + logging.info( + f"Dataset downloaded TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + # logging.info( + # f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + # ) influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME) - logging.debug( - f"waiting {(msg['epoch_start'] * 1000 - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds" + logging.info( + f"waiting {(msg['epoch_start'] * 1000 - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) dataset_preprocessor.prepare_csv() @@ -113,8 +114,12 @@ def main(): while True: start_time = int(time.time()) - logging.debug("prediction") - logging.info(f"prediction loop started") + log.info( + f"prediction TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + log.info( + f"prediction loop started TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) dataset_preprocessor.prepare_csv() global time_0 for metric in predicted_metrics: @@ -140,30 +145,50 @@ def main(): predictions = prediction if prediction_msgs: - logging.info(f"Sending predictions for {metric} metric") logging.info( - f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + f"Sending predictions for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) + dest = f"{PRED_TOPIC_PREF}.{metric}" print( f'{int(prediction_msgs[metric]["predictionTime"]) - int(prediction_msgs[metric]["timestamp"])} difference between timestamp and predicted in secnds' ) - print( - f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds' + logging.info( + f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds TIME: {datetime.now(pytz.timezone(TZ)).strftime("%d/%m/%Y %H:%M:%S")}' ) logging.info( - f"Message: {prediction_msgs[metric]}, destination: {dest}" + f"Message: {prediction_msgs[metric]}, destination: {dest} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) start_conn.send_to_topic(dest, prediction_msgs[metric]) influxdb_conn.send_to_influxdb(metric, prediction_msgs) end_time = int(time.time()) print(f"sleeping {prediction_cycle - (end_time - start_time)} seconds") + time_0 = time_0 + prediction_cycle - time.sleep(prediction_cycle - (end_time - start_time)) + time_to_wait = prediction_cycle - (end_time - start_time) + if time_to_wait < 0: + time_to_wait = prediction_cycle - (time_to_wait % prediction_cycle) + logging.info( + f"Prediction time is too slow (predictions might be delayed) TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + time_0 = time_0 + prediction_cycle + + time.sleep(time_to_wait) if __name__ == "__main__": + logging.basicConfig( + filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", + format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", + ) + + logging.Formatter.converter = lambda *args: datetime.now( + tz=timezone(TZ) + ).timetuple() + log = logging.getLogger() msg = json.loads(sys.argv[1]) metrics_info = { m["metric"]: { @@ -178,7 +203,7 @@ if __name__ == "__main__": predicted_metrics = set(msg["metrics"]) prediction_cycle = msg["prediction_horizon"] - logging.debug(f"Predicted metrics: {predicted_metrics}") + log.info(f"Predicted metrics: {predicted_metrics}") number_of_forward_predictions = { metric: msg["number_of_forward_predictions"] for metric in predicted_metrics } # deafult number of forward predictions diff --git a/deployment/tft/retrain.py b/deployment/tft/retrain.py index 202cf97db1d4d11e0f055485779f0bbd5e961494..9520486456642efd29d52c81a9e11d99f0c923f3 100644 --- a/deployment/tft/retrain.py +++ b/deployment/tft/retrain.py @@ -6,6 +6,9 @@ import time from src.model_train import train from amq_message_python_library import * from src.dataset_maker import CSVData +import pytz +import time +from datetime import datetime TOPIC_NAME = "training_models" RETRAIN_CYCLE = 10 # minutes @@ -14,10 +17,7 @@ AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin") AMQ_HOST = os.environ.get("AMQ_HOST", "localhost") AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613") APP_NAME = os.environ.get("APP_NAME", "demo") - -logging.basicConfig( - filename=f"logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO -) +TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") def main(predicted_metrics, prediction_horizon): @@ -34,19 +34,30 @@ def main(predicted_metrics, prediction_horizon): for metric in predicted_metrics: retrain_msg = train(metric, prediction_horizon) if retrain_msg: - logging.info(f"Training completed for {metric} metric") + logger.info( + f"Training completed for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) start_conn.send_to_topic(TOPIC_NAME, retrain_msg) + else: print("Not enough data for model training, waiting ...") - logging.info("Not enough data for model training, waiting ...") - start_conn.send_to_topic(TOPIC_NAME, retrain_msg) + logger.info( + f"Not enough data for model training, waiting ... TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) end_time = int(time.time()) - time.sleep(60 * RETRAIN_CYCLE - (end_time - start_time)) + time_to_wait = 60 * RETRAIN_CYCLE - (end_time - start_time) + if time_to_wait < 0: + time_to_wait = 60 * RETRAIN_CYCLE - (time_to_wait % 60 * RETRAIN_CYCLE) + logger.info( + f"Waiting for the next training: {time_to_wait} seconds TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + time.sleep(time_to_wait) if __name__ == "__main__": - logging.info(f"Training loop started") + logger = logging.getLogger() + logger.info(f"Training loop started") msg = json.loads(sys.argv[1]) metrics_info = { m["metric"]: { @@ -57,5 +68,7 @@ if __name__ == "__main__": } prediction_horizon = (msg["prediction_horizon"] * 1000) // msg["publish_rate"] predicted_metrics = set(metrics_info.keys()) - logging.debug(f"Predicted metrics: {predicted_metrics}") + logger.info( + f"Predicted metrics: {predicted_metrics} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) main(predicted_metrics, prediction_horizon) diff --git a/deployment/tft/src/model_predict.py b/deployment/tft/src/model_predict.py index 1013a5fbdd863c99d5d7f921f4b8a658d53fc698..b7eff82634fad7d5065ede8d74afc24459f8f7a7 100644 --- a/deployment/tft/src/model_predict.py +++ b/deployment/tft/src/model_predict.py @@ -9,6 +9,10 @@ from src.preprocess_dataset import Dataset from pytorch_forecasting import TemporalFusionTransformer import time import logging +import pytz +from datetime import datetime + +TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") pd.options.mode.chained_assignment = None @@ -29,12 +33,19 @@ def predict( params["dataset"]["prediction_length"] = prediction_length params["dataset"]["context_length"] = prediction_length * 10 + model_path = os.path.join(params["save_path"], f"{target_column}.pth") + + if not os.path.isfile(model_path): # no pretrained model, unable to predict + logging.info( + f"no pretrained model, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + return (None, None) + data_path = os.path.join( os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' ) dataset = pd.read_csv(data_path) - print(dataset) if extra_data is not None: dataset = pd.concat([dataset, extra_data], ignore_index=True) diff --git a/deployment/tft/src/model_train.py b/deployment/tft/src/model_train.py index a417705f1e612cf7d9de42a76336211799f3382b..51191ef9edcfe5160563bf63a69479f28bf56864 100644 --- a/deployment/tft/src/model_train.py +++ b/deployment/tft/src/model_train.py @@ -12,7 +12,7 @@ from pytorch_forecasting import TemporalFusionTransformer from src.preprocess_dataset import Dataset logging.basicConfig( - filename=f"/logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO + filename=f"/wd/logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO ) """Script for temporal fusion transformer training""" @@ -38,7 +38,7 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' ) - dataset = pd.read_csv(data_path).tail(1000) + dataset = pd.read_csv(data_path) if dataset.shape[0] < 12 * prediction_length: logging.info( @@ -75,8 +75,9 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): max_epochs=params["training"]["max_epochs"], gpus=0, gradient_clip_val=0.5, - callbacks=[lr_logger, early_stop_callback], - checkpoint_callback=False, # TODO: is pl checkpoint_callback thread safe? + # callbacks=[lr_logger, early_stop_callback], + # checkpoint_callback=False, # TODO: is pl checkpoint_callback thread safe? + logger=None, ) tft = TemporalFusionTransformer.from_dataset(