From 35cfdc647d9e2bb1f5382ba43ce6f0ee4dadf17e Mon Sep 17 00:00:00 2001 From: Anna Warno Date: Wed, 15 Sep 2021 09:24:03 +0200 Subject: [PATCH 1/2] arima model corrected --- deployment/arima/docker_run.sh | 1 - deployment/arima/main.py | 51 ++++++------ deployment/arima/predict.py | 92 ++++++++++++---------- deployment/arima/retrain.py | 37 ++++++--- deployment/arima/run.sh | 5 -- deployment/arima/src/model_predict.py | 75 +++++++++--------- deployment/arima/src/model_train.py | 15 ++-- deployment/arima/src/preprocess_dataset.py | 80 +++++++++++++------ 8 files changed, 207 insertions(+), 149 deletions(-) delete mode 100755 deployment/arima/docker_run.sh delete mode 100644 deployment/arima/run.sh diff --git a/deployment/arima/docker_run.sh b/deployment/arima/docker_run.sh deleted file mode 100755 index cff33ad4..00000000 --- a/deployment/arima/docker_run.sh +++ /dev/null @@ -1 +0,0 @@ -docker run -t --env-file=env --network=host stomp_app diff --git a/deployment/arima/main.py b/deployment/arima/main.py index ec34ebe2..e53670df 100644 --- a/deployment/arima/main.py +++ b/deployment/arima/main.py @@ -6,7 +6,8 @@ from amq_message_python_library import * # python amq-message-python-library import logging import time from datetime import datetime -import pytz +from pytz import timezone +from datetime import datetime AMQ_USER = os.environ.get("AMQ_USER", "admin") AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin") @@ -17,10 +18,6 @@ METHOD = os.environ.get("METHOD", "nbeats") START_TOPIC = f"start_forecasting.{METHOD}" TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") -logging.basicConfig( - filename=f"logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO -) - def run_process(args): print("running") @@ -47,8 +44,7 @@ class StartListener(stomp.ConnectionListener): print('received an error "%s"' % frame.body) def on_message(self, frame): - print(self.topic_name) - logging.debug(f" Body: {frame.body}") + logging.info(f" Body: {frame.body}") message = json.loads(frame.body) global publish_rate, all_metrics @@ -83,12 +79,19 @@ class Msg(object): def main(): - logging.getLogger().setLevel(logging.DEBUG) - logging.info( - f"EXPERIMENT STARTED FOR APPLICATION: {os.environ.get('APP_NAME', 'demo')}" + logging.basicConfig( + filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", + format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", ) - logging.info( - f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + + logging.Formatter.converter = lambda *args: datetime.now( + tz=timezone(TZ) + ).timetuple() + log = logging.getLogger() + log.info( + f"EXPERIMENT STARTED FOR APPLICATION: {os.environ.get('APP_NAME', 'demo')}" ) start_app_conn = morphemic.Connection( @@ -108,18 +111,18 @@ def main(): "2", StartForecastingListener(start_conn.conn, START_TOPIC) ) - # msg1 = Msg() - # msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 10000}]' - # msg2 = Msg() - # msg2.body = ( - # "{" - # + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 120' - # + "}" - # ) - - # print(msg1) - # StartListener(start_conn.conn, START_APP_TOPIC).on_message(msg1) - # StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2) + msg1 = Msg() + msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 30000}]' + msg2 = Msg() + msg2.body = ( + "{" + + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' + + "}" + ) + + print(msg1) + StartListener(start_conn.conn, START_APP_TOPIC).on_message(msg1) + StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2) while True: pass diff --git a/deployment/arima/predict.py b/deployment/arima/predict.py index 9552ed52..ec97dc9a 100644 --- a/deployment/arima/predict.py +++ b/deployment/arima/predict.py @@ -11,12 +11,9 @@ import pandas as pd import logging from datetime import datetime from src.dataset_maker import CSVData +from pytz import timezone import pytz - -logging.basicConfig( - filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO -) - +from datetime import datetime METHOD = os.environ.get("METHOD", "nbeats") START_TOPIC = f"start_forecasting.{METHOD}" @@ -92,13 +89,14 @@ def main(): dataset_preprocessor = CSVData(APP_NAME) dataset_preprocessor.prepare_csv() - logging.debug("dataset downloaded") - logging.info(f"Dataset downloaded") + logging.info( + f"Dataset downloaded TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME) - logging.debug( - f"waiting {(msg['epoch_start'] * 1000 - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds" + logging.info( + f"waiting {(msg['epoch_start'] * 1000 - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) dataset_preprocessor.prepare_csv() @@ -113,57 +111,63 @@ def main(): while True: start_time = int(time.time()) - logging.debug("prediction") - logging.info(f"prediction loop started") + logging.info( + f"prediction TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + logging.info( + f"prediction loop started TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) dataset_preprocessor.prepare_csv() global time_0 for metric in predicted_metrics: - predictions = None for i in range(number_of_forward_predictions[metric]): - prediction_msgs, prediction = predict( + prediction_msgs = predict( metric, - (prediction_cycle * 1000) // msg["publish_rate"], - extra_data=predictions, - m=i + 1, + prediction_length, prediction_hor=prediction_horizon, - timestamp=time_0 + (i + 1) * (prediction_horizon // 1000), + timestamp=time_0, ) if i == (number_of_forward_predictions[metric] - 1): print( f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}" ) - if predictions is not None: - predictions = pd.concat( - [predictions, prediction], ignore_index=True - ) - else: - predictions = prediction if prediction_msgs: - logging.info(f"Sending predictions for {metric} metric") - logging.info( - f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" - ) dest = f"{PRED_TOPIC_PREF}.{metric}" - print( - f'{int(prediction_msgs[metric]["predictionTime"]) - int(prediction_msgs[metric]["timestamp"])} difference between timestamp and predicted in secnds' - ) - print( - f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds' - ) - logging.info( - f"Message: {prediction_msgs[metric]}, destination: {dest}" - ) - start_conn.send_to_topic(dest, prediction_msgs[metric]) - influxdb_conn.send_to_influxdb(metric, prediction_msgs) + + for message in prediction_msgs: + logging.info( + f"Sending predictions for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + start_conn.send_to_topic(dest, message[metric]) + influxdb_conn.send_to_influxdb(metric, message) end_time = int(time.time()) print(f"sleeping {prediction_cycle - (end_time - start_time)} seconds") + time_0 = time_0 + prediction_cycle - time.sleep(prediction_cycle - (end_time - start_time)) + time_to_wait = prediction_cycle - (end_time - start_time) + if time_to_wait < 0: + time_to_wait = prediction_cycle - (time_to_wait % prediction_cycle) + logging.info( + f"Prediction time is too slow (predictions might be delayed) TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + time_0 = time_0 + prediction_cycle + + time.sleep(time_to_wait) if __name__ == "__main__": + logging.basicConfig( + filename=f"logs/{os.environ.get('METHOD', 'arima')}.out", + level=logging.INFO, + datefmt="%Y-%m-%d %H:%M:%S", + format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", + ) + + logging.Formatter.converter = lambda *args: datetime.now( + tz=timezone(TZ) + ).timetuple() msg = json.loads(sys.argv[1]) metrics_info = { m["metric"]: { @@ -175,10 +179,16 @@ if __name__ == "__main__": time_0 = msg["epoch_start"] prediction_horizon = msg["prediction_horizon"] * 1000 + prediction_points_horizon = msg["prediction_horizon"] * 1000 / msg["publish_rate"] predicted_metrics = set(msg["metrics"]) prediction_cycle = msg["prediction_horizon"] - - logging.debug(f"Predicted metrics: {predicted_metrics}") + prediction_length = ( + msg["prediction_horizon"] + * 1000 + // msg["publish_rate"] + * msg["number_of_forward_predictions"] + ) + logging.info(f"Predicted metrics: {predicted_metrics}") number_of_forward_predictions = { metric: msg["number_of_forward_predictions"] for metric in predicted_metrics } # deafult number of forward predictions diff --git a/deployment/arima/retrain.py b/deployment/arima/retrain.py index 4ff0c379..2a6e654d 100644 --- a/deployment/arima/retrain.py +++ b/deployment/arima/retrain.py @@ -6,6 +6,9 @@ import time from src.model_train import train from amq_message_python_library import * from src.dataset_maker import CSVData +import pytz +import time +from datetime import datetime TOPIC_NAME = "training_models" RETRAIN_CYCLE = 10 # minutes @@ -14,13 +17,12 @@ AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin") AMQ_HOST = os.environ.get("AMQ_HOST", "localhost") AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613") APP_NAME = os.environ.get("APP_NAME", "demo") +TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") logging.basicConfig( - filename=f"logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO + filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO ) -print(os.listdir("./"), "files") - def main(predicted_metrics, prediction_horizon): start_conn = morphemic.Connection( @@ -36,15 +38,25 @@ def main(predicted_metrics, prediction_horizon): for metric in predicted_metrics: retrain_msg = train(metric, prediction_horizon) if retrain_msg: - logging.info(f"Training completed for {metric} metric") + logging.info( + f"Training completed for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) start_conn.send_to_topic(TOPIC_NAME, retrain_msg) + else: print("Not enough data for model training, waiting ...") - logging.info("Not enough data for model training, waiting ...") - start_conn.send_to_topic(TOPIC_NAME, retrain_msg) + logging.info( + f"Not enough data for model training, waiting ... TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) end_time = int(time.time()) - time.sleep(60 * RETRAIN_CYCLE - (end_time - start_time)) + time_to_wait = 60 * RETRAIN_CYCLE - (end_time - start_time) + if time_to_wait < 0: + time_to_wait = 60 * RETRAIN_CYCLE - (time_to_wait % 60 * RETRAIN_CYCLE) + logging.info( + f"Waiting for the next training: {time_to_wait} seconds TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + time.sleep(time_to_wait) if __name__ == "__main__": @@ -57,7 +69,14 @@ if __name__ == "__main__": } for m in msg["all_metrics"] } - prediction_horizon = (msg["prediction_horizon"] * 1000) // msg["publish_rate"] + prediction_horizon = ( + msg["prediction_horizon"] + * 1000 + // msg["publish_rate"] + * msg["number_of_forward_predictions"] + ) predicted_metrics = set(metrics_info.keys()) - logging.debug(f"Predicted metrics: {predicted_metrics}") + logging.info( + f"Predicted metrics: {predicted_metrics} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) main(predicted_metrics, prediction_horizon) diff --git a/deployment/arima/run.sh b/deployment/arima/run.sh deleted file mode 100644 index 917126cf..00000000 --- a/deployment/arima/run.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/sh -python3 morphemic-datasetmaker/setup.py install -# rm -r morphemic-datasetmaker -# python3 main.py -# mv amq-message-python-library amq_message_python_library diff --git a/deployment/arima/src/model_predict.py b/deployment/arima/src/model_predict.py index f7b6ec01..e4f0bdaa 100644 --- a/deployment/arima/src/model_predict.py +++ b/deployment/arima/src/model_predict.py @@ -18,60 +18,59 @@ def predict( target_column, prediction_length, yaml_file="model.yaml", - extra_data=None, - m=1, prediction_hor=60, timestamp=0, ): with open(yaml_file) as file: params = yaml.load(file, Loader=yaml.FullLoader) params["dataset"]["prediction_length"] = prediction_length - params["dataset"]["context_length"] = prediction_length * 10 + params["dataset"]["context_length"] = prediction_length * 5 data_path = os.path.join( os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' ) - dataset = pd.read_csv(data_path).head(500) - - if extra_data is not None: - dataset = pd.concat([dataset, extra_data], ignore_index=True) + dataset = pd.read_csv(data_path) ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + ts_dataset = ts_dataset.dataset[ + ts_dataset.dataset.series == ts_dataset.dataset.series.max() + ] pred_len = params["dataset"]["prediction_length"] + best_model_aic = None - sarima = sm.tsa.statespace.SARIMAX(ts_dataset.dataset[target_column]) - model = sarima.fit() - predictions = model.get_forecast(pred_len, return_conf_int=True, alpha=0.05) - - future_df = ts_dataset.dataset.iloc[[-1 for _ in range(pred_len)]] + for order in [(2, 1, 2), (3, 0, 1), (1, 0, 1), (3, 1, 0)]: + sarima = sm.tsa.statespace.SARIMAX(ts_dataset[target_column], order=order) + model = sarima.fit() + if best_model_aic: + if model.aic < best_model_aic: + predictions = model.get_forecast( + pred_len, return_conf_int=True, alpha=0.05 + ) + best_model_aic = model.aic + else: + predictions = model.get_forecast(pred_len, return_conf_int=True, alpha=0.05) + best_model_aic = model.aic - future_time_idx = list(range(ts_dataset.n, ts_dataset.n + pred_len)) - future_df["time_idx"] = future_time_idx - future_df["split"] = "future" - - ts_dataset.dataset = pd.concat([ts_dataset.dataset, future_df]).reset_index( - drop=True - ) + msgs = [] - msg = { - target_column: { - "metricValue": predictions.predicted_mean.values[-1], - "level": 1, # TODO - "timestamp": int(time.time()), - "probability": 0.95, - "confidence_interval": [ - predictions.conf_int(alpha=0.05).iloc[-1].values[0], - predictions.conf_int(alpha=0.05).iloc[-1].values[1], - ], # quantiles difference - "predictionTime": timestamp, - "refersTo": "TODO", - "cloud": "TODO", - "provider": "TODO", + for i in range(pred_len): + msg = { + target_column: { + "metricValue": predictions.predicted_mean.values[i], + "level": 1, # TODO + "timestamp": int(time.time()), + "probability": 0.95, + "confidence_interval": [ + predictions.conf_int(alpha=0.05).iloc[i].values[0], + predictions.conf_int(alpha=0.05).iloc[i].values[1], + ], # quantiles difference + "predictionTime": timestamp + (i + 1) * (prediction_hor // 1000), + "refersTo": "TODO", + "cloud": "TODO", + "provider": "TODO", + } } - } - logging.debug(f"prediction msg: {msg}") + msgs.append(msg) - future_df["split"] = "val" - future_df[target_column] = predictions.predicted_mean.values - return (msg, future_df) + return msgs diff --git a/deployment/arima/src/model_train.py b/deployment/arima/src/model_train.py index 83ed8e87..c6f0fa02 100644 --- a/deployment/arima/src/model_train.py +++ b/deployment/arima/src/model_train.py @@ -1,12 +1,11 @@ -import yaml -import pandas as pd -from filelock import FileLock -from src.preprocess_dataset import Dataset - - -"""Script for temporal fusion transformer training""" +import time +import os def train(target_column, prediction_length, yaml_file="model.yaml"): - msg = None + msg = { + "metrics": [target_column], + "forecasting_method": os.environ.get("METHOD", "tft"), + "timestamp": int(time.time()), + } return msg diff --git a/deployment/arima/src/preprocess_dataset.py b/deployment/arima/src/preprocess_dataset.py index 3faf2789..a39aedeb 100644 --- a/deployment/arima/src/preprocess_dataset.py +++ b/deployment/arima/src/preprocess_dataset.py @@ -1,5 +1,6 @@ import pandas as pd import numpy as np +import logging pd.options.mode.chained_assignment = None @@ -12,15 +13,18 @@ class Dataset(object): self, dataset, target_column="value", - tv_unknown_reals="[]", - known_reals="[]", - tv_unknown_cat="[]", - static_reals="[]", + tv_unknown_reals=[], + known_reals=[], + tv_unknown_cat=[], + static_reals=[], classification=0, context_length=40, prediction_length=5, ): + self.max_missing_values = ( + 20 # max consecutive missing values allowed per series + ) self.target_column = target_column self.tv_unknown_cat = tv_unknown_cat self.known_reals = known_reals @@ -29,22 +33,19 @@ class Dataset(object): self.classification = classification self.context_length = context_length self.prediction_length = prediction_length - self.dataset = self.cut_nan_start(dataset) - self.fill_na() - self.dataset = self.add_obligatory_columns(self.dataset) - self.dataset = self.convert_formats(self.dataset) + self.dataset = dataset + self.check_gap() self.n = dataset.shape[0] - self.ts_dataset = self.create_time_series_dataset() def cut_nan_start(self, dataset): - first_not_nan_index = dataset[self.target_column][ - dataset[self.target_column] != "None" - ].index[0] + dataset.index = range(dataset.shape[0]) + first_not_nan_index = dataset[self.target_column].first_valid_index() return dataset[dataset.index > first_not_nan_index] - def fill_na(self): - self.dataset = self.dataset.replace("None", np.nan) - self.dataset = self.dataset.ffill(axis="rows") + def fill_na(self, dataset): + dataset = dataset.replace("None", np.nan) + dataset = dataset.ffill(axis="rows") + return dataset def convert_formats(self, dataset): if not self.classification: @@ -63,14 +64,47 @@ class Dataset(object): dataset["split"] = "train" n = dataset.shape[0] dataset["split"][int(n * 0.8) :] = "val" - dataset["time_idx"] = range(n) # TODO check time gaps + dataset["time_idx"] = range(n) return dataset - def create_time_series_dataset(self): - return self.dataset + def check_gap(self): + max_gap = self.dataset["time"].diff().abs().max() + logging.info(f"Max time gap in series {max_gap}") + print(f"Max time gap in series {max_gap}") + series_freq = self.dataset["time"].diff().value_counts().index.values[0] + logging.info(f"Detected series with {series_freq} frequency") + print(f"Detected series with {series_freq} frequency") + # check series length + series = np.split( + self.dataset, + *np.where( + self.dataset["time"].diff().abs().fillna(0).astype(int) + >= np.abs(self.max_missing_values * series_freq) + ), + ) + logging.info(f"{len(series)} series found") + print(f"{len(series)} series found") + preprocessed_series = [] + for i, s in enumerate(series): + s = self.fill_na(s) + s = self.cut_nan_start(s) + s = self.add_obligatory_columns(s) + s = self.convert_formats(s) + s["split"] = "train" + if s.shape[0] > self.prediction_length * 2 + self.context_length: + s["series"] = i + preprocessed_series.append(s) + if i == len(series) - 1: + logging.info( + f"Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}" + ) - def inherited_dataset(self, split1, split2): - df1 = self.dataset[lambda x: x.split == split1] # previous split fragment - df2 = self.dataset[lambda x: x.split == split2] # split part - inh_dataset = pd.concat([df1, df2]) - return inh_dataset + logging.info(f"{len(preprocessed_series)} long enough series found") + print(f"{len(preprocessed_series)} long enough series found") + # logging.info(f"") + self.dataset = pd.concat(preprocessed_series) + if self.dataset["series"].max() != len(series) - 1: + self.dropped_recent_series = True + else: + self.dropped_recent_series = False + self.dataset.index = range(self.dataset.shape[0]) -- GitLab From 0669d5460db883179075e55804f870754121c856 Mon Sep 17 00:00:00 2001 From: Anna Warno Date: Wed, 15 Sep 2021 09:27:04 +0200 Subject: [PATCH 2/2] test removed --- deployment/arima/main.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/deployment/arima/main.py b/deployment/arima/main.py index e53670df..361b146d 100644 --- a/deployment/arima/main.py +++ b/deployment/arima/main.py @@ -111,18 +111,18 @@ def main(): "2", StartForecastingListener(start_conn.conn, START_TOPIC) ) - msg1 = Msg() - msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 30000}]' - msg2 = Msg() - msg2.body = ( - "{" - + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' - + "}" - ) - - print(msg1) - StartListener(start_conn.conn, START_APP_TOPIC).on_message(msg1) - StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2) + # msg1 = Msg() + # msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 30000}]' + # msg2 = Msg() + # msg2.body = ( + # "{" + # + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' + # + "}" + # ) + + # print(msg1) + # StartListener(start_conn.conn, START_APP_TOPIC).on_message(msg1) + # StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2) while True: pass -- GitLab