diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 7d9d4ec55f24201548187b8f470f1c269c8da274..a2c5ec98935457e9c6244846349a9476e49e2a79 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -142,3 +142,33 @@ deploy:slo-severity-calculator: - build:slo-severity-calculator script: - $SLO_SEVERITY_CALCULATOR_CLI deploy + +deploy:nbeats: + stage: deploy + image: $DOCKER_DIND_IMAGE + only: + - master + - morphemic-rc1.5 + services: + - $DOCKER_DIND_SERVICE + script: + - docker build -t nbeats -f ./deployment/nbeats/Dockerfile . + - docker image ls + - echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin + - docker tag nbeats:latest $CI_REGISTRY_IMAGE/nbeats:$CI_COMMIT_BRANCH + - docker push $CI_REGISTRY_IMAGE/nbeats:$CI_COMMIT_BRANCH + +deploy:tft: + stage: deploy + image: $DOCKER_DIND_IMAGE + only: + - master + - morphemic-rc1.5 + services: + - $DOCKER_DIND_SERVICE + script: + - docker build -t tft -f ./deployment/tft/Dockerfile . + - docker image ls + - echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin + - docker tag tft:latest $CI_REGISTRY_IMAGE/tft:$CI_COMMIT_BRANCH + - docker push $CI_REGISTRY_IMAGE/tft:$CI_COMMIT_BRANCH diff --git a/deployment/README.md b/deployment/README.md new file mode 100644 index 0000000000000000000000000000000000000000..f92464042ca79755787312a83479a4e07eacde04 --- /dev/null +++ b/deployment/README.md @@ -0,0 +1,65 @@ +# Model deployment + +Model deployment code for tft and nbeats models. Each modules consists of : + +- Dockerfile + - After installing all dependecies runs main.py script + - env (file with enviromental variables) is used as a parameter + - docker_run.sh (docker compose will be used later) + - TODO: build docker-compose, decide which volumes will be used (e.g data path, model path, app config path) + + +- main.py + - Starts running scripts after the message from amq from metrics_to_predict topic + - Runs independently (as sepparated processes) two scripts, one for training (retrain.py) the other for prediction (predict.py) + +- retrain.py + - Trains (at the moment as a cyclic job with fixed 10 minutes frequency) models + - Currently one model per metric is used, however that can be changed for tft + - After models trainings messages are send to training_models topic + - TODO: change retraining conditions (needs to be discussed), add automl hyperparameters optimization + +- predict.py + - Sends prediction (at the moment it is a cyclic event with configurable number of forward predictions which may be changed with start_frecasting message), currently each metrics predictions are published with the same frequency (but according to the examples in python stomp.py library predictions frequency may differ accross the metrics, so maybe this will be changed in future) + - Until the first model is trained, no prediction is sent + - m preidctions where m is the number of forward predictions for given metric are currently sent as sepparated messages, this solution can be replaced by one message with lists + - TODO: verify which parameters like are stable and which may be changed by amq messages (prediction_horizon, publish_rate), parallel the predictions, cache models, fill properly fields like 'refersTo', 'provider', calculate properl confidence interval (nbeats) + +- src + - folder with all helpers + - TODO: install dataset-maker package in a more proper way + +How to test? + +Ensure that there are no docker images nor containers, check if ports for influxdb, amq are not already in use. + +From morphemic-persistent-storage: + + docker compose-up -d + +From this directory: + + cd tft + docker build --tag test . + ./docker_run.sh + +from amq web console (http://localhost:8161/admin) message (json format) e.g: + + [{"metric": "cpu_usage", "level": 3, "publish_rate": 60000}] + +might be sent from topic: metrics_to_predict + +then + +{ + "metrics": ["cpu_usage"], + "timestamp": 143532341251, + "epoch_start": 143532341252, + "number_of_forward_predictions": 5, + "prediction_horizon": 600 +} + +from topic start_forecating.[METHOD NAME] e.g start_forecasting.tft + +Until the first model is trained, predictions will not be sent + diff --git a/deployment/nbeats/Dockerfile b/deployment/nbeats/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..70848da01a51226adbcd6e9c03ae7e977714ccf9 --- /dev/null +++ b/deployment/nbeats/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.8-slim-buster + +# Install Python dependencies. +WORKDIR /wd +COPY deployment/tft/requirements.txt . +RUN pip3 install -r requirements.txt + +# Copy the rest of the codebase into the image +COPY deployment/tft ./ +COPY morphemic-datasetmaker ./morphemic-datasetmaker +COPY amq-message-python-library ./amq-message-python-library + +RUN cd morphemic-datasetmaker && python3 setup.py install && cd .. +RUN rm -r morphemic-datasetmaker +RUN mv amq-message-python-library amq_message_python_library + +CMD ["python3", "main.py"] + + diff --git a/deployment/nbeats/docker_run.sh b/deployment/nbeats/docker_run.sh new file mode 100755 index 0000000000000000000000000000000000000000..cff33ad45688c4e4f4e3b0827ec11590078c1871 --- /dev/null +++ b/deployment/nbeats/docker_run.sh @@ -0,0 +1 @@ +docker run -t --env-file=env --network=host stomp_app diff --git a/deployment/nbeats/env b/deployment/nbeats/env new file mode 100644 index 0000000000000000000000000000000000000000..b9e60a4dca00f1dee9b606e5f5867999f91a4896 --- /dev/null +++ b/deployment/nbeats/env @@ -0,0 +1,12 @@ +AMQ_HOSTNAME=localhost +AMQ_USER=admin +AMQ_PASSWORD=admin +AMQ_PORT=61613 +APP_NAME=demo +METHOD=nbeats +DATA_PATH=./ +INFLUXDB_HOSTNAME=localhost +INFLUXDB_PORT=8086 +INFLUXDB_USERNAME=morphemic +INFLUXDB_PASSWORD=password +INFLUXDB_DBNAME=morphemic diff --git a/deployment/nbeats/main.py b/deployment/nbeats/main.py new file mode 100644 index 0000000000000000000000000000000000000000..ed56ffbb785b8ba342e17c832e71354f10a3ca27 --- /dev/null +++ b/deployment/nbeats/main.py @@ -0,0 +1,94 @@ +import os +from multiprocessing import Pool +import stomp +import json +from amq_message_python_library import * # python amq-message-python-library +import logging + +AMQ_USER = os.environ.get("AMQ_USER", "admin") +AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin") +START_APP_TOPIC = "metrics_to_predict" +METHOD = os.environ.get("METHOD", "tft") +START_TOPIC = f"start_forecasting.{METHOD}" + + +def run_process(args): + os.system(f"python {args[0]} '{args[1]}'") + + +def start_training(metrics_to_predict): + processes = (("retrain.py", metrics_to_predict), ("predict.py", metrics_to_predict)) + + pool = Pool(processes=2) + pool.map(run_process, processes) + + +class StartListener(stomp.ConnectionListener): + """Custom listener, parameters: + - conn (stomp connector) + - topic_name, name of topic to subscribe""" + + def __init__(self, conn, topic_name): + self.conn = conn + self.topic_name = topic_name + + def on_error(self, frame): + print('received an error "%s"' % frame.body) + + def on_message(self, frame): + print(self.topic_name) + logging.debug(f" Body: {frame.body}") + + message = json.loads(frame.body) + global publish_rate, all_metrics + publish_rate = message[0]["publish_rate"] + all_metrics = message + + +class StartForecastingListener(stomp.ConnectionListener): + """Custom listener, parameters: + - conn (stomp connector) + - topic_name, name of topic to subscribe""" + + def __init__(self, conn, topic_name): + self.conn = conn + self.topic_name = topic_name + + def on_error(self, frame): + print('received an error "%s"' % frame.body) + + def on_message(self, frame): + message = json.loads(frame.body) + message["publish_rate"] = publish_rate + message["all_metrics"] = all_metrics + message = json.dumps(message) + start_training(message) + self.conn.disconnect() + + + +def main(): + logging.getLogger().setLevel(logging.DEBUG) + + start_app_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD) + start_app_conn.connect() + start_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD) + start_conn.connect() + + start_conn.conn.subscribe(f"/topic/{START_APP_TOPIC}", "1", ack="auto") + start_app_conn.conn.subscribe(f"/topic/{START_TOPIC}", "2", ack="auto") + + start_conn.conn.set_listener("1", StartListener(start_conn.conn, START_APP_TOPIC)) + start_app_conn.conn.set_listener( + "2", StartForecastingListener(start_conn.conn, START_TOPIC) + ) + + while True: + pass + + +if __name__ == "__main__": + publish_rate = 0 + all_metrics = {} + + main() diff --git a/deployment/nbeats/model.yaml b/deployment/nbeats/model.yaml new file mode 100644 index 0000000000000000000000000000000000000000..ed34e20d2110081370f5460ed0cbb25ae48ba91a --- /dev/null +++ b/deployment/nbeats/model.yaml @@ -0,0 +1,18 @@ +data: + csv_path: demo.csv +training: + bs: 8 + max_epochs: 1 + loss: mae +dataset: + tv_unknown_reals: [] + known_reals: [] + tv_unknown_cat: [] + static_reals: [] + context_length: 10 + prediction_length: 5 + classification: 0 +prediction: + bs: 8 +save_path: + models diff --git a/deployment/nbeats/predict.py b/deployment/nbeats/predict.py new file mode 100644 index 0000000000000000000000000000000000000000..56097d34a5023f714e13b2cd8eadc3792e32873d --- /dev/null +++ b/deployment/nbeats/predict.py @@ -0,0 +1,145 @@ +import time +import os +import stomp +import threading +from src.model_predict import predict +from amq_message_python_library import * +from src.influxdb_predictions import InfluxdbPredictionsSender +from timeloop import Timeloop +from datetime import timedelta +import json +import sys +import pandas as pd +import logging +from src.dataset_maker import CSVData + + +METHOD = os.environ.get("METHOD", "tft") +START_TOPIC = f"start_forecasting.{METHOD}" +STOP_TOPIC = f"stop_forecasting.{METHOD}" +PRED_TOPIC_PREF = f"intermediate_prediction.{METHOD}" +PREDICTION_CYCLE = 1 # minutes +APP_NAME = os.environ.get("APP_NAME", "demo") +AMQ_USER = os.environ.get("AMQ_USER", "admin") +AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin") + + +class CustomListener(stomp.ConnectionListener): + """Custom listener, parameters: + - conn (stomp connector) + - topic_name, name of topic to subscribe, + - start , if start is set to be true recived metrics + are added to predicted metric, otherwise recived + metrics are removed from predicted metric (start + mode corresponds to start_prediction)""" + + def __init__(self, conn, topic_name, start=True): + self.conn = conn + self.topic_name = topic_name + self.start = start + + def on_error(self, frame): + print('received an error "%s"' % frame.body) + + def on_message(self, frame): + global predicted_metrics, lock, number_of_forward_predictions, metrics_info + lock.acquire() + try: + metrics = set(json.loads(frame.body)["metrics"]) + + if self.start: + print("STARTTTTTT", self.start) + predicted_metrics = predicted_metrics.union(metrics) + for metric in metrics: + frame_body = json.loads(frame.body) + logging.debug(frame_body) + number_of_forward_predictions[metric] = frame_body[ + "number_of_forward_predictions" + ] + self.prediction_cycle = frame_body["prediction_horizon"] + else: + predicted_metrics = predicted_metrics.difference(metrics) + + finally: + lock.release() + + +def main(): + logging.debug(f"metrics to predict {predicted_metrics}") + start_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD) + start_conn.connect() + + stop_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD) + stop_conn.connect() + + start_conn.conn.subscribe(f"/topic/{START_TOPIC}", "1", ack="auto") + start_conn.conn.subscribe(f"/topic/{STOP_TOPIC}", "2", ack="auto") + + start_conn.set_listener( + "1", CustomListener(start_conn.conn, START_TOPIC, start=True) + ) + stop_conn.set_listener("2", CustomListener(stop_conn.conn, STOP_TOPIC, start=False)) + + dataset_preprocessor = CSVData(APP_NAME) + dataset_preprocessor.prepare_csv() + logging.debug("dataset downloaded") + + influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME) + + logging.debug( + f"waiting {msg['epoch_start'] - int(time.time()) - prediction_cycle} seconds" + ) + + time.sleep(max(0 + msg["epoch_start"] - int(time.time()) - prediction_cycle) + ) # time units??? + + tl = Timeloop() + + @tl.job(interval=timedelta(seconds=prediction_cycle)) + def metric_predict(): + logging.debug("prediction") + dataset_preprocessor.prepare_csv() + for metric in predicted_metrics: + predictions = None + time_0 = int(time.time()) + for i in range(number_of_forward_predictions[metric]): + prediction_msgs, predictions = predict( + metric, + prediction_cycle, + extra_data=predictions, + m=i + 1, + prediction_hor=prediction_horizon, + timestamp=time_0, + ) + if prediction_msgs: + dest = f"{PRED_TOPIC_PREF}.{metric}" + start_conn.send_to_topic(dest, prediction_msgs[metric]) + influxdb_conn.send_to_influxdb(metric, prediction_msgs) + + tl.start(block=True) + + while True: + pass + + +if __name__ == "__main__": + msg = json.loads(sys.argv[1]) + metrics_info = { + m["metric"]: { + "level": m["level"], + "publish_rate": m["publish_rate"], + } + for m in msg["all_metrics"] + } + prediction_horizon = msg["prediction_horizon"] + predicted_metrics = set(msg["metrics"]) + prediction_cycle = msg["prediction_horizon"] + + logging.debug(f"Predicted metrics: {predicted_metrics}") + number_of_forward_predictions = { + metric: msg["number_of_forward_predictions"] for metric in predicted_metrics + } # deafult number of forward predictions + lock = threading.Lock() + + main() diff --git a/deployment/nbeats/requirements.txt b/deployment/nbeats/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..b0e5b674a12fc53058b17638b5e1e0f774470ad6 --- /dev/null +++ b/deployment/nbeats/requirements.txt @@ -0,0 +1,10 @@ +stomp.py +pandas==1.1.3 +pytorch-lightning==1.2.7 +pytorch-forecasting==0.8.4 +timeloop==1.0.2 +filelock==3.0.12 +influxdb +python-slugify + + diff --git a/deployment/nbeats/retrain.py b/deployment/nbeats/retrain.py new file mode 100644 index 0000000000000000000000000000000000000000..9705fe307458eaf018f271653d60667a0c5ad82e --- /dev/null +++ b/deployment/nbeats/retrain.py @@ -0,0 +1,51 @@ +import stomp +import os +import sys +import json +import logging +from src.model_train import train +from amq_message_python_library import * +from timeloop import Timeloop +from datetime import timedelta +from src.dataset_maker import CSVData + +TOPIC_NAME = "training_models" +RETRAIN_CYCLE = 10 # minutes +HOSTS = (os.environ.get("AMQ_HOSTNAME", "localhost"), os.environ.get("AMQ_PORT", 61613)) +AMQ_USER = os.environ.get("AMQ_USER", "admin") +AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin") +APP_NAME = os.environ.get("APP_NAME", "demo") + + +def main(predicted_metrics, prediction_horizon): + start_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD) + start_conn.connect() + tl = Timeloop() + + dataset_preprocessor = CSVData(APP_NAME) + dataset_preprocessor.prepare_csv() + + @tl.job(interval=timedelta(seconds=60 * RETRAIN_CYCLE)) + def retrain_model(): + logging.debug("TRAINING") + for metric in predicted_metrics: + retrain_msg = train(metric, prediction_horizon) + start_conn.send_to_topic(TOPIC_NAME, retrain_msg) + + tl.start(block=True) + + +if __name__ == "__main__": + logging.debug("Training") + msg = json.loads(sys.argv[1]) + metrics_info = { + m["metric"]: { + "level": m["level"], + "publish_rate": m["publish_rate"], + } + for m in msg["all_metrics"] + } + prediction_horizon = msg["prediction_horizon"] // msg["publish_rate"] + predicted_metrics = set(metrics_info.keys()) + logging.debug(f"Predicted metrics: {predicted_metrics}") + main(predicted_metrics, prediction_horizon) diff --git a/deployment/nbeats/run.sh b/deployment/nbeats/run.sh new file mode 100644 index 0000000000000000000000000000000000000000..917126cf8580d0f453190b98c6f496f262d97d5e --- /dev/null +++ b/deployment/nbeats/run.sh @@ -0,0 +1,5 @@ +#!/bin/sh +python3 morphemic-datasetmaker/setup.py install +# rm -r morphemic-datasetmaker +# python3 main.py +# mv amq-message-python-library amq_message_python_library diff --git a/deployment/nbeats/src/dataset_maker.py b/deployment/nbeats/src/dataset_maker.py new file mode 100644 index 0000000000000000000000000000000000000000..7ffb3e5e7f6361aa2ad5462900b4dad63f95d346 --- /dev/null +++ b/deployment/nbeats/src/dataset_maker.py @@ -0,0 +1,34 @@ +from morphemic.dataset import DatasetMaker +import os +from filelock import FileLock + +"""Script for preparing csv data downloaded form InfluxDB database, data""" + + +class CSVData(object): + def __init__(self, name, start_collection=None): + self.name = name + self.config = { + "hostname": os.environ.get("INFLUXDB_HOSTNAME", "localhost"), + "port": int(os.environ.get("INFLUXDB_PORT", "8086")), + "username": os.environ.get("INFLUXDB_USERNAME", "morphemic"), + "password": os.environ.get("INFLUXDB_PASSWORD", "password"), + "dbname": os.environ.get("INFLUXDB_DBNAME", "morphemic"), + "path_dataset": os.environ.get("DATA_PATH", "./"), + } + self.start_collection = start_collection + + def prepare_csv(self): + lockfile = os.path.join(self.config["path_dataset"], f"{self.name}.csv") + lock = FileLock(lockfile + ".lock") + + if os.path.isfile(lockfile): + with lock: + datasetmaker = DatasetMaker( + self.name, self.start_collection, self.config + ) + response = datasetmaker.make() + + else: + datasetmaker = DatasetMaker(self.name, self.start_collection, self.config) + response = datasetmaker.make() diff --git a/deployment/nbeats/src/influxdb_predictions.py b/deployment/nbeats/src/influxdb_predictions.py new file mode 100644 index 0000000000000000000000000000000000000000..1c1e0499117f801dece5f7a677951d99f2337038 --- /dev/null +++ b/deployment/nbeats/src/influxdb_predictions.py @@ -0,0 +1,42 @@ +from influxdb import * +import os +import datetime + + +"""Connects with Influxdb and sends predictet values""" + + +class InfluxdbPredictionsSender(object): + def __init__(self, method, app_name): + self.client = InfluxDBClient( + host=os.environ.get("INFLUXDB_HOSTNAME", "localhost"), + port=int(os.environ.get("INFLUXDB_PORT", "8086")), + username=os.environ.get("INFLUXDB_USERNAME", "morphemic"), + password=os.environ.get("INFLUXDB_PASSWORD", "password"), + ) + self.client.switch_database(os.environ.get("INFLUXDB_DBNAME", "morphemic")) + self.method = method + self.app_name = app_name + + def send_to_influxdb(self, metric, msg): + msg = { + "measurement": f"{self.app_name}.{metric}.prediction", + "tags": { + "method": f"{self.method}", + }, + "time": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"), + "fields": { + "value": msg[metric]["metricValues"], + "prediction_horizon": msg[metric]["horizon"], + }, + } + try: + self.client.write_points( + [ + msg, + ], + ) + print("predictions sent to influxdb") + + except Exception as e: + print("Could not send predictions to influxdb") diff --git a/deployment/nbeats/src/model_predict.py b/deployment/nbeats/src/model_predict.py new file mode 100644 index 0000000000000000000000000000000000000000..106094d184e3bc88829525175cbfc72118adc576 --- /dev/null +++ b/deployment/nbeats/src/model_predict.py @@ -0,0 +1,89 @@ +import yaml +import pandas as pd +import numpy as np +import time +import os +import torch +from filelock import FileLock +from src.preprocess_dataset import Dataset +from pytorch_forecasting import NBeats +import logging + +"""Script for nbeats fusion transformer prediction""" + + +def predict(target_column, yaml_file="nbeats.yaml", extra_data=None): + with open(yaml_file) as file: + params = yaml.load(file, Loader=yaml.FullLoader) + + data_path = os.path.join( + os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' + ) + + dataset = pd.read_csv(data_path) + + if extra_data is not None: + dataset = pd.concat([dataset, extra_data]) + + ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + + dataset = dataset[ + lambda x: x.series == x.series.max() + ] # multiple or single series training? + ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + + pred_len = params["dataset"]["prediction_length"] + + future_df = ts_dataset.dataset.iloc[[-1 for _ in range(pred_len)]] + future_time_idx = list(range(ts_dataset.n, ts_dataset.n + pred_len)) + future_df["time_idx"] = future_time_idx + future_df["split"] = "future" + + ts_dataset.dataset = pd.concat([ts_dataset.dataset, future_df]).reset_index() + + prediction_input = ts_dataset.inherited_dataset("val", "future") + + model = NBeats.from_dataset( + ts_dataset.ts_dataset, + dropout=0.1, + log_interval=-1, + reduce_on_plateau_patience=5, + ) + + lockfile = params["save_path"] + lock = FileLock(lockfile + ".lock") + + model_path = os.path.join(params["save_path"], f"{target_column}.pth") + + if not os.path.isfile(model_path): # no pretrained model, unable to predict + print("no pretrained model, unable to predict") + return (None, None) + + if os.path.isfile(lockfile): + with lock: + model.load_state_dict(torch.load(model_path)) + + prediction_input = prediction_input.to_dataloader(train=False) + prediction = model.predict(prediction_input, mode="raw")["prediction"] + + print(prediction) + + msg = { + target_column: { + "metricValues": prediction[-1][-1].item(), + "level": 0, + "timestamp": time.time(), + "probability": 0.95, + "confidence_interval": "TODO", # quantiles difference + "horizon": 20 * 60, # TODO + "refersTo": "TODO", + "cloud": "TODO", + "provider": "TODO", + } + } + print(f"prediction msg: {msg}") + + # return predicted values + future_df["split"] = "val" + future_df[target_column] = prediction[-1] + return (msg, future_df) diff --git a/deployment/nbeats/src/model_train.py b/deployment/nbeats/src/model_train.py new file mode 100644 index 0000000000000000000000000000000000000000..3fcf70f867bb2d8462c86f3482240ab2c1141bfd --- /dev/null +++ b/deployment/nbeats/src/model_train.py @@ -0,0 +1,95 @@ +import torch +import yaml +import pandas as pd +import time +from filelock import FileLock +import pytorch_lightning as pl +import os +from pytorch_lightning.callbacks import EarlyStopping, LearningRateMonitor +from pytorch_forecasting.metrics import QuantileLoss, MAE, RMSE, CrossEntropy +from pytorch_forecasting import NBeats +from src.preprocess_dataset import Dataset + + +"""Script for temporal fusion transformer training""" + + +LOSSES_DICT = { + "mae": MAE(), + "rmse": RMSE(), + "crossentropy": CrossEntropy(), +} + + +def train(target_column, yaml_file="nbeats.yaml"): + torch.manual_seed(12345) + + with open(yaml_file) as file: + params = yaml.load(file, Loader=yaml.FullLoader) + + data_path = os.path.join( + os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' + ) + + dataset = pd.read_csv(data_path) + + ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + + training = ts_dataset.ts_dataset + validation = ts_dataset.inherited_dataset( + "train", "val" + ) # only train and val splits will be used + + bs = params["training"]["bs"] + + train_dataloader = training.to_dataloader( + train=True, batch_size=bs, num_workers=6, shuffle=True + ) + val_dataloader = validation.to_dataloader(train=False, batch_size=bs, num_workers=6) + + early_stop_callback = EarlyStopping( + monitor="val_loss", min_delta=1e-5, patience=8, verbose=False, mode="min" + ) + lr_logger = LearningRateMonitor() + trainer = pl.Trainer( + max_epochs=params["training"]["max_epochs"], + gpus=0, + gradient_clip_val=0.5, + callbacks=[lr_logger, early_stop_callback], + checkpoint_callback=False, # TODO: is pl checkpoint_callback thread safe? + ) + + model = NBeats.from_dataset( + training, + dropout=0.1, + loss=LOSSES_DICT[params["training"]["loss"]], + log_interval=-1, + reduce_on_plateau_patience=5, + ) + + model_path = os.path.join(params["save_path"], f"{target_column}.pth") + + lockfile = model_path + lock = FileLock(lockfile + ".lock") + + if os.path.isfile(lockfile): + print("downloading weigths") + with lock: + model.load_state_dict(torch.load(model_path)) + + trainer.fit( + model, + train_dataloader=train_dataloader, + val_dataloaders=val_dataloader, + ) + + with lock: + torch.save(model.state_dict(), model_path) + + msg = { + "metrics": target_column, + "forecasting_method": os.environ.get("METHOD", "nbetas"), + "timestamp": time.time(), + } + print(msg) + return msg diff --git a/deployment/nbeats/src/preprocess_dataset.py b/deployment/nbeats/src/preprocess_dataset.py new file mode 100644 index 0000000000000000000000000000000000000000..e1de9bbc0ac0e2630ea1c5cd5190f4fcbccb6f21 --- /dev/null +++ b/deployment/nbeats/src/preprocess_dataset.py @@ -0,0 +1,106 @@ +import pandas as pd +from pytorch_forecasting import TimeSeriesDataSet +from pytorch_forecasting.data import NaNLabelEncoder + +pd.options.mode.chained_assignment = None + +"""Script for preparing time series dataset from pythorch-forecasting package +TODO: add checking whether data consists of multiple series, handle nans values""" + + +class Dataset(object): + def __init__( + self, + dataset, + target_column="value", + tv_unknown_reals="[]", + known_reals="[]", + tv_unknown_cat="[]", + static_reals="[]", + classification=0, + context_length=40, + prediction_length=5, + ): + + self.target_column = target_column + self.tv_unknown_cat = tv_unknown_cat + self.known_reals = known_reals + self.tv_unknown_reals = tv_unknown_reals + self.static_reals = static_reals + self.classification = classification + self.context_length = context_length + self.prediction_length = prediction_length + self.add_obligatory_columns(dataset) + self.dataset = self.convert_formats(dataset) + self.n = dataset.shape[0] + self.ts_dataset = self.create_time_series_dataset() + + def convert_formats(self, dataset): + if not self.classification: + dataset[self.target_column] = dataset[self.target_column].astype(float) + else: + dataset[self.target_column] = dataset[self.target_column].astype(int) + + for name in self.tv_unknown_cat: + dataset[name] = dataset[name].astype(str) + + dataset["series"] = dataset["series"].astype(str) + return dataset + + def add_obligatory_columns(self, dataset): + dataset["series"] = 0 + dataset["split"] = "train" + n = dataset.shape[0] + dataset["split"][int(n * 0.8) :] = "val" + dataset["time_idx"] = range(n) # TODO check time gaps + return dataset + + def create_time_series_dataset(self): + if not self.classification: + self.time_varying_unknown_reals = [ + self.target_column + ] + self.tv_unknown_reals + self.time_varying_unknown_categoricals = self.tv_unknown_cat + else: + self.time_varying_unknown_reals = self.tv_unknown_reals + self.time_varying_unknown_categoricals = [ + self.target_column + ] + self.tv_unknown_cat + + ts_dataset = TimeSeriesDataSet( + self.dataset[lambda x: x.split == "train"], + time_idx="time_idx", + target=self.target_column, + group_ids=["series"], + min_encoder_length=self.context_length, # keep encoder length long (as it is in the validation set) + max_encoder_length=self.context_length, + min_prediction_length=self.prediction_length, + max_prediction_length=self.prediction_length, + static_categoricals=[], + static_reals=self.static_reals, + time_varying_known_categoricals=[], + categorical_encoders={"series": NaNLabelEncoder().fit(self.dataset.series)}, + variable_groups={}, # group of categorical variables can be treated as one variable + time_varying_known_reals=["time_idx"] + self.known_reals, + time_varying_unknown_categoricals=self.tv_unknown_cat, + time_varying_unknown_reals=[self.target_column] + self.tv_unknown_reals, + add_relative_time_idx=True, + add_target_scales=True if not self.classification else False, + add_encoder_length=True, + allow_missings=True, + ) + return ts_dataset + + def inherited_dataset(self, split1, split2): + df1 = ( + self.dataset[lambda x: x.split == split1] + .groupby("series", as_index=False) + .apply(lambda x: x.iloc[-self.context_length :]) + ) # previous split fragment + df2 = self.dataset[lambda x: x.split == split2] # split part + inh_dataset = pd.concat([df1, df2]) + inh_dataset = inh_dataset.sort_values(by=["series", "time_idx"]) + inh_dataset = TimeSeriesDataSet.from_dataset( + self.ts_dataset, inh_dataset, min_prediction_idx=0, stop_randomization=True + ) + return inh_dataset diff --git a/deployment/tft/.dockerignore b/deployment/tft/.dockerignore new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/deployment/tft/Dockerfile b/deployment/tft/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..70848da01a51226adbcd6e9c03ae7e977714ccf9 --- /dev/null +++ b/deployment/tft/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.8-slim-buster + +# Install Python dependencies. +WORKDIR /wd +COPY deployment/tft/requirements.txt . +RUN pip3 install -r requirements.txt + +# Copy the rest of the codebase into the image +COPY deployment/tft ./ +COPY morphemic-datasetmaker ./morphemic-datasetmaker +COPY amq-message-python-library ./amq-message-python-library + +RUN cd morphemic-datasetmaker && python3 setup.py install && cd .. +RUN rm -r morphemic-datasetmaker +RUN mv amq-message-python-library amq_message_python_library + +CMD ["python3", "main.py"] + + diff --git a/deployment/tft/docker_run.sh b/deployment/tft/docker_run.sh new file mode 100755 index 0000000000000000000000000000000000000000..cff33ad45688c4e4f4e3b0827ec11590078c1871 --- /dev/null +++ b/deployment/tft/docker_run.sh @@ -0,0 +1 @@ +docker run -t --env-file=env --network=host stomp_app diff --git a/deployment/tft/env b/deployment/tft/env new file mode 100644 index 0000000000000000000000000000000000000000..aa4cd5c1df91db819278744698b449d4d4448274 --- /dev/null +++ b/deployment/tft/env @@ -0,0 +1,12 @@ +AMQ_HOSTNAME=localhost +AMQ_USER=admin +AMQ_PASSWORD=admin +AMQ_PORT=61613 +APP_NAME=demo +METHOD=tft +DATA_PATH=./ +INFLUXDB_HOSTNAME=localhost +INFLUXDB_PORT=8086 +INFLUXDB_USERNAME=morphemic +INFLUXDB_PASSWORD=password +INFLUXDB_DBNAME=morphemic diff --git a/deployment/tft/main.py b/deployment/tft/main.py new file mode 100644 index 0000000000000000000000000000000000000000..2cc1a5d8e0c6b78c668a450e671a179a647ee0de --- /dev/null +++ b/deployment/tft/main.py @@ -0,0 +1,95 @@ +import os +from multiprocessing import Pool +import stomp +import json +from amq_message_python_library import * # python amq-message-python-library +import logging + +AMQ_USER = os.environ.get("AMQ_USER", "admin") +AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin") +START_APP_TOPIC = "metrics_to_predict" +METHOD = os.environ.get("METHOD", "tft") +START_TOPIC = f"start_forecasting.{METHOD}" + + +def run_process(args): + print("running") + os.system(f"python {args[0]} '{args[1]}'") + + +def start_training(metrics_to_predict): + processes = (("retrain.py", metrics_to_predict), ("predict.py", metrics_to_predict)) + + pool = Pool(processes=2) + pool.map(run_process, processes) + + +class StartListener(stomp.ConnectionListener): + """Custom listener, parameters: + - conn (stomp connector) + - topic_name, name of topic to subscribe""" + + def __init__(self, conn, topic_name): + self.conn = conn + self.topic_name = topic_name + + def on_error(self, frame): + print('received an error "%s"' % frame.body) + + def on_message(self, frame): + print(self.topic_name) + logging.debug(f" Body: {frame.body}") + + message = json.loads(frame.body) + global publish_rate, all_metrics + publish_rate = message[0]["publish_rate"] + all_metrics = message + + +class StartForecastingListener(stomp.ConnectionListener): + """Custom listener, parameters: + - conn (stomp connector) + - topic_name, name of topic to subscribe""" + + def __init__(self, conn, topic_name): + self.conn = conn + self.topic_name = topic_name + + def on_error(self, frame): + print('received an error "%s"' % frame.body) + + def on_message(self, frame): + message = json.loads(frame.body) + message["publish_rate"] = publish_rate + message["all_metrics"] = all_metrics + message = json.dumps(message) + start_training(message) + self.conn.disconnect() + + + +def main(): + logging.getLogger().setLevel(logging.DEBUG) + + start_app_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD) + start_app_conn.connect() + start_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD) + start_conn.connect() + + start_conn.conn.subscribe(f"/topic/{START_APP_TOPIC}", "1", ack="auto") + start_app_conn.conn.subscribe(f"/topic/{START_TOPIC}", "2", ack="auto") + + start_conn.conn.set_listener("1", StartListener(start_conn.conn, START_APP_TOPIC)) + start_app_conn.conn.set_listener( + "2", StartForecastingListener(start_conn.conn, START_TOPIC) + ) + + while True: + pass + + +if __name__ == "__main__": + publish_rate = 0 + all_metrics = {} + + main() diff --git a/deployment/tft/model.yaml b/deployment/tft/model.yaml new file mode 100644 index 0000000000000000000000000000000000000000..b198081db24e20e18f042661ea339b7491a9b0f7 --- /dev/null +++ b/deployment/tft/model.yaml @@ -0,0 +1,24 @@ +data: + csv_path: demo.csv +training: + bs: 8 + max_epochs: 1 + loss: quantile +dataset: + tv_unknown_reals: [] + known_reals: [] + tv_unknown_cat: [] + static_reals: [] + context_length: 10 + prediction_length: 5 + classification: 0 +model: + learning_rate: 0.05 + hidden_size: 32 + attention_head_size: 1 + hidden_continuous_size: 16 + output_size: 7 +prediction: + bs: 8 +save_path: + models diff --git a/deployment/tft/predict.py b/deployment/tft/predict.py new file mode 100644 index 0000000000000000000000000000000000000000..f1d7075da7fbfa5fb4f184113dea0af0655eec39 --- /dev/null +++ b/deployment/tft/predict.py @@ -0,0 +1,145 @@ +import time +import os +import stomp +import threading +from src.model_predict import predict +from amq_message_python_library import * +from src.influxdb_predictions import InfluxdbPredictionsSender +from timeloop import Timeloop +from datetime import timedelta +import json +import sys +import pandas as pd +import logging +from src.dataset_maker import CSVData + + +METHOD = os.environ.get("METHOD", "tft") +START_TOPIC = f"start_forecasting.{METHOD}" +STOP_TOPIC = f"stop_forecasting.{METHOD}" +PRED_TOPIC_PREF = f"intermediate_prediction.{METHOD}" +PREDICTION_CYCLE = 1 # minutes +APP_NAME = os.environ.get("APP_NAME", "demo") +AMQ_USER = os.environ.get("AMQ_USER", "admin") +AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin") + + +class CustomListener(stomp.ConnectionListener): + """Custom listener, parameters: + - conn (stomp connector) + - topic_name, name of topic to subscribe, + - start , if start is set to be true recived metrics + are added to predicted metric, otherwise recived + metrics are removed from predicted metric (start + mode corresponds to start_prediction)""" + + def __init__(self, conn, topic_name, start=True): + self.conn = conn + self.topic_name = topic_name + self.start = start + + def on_error(self, frame): + print('received an error "%s"' % frame.body) + + def on_message(self, frame): + global predicted_metrics, lock, number_of_forward_predictions, metrics_info + lock.acquire() + try: + metrics = set(json.loads(frame.body)["metrics"]) + + if self.start: + print("STARTTTTTT", self.start) + predicted_metrics = predicted_metrics.union(metrics) + for metric in metrics: + frame_body = json.loads(frame.body) + logging.debug(frame_body) + number_of_forward_predictions[metric] = frame_body[ + "number_of_forward_predictions" + ] + self.prediction_cycle = frame_body["prediction_horizon"] + else: + predicted_metrics = predicted_metrics.difference(metrics) + + finally: + lock.release() + + +def main(): + logging.debug(f"metrics to predict {predicted_metrics}") + start_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD) + start_conn.connect() + + stop_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD) + stop_conn.connect() + + start_conn.conn.subscribe(f"/topic/{START_TOPIC}", "1", ack="auto") + stop_conn.conn.subscribe(f"/topic/{STOP_TOPIC}", "2", ack="auto") + + start_conn.set_listener( + "1", CustomListener(start_conn.conn, START_TOPIC, start=True) + ) + stop_conn.set_listener("2", CustomListener(stop_conn.conn, STOP_TOPIC, start=False)) + + dataset_preprocessor = CSVData(APP_NAME) + dataset_preprocessor.prepare_csv() + logging.debug("dataset downloaded") + + influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME) + + logging.debug( + f"waiting {msg['epoch_start'] - int(time.time()) - prediction_cycle} seconds" + ) + print() + # time.sleep(max(0 + # msg["epoch_start"] - int(time.time()) - prediction_cycle) + # ) # time units??? + + tl = Timeloop() + + @tl.job(interval=timedelta(seconds=prediction_cycle)) + def metric_predict(): + logging.debug("prediction") + dataset_preprocessor.prepare_csv() + for metric in predicted_metrics: + predictions = None + time_0 = int(time.time()) + for i in range(number_of_forward_predictions[metric]): + prediction_msgs, predictions = predict( + metric, + prediction_cycle, + extra_data=predictions, + m=i + 1, + prediction_hor=prediction_horizon, + timestamp=time_0, + ) + if prediction_msgs: + dest = f"{PRED_TOPIC_PREF}.{metric}" + start_conn.send_to_topic(dest, prediction_msgs[metric]) + influxdb_conn.send_to_influxdb(metric, prediction_msgs) + + tl.start(block=True) + + while True: + pass + + +if __name__ == "__main__": + msg = json.loads(sys.argv[1]) + metrics_info = { + m["metric"]: { + "level": m["level"], + "publish_rate": m["publish_rate"], + } + for m in msg["all_metrics"] + } + prediction_horizon = msg["prediction_horizon"] + predicted_metrics = set(msg["metrics"]) + prediction_cycle = msg["prediction_horizon"] + + logging.debug(f"Predicted metrics: {predicted_metrics}") + number_of_forward_predictions = { + metric: msg["number_of_forward_predictions"] for metric in predicted_metrics + } # deafult number of forward predictions + lock = threading.Lock() + + main() diff --git a/deployment/tft/requirements.txt b/deployment/tft/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..b0e5b674a12fc53058b17638b5e1e0f774470ad6 --- /dev/null +++ b/deployment/tft/requirements.txt @@ -0,0 +1,10 @@ +stomp.py +pandas==1.1.3 +pytorch-lightning==1.2.7 +pytorch-forecasting==0.8.4 +timeloop==1.0.2 +filelock==3.0.12 +influxdb +python-slugify + + diff --git a/deployment/tft/retrain.py b/deployment/tft/retrain.py new file mode 100644 index 0000000000000000000000000000000000000000..ca1be6c86efe72a99f411b9c4eb2fda7c8de5ee6 --- /dev/null +++ b/deployment/tft/retrain.py @@ -0,0 +1,51 @@ +import stomp +import os +import sys +import json +import logging +from src.model_train import train +from amq_message_python_library import * +from timeloop import Timeloop +from datetime import timedelta +from src.dataset_maker import CSVData + +TOPIC_NAME = "training_models" +RETRAIN_CYCLE = 2 # minutes +HOSTS = (os.environ.get("AMQ_HOSTNAME", "localhost"), os.environ.get("AMQ_PORT", 61613)) +AMQ_USER = os.environ.get("AMQ_USER", "admin") +AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin") +APP_NAME = os.environ.get("APP_NAME", "demo") + + +def main(predicted_metrics, prediction_horizon): + start_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD) + start_conn.connect() + tl = Timeloop() + + dataset_preprocessor = CSVData(APP_NAME) + dataset_preprocessor.prepare_csv() + + @tl.job(interval=timedelta(seconds=60 * RETRAIN_CYCLE)) + def retrain_model(): + logging.debug("TRAINING") + for metric in predicted_metrics: + retrain_msg = train(metric, prediction_horizon) + start_conn.send_to_topic(TOPIC_NAME, retrain_msg) + + tl.start(block=True) + + +if __name__ == "__main__": + logging.debug("Training") + msg = json.loads(sys.argv[1]) + metrics_info = { + m["metric"]: { + "level": m["level"], + "publish_rate": m["publish_rate"], + } + for m in msg["all_metrics"] + } + prediction_horizon = msg["prediction_horizon"] // msg["publish_rate"] + predicted_metrics = set(metrics_info.keys()) + logging.debug(f"Predicted metrics: {predicted_metrics}") + main(predicted_metrics, prediction_horizon) diff --git a/deployment/tft/run.sh b/deployment/tft/run.sh new file mode 100644 index 0000000000000000000000000000000000000000..917126cf8580d0f453190b98c6f496f262d97d5e --- /dev/null +++ b/deployment/tft/run.sh @@ -0,0 +1,5 @@ +#!/bin/sh +python3 morphemic-datasetmaker/setup.py install +# rm -r morphemic-datasetmaker +# python3 main.py +# mv amq-message-python-library amq_message_python_library diff --git a/deployment/tft/src/dataset_maker.py b/deployment/tft/src/dataset_maker.py new file mode 100644 index 0000000000000000000000000000000000000000..7ffb3e5e7f6361aa2ad5462900b4dad63f95d346 --- /dev/null +++ b/deployment/tft/src/dataset_maker.py @@ -0,0 +1,34 @@ +from morphemic.dataset import DatasetMaker +import os +from filelock import FileLock + +"""Script for preparing csv data downloaded form InfluxDB database, data""" + + +class CSVData(object): + def __init__(self, name, start_collection=None): + self.name = name + self.config = { + "hostname": os.environ.get("INFLUXDB_HOSTNAME", "localhost"), + "port": int(os.environ.get("INFLUXDB_PORT", "8086")), + "username": os.environ.get("INFLUXDB_USERNAME", "morphemic"), + "password": os.environ.get("INFLUXDB_PASSWORD", "password"), + "dbname": os.environ.get("INFLUXDB_DBNAME", "morphemic"), + "path_dataset": os.environ.get("DATA_PATH", "./"), + } + self.start_collection = start_collection + + def prepare_csv(self): + lockfile = os.path.join(self.config["path_dataset"], f"{self.name}.csv") + lock = FileLock(lockfile + ".lock") + + if os.path.isfile(lockfile): + with lock: + datasetmaker = DatasetMaker( + self.name, self.start_collection, self.config + ) + response = datasetmaker.make() + + else: + datasetmaker = DatasetMaker(self.name, self.start_collection, self.config) + response = datasetmaker.make() diff --git a/deployment/tft/src/influxdb_predictions.py b/deployment/tft/src/influxdb_predictions.py new file mode 100644 index 0000000000000000000000000000000000000000..1c1e0499117f801dece5f7a677951d99f2337038 --- /dev/null +++ b/deployment/tft/src/influxdb_predictions.py @@ -0,0 +1,42 @@ +from influxdb import * +import os +import datetime + + +"""Connects with Influxdb and sends predictet values""" + + +class InfluxdbPredictionsSender(object): + def __init__(self, method, app_name): + self.client = InfluxDBClient( + host=os.environ.get("INFLUXDB_HOSTNAME", "localhost"), + port=int(os.environ.get("INFLUXDB_PORT", "8086")), + username=os.environ.get("INFLUXDB_USERNAME", "morphemic"), + password=os.environ.get("INFLUXDB_PASSWORD", "password"), + ) + self.client.switch_database(os.environ.get("INFLUXDB_DBNAME", "morphemic")) + self.method = method + self.app_name = app_name + + def send_to_influxdb(self, metric, msg): + msg = { + "measurement": f"{self.app_name}.{metric}.prediction", + "tags": { + "method": f"{self.method}", + }, + "time": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"), + "fields": { + "value": msg[metric]["metricValues"], + "prediction_horizon": msg[metric]["horizon"], + }, + } + try: + self.client.write_points( + [ + msg, + ], + ) + print("predictions sent to influxdb") + + except Exception as e: + print("Could not send predictions to influxdb") diff --git a/deployment/tft/src/model_predict.py b/deployment/tft/src/model_predict.py new file mode 100644 index 0000000000000000000000000000000000000000..5007b1a6ccb0732349950fc4ff20392cdfe7cb49 --- /dev/null +++ b/deployment/tft/src/model_predict.py @@ -0,0 +1,103 @@ +import yaml +import pandas as pd +import numpy as np +import time +import os +import torch +from filelock import FileLock +from src.preprocess_dataset import Dataset +from pytorch_forecasting import TemporalFusionTransformer +import logging + +pd.options.mode.chained_assignment = None + +"""Script for temporal fusion transformer prediction""" + + +def predict( + target_column, + prediction_length, + yaml_file="model.yaml", + extra_data=None, + m=1, + prediction_hor=60, + timestamp=0, +): + with open(yaml_file) as file: + params = yaml.load(file, Loader=yaml.FullLoader) + params["dataset"]["prediction_length"] = prediction_length + + data_path = os.path.join( + os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' + ) + + dataset = pd.read_csv(data_path) + + if extra_data is not None: + dataset = pd.concat([dataset, extra_data]) + + ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + + dataset = dataset[ + lambda x: x.series == x.series.max() + ] # multiple or single series training? + ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + + pred_len = params["dataset"]["prediction_length"] + + future_df = ts_dataset.dataset.iloc[[-1 for _ in range(pred_len)]] + + future_time_idx = list(range(ts_dataset.n, ts_dataset.n + pred_len)) + future_df["time_idx"] = future_time_idx + future_df["split"] = "future" + + ts_dataset.dataset = pd.concat([ts_dataset.dataset, future_df]).reset_index( + drop=True + ) + + prediction_input = ts_dataset.inherited_dataset("val", "future") + + tft = TemporalFusionTransformer.from_dataset( + ts_dataset.ts_dataset, + dropout=0.1, + log_interval=-1, + reduce_on_plateau_patience=5, + **params["model"], + ) + + lockfile = params["save_path"] + lock = FileLock(lockfile + ".lock") + + model_path = os.path.join(params["save_path"], f"{target_column}.pth") + + if not os.path.isfile(model_path): # no pretrained model, unable to predict + logging.debug(f"No pretrained model unable to predict") + return (None, None) + + if os.path.isfile(lockfile): + with lock: + tft.load_state_dict(torch.load(model_path)) + + prediction_input = prediction_input.to_dataloader(train=False) + prediction = tft.predict(prediction_input, mode="raw")["prediction"] + + msg = { + target_column: { + "metricValues": prediction[-1][-1][2].item(), + "level": "TODO", + "timestamp": timestamp, + "probability": 0.95, + "confidence_interval": abs( + (prediction[-1][-1][-1] - prediction[-1][-1][0]).item() + ), # quantiles difference + "horizon": prediction_hor * m, + "refersTo": "TODO", + "cloud": "TODO", + "provider": "TODO", + } + } + logging.debug(f"prediction msg: {msg}") + + future_df["split"] = "val" + future_df[target_column] = prediction.permute(0, 2, 1)[0][2] + return (msg, future_df) diff --git a/deployment/tft/src/model_train.py b/deployment/tft/src/model_train.py new file mode 100644 index 0000000000000000000000000000000000000000..02c2cc771cc90506c21d979a15efd38e48285b11 --- /dev/null +++ b/deployment/tft/src/model_train.py @@ -0,0 +1,98 @@ +import torch +import yaml +import pandas as pd +import time +from filelock import FileLock +import pytorch_lightning as pl +import os +from pytorch_lightning.callbacks import EarlyStopping, LearningRateMonitor +from pytorch_forecasting.metrics import QuantileLoss, MAE, RMSE, CrossEntropy +from pytorch_forecasting import TemporalFusionTransformer +from src.preprocess_dataset import Dataset + + +"""Script for temporal fusion transformer training""" + + +LOSSES_DICT = { + "quantile": QuantileLoss(), + "mae": MAE(), + "rmse": RMSE(), + "crossentropy": CrossEntropy(), +} + + +def train(target_column, prediction_length, yaml_file="model.yaml"): + torch.manual_seed(12345) + + with open(yaml_file) as file: + params = yaml.load(file, Loader=yaml.FullLoader) + params["dataset"]["prediction_length"] = prediction_length + + data_path = os.path.join( + os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' + ) + + dataset = pd.read_csv(data_path) + + ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + + training = ts_dataset.ts_dataset + validation = ts_dataset.inherited_dataset( + "train", "val" + ) # only train and val splits will be used + + bs = params["training"]["bs"] + + train_dataloader = training.to_dataloader( + train=True, batch_size=bs, num_workers=6, shuffle=True + ) + val_dataloader = validation.to_dataloader(train=False, batch_size=bs, num_workers=6) + + early_stop_callback = EarlyStopping( + monitor="val_loss", min_delta=1e-5, patience=8, verbose=False, mode="min" + ) + lr_logger = LearningRateMonitor() + trainer = pl.Trainer( + max_epochs=params["training"]["max_epochs"], + gpus=0, + gradient_clip_val=0.5, + callbacks=[lr_logger, early_stop_callback], + checkpoint_callback=False, # TODO: is pl checkpoint_callback thread safe? + ) + + tft = TemporalFusionTransformer.from_dataset( + training, + dropout=0.1, + loss=LOSSES_DICT[params["training"]["loss"]], + log_interval=-1, + reduce_on_plateau_patience=5, + **params["model"], + ) + + model_path = os.path.join(params["save_path"], f"{target_column}.pth") + + lockfile = model_path + lock = FileLock(lockfile + ".lock") + + if os.path.isfile(lockfile): + print("downloading weigths") + with lock: + tft.load_state_dict(torch.load(model_path)) + + trainer.fit( + tft, + train_dataloader=train_dataloader, + val_dataloaders=val_dataloader, + ) + + with lock: + torch.save(tft.state_dict(), model_path) + + msg = { + "metrics": target_column, + "forecasting_method": os.environ.get("METHOD", "tft"), + "timestamp": int(time.time()), + } + + return msg diff --git a/deployment/tft/src/preprocess_dataset.py b/deployment/tft/src/preprocess_dataset.py new file mode 100644 index 0000000000000000000000000000000000000000..e1de9bbc0ac0e2630ea1c5cd5190f4fcbccb6f21 --- /dev/null +++ b/deployment/tft/src/preprocess_dataset.py @@ -0,0 +1,106 @@ +import pandas as pd +from pytorch_forecasting import TimeSeriesDataSet +from pytorch_forecasting.data import NaNLabelEncoder + +pd.options.mode.chained_assignment = None + +"""Script for preparing time series dataset from pythorch-forecasting package +TODO: add checking whether data consists of multiple series, handle nans values""" + + +class Dataset(object): + def __init__( + self, + dataset, + target_column="value", + tv_unknown_reals="[]", + known_reals="[]", + tv_unknown_cat="[]", + static_reals="[]", + classification=0, + context_length=40, + prediction_length=5, + ): + + self.target_column = target_column + self.tv_unknown_cat = tv_unknown_cat + self.known_reals = known_reals + self.tv_unknown_reals = tv_unknown_reals + self.static_reals = static_reals + self.classification = classification + self.context_length = context_length + self.prediction_length = prediction_length + self.add_obligatory_columns(dataset) + self.dataset = self.convert_formats(dataset) + self.n = dataset.shape[0] + self.ts_dataset = self.create_time_series_dataset() + + def convert_formats(self, dataset): + if not self.classification: + dataset[self.target_column] = dataset[self.target_column].astype(float) + else: + dataset[self.target_column] = dataset[self.target_column].astype(int) + + for name in self.tv_unknown_cat: + dataset[name] = dataset[name].astype(str) + + dataset["series"] = dataset["series"].astype(str) + return dataset + + def add_obligatory_columns(self, dataset): + dataset["series"] = 0 + dataset["split"] = "train" + n = dataset.shape[0] + dataset["split"][int(n * 0.8) :] = "val" + dataset["time_idx"] = range(n) # TODO check time gaps + return dataset + + def create_time_series_dataset(self): + if not self.classification: + self.time_varying_unknown_reals = [ + self.target_column + ] + self.tv_unknown_reals + self.time_varying_unknown_categoricals = self.tv_unknown_cat + else: + self.time_varying_unknown_reals = self.tv_unknown_reals + self.time_varying_unknown_categoricals = [ + self.target_column + ] + self.tv_unknown_cat + + ts_dataset = TimeSeriesDataSet( + self.dataset[lambda x: x.split == "train"], + time_idx="time_idx", + target=self.target_column, + group_ids=["series"], + min_encoder_length=self.context_length, # keep encoder length long (as it is in the validation set) + max_encoder_length=self.context_length, + min_prediction_length=self.prediction_length, + max_prediction_length=self.prediction_length, + static_categoricals=[], + static_reals=self.static_reals, + time_varying_known_categoricals=[], + categorical_encoders={"series": NaNLabelEncoder().fit(self.dataset.series)}, + variable_groups={}, # group of categorical variables can be treated as one variable + time_varying_known_reals=["time_idx"] + self.known_reals, + time_varying_unknown_categoricals=self.tv_unknown_cat, + time_varying_unknown_reals=[self.target_column] + self.tv_unknown_reals, + add_relative_time_idx=True, + add_target_scales=True if not self.classification else False, + add_encoder_length=True, + allow_missings=True, + ) + return ts_dataset + + def inherited_dataset(self, split1, split2): + df1 = ( + self.dataset[lambda x: x.split == split1] + .groupby("series", as_index=False) + .apply(lambda x: x.iloc[-self.context_length :]) + ) # previous split fragment + df2 = self.dataset[lambda x: x.split == split2] # split part + inh_dataset = pd.concat([df1, df2]) + inh_dataset = inh_dataset.sort_values(by=["series", "time_idx"]) + inh_dataset = TimeSeriesDataSet.from_dataset( + self.ts_dataset, inh_dataset, min_prediction_idx=0, stop_randomization=True + ) + return inh_dataset