diff --git a/deployment/arima/.~lock.demo.csv# b/deployment/arima/.~lock.demo.csv# new file mode 100644 index 0000000000000000000000000000000000000000..07cbfa7de1074723757c6642d9a245df9fe18428 --- /dev/null +++ b/deployment/arima/.~lock.demo.csv# @@ -0,0 +1 @@ +,awarno,bulls-ThinkPad-T480,28.09.2021 09:11,file:///home/awarno/.config/libreoffice/4; \ No newline at end of file diff --git a/deployment/arima/env b/deployment/arima/env index 150f8c5eea3783eb32376e5738c6d15b08689080..1d3b68b4ee14a2de71cd64d909ad4fbcc84aedf4 100644 --- a/deployment/arima/env +++ b/deployment/arima/env @@ -1,15 +1,12 @@ AMQ_HOSTNAME=localhost -AMQ_USER=morphemic -AMQ_PASSWORD=morphemic -AMQ_HOST=147.102.17.76 -AMQ_PORT_BROKER=61610 -APP_NAME=default_application +AMQ_USER=admin +AMQ_PASSWORD=admin +AMQ_PORT=61613 +APP_NAME=demo METHOD=arima DATA_PATH=./ -INFLUXDB_HOSTNAME=147.102.17.76 +INFLUXDB_HOSTNAME=localhost INFLUXDB_PORT=8086 INFLUXDB_USERNAME=morphemic INFLUXDB_PASSWORD=password INFLUXDB_DBNAME=morphemic - - diff --git a/deployment/arima/main.py b/deployment/arima/main.py index de065605b1e95b7047e6e4832415499dc57eeef7..e5675fb894c45e6469b041ee8250b0cf63f1a75f 100644 --- a/deployment/arima/main.py +++ b/deployment/arima/main.py @@ -108,11 +108,11 @@ def main(): ) # msg1 = Msg() - # msg1.body = '[{"metric": "value", "level": 3, "publish_rate": 30000}]' + # msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "metric_0", "level": 3, "publish_rate": 30000}, {"metric": "metric_1", "level": 3, "publish_rate": 30000}]' # msg2 = Msg() # msg2.body = ( # "{" - # + f'"metrics": ["value"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' + # + f'"metrics": ["MinimumCores", "metric_0", "metric_1"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 60' # + "}" # ) diff --git a/deployment/arima/predict.py b/deployment/arima/predict.py index 4c44195786f3221f7b406322007c128bdbe2d930..4d2942f2b5ac5de0882d29217bbb045a35fa0d43 100644 --- a/deployment/arima/predict.py +++ b/deployment/arima/predict.py @@ -122,7 +122,8 @@ def main(): for metric in predicted_metrics: prediction_msgs = predict( metric, - prediction_length, + prediction_lengths[metric], + single_prediction_points_length=prediction_points_horizons[metric], prediction_hor=prediction_horizon, timestamp=time_0, ) @@ -153,15 +154,10 @@ def main(): if __name__ == "__main__": logging.basicConfig( - filename=f"logs/{os.environ.get('METHOD', 'arima')}.out", + filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO, - datefmt="%Y-%m-%d %H:%M:%S", - format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", ) - logging.Formatter.converter = lambda *args: datetime.now( - tz=timezone(TZ) - ).timetuple() msg = json.loads(sys.argv[1]) metrics_info = { m["metric"]: { @@ -173,15 +169,19 @@ if __name__ == "__main__": time_0 = msg["epoch_start"] prediction_horizon = msg["prediction_horizon"] * 1000 - prediction_points_horizon = msg["prediction_horizon"] * 1000 / msg["publish_rate"] + prediction_points_horizons = { + metric["metric"]: msg["prediction_horizon"] * 1000 // metric["publish_rate"] + for metric in msg["all_metrics"] + } predicted_metrics = set(msg["metrics"]) prediction_cycle = msg["prediction_horizon"] - prediction_length = ( - msg["prediction_horizon"] + prediction_lengths = { + metric["metric"]: msg["prediction_horizon"] * 1000 - // msg["publish_rate"] + // metric["publish_rate"] * msg["number_of_forward_predictions"] - ) + for metric in msg["all_metrics"] + } logging.info(f"Predicted metrics: {predicted_metrics}") number_of_forward_predictions = { metric: msg["number_of_forward_predictions"] for metric in predicted_metrics diff --git a/deployment/arima/src/example.py b/deployment/arima/src/example.py new file mode 100644 index 0000000000000000000000000000000000000000..75ff52b0674ea7b5066c82ba11db7ed8c17276ed --- /dev/null +++ b/deployment/arima/src/example.py @@ -0,0 +1,2 @@ +def square(x): + return x * x \ No newline at end of file diff --git a/deployment/arima/src/model_predict.py b/deployment/arima/src/model_predict.py index 9ffa970c066efe6dc34fd99d6f957dcd21e621f8..7c519a0ec8f16e892fb90dd8b3357cf86c94f893 100644 --- a/deployment/arima/src/model_predict.py +++ b/deployment/arima/src/model_predict.py @@ -21,6 +21,7 @@ TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") def predict( target_column, prediction_length, + single_prediction_points_length=1, yaml_file="model.yaml", prediction_hor=60, timestamp=0, @@ -42,51 +43,77 @@ def predict( new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) if new_ts_dataset.dropped_recent_series: # series with recent data was too short logging.info( - f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + f"METRIC: {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + print( + f"METRIC: {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) - print("Not enough fresh data, unable to predict TIME:") return None dataset = pd.read_csv(data_path) ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + if ts_dataset.dataset.shape[0] < 1: + logging.info( + f"METRIC: {target_column} Not enough fresh preprocessed data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + print( + f"METRIC: {target_column} Not enough fresh preprocessed data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + return None + ts_dataset = ts_dataset.dataset[ ts_dataset.dataset.series == ts_dataset.dataset.series.max() - ] + ].tail(1000) pred_len = params["dataset"]["prediction_length"] best_model_aic = None - for order in [(2, 1, 2), (3, 0, 1), (1, 0, 1), (3, 1, 0)]: - sarima = sm.tsa.statespace.SARIMAX(ts_dataset[target_column], order=order) - model = sarima.fit() - if best_model_aic: - if model.aic < best_model_aic: + for order in [(2, 1, 2), (3, 0, 1), (1, 0, 1), (3, 1, 0), (1, 0, 0), (4, 0, 2)]: + try: + sarima = sm.tsa.statespace.SARIMAX( + ts_dataset[target_column], order=order, enforce_stationarity=False + ) + model = sarima.fit() + if best_model_aic: + if model.aic < best_model_aic: + predictions = model.get_forecast( + pred_len, return_conf_int=True, alpha=0.05 + ) + best_model_aic = model.aic + else: predictions = model.get_forecast( pred_len, return_conf_int=True, alpha=0.05 ) best_model_aic = model.aic - else: - predictions = model.get_forecast(pred_len, return_conf_int=True, alpha=0.05) - best_model_aic = model.aic + except np.linalg.LinAlgError: + logging.info( + f"METRIC: {target_column} SARIMAX model order: {order} failed for metric {target_column} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + if not best_model_aic: + logging.info( + f"METRIC: {target_column} All SARIMAX failed for metric: {target_column}, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + return None msgs = [] for i in range(pred_len): - msg = { - target_column: { - "metricValue": predictions.predicted_mean.values[i], - "level": 1, # TODO - "timestamp": int(time.time()), - "probability": 0.95, - "confidence_interval": [ - predictions.conf_int(alpha=0.05).iloc[i].values[0], - predictions.conf_int(alpha=0.05).iloc[i].values[1], - ], # quantiles difference - "predictionTime": timestamp + (i + 1) * (prediction_hor // 1000), - "refersTo": "TODO", - "cloud": "TODO", - "provider": "TODO", + if ((i + 1) % single_prediction_points_length) == 0: + msg = { + target_column: { + "metricValue": predictions.predicted_mean.values[i], + "level": 1, # TODO + "timestamp": int(time.time()), + "probability": 0.95, + "confidence_interval": [ + predictions.conf_int(alpha=0.05).iloc[i].values[0], + predictions.conf_int(alpha=0.05).iloc[i].values[1], + ], # quantiles difference + "predictionTime": timestamp + (i + 1) * (prediction_hor // 1000), + "refersTo": "TODO", + "cloud": "TODO", + "provider": "TODO", + } } - } - msgs.append(msg) + msgs.append(msg) return msgs diff --git a/deployment/arima/src/preprocess_dataset.py b/deployment/arima/src/preprocess_dataset.py index a39aedeb8a93880dc0fd4044661f31ca7e9f2396..8ccc6cd73395e4fbb116ad671e93fb0381e41060 100644 --- a/deployment/arima/src/preprocess_dataset.py +++ b/deployment/arima/src/preprocess_dataset.py @@ -34,13 +34,18 @@ class Dataset(object): self.context_length = context_length self.prediction_length = prediction_length self.dataset = dataset - self.check_gap() + self.dropped_recent_series = True # default set to be true + if self.dataset.shape[0] > 0: + self.check_gap() self.n = dataset.shape[0] def cut_nan_start(self, dataset): dataset.index = range(dataset.shape[0]) first_not_nan_index = dataset[self.target_column].first_valid_index() - return dataset[dataset.index > first_not_nan_index] + if first_not_nan_index > -1: + return dataset[dataset.index > first_not_nan_index] + else: + return dataset.dropna() def fill_na(self, dataset): dataset = dataset.replace("None", np.nan) @@ -68,43 +73,63 @@ class Dataset(object): return dataset def check_gap(self): - max_gap = self.dataset["time"].diff().abs().max() - logging.info(f"Max time gap in series {max_gap}") - print(f"Max time gap in series {max_gap}") - series_freq = self.dataset["time"].diff().value_counts().index.values[0] - logging.info(f"Detected series with {series_freq} frequency") - print(f"Detected series with {series_freq} frequency") - # check series length - series = np.split( - self.dataset, - *np.where( - self.dataset["time"].diff().abs().fillna(0).astype(int) - >= np.abs(self.max_missing_values * series_freq) - ), - ) - logging.info(f"{len(series)} series found") - print(f"{len(series)} series found") - preprocessed_series = [] - for i, s in enumerate(series): - s = self.fill_na(s) - s = self.cut_nan_start(s) - s = self.add_obligatory_columns(s) - s = self.convert_formats(s) - s["split"] = "train" - if s.shape[0] > self.prediction_length * 2 + self.context_length: - s["series"] = i - preprocessed_series.append(s) - if i == len(series) - 1: + self.dataset[self.target_column] = pd.to_numeric( + self.dataset[self.target_column], errors="coerce" + ).fillna(np.nan) + self.dataset = self.dataset.dropna(subset=[self.target_column]) + if self.dataset.shape[0] > 0: + max_gap = self.dataset["time"].diff().abs().max() + logging.info( + f"Metric: {self.target_column} Max time gap in series {max_gap}" + ) + print(f" Metric: {self.target_column} Max time gap in series {max_gap}") + self.series_freq = ( + self.dataset["time"] // 1e9 + ).diff().value_counts().index.values[0] * 1e9 + logging.info( + f"Metric: {self.target_column} Detected series with {self.series_freq} frequency" + ) + print( + f"Metric: {self.target_column} Detected series with {self.series_freq} frequency" + ) + # check series length + series = np.split( + self.dataset, + *np.where( + self.dataset["time"].diff().abs().fillna(0).astype(int) + >= np.abs(self.max_missing_values * self.series_freq) + ), + ) + logging.info(f"Metric: {self.target_column} {len(series)} series found") + print(f"{len(series)} series found") + preprocessed_series = [] + for i, s in enumerate(series): + s = self.fill_na(s) + s = self.cut_nan_start(s) + s = self.add_obligatory_columns(s) + s = self.convert_formats(s) + s["split"] = "train" logging.info( - f"Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}" + f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}" ) + if s.shape[0] > self.prediction_length * 2 + self.context_length: + s["series"] = i + preprocessed_series.append(s) + if i == len(series) - 1: + logging.info( + f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}" + ) - logging.info(f"{len(preprocessed_series)} long enough series found") - print(f"{len(preprocessed_series)} long enough series found") - # logging.info(f"") - self.dataset = pd.concat(preprocessed_series) - if self.dataset["series"].max() != len(series) - 1: - self.dropped_recent_series = True - else: - self.dropped_recent_series = False - self.dataset.index = range(self.dataset.shape[0]) + logging.info( + f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found" + ) + print(f"{len(preprocessed_series)} long enough series found") + if preprocessed_series: + self.dataset = pd.concat(preprocessed_series) + if self.dataset["series"].max() != len(series) - 1: + self.dropped_recent_series = True + else: + self.dropped_recent_series = False + else: + self.dataset = pd.DataFrame() + self.dataset.index = range(self.dataset.shape[0]) diff --git a/deployment/arima/test/model_predict_test.py b/deployment/arima/test/model_predict_test.py new file mode 100644 index 0000000000000000000000000000000000000000..5be30b004d5a26ecc270b5263c063d40b7b59e02 --- /dev/null +++ b/deployment/arima/test/model_predict_test.py @@ -0,0 +1,179 @@ +import sys + +sys.path.append(".") + +import pytest +from src.model_predict import predict +import pandas as pd +import numpy as np +import random + + +@pytest.fixture +def df_1(): + df = pd.DataFrame({"time": [], "metric_0": []}) + return df + + +@pytest.fixture +def df_2(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + df["metric_0"] = np.nan + return df + + +@pytest.fixture +def df_3(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.nan + return df + + +@pytest.fixture +def df_4(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 3)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_5(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 3)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_6(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + return df + + +@pytest.fixture +def df_7(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna("None") + return df + + +@pytest.fixture +def df_8(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna(np.inf) + return df + + +@pytest.fixture +def df_9(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_10(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.inf + return df + + +@pytest.fixture +def df_11(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + if i % 2 == 0: + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_12(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = [ + np.nan if i % 2 == 0 else random.random() for i in range(1000) + ] + return df + + +@pytest.fixture +def df(request): + return request.getfixturevalue(request.param) + + +@pytest.fixture +def metric(): + return "metric_0" + + +@pytest.fixture +def prediction_length(): + return 60 + + +@pytest.mark.parametrize( + "df,metric,prediction_length", + [ + ("df_1", metric, prediction_length), + ("df_2", metric, prediction_length), + ("df_3", metric, prediction_length), + ("df_4", metric, prediction_length), + ("df_5", metric, prediction_length), + ("df_6", metric, prediction_length), + ("df_7", metric, prediction_length), + ("df_8", metric, prediction_length), + ("df_9", metric, prediction_length), + ("df_10", metric, prediction_length), + ("df_11", metric, prediction_length), + ("df_12", metric, prediction_length), + ], + indirect=True, +) +def test_predict(df, metric, prediction_length): + df.to_csv("demo.csv") + output = predict(metric, prediction_length) + if output: + print("True") + assert True diff --git a/deployment/arima/test/preprocess_dataset_test.py b/deployment/arima/test/preprocess_dataset_test.py new file mode 100644 index 0000000000000000000000000000000000000000..80e36837f8ecabf504d19ee45df9b48f038a177a --- /dev/null +++ b/deployment/arima/test/preprocess_dataset_test.py @@ -0,0 +1,172 @@ +import sys + +sys.path.append(".") + +import pytest +from src.preprocess_dataset import Dataset +import pandas as pd +import numpy as np +import random + + +@pytest.fixture +def df_1(): + df = pd.DataFrame({"time": [], "metric_0": []}) + return df + + +@pytest.fixture +def df_2(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + df["metric_0"] = np.nan + return df + + +@pytest.fixture +def df_3(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.nan + return df + + +@pytest.fixture +def df_4(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 3)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_5(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 3)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_6(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + return df + + +@pytest.fixture +def df_7(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna("None") + return df + + +@pytest.fixture +def df_8(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna(np.inf) + return df + + +@pytest.fixture +def df_9(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_10(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.inf + return df + + +@pytest.fixture +def df_11(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + if i % 2 == 0: + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_12(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = [ + np.nan if i % 2 == 0 else random.random() for i in range(1000) + ] + return df + + +@pytest.fixture +def df(request): + return request.getfixturevalue(request.param) + + +@pytest.fixture +def metric(): + return "metric_0" + + +class TestDataset: + @pytest.mark.parametrize( + "df,metric", + [ + ("df_1", metric), + ("df_2", metric), + ("df_3", metric), + ("df_4", metric), + ("df_5", metric), + ("df_6", metric), + ("df_7", metric), + ("df_8", metric), + ("df_9", metric), + ("df_10", metric), + ("df_11", metric), + ("df_12", metric), + ], + indirect=True, + ) + def test_init(self, df, metric): + preprocessed_dataset = Dataset(df, metric) + assert isinstance(preprocessed_dataset, Dataset) diff --git a/deployment/nbeats/env b/deployment/nbeats/env index a83d6aec69c8f0a1d106dee9a64b527c6e667e77..aa4cd5c1df91db819278744698b449d4d4448274 100644 --- a/deployment/nbeats/env +++ b/deployment/nbeats/env @@ -1,15 +1,12 @@ AMQ_HOSTNAME=localhost -AMQ_USER=morphemic -AMQ_PASSWORD=morphemic -AMQ_HOST=147.102.17.76 -AMQ_PORT_BROKER=61610 -APP_NAME=default_application -METHOD=nbeats +AMQ_USER=admin +AMQ_PASSWORD=admin +AMQ_PORT=61613 +APP_NAME=demo +METHOD=tft DATA_PATH=./ -INFLUXDB_HOSTNAME=147.102.17.76 +INFLUXDB_HOSTNAME=localhost INFLUXDB_PORT=8086 INFLUXDB_USERNAME=morphemic INFLUXDB_PASSWORD=password INFLUXDB_DBNAME=morphemic -TIME_ZONE=Europe/Vienna - diff --git a/deployment/nbeats/main.py b/deployment/nbeats/main.py index 4427fce4c7b7bfdfe61af0af79b58b5e9813159c..0cce7d090a7f4ee8830b91e14cf3cfb9f7a5344e 100644 --- a/deployment/nbeats/main.py +++ b/deployment/nbeats/main.py @@ -114,11 +114,11 @@ def main(): ) # msg1 = Msg() - # msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 30000}]' + # msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "metric_0", "level": 3, "publish_rate": 30000}, {"metric": "metric_1", "level": 3, "publish_rate": 30000}]' # msg2 = Msg() # msg2.body = ( # "{" - # + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' + # + f'"metrics": ["MinimumCores", "metric_0", "metric_1"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 60' # + "}" # ) diff --git a/deployment/nbeats/predict.py b/deployment/nbeats/predict.py index 2dd0091d5d75ef89739190384fdab82fcd1d85fb..8d3985ab6aa21ae0de4e9574e493bef96d9de2f0 100644 --- a/deployment/nbeats/predict.py +++ b/deployment/nbeats/predict.py @@ -122,15 +122,16 @@ def main(): for metric in predicted_metrics: predictions = None for i in range(number_of_forward_predictions[metric]): - print(int((i + 1) * prediction_points_horizon), "point idx") prediction_msgs, prediction = predict( metric, - prediction_length, + prediction_lengths[metric], extra_data=None, m=i + 1, prediction_hor=prediction_horizon, timestamp=time_0 + (i + 1) * (prediction_horizon // 1000), - predicted_point_idx=int((i + 1) * prediction_points_horizon - 1), + predicted_point_idx=int( + (i + 1) * prediction_points_horizons[metric] - 1 + ), ) if i == (number_of_forward_predictions[metric] - 1): print( @@ -189,15 +190,19 @@ if __name__ == "__main__": time_0 = msg["epoch_start"] prediction_horizon = msg["prediction_horizon"] * 1000 - prediction_points_horizon = msg["prediction_horizon"] * 1000 / msg["publish_rate"] + prediction_points_horizons = { + metric["metric"]: msg["prediction_horizon"] * 1000 // metric["publish_rate"] + for metric in msg["all_metrics"] + } predicted_metrics = set(msg["metrics"]) prediction_cycle = msg["prediction_horizon"] - prediction_length = ( - msg["prediction_horizon"] + prediction_lengths = { + metric["metric"]: msg["prediction_horizon"] * 1000 - // msg["publish_rate"] + // metric["publish_rate"] * msg["number_of_forward_predictions"] - ) + for metric in msg["all_metrics"] + } logging.info(f"Predicted metrics: {predicted_metrics}") number_of_forward_predictions = { metric: msg["number_of_forward_predictions"] for metric in predicted_metrics diff --git a/deployment/nbeats/retrain.py b/deployment/nbeats/retrain.py index 55eae32bcb58f7b1e10993d3a62cf1885b390e5a..fa69c0d5d64a267b98df27bce0f845d216d6d9b7 100644 --- a/deployment/nbeats/retrain.py +++ b/deployment/nbeats/retrain.py @@ -21,7 +21,7 @@ APP_NAME = os.environ.get("APP_NAME", "demo") TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") -def main(predicted_metrics, prediction_horizon): +def main(predicted_metrics, prediction_horizons): start_conn = morphemic.Connection( AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER ) @@ -33,7 +33,7 @@ def main(predicted_metrics, prediction_horizon): while True: start_time = int(time.time()) for metric in predicted_metrics: - retrain_msg = train(metric, prediction_horizon) + retrain_msg = train(metric, prediction_horizons[metric]) if retrain_msg: logging.info( f"Training completed for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" @@ -43,7 +43,7 @@ def main(predicted_metrics, prediction_horizon): else: print("Not enough data for model training, waiting ...") logging.info( - f"Not enough data for model training, waiting ... TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + f"METRIC: {metric} Not enough data for model training, waiting ... TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) end_time = int(time.time()) @@ -70,14 +70,15 @@ if __name__ == "__main__": } for m in msg["all_metrics"] } - prediction_horizon = ( - msg["prediction_horizon"] + prediction_horizons = { + metric["metric"]: msg["prediction_horizon"] * 1000 - // msg["publish_rate"] + // metric["publish_rate"] * msg["number_of_forward_predictions"] - ) + for metric in msg["all_metrics"] + } predicted_metrics = set(metrics_info.keys()) logging.info( f"Predicted metrics: {predicted_metrics} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) - main(predicted_metrics, prediction_horizon) + main(predicted_metrics, prediction_horizons) diff --git a/deployment/nbeats/src/model_predict.py b/deployment/nbeats/src/model_predict.py index 21f7fa2118a9ee073b02e9ef6fc612056a17110f..770089cfdeaa68be8f6923ea04bca7b9ec6fb31c 100644 --- a/deployment/nbeats/src/model_predict.py +++ b/deployment/nbeats/src/model_predict.py @@ -68,7 +68,7 @@ def predict( if extra_data is not None: dataset = pd.concat([dataset, extra_data[dataset.columns]], ignore_index=True) - lockfile = params["dataloader_path"] + ".pickle" + lockfile = f"{params['dataloader_path']}_{target_column}.pickle" lock = FileLock(lockfile + ".lock") with lock: diff --git a/deployment/nbeats/src/model_train.py b/deployment/nbeats/src/model_train.py index 785d9febd98a5617689144631103369abfb17383..9c697f1888d530d17fff9ecfc4e140a25f247adc 100644 --- a/deployment/nbeats/src/model_train.py +++ b/deployment/nbeats/src/model_train.py @@ -48,20 +48,27 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): if dataset.shape[0] < 14 * prediction_length: logging.info( - f"dataset len: {dataset.shape[0]}, minimum points required: {14 * prediction_length} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + f"METRIC: {target_column} dataset len: {dataset.shape[0]}, minimum points required: {14 * prediction_length} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) return None ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) - lockfile = params["dataloader_path"] + ".pickle" + print(ts_dataset.dataset, "DATASEEEET") + if ts_dataset.dataset.shape[0] < 1: + logging.info( + f"METRIC: {target_column} Preprocessed dataset len: {ts_dataset.dataset.shape[0]}, minimum points required: {14 * prediction_length} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + return None + + lockfile = f"{params['dataloader_path']}_{target_column}.pickle" lock = FileLock(lockfile + ".lock") with lock: with open(lockfile, "wb") as handle: pickle.dump(ts_dataset, handle) logging.info( - f"train dataset saved: {lockfile} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + f"METRIC: {target_column} train dataset saved: {lockfile} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) training = ts_dataset.ts_dataset diff --git a/deployment/nbeats/src/preprocess_dataset.py b/deployment/nbeats/src/preprocess_dataset.py index 9383d3d352c04a4da870943570c6d96eb749d556..01ae99663be229f1fbba9f60bf90ca00d79e5418 100644 --- a/deployment/nbeats/src/preprocess_dataset.py +++ b/deployment/nbeats/src/preprocess_dataset.py @@ -36,17 +36,23 @@ class Dataset(object): self.context_length = context_length self.prediction_length = prediction_length self.dataset = dataset - self.check_gap() + self.dropped_recent_series = True # default set to be true + if self.dataset.shape[0] > 0: + self.check_gap() self.n = dataset.shape[0] - self.ts_dataset = self.create_time_series_dataset() + if self.dataset.shape[0] > 0: + self.ts_dataset = self.create_time_series_dataset() def cut_nan_start(self, dataset): dataset.index = range(dataset.shape[0]) first_not_nan_index = dataset[self.target_column].first_valid_index() - return dataset[dataset.index > first_not_nan_index] + if first_not_nan_index > -1: + return dataset[dataset.index > first_not_nan_index] + else: + return dataset.dropna() def fill_na(self, dataset): - dataset = dataset.replace("None", np.nan) + dataset = dataset.replace(np.inf, np.nan) dataset = dataset.ffill(axis="rows") return dataset @@ -99,46 +105,69 @@ class Dataset(object): return ts_dataset def check_gap(self): - max_gap = self.dataset["time"].diff().abs().max() - logging.info(f"Max time gap in series {max_gap}") - print(f"Max time gap in series {max_gap}") - series_freq = self.dataset["time"].diff().value_counts().index.values[0] - logging.info(f"Detected series with {series_freq} frequency") - print(f"Detected series with {series_freq} frequency") - # check series length - series = np.split( - self.dataset, - *np.where( - self.dataset["time"].diff().abs().fillna(0).astype(int) - >= np.abs(self.max_missing_values * series_freq) - ), - ) - logging.info(f"{len(series)} series found") - print(f"{len(series)} series found") - preprocessed_series = [] - for i, s in enumerate(series): - s = self.fill_na(s) - s = self.cut_nan_start(s) - s = self.add_obligatory_columns(s) - s = self.convert_formats(s) - s["split"] = "train" - if s.shape[0] > self.prediction_length * 2 + self.context_length: - s["series"] = i - preprocessed_series.append(s) - if i == len(series) - 1: + self.dataset[self.target_column] = pd.to_numeric( + self.dataset[self.target_column], errors="coerce" + ).fillna(np.nan) + self.dataset = self.dataset.replace(np.inf, np.nan) + self.dataset = self.dataset.dropna(subset=[self.target_column]) + if self.dataset.shape[0] > 0: + max_gap = self.dataset["time"].diff().abs().max() + logging.info( + f"Metric: {self.target_column} Max time gap in series {max_gap}" + ) + print(f" Metric: {self.target_column} Max time gap in series {max_gap}") + series_freq = ( + self.dataset["time"] // 1e9 + ).diff().value_counts().index.values[0] * 1e9 + logging.info( + f"Metric: {self.target_column} Detected series with {series_freq} frequency" + ) + print( + f"Metric: {self.target_column} Detected series with {series_freq} frequency" + ) + # check series length + series = np.split( + self.dataset, + *np.where( + self.dataset["time"].diff().abs().fillna(0).astype(int) + >= np.abs(self.max_missing_values * series_freq) + ), + ) + logging.info(f"Metric: {self.target_column} {len(series)} series found") + print(f"{len(series)} series found") + preprocessed_series = [] + for i, s in enumerate(series): + s = self.fill_na(s) + s = self.cut_nan_start(s) + s = self.add_obligatory_columns(s) + s = self.convert_formats(s) + s["split"] = "train" logging.info( - f"Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}" + f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}" ) - - logging.info(f"{len(preprocessed_series)} long enough series found") - print(f"{len(preprocessed_series)} long enough series found") - # logging.info(f"") - self.dataset = pd.concat(preprocessed_series) - if self.dataset["series"].max() != len(series) - 1: - self.dropped_recent_series = True - else: - self.dropped_recent_series = False - self.dataset.index = range(self.dataset.shape[0]) + if s.shape[0] > self.prediction_length * 2 + self.context_length: + s["series"] = i + preprocessed_series.append(s) + if i == len(series) - 1: + logging.info( + f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}" + ) + + logging.info( + f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found" + ) + print(f"{len(preprocessed_series)} long enough series found") + # logging.info(f"") + if preprocessed_series: + self.dataset = pd.concat(preprocessed_series) + if self.dataset["series"].max() != len(series) - 1: + self.dropped_recent_series = True + else: + self.dropped_recent_series = False + else: + self.dataset = pd.DataFrame() + self.dropped_recent_series = True + self.dataset.index = range(self.dataset.shape[0]) def inherited_dataset(self, split1, split2): df1 = ( diff --git a/deployment/nbeats/test/model_predict_test.py b/deployment/nbeats/test/model_predict_test.py new file mode 100644 index 0000000000000000000000000000000000000000..5be30b004d5a26ecc270b5263c063d40b7b59e02 --- /dev/null +++ b/deployment/nbeats/test/model_predict_test.py @@ -0,0 +1,179 @@ +import sys + +sys.path.append(".") + +import pytest +from src.model_predict import predict +import pandas as pd +import numpy as np +import random + + +@pytest.fixture +def df_1(): + df = pd.DataFrame({"time": [], "metric_0": []}) + return df + + +@pytest.fixture +def df_2(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + df["metric_0"] = np.nan + return df + + +@pytest.fixture +def df_3(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.nan + return df + + +@pytest.fixture +def df_4(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 3)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_5(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 3)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_6(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + return df + + +@pytest.fixture +def df_7(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna("None") + return df + + +@pytest.fixture +def df_8(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna(np.inf) + return df + + +@pytest.fixture +def df_9(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_10(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.inf + return df + + +@pytest.fixture +def df_11(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + if i % 2 == 0: + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_12(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = [ + np.nan if i % 2 == 0 else random.random() for i in range(1000) + ] + return df + + +@pytest.fixture +def df(request): + return request.getfixturevalue(request.param) + + +@pytest.fixture +def metric(): + return "metric_0" + + +@pytest.fixture +def prediction_length(): + return 60 + + +@pytest.mark.parametrize( + "df,metric,prediction_length", + [ + ("df_1", metric, prediction_length), + ("df_2", metric, prediction_length), + ("df_3", metric, prediction_length), + ("df_4", metric, prediction_length), + ("df_5", metric, prediction_length), + ("df_6", metric, prediction_length), + ("df_7", metric, prediction_length), + ("df_8", metric, prediction_length), + ("df_9", metric, prediction_length), + ("df_10", metric, prediction_length), + ("df_11", metric, prediction_length), + ("df_12", metric, prediction_length), + ], + indirect=True, +) +def test_predict(df, metric, prediction_length): + df.to_csv("demo.csv") + output = predict(metric, prediction_length) + if output: + print("True") + assert True diff --git a/deployment/nbeats/test/preprocess_dataset_test.py b/deployment/nbeats/test/preprocess_dataset_test.py new file mode 100644 index 0000000000000000000000000000000000000000..80e36837f8ecabf504d19ee45df9b48f038a177a --- /dev/null +++ b/deployment/nbeats/test/preprocess_dataset_test.py @@ -0,0 +1,172 @@ +import sys + +sys.path.append(".") + +import pytest +from src.preprocess_dataset import Dataset +import pandas as pd +import numpy as np +import random + + +@pytest.fixture +def df_1(): + df = pd.DataFrame({"time": [], "metric_0": []}) + return df + + +@pytest.fixture +def df_2(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + df["metric_0"] = np.nan + return df + + +@pytest.fixture +def df_3(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.nan + return df + + +@pytest.fixture +def df_4(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 3)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_5(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 3)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_6(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + return df + + +@pytest.fixture +def df_7(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna("None") + return df + + +@pytest.fixture +def df_8(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna(np.inf) + return df + + +@pytest.fixture +def df_9(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_10(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.inf + return df + + +@pytest.fixture +def df_11(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + if i % 2 == 0: + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_12(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = [ + np.nan if i % 2 == 0 else random.random() for i in range(1000) + ] + return df + + +@pytest.fixture +def df(request): + return request.getfixturevalue(request.param) + + +@pytest.fixture +def metric(): + return "metric_0" + + +class TestDataset: + @pytest.mark.parametrize( + "df,metric", + [ + ("df_1", metric), + ("df_2", metric), + ("df_3", metric), + ("df_4", metric), + ("df_5", metric), + ("df_6", metric), + ("df_7", metric), + ("df_8", metric), + ("df_9", metric), + ("df_10", metric), + ("df_11", metric), + ("df_12", metric), + ], + indirect=True, + ) + def test_init(self, df, metric): + preprocessed_dataset = Dataset(df, metric) + assert isinstance(preprocessed_dataset, Dataset) diff --git a/deployment/tft/env b/deployment/tft/env index 1a2ad8d813c13f5a3f67c5d194cf952d1ed8a683..aa4cd5c1df91db819278744698b449d4d4448274 100644 --- a/deployment/tft/env +++ b/deployment/tft/env @@ -1,15 +1,12 @@ AMQ_HOSTNAME=localhost -AMQ_USER=morphemic -AMQ_PASSWORD=morphemic -AMQ_HOST=147.102.17.76 -AMQ_PORT_BROKER=61610 -APP_NAME=default_application +AMQ_USER=admin +AMQ_PASSWORD=admin +AMQ_PORT=61613 +APP_NAME=demo METHOD=tft DATA_PATH=./ -INFLUXDB_HOSTNAME=147.102.17.76 +INFLUXDB_HOSTNAME=localhost INFLUXDB_PORT=8086 INFLUXDB_USERNAME=morphemic INFLUXDB_PASSWORD=password INFLUXDB_DBNAME=morphemic - - diff --git a/deployment/tft/main.py b/deployment/tft/main.py index 361b146d36f09411874b42157a5f166b5c8a0c77..4896ffe6a04f800faa7a707da5fc9b5c42febdfd 100644 --- a/deployment/tft/main.py +++ b/deployment/tft/main.py @@ -9,6 +9,8 @@ from datetime import datetime from pytz import timezone from datetime import datetime +# from src.log import logger + AMQ_USER = os.environ.get("AMQ_USER", "admin") AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin") AMQ_HOST = os.environ.get("AMQ_HOST", "localhost") @@ -112,11 +114,11 @@ def main(): ) # msg1 = Msg() - # msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 30000}]' + # msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "metric_0", "level": 3, "publish_rate": 30000}, {"metric": "metric_1", "level": 3, "publish_rate": 30000}]' # msg2 = Msg() # msg2.body = ( # "{" - # + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' + # + f'"metrics": ["MinimumCores", "metric_0", "metric_1"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' # + "}" # ) diff --git a/deployment/tft/predict.py b/deployment/tft/predict.py index dc057f4219a9716bd7da1cbf11ee968111531558..1bd61287b649071fd360a47d2dfa9e42d03926f8 100644 --- a/deployment/tft/predict.py +++ b/deployment/tft/predict.py @@ -122,24 +122,22 @@ def main(): for metric in predicted_metrics: predictions = None for i in range(number_of_forward_predictions[metric]): - print(int((i + 1) * prediction_points_horizon), "point idx") prediction_msgs, prediction = predict( metric, - prediction_length, + prediction_lengths[metric], extra_data=None, m=i + 1, prediction_hor=prediction_horizon, timestamp=time_0 + (i + 1) * (prediction_horizon // 1000), - predicted_point_idx=int((i + 1) * prediction_points_horizon - 1), + predicted_point_idx=int( + (i + 1) * prediction_points_horizons[metric] - 1 + ), ) if i == (number_of_forward_predictions[metric] - 1): print( f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}" ) - # if predictions is not None: - # predictions = pd.concat( - # [predictions, prediction], ignore_index=True - # ) + else: predictions = prediction @@ -179,14 +177,8 @@ def main(): if __name__ == "__main__": logging.basicConfig( filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", - level=logging.INFO, - datefmt="%Y-%m-%d %H:%M:%S", - format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", ) - logging.Formatter.converter = lambda *args: datetime.now( - tz=timezone(TZ) - ).timetuple() msg = json.loads(sys.argv[1]) metrics_info = { m["metric"]: { @@ -198,15 +190,20 @@ if __name__ == "__main__": time_0 = msg["epoch_start"] prediction_horizon = msg["prediction_horizon"] * 1000 - prediction_points_horizon = msg["prediction_horizon"] * 1000 / msg["publish_rate"] + prediction_points_horizons = { + metric["metric"]: msg["prediction_horizon"] * 1000 // metric["publish_rate"] + for metric in msg["all_metrics"] + } + print(prediction_points_horizons) predicted_metrics = set(msg["metrics"]) prediction_cycle = msg["prediction_horizon"] - prediction_length = ( - msg["prediction_horizon"] + prediction_lengths = { + metric["metric"]: msg["prediction_horizon"] * 1000 - // msg["publish_rate"] + // metric["publish_rate"] * msg["number_of_forward_predictions"] - ) + for metric in msg["all_metrics"] + } logging.info(f"Predicted metrics: {predicted_metrics}") number_of_forward_predictions = { metric: msg["number_of_forward_predictions"] for metric in predicted_metrics diff --git a/deployment/tft/retrain.py b/deployment/tft/retrain.py index 2a6e654d17e71abb3ec2aac8cdc632634c556200..fa69c0d5d64a267b98df27bce0f845d216d6d9b7 100644 --- a/deployment/tft/retrain.py +++ b/deployment/tft/retrain.py @@ -8,6 +8,7 @@ from amq_message_python_library import * from src.dataset_maker import CSVData import pytz import time +from pytz import timezone from datetime import datetime TOPIC_NAME = "training_models" @@ -19,12 +20,8 @@ AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613") APP_NAME = os.environ.get("APP_NAME", "demo") TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") -logging.basicConfig( - filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO -) - -def main(predicted_metrics, prediction_horizon): +def main(predicted_metrics, prediction_horizons): start_conn = morphemic.Connection( AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER ) @@ -36,7 +33,7 @@ def main(predicted_metrics, prediction_horizon): while True: start_time = int(time.time()) for metric in predicted_metrics: - retrain_msg = train(metric, prediction_horizon) + retrain_msg = train(metric, prediction_horizons[metric]) if retrain_msg: logging.info( f"Training completed for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" @@ -46,7 +43,7 @@ def main(predicted_metrics, prediction_horizon): else: print("Not enough data for model training, waiting ...") logging.info( - f"Not enough data for model training, waiting ... TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + f"METRIC: {metric} Not enough data for model training, waiting ... TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) end_time = int(time.time()) @@ -60,6 +57,10 @@ def main(predicted_metrics, prediction_horizon): if __name__ == "__main__": + logging.basicConfig( + filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", + ) + logging.info(f"Training loop started") msg = json.loads(sys.argv[1]) metrics_info = { @@ -69,14 +70,15 @@ if __name__ == "__main__": } for m in msg["all_metrics"] } - prediction_horizon = ( - msg["prediction_horizon"] + prediction_horizons = { + metric["metric"]: msg["prediction_horizon"] * 1000 - // msg["publish_rate"] + // metric["publish_rate"] * msg["number_of_forward_predictions"] - ) + for metric in msg["all_metrics"] + } predicted_metrics = set(metrics_info.keys()) logging.info( f"Predicted metrics: {predicted_metrics} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) - main(predicted_metrics, prediction_horizon) + main(predicted_metrics, prediction_horizons) diff --git a/deployment/tft/src/model_predict.py b/deployment/tft/src/model_predict.py index 2f7f9b5c073da5907ac44d254ce2e864b497eba1..91bbe32dca961802dd8ee0df65ec34b3df14a470 100644 --- a/deployment/tft/src/model_predict.py +++ b/deployment/tft/src/model_predict.py @@ -43,7 +43,7 @@ def predict( if not os.path.isfile(model_path): # no pretrained model, unable to predict logging.info( - f"no pretrained model, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + f"METRIC {target_column} no pretrained model, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) print("no pretrained model, unable to predict") return (None, None) @@ -59,9 +59,9 @@ def predict( new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) if new_ts_dataset.dropped_recent_series: # series with recent data was too short logging.info( - f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + f"METRIC {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) - print("Not enough fresh data, unable to predict TIME:") + print("METRIC {target_column} Not enough fresh data, unable to predict TIME:") return (None, None) dataset = new_ts_dataset.dataset @@ -69,7 +69,7 @@ def predict( if extra_data is not None: dataset = pd.concat([dataset, extra_data[dataset.columns]], ignore_index=True) - lockfile = params["dataloader_path"] + ".pickle" + lockfile = f"{params['dataloader_path']}_{target_column}.pickle" lock = FileLock(lockfile + ".lock") with lock: @@ -110,7 +110,9 @@ def predict( model_path = os.path.join(params["save_path"], f"{target_column}.pth") if not os.path.isfile(model_path): # no pretrained model, unable to predict - logging.info(f"No pretrained model unable to predict") + logging.info( + f"METRIC {target_column} No pretrained model unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) return (None, None) with lock: diff --git a/deployment/tft/src/model_train.py b/deployment/tft/src/model_train.py index 8a5e139378e9485f1db6cacb9ab5da37f6145a95..fa7e43c940cf99594c15d0182763399532844b89 100644 --- a/deployment/tft/src/model_train.py +++ b/deployment/tft/src/model_train.py @@ -56,7 +56,7 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) - lockfile = params["dataloader_path"] + ".pickle" + lockfile = f"{params['dataloader_path']}_{target_column}.pickle" lock = FileLock(lockfile + ".lock") with lock: diff --git a/deployment/tft/src/preprocess_dataset.py b/deployment/tft/src/preprocess_dataset.py index 1880094eb7955e8062377ca74f91928e27f02317..84d80b05db2b4545cda75ebf43a3926fbc7afa3e 100644 --- a/deployment/tft/src/preprocess_dataset.py +++ b/deployment/tft/src/preprocess_dataset.py @@ -36,17 +36,23 @@ class Dataset(object): self.context_length = context_length self.prediction_length = prediction_length self.dataset = dataset - self.check_gap() + self.dropped_recent_series = True # default set to be true + if self.dataset.shape[0] > 0: + self.check_gap() self.n = dataset.shape[0] - self.ts_dataset = self.create_time_series_dataset() + if self.dataset.shape[0] > 0: + self.ts_dataset = self.create_time_series_dataset() def cut_nan_start(self, dataset): dataset.index = range(dataset.shape[0]) first_not_nan_index = dataset[self.target_column].first_valid_index() - return dataset[dataset.index > first_not_nan_index] + if first_not_nan_index > -1: + return dataset[dataset.index > first_not_nan_index] + else: + return dataset.dropna() def fill_na(self, dataset): - dataset = dataset.replace("None", np.nan) + dataset = dataset.replace(np.inf, np.nan) dataset = dataset.ffill(axis="rows") return dataset @@ -107,46 +113,69 @@ class Dataset(object): return ts_dataset def check_gap(self): - max_gap = self.dataset["time"].diff().abs().max() - logging.info(f"Max time gap in series {max_gap}") - print(f"Max time gap in series {max_gap}") - series_freq = self.dataset["time"].diff().value_counts().index.values[0] - logging.info(f"Detected series with {series_freq} frequency") - print(f"Detected series with {series_freq} frequency") - # check series length - series = np.split( - self.dataset, - *np.where( - self.dataset["time"].diff().abs().fillna(0).astype(int) - >= np.abs(self.max_missing_values * series_freq) - ), - ) - logging.info(f"{len(series)} series found") - print(f"{len(series)} series found") - preprocessed_series = [] - for i, s in enumerate(series): - s = self.fill_na(s) - s = self.cut_nan_start(s) - s = self.add_obligatory_columns(s) - s = self.convert_formats(s) - s["split"] = "train" - if s.shape[0] > self.prediction_length * 2 + self.context_length: - s["series"] = i - preprocessed_series.append(s) - if i == len(series) - 1: + self.dataset[self.target_column] = pd.to_numeric( + self.dataset[self.target_column], errors="coerce" + ).fillna(np.nan) + self.dataset = self.dataset.replace(np.inf, np.nan) + self.dataset = self.dataset.dropna(subset=[self.target_column]) + if self.dataset.shape[0] > 0: + max_gap = self.dataset["time"].diff().abs().max() + logging.info( + f"Metric: {self.target_column} Max time gap in series {max_gap}" + ) + print(f" Metric: {self.target_column} Max time gap in series {max_gap}") + series_freq = ( + self.dataset["time"] // 1e9 + ).diff().value_counts().index.values[0] * 1e9 + logging.info( + f"Metric: {self.target_column} Detected series with {series_freq} frequency" + ) + print( + f"Metric: {self.target_column} Detected series with {series_freq} frequency" + ) + # check series length + series = np.split( + self.dataset, + *np.where( + self.dataset["time"].diff().abs().fillna(0).astype(int) + >= np.abs(self.max_missing_values * series_freq) + ), + ) + logging.info(f"Metric: {self.target_column} {len(series)} series found") + print(f"{len(series)} series found") + preprocessed_series = [] + for i, s in enumerate(series): + s = self.fill_na(s) + s = self.cut_nan_start(s) + s = self.add_obligatory_columns(s) + s = self.convert_formats(s) + s["split"] = "train" logging.info( - f"Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}" + f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}" ) - - logging.info(f"{len(preprocessed_series)} long enough series found") - print(f"{len(preprocessed_series)} long enough series found") - # logging.info(f"") - self.dataset = pd.concat(preprocessed_series) - if self.dataset["series"].max() != len(series) - 1: - self.dropped_recent_series = True - else: - self.dropped_recent_series = False - self.dataset.index = range(self.dataset.shape[0]) + if s.shape[0] > self.prediction_length * 2 + self.context_length: + s["series"] = i + preprocessed_series.append(s) + if i == len(series) - 1: + logging.info( + f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}" + ) + + logging.info( + f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found" + ) + print(f"{len(preprocessed_series)} long enough series found") + # logging.info(f"") + if preprocessed_series: + self.dataset = pd.concat(preprocessed_series) + if self.dataset["series"].max() != len(series) - 1: + self.dropped_recent_series = True + else: + self.dropped_recent_series = False + else: + self.dataset = pd.DataFrame() + self.dropped_recent_series = True + self.dataset.index = range(self.dataset.shape[0]) def inherited_dataset(self, split1, split2): df1 = ( diff --git a/deployment/tft/test/model_predict_test.py b/deployment/tft/test/model_predict_test.py new file mode 100644 index 0000000000000000000000000000000000000000..5be30b004d5a26ecc270b5263c063d40b7b59e02 --- /dev/null +++ b/deployment/tft/test/model_predict_test.py @@ -0,0 +1,179 @@ +import sys + +sys.path.append(".") + +import pytest +from src.model_predict import predict +import pandas as pd +import numpy as np +import random + + +@pytest.fixture +def df_1(): + df = pd.DataFrame({"time": [], "metric_0": []}) + return df + + +@pytest.fixture +def df_2(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + df["metric_0"] = np.nan + return df + + +@pytest.fixture +def df_3(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.nan + return df + + +@pytest.fixture +def df_4(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 3)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_5(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 3)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_6(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + return df + + +@pytest.fixture +def df_7(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna("None") + return df + + +@pytest.fixture +def df_8(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna(np.inf) + return df + + +@pytest.fixture +def df_9(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_10(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.inf + return df + + +@pytest.fixture +def df_11(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + if i % 2 == 0: + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_12(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = [ + np.nan if i % 2 == 0 else random.random() for i in range(1000) + ] + return df + + +@pytest.fixture +def df(request): + return request.getfixturevalue(request.param) + + +@pytest.fixture +def metric(): + return "metric_0" + + +@pytest.fixture +def prediction_length(): + return 60 + + +@pytest.mark.parametrize( + "df,metric,prediction_length", + [ + ("df_1", metric, prediction_length), + ("df_2", metric, prediction_length), + ("df_3", metric, prediction_length), + ("df_4", metric, prediction_length), + ("df_5", metric, prediction_length), + ("df_6", metric, prediction_length), + ("df_7", metric, prediction_length), + ("df_8", metric, prediction_length), + ("df_9", metric, prediction_length), + ("df_10", metric, prediction_length), + ("df_11", metric, prediction_length), + ("df_12", metric, prediction_length), + ], + indirect=True, +) +def test_predict(df, metric, prediction_length): + df.to_csv("demo.csv") + output = predict(metric, prediction_length) + if output: + print("True") + assert True diff --git a/deployment/tft/test/preprocess_dataset_test.py b/deployment/tft/test/preprocess_dataset_test.py new file mode 100644 index 0000000000000000000000000000000000000000..80e36837f8ecabf504d19ee45df9b48f038a177a --- /dev/null +++ b/deployment/tft/test/preprocess_dataset_test.py @@ -0,0 +1,172 @@ +import sys + +sys.path.append(".") + +import pytest +from src.preprocess_dataset import Dataset +import pandas as pd +import numpy as np +import random + + +@pytest.fixture +def df_1(): + df = pd.DataFrame({"time": [], "metric_0": []}) + return df + + +@pytest.fixture +def df_2(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + df["metric_0"] = np.nan + return df + + +@pytest.fixture +def df_3(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.nan + return df + + +@pytest.fixture +def df_4(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 3)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_5(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 3)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_6(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + return df + + +@pytest.fixture +def df_7(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna("None") + return df + + +@pytest.fixture +def df_8(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna(np.inf) + return df + + +@pytest.fixture +def df_9(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_10(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.inf + return df + + +@pytest.fixture +def df_11(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + if i % 2 == 0: + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_12(): + df = pd.DataFrame() + df["time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = [ + np.nan if i % 2 == 0 else random.random() for i in range(1000) + ] + return df + + +@pytest.fixture +def df(request): + return request.getfixturevalue(request.param) + + +@pytest.fixture +def metric(): + return "metric_0" + + +class TestDataset: + @pytest.mark.parametrize( + "df,metric", + [ + ("df_1", metric), + ("df_2", metric), + ("df_3", metric), + ("df_4", metric), + ("df_5", metric), + ("df_6", metric), + ("df_7", metric), + ("df_8", metric), + ("df_9", metric), + ("df_10", metric), + ("df_11", metric), + ("df_12", metric), + ], + indirect=True, + ) + def test_init(self, df, metric): + preprocessed_dataset = Dataset(df, metric) + assert isinstance(preprocessed_dataset, Dataset)