diff --git a/deployment/nbeats/loggers_config.ini b/deployment/nbeats/loggers_config.ini deleted file mode 100644 index 2cfdc2d03ad16097454d40a2de76511248409c6b..0000000000000000000000000000000000000000 --- a/deployment/nbeats/loggers_config.ini +++ /dev/null @@ -1,31 +0,0 @@ -[loggers] -keys=root,retrain, main - -[handlers] -keys=consoleHandler - -[formatters] -keys=defaultFormatter - -[logger_root] -handlers=consoleHandler - -[logger_retrain] -handlers=consoleHandler -level=INFO -qualname=core -propagate=0 - -[logger_main] -handlers=consoleHandler -level=DEBUG -qualname=__main__ -propagate=0 - -[handler_consoleHandler] -class=logging.StreamHandler -formatter=defaultFormatter -args=(sys.stdout,) - -[formatter_defaultFormatter] -format=%(asctime)s %(levelname)s %(asctime)s %(filename)s - %(message)s \ No newline at end of file diff --git a/deployment/nbeats/main.py b/deployment/nbeats/main.py index 5e341e7c39558e09b25a319337b759c72fe53c68..3e63599d873edee1e1f7f803e3fb11f3a5998c3d 100644 --- a/deployment/nbeats/main.py +++ b/deployment/nbeats/main.py @@ -129,7 +129,7 @@ def main(): # msg2 = Msg() # msg2.body = ( # "{" - # + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 120' + # + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 20' # + "}" # ) diff --git a/deployment/nbeats/model.yaml b/deployment/nbeats/model.yaml index 8563e936a4850f1493389204dd34df93c2a28429..b16e9771dfe18b42532a531cd333fc88e6cc8837 100644 --- a/deployment/nbeats/model.yaml +++ b/deployment/nbeats/model.yaml @@ -1,8 +1,8 @@ data: csv_path: demo.csv training: - bs: 8 - max_epochs: 8 + bs: 64 + max_epochs: 20 loss: rmse dataset: tv_unknown_reals: [] diff --git a/deployment/nbeats/predict.py b/deployment/nbeats/predict.py index e8b4c7dfd761f25787b20ab7860f38686268d2d9..c56c10c8eb6bb373523d1fe2ffc668eee3e07aaa 100644 --- a/deployment/nbeats/predict.py +++ b/deployment/nbeats/predict.py @@ -92,9 +92,6 @@ def main(): logging.info( f"Dataset downloaded TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) - # logging.info( - # f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" - # ) influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME) @@ -114,10 +111,10 @@ def main(): while True: start_time = int(time.time()) - log.info( + logging.info( f"prediction TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) - log.info( + logging.info( f"prediction loop started TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) dataset_preprocessor.prepare_csv() @@ -125,22 +122,24 @@ def main(): for metric in predicted_metrics: predictions = None for i in range(number_of_forward_predictions[metric]): + print(int((i + 1) * prediction_points_horizon), "point idx") prediction_msgs, prediction = predict( metric, - (prediction_cycle * 1000) // msg["publish_rate"], + prediction_length, extra_data=predictions, m=i + 1, prediction_hor=prediction_horizon, timestamp=time_0 + (i + 1) * (prediction_horizon // 1000), + predicted_point_idx=int((i + 1) * prediction_points_horizon - 1), ) if i == (number_of_forward_predictions[metric] - 1): print( f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}" ) - if predictions is not None: - predictions = pd.concat( - [predictions, prediction], ignore_index=True - ) + # if predictions is not None: + # predictions = pd.concat( + # [predictions, prediction], ignore_index=True + # ) else: predictions = prediction @@ -188,7 +187,6 @@ if __name__ == "__main__": logging.Formatter.converter = lambda *args: datetime.now( tz=timezone(TZ) ).timetuple() - log = logging.getLogger() msg = json.loads(sys.argv[1]) metrics_info = { m["metric"]: { @@ -200,10 +198,16 @@ if __name__ == "__main__": time_0 = msg["epoch_start"] prediction_horizon = msg["prediction_horizon"] * 1000 + prediction_points_horizon = msg["prediction_horizon"] * 1000 / msg["publish_rate"] predicted_metrics = set(msg["metrics"]) prediction_cycle = msg["prediction_horizon"] - - log.info(f"Predicted metrics: {predicted_metrics}") + prediction_length = ( + msg["prediction_horizon"] + * 1000 + // msg["publish_rate"] + * msg["number_of_forward_predictions"] + ) + logging.info(f"Predicted metrics: {predicted_metrics}") number_of_forward_predictions = { metric: msg["number_of_forward_predictions"] for metric in predicted_metrics } # deafult number of forward predictions diff --git a/deployment/nbeats/retrain.py b/deployment/nbeats/retrain.py index 1fccaa2c6991a2eba752d51c01180cff7359fb11..b3b9340601756737476cd0a571144b1ae40e98a0 100644 --- a/deployment/nbeats/retrain.py +++ b/deployment/nbeats/retrain.py @@ -65,7 +65,12 @@ if __name__ == "__main__": } for m in msg["all_metrics"] } - prediction_horizon = (msg["prediction_horizon"] * 1000) // msg["publish_rate"] + prediction_horizon = ( + msg["prediction_horizon"] + * 1000 + // msg["publish_rate"] + * msg["number_of_forward_predictions"] + ) predicted_metrics = set(metrics_info.keys()) logging.info( f"Predicted metrics: {predicted_metrics} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" diff --git a/deployment/nbeats/src/model_predict.py b/deployment/nbeats/src/model_predict.py index b382da4f1539ac471c023ce93870b411b480b3a0..21f7fa2118a9ee073b02e9ef6fc612056a17110f 100644 --- a/deployment/nbeats/src/model_predict.py +++ b/deployment/nbeats/src/model_predict.py @@ -30,14 +30,13 @@ def predict( m=1, prediction_hor=60, timestamp=0, + predicted_point_idx=0, ): with open(yaml_file) as file: params = yaml.load(file, Loader=yaml.FullLoader) params["dataset"]["prediction_length"] = prediction_length - params["dataset"]["context_length"] = prediction_length * 10 - - print(prediction_length, "prediction length") + params["dataset"]["context_length"] = prediction_length * 12 model_path = os.path.join(params["save_path"], f"{target_column}.pth") @@ -52,35 +51,49 @@ def predict( os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' ) + if not os.path.isfile(data_path): + return (None, None) + dataset = pd.read_csv(data_path) - # dataset[target_column] = range(dataset.shape[0]) + new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + if new_ts_dataset.dropped_recent_series: # series with recent data was too short + logging.info( + f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + print("Not enough fresh data, unable to predict TIME:") + return (None, None) + + dataset = new_ts_dataset.dataset if extra_data is not None: - dataset = pd.concat([dataset, extra_data], ignore_index=True) + dataset = pd.concat([dataset, extra_data[dataset.columns]], ignore_index=True) lockfile = params["dataloader_path"] + ".pickle" lock = FileLock(lockfile + ".lock") - if os.path.isfile(lockfile): - with lock: - with open(lockfile, "rb") as handle: - ts_dataset = pickle.load(handle) - ts_dataset.prepare_dataset(dataset) - ts_dataset.dataset["split"] = "train" - - # ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + with lock: + with open(lockfile, "rb") as handle: + ts_dataset = pickle.load(handle) + ts_dataset.dataset = dataset + ts_dataset.check_gap() + ts_dataset.dataset["split"] = "train" + print("dataset downloaded from checkpoint") pred_len = params["dataset"]["prediction_length"] - - future_df = ts_dataset.dataset.iloc[[-1 for _ in range(pred_len)]] - future_time_idx = list(range(ts_dataset.n, ts_dataset.n + pred_len)) - future_df["time_idx"] = future_time_idx + future_df = dataset.tail(pred_len).copy() + future_df[target_column] = 0 + future_df = pd.concat( + [dataset.tail(params["dataset"]["context_length"]), future_df] + ).reset_index() + last_series_length = new_ts_dataset.dataset[ + new_ts_dataset.dataset["series"] == new_ts_dataset.dataset["series"].max() + ].shape[0] + future_df["time_idx"] = range( + last_series_length - params["dataset"]["context_length"], + last_series_length + pred_len, + ) future_df["split"] = "future" - - ts_dataset.dataset = pd.concat([ts_dataset.dataset, future_df]).reset_index() - # print(ts_dataset.dataset[target_column]) - - prediction_input = ts_dataset.inherited_dataset("train", "future") + future_df["series"] = str(new_ts_dataset.dataset.series.max()) model = NBeats.from_dataset( ts_dataset.ts_dataset, @@ -92,13 +105,13 @@ def predict( lockfile = params["save_path"] lock = FileLock(lockfile + ".lock") - if os.path.isfile(lockfile): - with lock: - model.load_state_dict(torch.load(model_path)) + with lock: + model.load_state_dict(torch.load(model_path)) + prediction_input = ts_dataset.get_from_dataset(future_df) prediction_input = prediction_input.to_dataloader(train=False) - prediction = model.predict(prediction_input, mode="raw")["prediction"] + prediction = model.predict(future_df) predictions_with_dropout = [] model.train() @@ -107,12 +120,11 @@ def predict( for _ in range(20): for x, _ in prediction_input: # make prediction - out = model(x) # raw output is dictionary - out = torch.flatten(model.transform_output(out))[-1] + out = model(x) + out = torch.flatten(model.transform_output(out))[predicted_point_idx] out = out.item() predictions_with_dropout.append(out) - # print(model.to_prediction(model.forward(first[0])), "TRANSFORMED") conf_intervals = st.t.interval( alpha=0.95, df=len(predictions_with_dropout) - 1, @@ -120,7 +132,9 @@ def predict( scale=st.sem(predictions_with_dropout), ) - predicted_values = list(conf_intervals) + [torch.flatten(prediction)[-1].item()] + predicted_values = list(conf_intervals) + [ + torch.flatten(prediction)[predicted_point_idx].item() + ] predicted_values.sort() # ensure that predictions and confidence intervals are in correct order msg = { @@ -139,8 +153,11 @@ def predict( "provider": "TODO", } } - logging.debug(f"prediction msg: {msg}") + logging.info( + f"prediction msg: {msg} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + future_df = future_df.tail(pred_len) future_df["split"] = "val" - future_df[target_column] = torch.flatten(torch.mean(prediction, axis=0)).numpy() + future_df[target_column] = prediction[0] return (msg, future_df) diff --git a/deployment/nbeats/src/model_train.py b/deployment/nbeats/src/model_train.py index 9dc7eb0de609247fd636519f046779b7a5aab590..dd3be7060eed80329650000e55a7295e777dbb3f 100644 --- a/deployment/nbeats/src/model_train.py +++ b/deployment/nbeats/src/model_train.py @@ -11,9 +11,12 @@ from pytorch_forecasting.metrics import QuantileLoss, MAE, RMSE, CrossEntropy from pytorch_forecasting import NBeats from src.preprocess_dataset import Dataset import pickle +import pytz +from datetime import datetime """Script for temporal fusion transformer training""" +TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") LOSSES_DICT = { "mae": MAE(), @@ -28,18 +31,20 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): with open(yaml_file) as file: params = yaml.load(file, Loader=yaml.FullLoader) params["dataset"]["prediction_length"] = prediction_length - params["dataset"]["context_length"] = prediction_length * 10 + params["dataset"]["context_length"] = prediction_length * 12 data_path = os.path.join( os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' ) + if not os.path.isfile(data_path): + return None + dataset = pd.read_csv(data_path) - # dataset[target_column] = range(dataset.shape[0]) - if dataset.shape[0] < 12 * prediction_length: + if dataset.shape[0] < 14 * prediction_length: logging.info( - f"dataset len: {dataset.shape[0]}, minimum points required: {12 * prediction_length}" + f"dataset len: {dataset.shape[0]}, minimum points required: {14 * prediction_length} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) return None @@ -51,12 +56,16 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): with lock: with open(lockfile, "wb") as handle: pickle.dump(ts_dataset, handle) - print(f"train dataset saved: {lockfile}") + logging.info( + f"train dataset saved: {lockfile} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) training = ts_dataset.ts_dataset - validation = ts_dataset.inherited_dataset( - "train", "val" - ) # only train and val splits will be used + validation = ts_dataset.get_from_dataset( + ts_dataset.dataset[ + ts_dataset.dataset["series"] == ts_dataset.dataset["series"].max() + ].tail(14 * prediction_length) + ) bs = params["training"]["bs"] @@ -66,7 +75,7 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): val_dataloader = validation.to_dataloader(train=False, batch_size=bs, num_workers=6) early_stop_callback = EarlyStopping( - monitor="val_loss", min_delta=1e-5, patience=8, verbose=False, mode="min" + monitor="train_loss", min_delta=1e-5, patience=8, verbose=False, mode="min" ) lr_logger = LearningRateMonitor() trainer = pl.Trainer( @@ -74,16 +83,17 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): gpus=0, gradient_clip_val=0.5, # callbacks=[lr_logger, early_stop_callback], - # checkpoint_callback=False, + checkpoint_callback=False, logger=None, ) model = NBeats.from_dataset( training, - dropout=0.1, - loss=LOSSES_DICT[params["training"]["loss"]], + learning_rate=4e-3, log_interval=-1, - reduce_on_plateau_patience=5, + weight_decay=1e-2, + widths=[32, 512], + backcast_loss_ratio=1.0, ) model_path = os.path.join(params["save_path"], f"{target_column}.pth") @@ -112,5 +122,4 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): "forecasting_method": os.environ.get("METHOD", "nbetas"), "timestamp": int(time.time()), } - print(msg) return msg diff --git a/deployment/nbeats/src/preprocess_dataset.py b/deployment/nbeats/src/preprocess_dataset.py index 39d433e09cb3da331f0064b00dd597ac93382a04..3affc9505d9391d775adbd0ca33452487a2967de 100644 --- a/deployment/nbeats/src/preprocess_dataset.py +++ b/deployment/nbeats/src/preprocess_dataset.py @@ -2,6 +2,7 @@ import pandas as pd from pytorch_forecasting import TimeSeriesDataSet from pytorch_forecasting.data import NaNLabelEncoder import numpy as np +import logging pd.options.mode.chained_assignment = None @@ -14,15 +15,18 @@ class Dataset(object): self, dataset, target_column="value", - tv_unknown_reals="[]", - known_reals="[]", - tv_unknown_cat="[]", - static_reals="[]", + tv_unknown_reals=[], + known_reals=[], + tv_unknown_cat=[], + static_reals=[], classification=0, context_length=40, prediction_length=5, ): + self.max_missing_values = ( + 20 # max consecutive missing values allowed per series + ) self.target_column = target_column self.tv_unknown_cat = tv_unknown_cat self.known_reals = known_reals @@ -31,30 +35,24 @@ class Dataset(object): self.classification = classification self.context_length = context_length self.prediction_length = prediction_length - self.prepare_dataset(dataset) + self.dataset = dataset + self.check_gap() # self.dataset = self.cut_nan_start(dataset) # self.fill_na() # self.dataset = self.add_obligatory_columns(self.dataset) # self.dataset = self.convert_formats(self.dataset) - # self.n = dataset.shape[0] - self.ts_dataset = self.create_time_series_dataset() - - def prepare_dataset(self, dataset): - self.dataset = self.cut_nan_start(dataset) - self.fill_na() - self.dataset = self.add_obligatory_columns(self.dataset) - self.dataset = self.convert_formats(self.dataset) self.n = dataset.shape[0] + self.ts_dataset = self.create_time_series_dataset() def cut_nan_start(self, dataset): - first_not_nan_index = dataset[self.target_column][ - dataset[self.target_column] != "None" - ].index[0] + dataset.index = range(dataset.shape[0]) + first_not_nan_index = dataset[self.target_column].first_valid_index() return dataset[dataset.index > first_not_nan_index] - def fill_na(self): - self.dataset = self.dataset.replace("None", np.nan) - self.dataset = self.dataset.ffill(axis="rows") + def fill_na(self, dataset): + dataset = dataset.replace("None", np.nan) + dataset = dataset.ffill(axis="rows") + return dataset def convert_formats(self, dataset): if not self.classification: @@ -72,7 +70,7 @@ class Dataset(object): dataset["series"] = 0 dataset["split"] = "train" n = dataset.shape[0] - dataset["split"][int(n * 0.9) :] = "val" + dataset["split"][int(n * 0.8) :] = "val" dataset["time_idx"] = range(n) # TODO check time gaps return dataset @@ -100,10 +98,48 @@ class Dataset(object): max_prediction_length=self.prediction_length, min_prediction_length=self.prediction_length, add_relative_time_idx=False, - allow_missings=True, + allow_missings=False, ) return ts_dataset + def check_gap(self): + max_gap = self.dataset["time"].diff().abs().max() + logging.info(f"Max time gap in series {max_gap}") + print(f"Max time gap in series {max_gap}") + series_freq = self.dataset["time"].diff().value_counts().index.values[0] + logging.info(f"Detected series with {series_freq} frequency") + print(f"Detected series with {series_freq} frequency") + # check series length + series = np.split( + self.dataset, + *np.where( + self.dataset["time"].diff().abs().fillna(0).astype(int) + >= np.abs(self.max_missing_values * series_freq) + ), + ) + logging.info(f"{len(series)} series found") + print(f"{len(series)} series found") + preprocessed_series = [] + for i, s in enumerate(series): + s = self.fill_na(s) + s = self.cut_nan_start(s) + s = self.add_obligatory_columns(s) + s = self.convert_formats(s) + s["split"] = "train" + if s.shape[0] > self.prediction_length * 2 + self.context_length: + s["series"] = i + preprocessed_series.append(s) + + logging.info(f"{len(preprocessed_series)} long enough series found") + print(f"{len(preprocessed_series)} long enough series found") + # logging.info(f"") + self.dataset = pd.concat(preprocessed_series) + if self.dataset["series"].max() != len(series) - 1: + self.dropped_recent_series = True + else: + self.dropped_recent_series = False + self.dataset.index = range(self.dataset.shape[0]) + def inherited_dataset(self, split1, split2): df1 = ( self.dataset[lambda x: x.split == split1] @@ -122,3 +158,119 @@ class Dataset(object): return TimeSeriesDataSet.from_dataset( self.ts_dataset, dataset, min_prediction_idx=0, stop_randomization=True ) + + +# class Dataset(object): +# def __init__( +# self, +# dataset, +# target_column="value", +# tv_unknown_reals="[]", +# known_reals="[]", +# tv_unknown_cat="[]", +# static_reals="[]", +# classification=0, +# context_length=40, +# prediction_length=5, +# ): + +# self.target_column = target_column +# self.tv_unknown_cat = tv_unknown_cat +# self.known_reals = known_reals +# self.tv_unknown_reals = tv_unknown_reals +# self.static_reals = static_reals +# self.classification = classification +# self.context_length = context_length +# self.prediction_length = prediction_length +# self.prepare_dataset(dataset) +# # self.dataset = self.cut_nan_start(dataset) +# # self.fill_na() +# # self.dataset = self.add_obligatory_columns(self.dataset) +# # self.dataset = self.convert_formats(self.dataset) +# # self.n = dataset.shape[0] +# self.ts_dataset = self.create_time_series_dataset() + +# def prepare_dataset(self, dataset): +# self.dataset = self.cut_nan_start(dataset) +# self.fill_na() +# self.dataset = self.add_obligatory_columns(self.dataset) +# self.dataset = self.convert_formats(self.dataset) +# self.n = dataset.shape[0] + +# def cut_nan_start(self, dataset): +# first_not_nan_index = dataset[self.target_column][ +# dataset[self.target_column] != "None" +# ].index[0] +# return dataset[dataset.index > first_not_nan_index] + +# def fill_na(self): +# self.dataset = self.dataset.replace("None", np.nan) +# self.dataset = self.dataset.ffill(axis="rows") + +# def convert_formats(self, dataset): +# if not self.classification: +# dataset[self.target_column] = dataset[self.target_column].astype(float) +# else: +# dataset[self.target_column] = dataset[self.target_column].astype(int) + +# for name in self.tv_unknown_cat: +# dataset[name] = dataset[name].astype(str) + +# dataset["series"] = dataset["series"].astype(str) +# return dataset + +# def add_obligatory_columns(self, dataset): +# dataset["series"] = 0 +# dataset["split"] = "train" +# n = dataset.shape[0] +# dataset["split"][int(n * 0.9) :] = "val" +# dataset["time_idx"] = range(n) # TODO check time gaps +# return dataset + +# def create_time_series_dataset(self): +# if not self.classification: +# self.time_varying_unknown_reals = [ +# self.target_column +# ] + self.tv_unknown_reals +# self.time_varying_unknown_categoricals = self.tv_unknown_cat +# else: +# self.time_varying_unknown_reals = self.tv_unknown_reals +# self.time_varying_unknown_categoricals = [ +# self.target_column +# ] + self.tv_unknown_cat + +# ts_dataset = TimeSeriesDataSet( +# self.dataset[lambda x: x.split == "train"], +# time_idx="time_idx", +# target=self.target_column, +# categorical_encoders={"series": NaNLabelEncoder().fit(self.dataset.series)}, +# group_ids=["series"], +# time_varying_unknown_reals=[self.target_column], +# min_encoder_length=self.context_length, +# max_encoder_length=self.context_length, +# max_prediction_length=self.prediction_length, +# min_prediction_length=self.prediction_length, +# add_relative_time_idx=False, +# ) +# return ts_dataset + +# def inherited_dataset(self, split1, split2): +# df1 = ( +# self.dataset[lambda x: x.split == split1] +# .groupby("series", as_index=False) +# .apply(lambda x: x.iloc[-self.context_length :]) +# ) # previous split fragment +# df2 = self.dataset[lambda x: x.split == split2] # split part +# inh_dataset = pd.concat([df1, df2]) +# inh_dataset = inh_dataset.sort_values(by=["series", "time_idx"]) +# inh_dataset = TimeSeriesDataSet.from_dataset( +# self.ts_dataset, inh_dataset, min_prediction_idx=0, stop_randomization=True +# ) +# return inh_dataset + +# def get_from_dataset(self, dataset): +# print(self.context_length + self.prediction_length) +# print(dataset) +# return TimeSeriesDataSet.from_dataset( +# self.ts_dataset, dataset, min_prediction_idx=0, stop_randomization=True +# ) diff --git a/deployment/tft/main.py b/deployment/tft/main.py index e70f263a85db2a365e8077d532ec0ad76d8ae7e3..3e63599d873edee1e1f7f803e3fb11f3a5998c3d 100644 --- a/deployment/tft/main.py +++ b/deployment/tft/main.py @@ -96,7 +96,7 @@ def main(): filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S", - format="AAA %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", + format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", ) logging.Formatter.converter = lambda *args: datetime.now( @@ -129,7 +129,7 @@ def main(): # msg2 = Msg() # msg2.body = ( # "{" - # + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 120' + # + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 20' # + "}" # ) diff --git a/deployment/tft/model.yaml b/deployment/tft/model.yaml index 24235c2d2d14934d36e045f7384dfefba5c1e91b..53f9b6b67ee2f11980e6eb565d4e0c4900885846 100644 --- a/deployment/tft/model.yaml +++ b/deployment/tft/model.yaml @@ -1,8 +1,8 @@ data: csv_path: demo.csv training: - bs: 8 - max_epochs: 10 + bs: 64 + max_epochs: 1 loss: quantile dataset: tv_unknown_reals: [] @@ -22,3 +22,5 @@ prediction: bs: 32 save_path: models +dataloader_path: + dataloader diff --git a/deployment/tft/predict.py b/deployment/tft/predict.py index e8b4c7dfd761f25787b20ab7860f38686268d2d9..c56c10c8eb6bb373523d1fe2ffc668eee3e07aaa 100644 --- a/deployment/tft/predict.py +++ b/deployment/tft/predict.py @@ -92,9 +92,6 @@ def main(): logging.info( f"Dataset downloaded TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) - # logging.info( - # f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" - # ) influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME) @@ -114,10 +111,10 @@ def main(): while True: start_time = int(time.time()) - log.info( + logging.info( f"prediction TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) - log.info( + logging.info( f"prediction loop started TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) dataset_preprocessor.prepare_csv() @@ -125,22 +122,24 @@ def main(): for metric in predicted_metrics: predictions = None for i in range(number_of_forward_predictions[metric]): + print(int((i + 1) * prediction_points_horizon), "point idx") prediction_msgs, prediction = predict( metric, - (prediction_cycle * 1000) // msg["publish_rate"], + prediction_length, extra_data=predictions, m=i + 1, prediction_hor=prediction_horizon, timestamp=time_0 + (i + 1) * (prediction_horizon // 1000), + predicted_point_idx=int((i + 1) * prediction_points_horizon - 1), ) if i == (number_of_forward_predictions[metric] - 1): print( f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}" ) - if predictions is not None: - predictions = pd.concat( - [predictions, prediction], ignore_index=True - ) + # if predictions is not None: + # predictions = pd.concat( + # [predictions, prediction], ignore_index=True + # ) else: predictions = prediction @@ -188,7 +187,6 @@ if __name__ == "__main__": logging.Formatter.converter = lambda *args: datetime.now( tz=timezone(TZ) ).timetuple() - log = logging.getLogger() msg = json.loads(sys.argv[1]) metrics_info = { m["metric"]: { @@ -200,10 +198,16 @@ if __name__ == "__main__": time_0 = msg["epoch_start"] prediction_horizon = msg["prediction_horizon"] * 1000 + prediction_points_horizon = msg["prediction_horizon"] * 1000 / msg["publish_rate"] predicted_metrics = set(msg["metrics"]) prediction_cycle = msg["prediction_horizon"] - - log.info(f"Predicted metrics: {predicted_metrics}") + prediction_length = ( + msg["prediction_horizon"] + * 1000 + // msg["publish_rate"] + * msg["number_of_forward_predictions"] + ) + logging.info(f"Predicted metrics: {predicted_metrics}") number_of_forward_predictions = { metric: msg["number_of_forward_predictions"] for metric in predicted_metrics } # deafult number of forward predictions diff --git a/deployment/tft/retrain.py b/deployment/tft/retrain.py index 9520486456642efd29d52c81a9e11d99f0c923f3..b3b9340601756737476cd0a571144b1ae40e98a0 100644 --- a/deployment/tft/retrain.py +++ b/deployment/tft/retrain.py @@ -34,14 +34,14 @@ def main(predicted_metrics, prediction_horizon): for metric in predicted_metrics: retrain_msg = train(metric, prediction_horizon) if retrain_msg: - logger.info( + logging.info( f"Training completed for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) start_conn.send_to_topic(TOPIC_NAME, retrain_msg) else: print("Not enough data for model training, waiting ...") - logger.info( + logging.info( f"Not enough data for model training, waiting ... TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) @@ -49,15 +49,14 @@ def main(predicted_metrics, prediction_horizon): time_to_wait = 60 * RETRAIN_CYCLE - (end_time - start_time) if time_to_wait < 0: time_to_wait = 60 * RETRAIN_CYCLE - (time_to_wait % 60 * RETRAIN_CYCLE) - logger.info( + logging.info( f"Waiting for the next training: {time_to_wait} seconds TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) time.sleep(time_to_wait) if __name__ == "__main__": - logger = logging.getLogger() - logger.info(f"Training loop started") + logging.info(f"Training loop started") msg = json.loads(sys.argv[1]) metrics_info = { m["metric"]: { @@ -66,9 +65,14 @@ if __name__ == "__main__": } for m in msg["all_metrics"] } - prediction_horizon = (msg["prediction_horizon"] * 1000) // msg["publish_rate"] + prediction_horizon = ( + msg["prediction_horizon"] + * 1000 + // msg["publish_rate"] + * msg["number_of_forward_predictions"] + ) predicted_metrics = set(metrics_info.keys()) - logger.info( + logging.info( f"Predicted metrics: {predicted_metrics} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) main(predicted_metrics, prediction_horizon) diff --git a/deployment/tft/src/model_predict.py b/deployment/tft/src/model_predict.py index b7eff82634fad7d5065ede8d74afc24459f8f7a7..1d984b23c547e1269dcdd98f51a825db9f7be1bd 100644 --- a/deployment/tft/src/model_predict.py +++ b/deployment/tft/src/model_predict.py @@ -10,6 +10,7 @@ from pytorch_forecasting import TemporalFusionTransformer import time import logging import pytz +import pickle from datetime import datetime TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") @@ -27,11 +28,12 @@ def predict( m=1, prediction_hor=60, timestamp=0, + predicted_point_idx=0, ): with open(yaml_file) as file: params = yaml.load(file, Loader=yaml.FullLoader) params["dataset"]["prediction_length"] = prediction_length - params["dataset"]["context_length"] = prediction_length * 10 + params["dataset"]["context_length"] = prediction_length * 12 model_path = os.path.join(params["save_path"], f"{target_column}.pth") @@ -39,39 +41,58 @@ def predict( logging.info( f"no pretrained model, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) + print("no pretrained model, unable to predict") return (None, None) data_path = os.path.join( os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' ) + if not os.path.isfile(data_path): + return (None, None) + dataset = pd.read_csv(data_path) + new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + if new_ts_dataset.dropped_recent_series: # series with recent data was too short + logging.info( + f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + print("Not enough fresh data, unable to predict TIME:") + return (None, None) + + dataset = new_ts_dataset.dataset if extra_data is not None: - dataset = pd.concat([dataset, extra_data], ignore_index=True) + dataset = pd.concat([dataset, extra_data[dataset.columns]], ignore_index=True) - ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + lockfile = params["dataloader_path"] + ".pickle" + lock = FileLock(lockfile + ".lock") - dataset = ts_dataset.dataset[ - lambda x: x.series == x.series.max() - ] # multiple or single series training? - ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + with lock: + with open(lockfile, "rb") as handle: + ts_dataset = pickle.load(handle) + ts_dataset.dataset = dataset + ts_dataset.check_gap() + ts_dataset.dataset["split"] = "train" + print("dataset downloaded from checkpoint") pred_len = params["dataset"]["prediction_length"] - - future_df = ts_dataset.dataset.iloc[[-1 for _ in range(pred_len)]] - - future_time_idx = list(range(ts_dataset.n, ts_dataset.n + pred_len)) - future_df["time_idx"] = future_time_idx - future_df["split"] = "future" - - ts_dataset.dataset = pd.concat([ts_dataset.dataset, future_df]).reset_index( - drop=True + future_df = dataset.tail(pred_len).copy() + future_df[target_column] = 0 + future_df = pd.concat( + [dataset.tail(params["dataset"]["context_length"]), future_df] + ).reset_index() + last_series_length = new_ts_dataset.dataset[ + new_ts_dataset.dataset["series"] == new_ts_dataset.dataset["series"].max() + ].shape[0] + future_df["time_idx"] = range( + last_series_length - params["dataset"]["context_length"], + last_series_length + pred_len, ) + future_df["split"] = "future" + future_df["series"] = str(new_ts_dataset.dataset.series.max()) - prediction_input = ts_dataset.inherited_dataset("val", "future") - - tft = TemporalFusionTransformer.from_dataset( + model = TemporalFusionTransformer.from_dataset( ts_dataset.ts_dataset, dropout=0.1, log_interval=-1, @@ -85,20 +106,21 @@ def predict( model_path = os.path.join(params["save_path"], f"{target_column}.pth") if not os.path.isfile(model_path): # no pretrained model, unable to predict - logging.debug(f"No pretrained model unable to predict") + logging.info(f"No pretrained model unable to predict") return (None, None) - if os.path.isfile(lockfile): - with lock: - tft.load_state_dict(torch.load(model_path)) + with lock: + model.load_state_dict(torch.load(model_path)) + prediction_input = ts_dataset.get_from_dataset(future_df) prediction_input = prediction_input.to_dataloader(train=False) - prediction = tft.predict(prediction_input, mode="raw")["prediction"] + + prediction = model.predict(future_df, mode="raw")["prediction"] predicted_values = [ - prediction[-1][-1][0].item(), - prediction[-1][-1][3].item(), - prediction[-1][-1][-1].item(), + prediction[-1][predicted_point_idx][0].item(), + prediction[-1][predicted_point_idx][3].item(), + prediction[-1][predicted_point_idx][-1].item(), ] predicted_values.sort() @@ -121,6 +143,8 @@ def predict( } logging.debug(f"prediction msg: {msg}") + future_df["split"] = "val" + future_df = future_df.tail(pred_len) future_df["split"] = "val" future_df[target_column] = prediction.permute(0, 2, 1)[0][3] return (msg, future_df) diff --git a/deployment/tft/src/model_train.py b/deployment/tft/src/model_train.py index 51191ef9edcfe5160563bf63a69479f28bf56864..795f1f114500912d2074ffb8eddee45e782e680a 100644 --- a/deployment/tft/src/model_train.py +++ b/deployment/tft/src/model_train.py @@ -10,10 +10,11 @@ from pytorch_lightning.callbacks import EarlyStopping, LearningRateMonitor from pytorch_forecasting.metrics import QuantileLoss, MAE, RMSE, CrossEntropy from pytorch_forecasting import TemporalFusionTransformer from src.preprocess_dataset import Dataset +import pickle +import pytz +from datetime import datetime -logging.basicConfig( - filename=f"/wd/logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO -) +TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") """Script for temporal fusion transformer training""" @@ -32,33 +33,40 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): with open(yaml_file) as file: params = yaml.load(file, Loader=yaml.FullLoader) params["dataset"]["prediction_length"] = prediction_length - params["dataset"]["context_length"] = prediction_length * 10 + params["dataset"]["context_length"] = prediction_length * 12 data_path = os.path.join( os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' ) + if not os.path.isfile(data_path): + return None + dataset = pd.read_csv(data_path) - if dataset.shape[0] < 12 * prediction_length: + if dataset.shape[0] < 14 * prediction_length: logging.info( - f"dataset len: {dataset.shape[0]}, minimum points required: {12 * prediction_length}" + f"dataset len: {dataset.shape[0]}, minimum points required: {14 * prediction_length} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) return None ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + lockfile = params["dataloader_path"] + ".pickle" + lock = FileLock(lockfile + ".lock") + + with lock: + with open(lockfile, "wb") as handle: + pickle.dump(ts_dataset, handle) + logging.info( + f"train dataset saved: {lockfile} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + training = ts_dataset.ts_dataset - validation = ts_dataset.inherited_dataset( - "train", "val" - ) # only train and val splits will be used - - print( - ts_dataset.dataset, - prediction_length, - "PR LEN", - "ENC LEN", - prediction_length * 10, + validation = ts_dataset.get_from_dataset( + ts_dataset.dataset[ + ts_dataset.dataset["series"] == ts_dataset.dataset["series"].max() + ].tail(14 * prediction_length) ) bs = params["training"]["bs"] @@ -76,7 +84,7 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): gpus=0, gradient_clip_val=0.5, # callbacks=[lr_logger, early_stop_callback], - # checkpoint_callback=False, # TODO: is pl checkpoint_callback thread safe? + checkpoint_callback=False, # TODO: is pl checkpoint_callback thread safe? logger=None, ) diff --git a/deployment/tft/src/preprocess_dataset.py b/deployment/tft/src/preprocess_dataset.py index 7974641ee6d12d602749cfc4d9d7309b29c1344c..1c9561d627b8cc0392519c784aecced12ca3091b 100644 --- a/deployment/tft/src/preprocess_dataset.py +++ b/deployment/tft/src/preprocess_dataset.py @@ -1,7 +1,8 @@ import pandas as pd -import numpy as np from pytorch_forecasting import TimeSeriesDataSet from pytorch_forecasting.data import NaNLabelEncoder +import numpy as np +import logging pd.options.mode.chained_assignment = None @@ -14,15 +15,18 @@ class Dataset(object): self, dataset, target_column="value", - tv_unknown_reals="[]", - known_reals="[]", - tv_unknown_cat="[]", - static_reals="[]", + tv_unknown_reals=[], + known_reals=[], + tv_unknown_cat=[], + static_reals=[], classification=0, context_length=40, prediction_length=5, ): + self.max_missing_values = ( + 20 # max consecutive missing values allowed per series + ) self.target_column = target_column self.tv_unknown_cat = tv_unknown_cat self.known_reals = known_reals @@ -31,22 +35,24 @@ class Dataset(object): self.classification = classification self.context_length = context_length self.prediction_length = prediction_length - self.dataset = self.cut_nan_start(dataset) - self.fill_na() - self.dataset = self.add_obligatory_columns(self.dataset) - self.dataset = self.convert_formats(self.dataset) + self.dataset = dataset + self.check_gap() + # self.dataset = self.cut_nan_start(dataset) + # self.fill_na() + # self.dataset = self.add_obligatory_columns(self.dataset) + # self.dataset = self.convert_formats(self.dataset) self.n = dataset.shape[0] self.ts_dataset = self.create_time_series_dataset() def cut_nan_start(self, dataset): - first_not_nan_index = dataset[self.target_column][ - dataset[self.target_column] != "None" - ].index[0] + dataset.index = range(dataset.shape[0]) + first_not_nan_index = dataset[self.target_column].first_valid_index() return dataset[dataset.index > first_not_nan_index] - def fill_na(self): - self.dataset = self.dataset.replace("None", np.nan) - self.dataset = self.dataset.ffill(axis="rows") + def fill_na(self, dataset): + dataset = dataset.replace("None", np.nan) + dataset = dataset.ffill(axis="rows") + return dataset def convert_formats(self, dataset): if not self.classification: @@ -104,6 +110,44 @@ class Dataset(object): ) return ts_dataset + def check_gap(self): + max_gap = self.dataset["time"].diff().abs().max() + logging.info(f"Max time gap in series {max_gap}") + print(f"Max time gap in series {max_gap}") + series_freq = self.dataset["time"].diff().value_counts().index.values[0] + logging.info(f"Detected series with {series_freq} frequency") + print(f"Detected series with {series_freq} frequency") + # check series length + series = np.split( + self.dataset, + *np.where( + self.dataset["time"].diff().abs().fillna(0).astype(int) + >= np.abs(self.max_missing_values * series_freq) + ), + ) + logging.info(f"{len(series)} series found") + print(f"{len(series)} series found") + preprocessed_series = [] + for i, s in enumerate(series): + s = self.fill_na(s) + s = self.cut_nan_start(s) + s = self.add_obligatory_columns(s) + s = self.convert_formats(s) + s["split"] = "train" + if s.shape[0] > self.prediction_length * 2 + self.context_length: + s["series"] = i + preprocessed_series.append(s) + + logging.info(f"{len(preprocessed_series)} long enough series found") + print(f"{len(preprocessed_series)} long enough series found") + # logging.info(f"") + self.dataset = pd.concat(preprocessed_series) + if self.dataset["series"].max() != len(series) - 1: + self.dropped_recent_series = True + else: + self.dropped_recent_series = False + self.dataset.index = range(self.dataset.shape[0]) + def inherited_dataset(self, split1, split2): df1 = ( self.dataset[lambda x: x.split == split1] @@ -117,3 +161,124 @@ class Dataset(object): self.ts_dataset, inh_dataset, min_prediction_idx=0, stop_randomization=True ) return inh_dataset + + def get_from_dataset(self, dataset): + return TimeSeriesDataSet.from_dataset( + self.ts_dataset, dataset, min_prediction_idx=0, stop_randomization=True + ) + + +# class Dataset(object): +# def __init__( +# self, +# dataset, +# target_column="value", +# tv_unknown_reals="[]", +# known_reals="[]", +# tv_unknown_cat="[]", +# static_reals="[]", +# classification=0, +# context_length=40, +# prediction_length=5, +# ): + +# self.target_column = target_column +# self.tv_unknown_cat = tv_unknown_cat +# self.known_reals = known_reals +# self.tv_unknown_reals = tv_unknown_reals +# self.static_reals = static_reals +# self.classification = classification +# self.context_length = context_length +# self.prediction_length = prediction_length +# self.prepare_dataset(dataset) +# # self.dataset = self.cut_nan_start(dataset) +# # self.fill_na() +# # self.dataset = self.add_obligatory_columns(self.dataset) +# # self.dataset = self.convert_formats(self.dataset) +# # self.n = dataset.shape[0] +# self.ts_dataset = self.create_time_series_dataset() + +# def prepare_dataset(self, dataset): +# self.dataset = self.cut_nan_start(dataset) +# self.fill_na() +# self.dataset = self.add_obligatory_columns(self.dataset) +# self.dataset = self.convert_formats(self.dataset) +# self.n = dataset.shape[0] + +# def cut_nan_start(self, dataset): +# first_not_nan_index = dataset[self.target_column][ +# dataset[self.target_column] != "None" +# ].index[0] +# return dataset[dataset.index > first_not_nan_index] + +# def fill_na(self): +# self.dataset = self.dataset.replace("None", np.nan) +# self.dataset = self.dataset.ffill(axis="rows") + +# def convert_formats(self, dataset): +# if not self.classification: +# dataset[self.target_column] = dataset[self.target_column].astype(float) +# else: +# dataset[self.target_column] = dataset[self.target_column].astype(int) + +# for name in self.tv_unknown_cat: +# dataset[name] = dataset[name].astype(str) + +# dataset["series"] = dataset["series"].astype(str) +# return dataset + +# def add_obligatory_columns(self, dataset): +# dataset["series"] = 0 +# dataset["split"] = "train" +# n = dataset.shape[0] +# dataset["split"][int(n * 0.9) :] = "val" +# dataset["time_idx"] = range(n) # TODO check time gaps +# return dataset + +# def create_time_series_dataset(self): +# if not self.classification: +# self.time_varying_unknown_reals = [ +# self.target_column +# ] + self.tv_unknown_reals +# self.time_varying_unknown_categoricals = self.tv_unknown_cat +# else: +# self.time_varying_unknown_reals = self.tv_unknown_reals +# self.time_varying_unknown_categoricals = [ +# self.target_column +# ] + self.tv_unknown_cat + +# ts_dataset = TimeSeriesDataSet( +# self.dataset[lambda x: x.split == "train"], +# time_idx="time_idx", +# target=self.target_column, +# categorical_encoders={"series": NaNLabelEncoder().fit(self.dataset.series)}, +# group_ids=["series"], +# time_varying_unknown_reals=[self.target_column], +# min_encoder_length=self.context_length, +# max_encoder_length=self.context_length, +# max_prediction_length=self.prediction_length, +# min_prediction_length=self.prediction_length, +# add_relative_time_idx=False, +# ) +# return ts_dataset + +# def inherited_dataset(self, split1, split2): +# df1 = ( +# self.dataset[lambda x: x.split == split1] +# .groupby("series", as_index=False) +# .apply(lambda x: x.iloc[-self.context_length :]) +# ) # previous split fragment +# df2 = self.dataset[lambda x: x.split == split2] # split part +# inh_dataset = pd.concat([df1, df2]) +# inh_dataset = inh_dataset.sort_values(by=["series", "time_idx"]) +# inh_dataset = TimeSeriesDataSet.from_dataset( +# self.ts_dataset, inh_dataset, min_prediction_idx=0, stop_randomization=True +# ) +# return inh_dataset + +# def get_from_dataset(self, dataset): +# print(self.context_length + self.prediction_length) +# print(dataset) +# return TimeSeriesDataSet.from_dataset( +# self.ts_dataset, dataset, min_prediction_idx=0, stop_randomization=True +# )