diff --git a/deployment/arima/main.py b/deployment/arima/main.py index 05184cf12da327bccf1dbd4f0fe2812fcca4ff82..7908bfe1c47e55d2dbf0151b9d684c329ece6356 100644 --- a/deployment/arima/main.py +++ b/deployment/arima/main.py @@ -3,17 +3,20 @@ import stomp import json from amq_message_python_library import * # python amq-message-python-library import logging +import time from datetime import datetime from pytz import timezone -from datetime import datetime import time +import setproctitle + +# from src.log import logger AMQ_USER = os.environ.get("AMQ_USER", "admin") AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin") AMQ_HOST = os.environ.get("AMQ_HOST", "localhost") AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613") START_APP_TOPIC = "metrics_to_predict" -METHOD = os.environ.get("METHOD", "nbeats") +METHOD = os.environ.get("METHOD", "model") START_TOPIC = f"start_forecasting.{METHOD}" TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") @@ -75,8 +78,9 @@ class Msg(object): def main(): + setproctitle.setproctitle(f"{os.environ.get('METHOD', 'model')}") logging.basicConfig( - filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", + filename=f"logs/{os.environ.get('METHOD', 'model')}.out", level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S", format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", @@ -121,6 +125,7 @@ def main(): # StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2) while True: + time.sleep(60) pass diff --git a/deployment/arima/predict.py b/deployment/arima/predict.py index 2460170b5608848a7f2a47998d0509b39e6d21cc..c87915b9bfd82bb6131f1bd5c8b294d760b6c86a 100644 --- a/deployment/arima/predict.py +++ b/deployment/arima/predict.py @@ -126,6 +126,7 @@ def main(): single_prediction_points_length=prediction_points_horizons[metric], prediction_hor=prediction_horizon, timestamp=time_0, + publish_rate=metrics_info[metric]["publish_rate"], ) if prediction_msgs: @@ -138,7 +139,16 @@ def main(): start_conn.send_to_topic(dest, message[metric]) influxdb_conn.send_to_influxdb(metric, message) + current_time = int(time.time()) + logging.info( + f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + print( + f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + end_time = int(time.time()) + time_0 = time_0 + prediction_cycle time_to_wait = time_0 - end_time if time_to_wait < 0: diff --git a/deployment/arima/requirements.txt b/deployment/arima/requirements.txt index e9ce5c82c1d44e0065e7dbf17360137a45989e48..c9a336864f83b46c96967f4d7528c779079775a1 100644 --- a/deployment/arima/requirements.txt +++ b/deployment/arima/requirements.txt @@ -5,5 +5,4 @@ filelock==3.0.12 pyyaml influxdb python-slugify - - +setproctitle diff --git a/deployment/arima/src/preprocess_dataset.py b/deployment/arima/src/preprocess_dataset.py index 29cb5cd369afd9f8570cdd1bad476215d1a1dc42..789b21320f1697e0bbde4308de6d25afa81e3970 100644 --- a/deployment/arima/src/preprocess_dataset.py +++ b/deployment/arima/src/preprocess_dataset.py @@ -94,10 +94,10 @@ class Dataset(object): last_timestamp_database = self.dataset[self.time_column].values[-1] current_time = int(time.time()) print( - f"Time difference between last timestamp and current time: {current_time - last_timestamp_database}" + f"Time difference between last timestamp and current time: {current_time - last_timestamp_database / 1000}" ) logging.info( - f"Time difference between last timestamp and current time: {current_time - last_timestamp_database}" + f"Time difference between last timestamp and current time: {current_time - last_timestamp_database / 1000}" ) def check_gap(self): diff --git a/deployment/arima/test/model_train_test.py b/deployment/arima/test/model_train_test.py new file mode 100644 index 0000000000000000000000000000000000000000..00f4ebd62dbe8ba04f7dcf0cf7a61158bd8559a2 --- /dev/null +++ b/deployment/arima/test/model_train_test.py @@ -0,0 +1,221 @@ +import sys + +sys.path.append(".") + +import pytest +from src.model_train import train +import pandas as pd +import numpy as np +import random + + +@pytest.fixture +def df_1(): + df = pd.DataFrame({"ems_time": [], "metric_0": []}) + return df + + +@pytest.fixture +def df_2(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + df["metric_0"] = np.nan + return df + + +@pytest.fixture +def df_3(): + df = pd.DataFrame() + df["ems_time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.nan + return df + + +@pytest.fixture +def df_4(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :3 + ] + ] + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_5(): + df = pd.DataFrame() + df["ems_time"] = np.array(range(0, 3)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_6(): + df = pd.DataFrame() + df["ems_time"] = np.array(range(1, 1001)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + return df + + +@pytest.fixture +def df_7(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna("None") + print(df) + return df + + +@pytest.fixture +def df_8(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna(np.inf) + return df + + +@pytest.fixture +def df_9(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_10(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.inf + return df + + +@pytest.fixture +def df_11(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + if i % 2 == 0: + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_12(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :6000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = [ + np.nan if i % 2 == 0 else random.random() for i in range(6000) + ] + return df + + +@pytest.fixture +def df(request): + return request.getfixturevalue(request.param) + + +@pytest.fixture +def metric(): + return "metric_0" + + +@pytest.fixture +def prediction_length(): + return 60 + + +@pytest.mark.parametrize( + "df,metric,prediction_length", + [ + ("df_1", metric, prediction_length), + ("df_2", metric, prediction_length), + ("df_3", metric, prediction_length), + ("df_4", metric, prediction_length), + ("df_5", metric, prediction_length), + ("df_6", metric, prediction_length), + ("df_7", metric, prediction_length), + ("df_8", metric, prediction_length), + ("df_9", metric, prediction_length), + ("df_10", metric, prediction_length), + ("df_11", metric, prediction_length), + ("df_12", metric, prediction_length), + ], + indirect=True, +) +def test_predict(df, metric, prediction_length): + df.to_csv("demo.csv") + output = train(metric, prediction_length) + print(output) + if output: + print("True") + assert True diff --git a/deployment/nbeats/main.py b/deployment/nbeats/main.py index 5a798b1a18b005b00d2e552630c91910f5584246..1e3c863b9e86b3a8b712122a26093e205c5c1330 100644 --- a/deployment/nbeats/main.py +++ b/deployment/nbeats/main.py @@ -7,7 +7,8 @@ import logging import time from datetime import datetime from pytz import timezone -from datetime import datetime +import time +import setproctitle # from src.log import logger @@ -81,8 +82,9 @@ class Msg(object): def main(): + setproctitle.setproctitle(f"{os.environ.get('METHOD', 'model')}") logging.basicConfig( - filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", + filename=f"logs/{os.environ.get('METHOD', 'model')}.out", level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S", format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", @@ -127,6 +129,7 @@ def main(): # StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2) while True: + time.sleep(60) pass diff --git a/deployment/nbeats/predict.py b/deployment/nbeats/predict.py index f12166dc43a7227cef069e964aac990ce66fbb70..1c40e84319fb743bd0a3a26156b2c4ab5b3d9cfb 100644 --- a/deployment/nbeats/predict.py +++ b/deployment/nbeats/predict.py @@ -15,6 +15,7 @@ from pytz import timezone import pytz from datetime import datetime import random +import setproctitle METHOD = os.environ.get("METHOD", "nbeats") START_TOPIC = f"start_forecasting.{METHOD}" @@ -139,6 +140,9 @@ def main(): print( f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}" ) + logging.info( + f"time difference in seconds between last preiction and current time {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}" + ) else: predictions = prediction @@ -178,6 +182,7 @@ def main(): if __name__ == "__main__": + setproctitle.setproctitle(f"{os.environ.get('METHOD', 'model')}_predict") logging.basicConfig( filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", ) diff --git a/deployment/nbeats/requirements.txt b/deployment/nbeats/requirements.txt index 1940fae2ffeb2ab99665dc424196659443108911..ae603574f3dfba609a0ea37d1f9a7e0a34628c7d 100644 --- a/deployment/nbeats/requirements.txt +++ b/deployment/nbeats/requirements.txt @@ -6,5 +6,4 @@ filelock==3.0.12 influxdb python-slugify torchmetrics==0.5.0 - - +setproctitle diff --git a/deployment/nbeats/retrain.py b/deployment/nbeats/retrain.py index 59a717d5f6c25e2ff2b8b5a1ea4c45127469bba7..fcfb77d562d14798bddbde15806ddbb585827688 100644 --- a/deployment/nbeats/retrain.py +++ b/deployment/nbeats/retrain.py @@ -10,9 +10,10 @@ import pytz import time from pytz import timezone from datetime import datetime +import setproctitle TOPIC_NAME = "training_models" -RETRAIN_CYCLE = 2 # minutes +RETRAIN_CYCLE = 10 # minutes AMQ_USER = os.environ.get("AMQ_USER", "admin") AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin") AMQ_HOST = os.environ.get("AMQ_HOST", "localhost") @@ -61,6 +62,7 @@ def main(predicted_metrics, prediction_horizons): if __name__ == "__main__": + setproctitle.setproctitle(f"{os.environ.get('METHOD', 'model')}_retrain") logging.basicConfig( filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", ) diff --git a/deployment/nbeats/src/model_predict.py b/deployment/nbeats/src/model_predict.py index 8ed07b842e81b6bb9062d1c5e07057629baf3e5d..0ea200bf397cb326685f1862f7c7b681e6a2cd0a 100644 --- a/deployment/nbeats/src/model_predict.py +++ b/deployment/nbeats/src/model_predict.py @@ -114,7 +114,11 @@ def predict( lock = FileLock(lockfile + ".lock") with lock: - model.load_state_dict(torch.load(model_path)) + if os.path.isfile(model_path): + model.load_state_dict(torch.load(model_path)) + logging.info("Model corrupted unable to predict") + else: + return (None, None) prediction_input = ts_dataset.get_from_dataset(future_df) prediction_input = prediction_input.to_dataloader(train=False) diff --git a/deployment/nbeats/src/model_train.py b/deployment/nbeats/src/model_train.py index cd7ce7b526730bdc8a105ced67ee85fcbcbc6987..cf9f8f86c10150ea6fb1073beda773140a444a47 100644 --- a/deployment/nbeats/src/model_train.py +++ b/deployment/nbeats/src/model_train.py @@ -62,7 +62,6 @@ def train(target_column, prediction_length, yaml_file="model.yaml", publish_rate publish_rate=publish_rate, ) - print(ts_dataset.dataset, "DATASEEEET") if ts_dataset.dataset.shape[0] < 1: logging.info( f"METRIC: {target_column} Preprocessed dataset len: {ts_dataset.dataset.shape[0]}, minimum points required: {(params['context_length_ratio'] + 2) * prediction_length} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" @@ -124,8 +123,9 @@ def train(target_column, prediction_length, yaml_file="model.yaml", publish_rate if os.path.isfile(lockfile): print("downloading weigths") - with lock: - model.load_state_dict(torch.load(model_path)) + if os.path.isfile(model_path): + with lock: + model.load_state_dict(torch.load(model_path)) trainer.fit( model, diff --git a/deployment/nbeats/test/model_train_test.py b/deployment/nbeats/test/model_train_test.py new file mode 100644 index 0000000000000000000000000000000000000000..00f4ebd62dbe8ba04f7dcf0cf7a61158bd8559a2 --- /dev/null +++ b/deployment/nbeats/test/model_train_test.py @@ -0,0 +1,221 @@ +import sys + +sys.path.append(".") + +import pytest +from src.model_train import train +import pandas as pd +import numpy as np +import random + + +@pytest.fixture +def df_1(): + df = pd.DataFrame({"ems_time": [], "metric_0": []}) + return df + + +@pytest.fixture +def df_2(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + df["metric_0"] = np.nan + return df + + +@pytest.fixture +def df_3(): + df = pd.DataFrame() + df["ems_time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.nan + return df + + +@pytest.fixture +def df_4(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :3 + ] + ] + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_5(): + df = pd.DataFrame() + df["ems_time"] = np.array(range(0, 3)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_6(): + df = pd.DataFrame() + df["ems_time"] = np.array(range(1, 1001)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + return df + + +@pytest.fixture +def df_7(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna("None") + print(df) + return df + + +@pytest.fixture +def df_8(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna(np.inf) + return df + + +@pytest.fixture +def df_9(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_10(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.inf + return df + + +@pytest.fixture +def df_11(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + if i % 2 == 0: + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_12(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :6000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = [ + np.nan if i % 2 == 0 else random.random() for i in range(6000) + ] + return df + + +@pytest.fixture +def df(request): + return request.getfixturevalue(request.param) + + +@pytest.fixture +def metric(): + return "metric_0" + + +@pytest.fixture +def prediction_length(): + return 60 + + +@pytest.mark.parametrize( + "df,metric,prediction_length", + [ + ("df_1", metric, prediction_length), + ("df_2", metric, prediction_length), + ("df_3", metric, prediction_length), + ("df_4", metric, prediction_length), + ("df_5", metric, prediction_length), + ("df_6", metric, prediction_length), + ("df_7", metric, prediction_length), + ("df_8", metric, prediction_length), + ("df_9", metric, prediction_length), + ("df_10", metric, prediction_length), + ("df_11", metric, prediction_length), + ("df_12", metric, prediction_length), + ], + indirect=True, +) +def test_predict(df, metric, prediction_length): + df.to_csv("demo.csv") + output = train(metric, prediction_length) + print(output) + if output: + print("True") + assert True diff --git a/deployment/tft/main.py b/deployment/tft/main.py index 5a798b1a18b005b00d2e552630c91910f5584246..a0e0ca67e624fb3659f464c263b2eb16d6f729bf 100644 --- a/deployment/tft/main.py +++ b/deployment/tft/main.py @@ -7,7 +7,8 @@ import logging import time from datetime import datetime from pytz import timezone -from datetime import datetime +import time +import setproctitle # from src.log import logger @@ -81,8 +82,9 @@ class Msg(object): def main(): + setproctitle.setproctitle(f"{os.environ.get('METHOD', 'model')}") logging.basicConfig( - filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", + filename=f"logs/{os.environ.get('METHOD', 'model')}.out", level=logging.INFO, datefmt="%Y-%m-%d %H:%M:%S", format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s", @@ -114,11 +116,11 @@ def main(): ) # msg1 = Msg() - # msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "NotFinishedOnTimeContext", "level": 3, "publish_rate": 30000}]' + # msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "NotFinishedOnTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "WillFinishTooSoonContext", "level": 3, "publish_rate": 30000}]' # msg2 = Msg() # msg2.body = ( # "{" - # + f'"metrics": ["MinimumCores", "EstimatedRemainingTimeContext", "NotFinishedOnTimeContext"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' + # + f'"metrics": ["MinimumCores", "EstimatedRemainingTimeContext", "NotFinishedOnTimeContext", "WillFinishTooSoonContext"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' # + "}" # ) @@ -127,6 +129,7 @@ def main(): # StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2) while True: + time.sleep(60) pass diff --git a/deployment/tft/model.yaml b/deployment/tft/model.yaml index ea58a201f41e15e0e329c736cb0cb25db351f9c5..884ba526a83ea79ef5c262711e1a351083458ee1 100644 --- a/deployment/tft/model.yaml +++ b/deployment/tft/model.yaml @@ -24,3 +24,5 @@ save_path: models dataloader_path: dataloader +context_length_ratio: + 12 diff --git a/deployment/tft/predict.py b/deployment/tft/predict.py index f12166dc43a7227cef069e964aac990ce66fbb70..1c40e84319fb743bd0a3a26156b2c4ab5b3d9cfb 100644 --- a/deployment/tft/predict.py +++ b/deployment/tft/predict.py @@ -15,6 +15,7 @@ from pytz import timezone import pytz from datetime import datetime import random +import setproctitle METHOD = os.environ.get("METHOD", "nbeats") START_TOPIC = f"start_forecasting.{METHOD}" @@ -139,6 +140,9 @@ def main(): print( f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}" ) + logging.info( + f"time difference in seconds between last preiction and current time {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}" + ) else: predictions = prediction @@ -178,6 +182,7 @@ def main(): if __name__ == "__main__": + setproctitle.setproctitle(f"{os.environ.get('METHOD', 'model')}_predict") logging.basicConfig( filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", ) diff --git a/deployment/tft/requirements.txt b/deployment/tft/requirements.txt index c2e88571a4afef8440f44b1751deaa0e26bb8330..ae603574f3dfba609a0ea37d1f9a7e0a34628c7d 100644 --- a/deployment/tft/requirements.txt +++ b/deployment/tft/requirements.txt @@ -5,4 +5,5 @@ pytorch-forecasting==0.8.4 filelock==3.0.12 influxdb python-slugify -torchmetrics==0.5.0 \ No newline at end of file +torchmetrics==0.5.0 +setproctitle diff --git a/deployment/tft/retrain.py b/deployment/tft/retrain.py index 64f225ae6d90cdc296e46defa2ac0a8acfc252d0..fcfb77d562d14798bddbde15806ddbb585827688 100644 --- a/deployment/tft/retrain.py +++ b/deployment/tft/retrain.py @@ -10,6 +10,7 @@ import pytz import time from pytz import timezone from datetime import datetime +import setproctitle TOPIC_NAME = "training_models" RETRAIN_CYCLE = 10 # minutes @@ -61,6 +62,7 @@ def main(predicted_metrics, prediction_horizons): if __name__ == "__main__": + setproctitle.setproctitle(f"{os.environ.get('METHOD', 'model')}_retrain") logging.basicConfig( filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", ) diff --git a/deployment/tft/src/model_predict.py b/deployment/tft/src/model_predict.py index 4fc3480055766e13889691eae14d8eadbab0aa1d..c98c90abcd5f29a4ef24ab510c9727405680bf29 100644 --- a/deployment/tft/src/model_predict.py +++ b/deployment/tft/src/model_predict.py @@ -57,7 +57,12 @@ def predict( return (None, None) dataset = pd.read_csv(data_path) - new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"], publish_rate=publish_rate) + new_ts_dataset = Dataset( + dataset, + target_column=target_column, + **params["dataset"], + publish_rate=publish_rate, + ) if new_ts_dataset.dropped_recent_series: # series with recent data was too short logging.info( f"METRIC {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" @@ -117,7 +122,11 @@ def predict( return (None, None) with lock: - model.load_state_dict(torch.load(model_path)) + if os.path.isfile(model_path): + model.load_state_dict(torch.load(model_path)) + logging.info("Model corrupted unable to predict") + else: + return (None, None) prediction_input = ts_dataset.get_from_dataset(future_df) prediction_input = prediction_input.to_dataloader(train=False) diff --git a/deployment/tft/src/model_train.py b/deployment/tft/src/model_train.py index a4e5fa3c92d5719ed9e0eca19eba13b3004568f5..92e9312cfd1dffb3ce5dd1a954d72c3a20c84bb6 100644 --- a/deployment/tft/src/model_train.py +++ b/deployment/tft/src/model_train.py @@ -37,7 +37,9 @@ def train(target_column, prediction_length, yaml_file="model.yaml", publish_rate with open(yaml_file) as file: params = yaml.load(file, Loader=yaml.FullLoader) params["dataset"]["prediction_length"] = prediction_length - params["dataset"]["context_length"] = prediction_length * 12 + params["dataset"]["context_length"] = ( + prediction_length * params["context_length_ratio"] + ) data_path = os.path.join( os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' @@ -48,9 +50,9 @@ def train(target_column, prediction_length, yaml_file="model.yaml", publish_rate dataset = pd.read_csv(data_path) - if dataset.shape[0] < 14 * prediction_length: + if dataset.shape[0] < (params["context_length_ratio"] + 2) * prediction_length: logging.info( - f"dataset len: {dataset.shape[0]}, minimum points required: {14 * prediction_length} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + f"dataset len: {dataset.shape[0]}, minimum points required: {(params['context_length_ratio'] + 2) * prediction_length} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) return None @@ -61,6 +63,12 @@ def train(target_column, prediction_length, yaml_file="model.yaml", publish_rate publish_rate=publish_rate, ) + if ts_dataset.dataset.shape[0] < 1: + logging.info( + f"METRIC: {target_column} Preprocessed dataset len: {ts_dataset.dataset.shape[0]}, minimum points required: {(params['context_length_ratio'] + 2) * prediction_length} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + return None + lockfile = f"{params['dataloader_path']}_{target_column}.pickle" lock = FileLock(lockfile + ".lock") @@ -113,8 +121,9 @@ def train(target_column, prediction_length, yaml_file="model.yaml", publish_rate if os.path.isfile(lockfile): print("downloading weigths") - with lock: - tft.load_state_dict(torch.load(model_path)) + if os.path.isfile(model_path): + with lock: + tft.load_state_dict(torch.load(model_path)) trainer.fit( tft, diff --git a/deployment/tft/src/preprocess_dataset.py b/deployment/tft/src/preprocess_dataset.py index 050c3c3701e1419c8b68fe6991032282f3cd2faa..97a6d9a8fa3542f50e263d8b415286839f709ed8 100644 --- a/deployment/tft/src/preprocess_dataset.py +++ b/deployment/tft/src/preprocess_dataset.py @@ -98,10 +98,10 @@ class Dataset(object): last_timestamp_database = self.dataset[self.time_column].values[-1] current_time = int(time.time()) print( - f"Time difference between last timestamp and current time: {current_time - last_timestamp_database}" + f"Time difference between last timestamp and current time: {current_time * 1000 - last_timestamp_database}" ) logging.info( - f"Time difference between last timestamp and current time: {current_time - last_timestamp_database}" + f"Time difference between last timestamp and current time: {current_time * 1000 - last_timestamp_database}" ) def check_gap(self): diff --git a/deployment/tft/test/model_train_test.py b/deployment/tft/test/model_train_test.py new file mode 100644 index 0000000000000000000000000000000000000000..00f4ebd62dbe8ba04f7dcf0cf7a61158bd8559a2 --- /dev/null +++ b/deployment/tft/test/model_train_test.py @@ -0,0 +1,221 @@ +import sys + +sys.path.append(".") + +import pytest +from src.model_train import train +import pandas as pd +import numpy as np +import random + + +@pytest.fixture +def df_1(): + df = pd.DataFrame({"ems_time": [], "metric_0": []}) + return df + + +@pytest.fixture +def df_2(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + df["metric_0"] = np.nan + return df + + +@pytest.fixture +def df_3(): + df = pd.DataFrame() + df["ems_time"] = np.array(range(0, 1000)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.nan + return df + + +@pytest.fixture +def df_4(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :3 + ] + ] + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_5(): + df = pd.DataFrame() + df["ems_time"] = np.array(range(0, 3)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = 1 + return df + + +@pytest.fixture +def df_6(): + df = pd.DataFrame() + df["ems_time"] = np.array(range(1, 1001)) * 1e9 + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + return df + + +@pytest.fixture +def df_7(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna("None") + print(df) + return df + + +@pytest.fixture +def df_8(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + df[f"metric_{i}"] = df[f"metric_{i}"].fillna(np.inf) + return df + + +@pytest.fixture +def df_9(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + np.random.randint(0, df.shape[0] - 1, 990), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_10(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.inf + return df + + +@pytest.fixture +def df_11(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :1000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = np.random.rand(1000) + if i % 2 == 0: + df.loc[ + list(range(20, 300)), + f"metric_{i}", + ] = np.nan + return df + + +@pytest.fixture +def df_12(): + df = pd.DataFrame() + df["ems_time"] = [ + int(x) + for x in pd.date_range(start="2016-01-01", end="2020-12-31", freq="10S").values[ + :6000 + ] + ] + for i in range(5): + df[f"metric_{i}"] = [ + np.nan if i % 2 == 0 else random.random() for i in range(6000) + ] + return df + + +@pytest.fixture +def df(request): + return request.getfixturevalue(request.param) + + +@pytest.fixture +def metric(): + return "metric_0" + + +@pytest.fixture +def prediction_length(): + return 60 + + +@pytest.mark.parametrize( + "df,metric,prediction_length", + [ + ("df_1", metric, prediction_length), + ("df_2", metric, prediction_length), + ("df_3", metric, prediction_length), + ("df_4", metric, prediction_length), + ("df_5", metric, prediction_length), + ("df_6", metric, prediction_length), + ("df_7", metric, prediction_length), + ("df_8", metric, prediction_length), + ("df_9", metric, prediction_length), + ("df_10", metric, prediction_length), + ("df_11", metric, prediction_length), + ("df_12", metric, prediction_length), + ], + indirect=True, +) +def test_predict(df, metric, prediction_length): + df.to_csv("demo.csv") + output = train(metric, prediction_length) + print(output) + if output: + print("True") + assert True