Commit 426b891a authored by Anna Warno's avatar Anna Warno
Browse files

arima and logging added

parent 49b0ecd7
......@@ -3,13 +3,13 @@ FROM python:3.8-slim-buster
# Install Python dependencies.
WORKDIR /wd
# COPY deployment/nbeats/requirements.txt .
COPY deployment/tft/requirements.txt .
COPY deployment/arima/requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt && mkdir models
ADD https://gitlab.ow2.org/melodic/morphemic-preprocessor/-/archive/morphemic-rc1.5/morphemic-preprocessor-morphemic-rc1.5.tar.gz /var/lib/morphemic/
# Copy the rest of the codebase into the image
COPY deployment/tft ./
COPY deployment/arima ./
RUN cd /var/lib/morphemic/ \
&& tar -zxf morphemic-preprocessor-morphemic-rc1.5.tar.gz \
......
AMQ_HOSTNAME=localhost
AMQ_USER=morphemic
AMQ_PASSWORD=morphemic
AMQ_PORT=61613
APP_NAME=demo
AMQ_HOST=147.102.17.76
AMQ_PORT_BROKER=61610
APP_NAME=default_application
METHOD=arima
DATA_PATH=./
INFLUXDB_HOSTNAME=localhost
INFLUXDB_HOSTNAME=147.102.17.76
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
INFLUXDB_DBNAME=morphemic
......@@ -5,15 +5,20 @@ import json
from amq_message_python_library import * # python amq-message-python-library
import logging
import time
from datetime import datetime
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
START_APP_TOPIC = "metrics_to_predict"
METHOD = os.environ.get("METHOD", "tft")
METHOD = os.environ.get("METHOD", "nbeats")
START_TOPIC = f"start_forecasting.{METHOD}"
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO
)
def run_process(args):
print("running")
......@@ -77,6 +82,10 @@ class Msg(object):
def main():
logging.getLogger().setLevel(logging.DEBUG)
logging.info(
f"EXPERIMENT STARTED FOR APPLICATION: {os.environ.get('APP_NAME', 'demo')}"
)
logging.info(f"TIME: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
start_app_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER, debug=True
......@@ -94,15 +103,12 @@ def main():
start_app_conn.conn.set_listener(
"2", StartForecastingListener(start_conn.conn, START_TOPIC)
)
# print("START")
# msg1 = Msg()
# msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 10000}, {"metric": "avgResponseTime", "level": 3, "publish_rate": 10000}]'
# msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 10000}]'
# msg2 = Msg()
# msg2.body = (
# "{"
# + f'"metrics": ["memory", "avgResponseTime"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 50'
# + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 120'
# + "}"
# )
......
......@@ -9,10 +9,15 @@ import json
import sys
import pandas as pd
import logging
import datetime
from src.dataset_maker import CSVData
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO
)
METHOD = os.environ.get("METHOD", "tft")
METHOD = os.environ.get("METHOD", "nbeats")
START_TOPIC = f"start_forecasting.{METHOD}"
STOP_TOPIC = f"stop_forecasting.{METHOD}"
PRED_TOPIC_PREF = f"intermediate_prediction.{METHOD}"
......@@ -86,6 +91,7 @@ def main():
dataset_preprocessor = CSVData(APP_NAME)
dataset_preprocessor.prepare_csv()
logging.debug("dataset downloaded")
logging.info(f"Dataset downloaded")
influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME)
......@@ -106,6 +112,7 @@ def main():
while True:
start_time = int(time.time())
logging.debug("prediction")
logging.info(f"prediction loop started")
dataset_preprocessor.prepare_csv()
global time_0
for metric in predicted_metrics:
......@@ -131,6 +138,10 @@ def main():
predictions = prediction
if prediction_msgs:
logging.info(f"Sending predictions for {metric} metric")
logging.info(
f"time: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}"
)
dest = f"{PRED_TOPIC_PREF}.{metric}"
print(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(prediction_msgs[metric]["timestamp"])} difference between timestamp and predicted in secnds'
......@@ -138,6 +149,9 @@ def main():
print(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds'
)
logging.info(
f"Message: {prediction_msgs[metric]}, destination: {dest}"
)
start_conn.send_to_topic(dest, prediction_msgs[metric])
influxdb_conn.send_to_influxdb(metric, prediction_msgs)
......
......@@ -2,6 +2,7 @@ stomp.py
pandas==1.1.3
statsmodels==0.12.2
filelock==3.0.12
pyyaml
influxdb
python-slugify
......
import stomp
import os
import sys
import json
......@@ -6,18 +5,22 @@ import logging
import time
from src.model_train import train
from amq_message_python_library import *
from datetime import timedelta
from src.dataset_maker import CSVData
TOPIC_NAME = "training_models"
RETRAIN_CYCLE = 10 # minutes
HOSTS = (os.environ.get("AMQ_HOSTNAME", "localhost"), os.environ.get("AMQ_PORT", 61613))
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
APP_NAME = os.environ.get("APP_NAME", "demo")
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO
)
print(os.listdir("./"), "files")
def main(predicted_metrics, prediction_horizon):
start_conn = morphemic.Connection(
......@@ -30,13 +33,22 @@ def main(predicted_metrics, prediction_horizon):
while True:
start_time = int(time.time())
logging.debug("TRAINING")
for metric in predicted_metrics:
retrain_msg = train(metric, prediction_horizon)
if retrain_msg:
logging.info(f"Training completed for {metric} metric")
start_conn.send_to_topic(TOPIC_NAME, retrain_msg)
else:
print("Not enough data for model training, waiting ...")
logging.info("Not enough data for model training, waiting ...")
start_conn.send_to_topic(TOPIC_NAME, retrain_msg)
end_time = int(time.time())
time.sleep(60 * RETRAIN_CYCLE - (end_time - start_time))
if __name__ == "__main__":
logging.debug("Training")
logging.info(f"Training loop started")
msg = json.loads(sys.argv[1])
metrics_info = {
m["metric"]: {
......
......@@ -14,33 +14,6 @@ pd.options.mode.chained_assignment = None
"""Script for temporal fusion transformer prediction"""
class SARIMAXTrainer(BaseTrainer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def preprocess_data(self):
if self.exog_col is not None:
self.train_data_exog = self.train_data[self.exog_col].copy()
self.val_data_exog = self.val_data[self.exog_col].copy()
else:
self.train_data_exog = None
self.val_data_exog = None
self.train_data = self.train_data[self.col]
self.val_data = self.val_data[self.col]
def train(self):
self.model = sm.tsa.statespace.SARIMAX(
self.train_data, exog=self.train_data_exog, **self.model_kwargs
)
self.res = self.model.fit(**self.fit_kwargs)
def predict(self):
self.forecast = self.res.forecast(
steps=self.val_data.shape[0], exog=self.val_data_exog
)
def predict(
target_column,
prediction_length,
......@@ -67,9 +40,9 @@ def predict(
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
pred_len = params["dataset"]["prediction_length"]
model = sm.tsa.statespace.SARIMAX(ts_dataset)
model.fit()
predictions = model.forecast(steps=pred_len)
sarima = sm.tsa.statespace.SARIMAX(ts_dataset.dataset[target_column])
model = sarima.fit()
predictions = model.get_forecast(pred_len, return_conf_int=True, alpha=0.05)
future_df = ts_dataset.dataset.iloc[[-1 for _ in range(pred_len)]]
......@@ -83,13 +56,13 @@ def predict(
msg = {
target_column: {
"metricValue": predictions[-1],
"metricValue": predictions.predicted_mean.values[-1],
"level": 1, # TODO
"timestamp": int(time.time()),
"probability": 0.95,
"confidence_interval": [
predictions[-1],
predictions[-1],
predictions.conf_int(alpha=0.05).iloc[-1].values[0],
predictions.conf_int(alpha=0.05).iloc[-1].values[1],
], # quantiles difference
"predictionTime": timestamp,
"refersTo": "TODO",
......@@ -100,5 +73,5 @@ def predict(
logging.debug(f"prediction msg: {msg}")
future_df["split"] = "val"
future_df[target_column] = predictions
future_df[target_column] = predictions.predicted_mean.values
return (msg, future_df)
import torch
import yaml
import pandas as pd
import time
from filelock import FileLock
import pytorch_lightning as pl
import os
from pytorch_lightning.callbacks import EarlyStopping, LearningRateMonitor
from pytorch_forecasting.metrics import QuantileLoss, MAE, RMSE, CrossEntropy
from pytorch_forecasting import TemporalFusionTransformer
from src.preprocess_dataset import Dataset
"""Script for temporal fusion transformer training"""
LOSSES_DICT = {
"quantile": QuantileLoss(),
"mae": MAE(),
"rmse": RMSE(),
"crossentropy": CrossEntropy(),
}
def train(target_column, prediction_length, yaml_file="model.yaml"):
torch.manual_seed(12345)
with open(yaml_file) as file:
params = yaml.load(file, Loader=yaml.FullLoader)
params["dataset"]["prediction_length"] = prediction_length
params["dataset"]["context_length"] = prediction_length * 10
data_path = os.path.join(
os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv'
)
dataset = pd.read_csv(data_path).head(4000)
if dataset.shape[0] < 12 * prediction_length:
return None
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
training = ts_dataset.ts_dataset
validation = ts_dataset.inherited_dataset(
"train", "val"
) # only train and val splits will be used
print(
ts_dataset.dataset,
prediction_length,
"PR LEN",
"ENC LEN",
prediction_length * 10,
)
bs = params["training"]["bs"]
train_dataloader = training.to_dataloader(
train=True, batch_size=bs, num_workers=6, shuffle=True
)
val_dataloader = validation.to_dataloader(train=False, batch_size=bs, num_workers=6)
early_stop_callback = EarlyStopping(
monitor="val_loss", min_delta=1e-5, patience=8, verbose=False, mode="min"
)
lr_logger = LearningRateMonitor()
trainer = pl.Trainer(
max_epochs=params["training"]["max_epochs"],
gpus=0,
gradient_clip_val=0.5,
callbacks=[lr_logger, early_stop_callback],
checkpoint_callback=False, # TODO: is pl checkpoint_callback thread safe?
)
tft = TemporalFusionTransformer.from_dataset(
training,
dropout=0.1,
loss=LOSSES_DICT[params["training"]["loss"]],
log_interval=-1,
reduce_on_plateau_patience=5,
**params["model"],
)
model_path = os.path.join(params["save_path"], f"{target_column}.pth")
lockfile = model_path
lock = FileLock(lockfile + ".lock")
if os.path.isfile(lockfile):
print("downloading weigths")
with lock:
tft.load_state_dict(torch.load(model_path))
trainer.fit(
tft,
train_dataloader=train_dataloader,
val_dataloaders=val_dataloader,
)
if os.path.isfile(lockfile):
with lock:
torch.save(tft.state_dict(), model_path)
else:
torch.save(tft.state_dict(), model_path)
msg = {
"metrics": [target_column],
"forecasting_method": os.environ.get("METHOD", "tft"),
"timestamp": int(time.time()),
}
msg = None
return msg
......@@ -17,7 +17,9 @@ RUN cd /var/lib/morphemic/ \
&& cd morphemic-datasetmaker && python3 setup.py install \
&& cd ../.. \
&& cp -R /var/lib/morphemic/morphemic-preprocessor-morphemic-rc1.5/amq-message-python-library /wd/amq_message_python_library \
&& rm -rf /var/lib/morphemic
&& rm -rf /var/lib/morphemic \
&& mkdir -p /wd/logs
CMD ["python3", "main.py"]
......
......@@ -5,6 +5,7 @@ import json
from amq_message_python_library import * # python amq-message-python-library
import logging
import time
from datetime import datetime
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
......@@ -14,6 +15,10 @@ START_APP_TOPIC = "metrics_to_predict"
METHOD = os.environ.get("METHOD", "nbeats")
START_TOPIC = f"start_forecasting.{METHOD}"
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO
)
def run_process(args):
print("running")
......@@ -77,6 +82,10 @@ class Msg(object):
def main():
logging.getLogger().setLevel(logging.DEBUG)
logging.info(
f"EXPERIMENT STARTED FOR APPLICATION: {os.environ.get('APP_NAME', 'demo')}"
)
logging.info(f"TIME: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
start_app_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER, debug=True
......@@ -94,7 +103,6 @@ def main():
start_app_conn.conn.set_listener(
"2", StartForecastingListener(start_conn.conn, START_TOPIC)
)
# msg1 = Msg()
# msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 10000}]'
# msg2 = Msg()
......
......@@ -9,10 +9,15 @@ import json
import sys
import pandas as pd
import logging
import datetime
from src.dataset_maker import CSVData
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO
)
METHOD = os.environ.get("METHOD", "tft")
METHOD = os.environ.get("METHOD", "nbeats")
START_TOPIC = f"start_forecasting.{METHOD}"
STOP_TOPIC = f"stop_forecasting.{METHOD}"
PRED_TOPIC_PREF = f"intermediate_prediction.{METHOD}"
......@@ -86,6 +91,7 @@ def main():
dataset_preprocessor = CSVData(APP_NAME)
dataset_preprocessor.prepare_csv()
logging.debug("dataset downloaded")
logging.info(f"Dataset downloaded")
influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME)
......@@ -106,6 +112,7 @@ def main():
while True:
start_time = int(time.time())
logging.debug("prediction")
logging.info(f"prediction loop started")
dataset_preprocessor.prepare_csv()
global time_0
for metric in predicted_metrics:
......@@ -131,6 +138,10 @@ def main():
predictions = prediction
if prediction_msgs:
logging.info(f"Sending predictions for {metric} metric")
logging.info(
f"time: {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}"
)
dest = f"{PRED_TOPIC_PREF}.{metric}"
print(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(prediction_msgs[metric]["timestamp"])} difference between timestamp and predicted in secnds'
......@@ -138,6 +149,9 @@ def main():
print(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds'
)
logging.info(
f"Message: {prediction_msgs[metric]}, destination: {dest}"
)
start_conn.send_to_topic(dest, prediction_msgs[metric])
influxdb_conn.send_to_influxdb(metric, prediction_msgs)
......
import stomp
import os
import sys
import json
......@@ -6,18 +5,22 @@ import logging
import time
from src.model_train import train
from amq_message_python_library import *
from datetime import timedelta
from src.dataset_maker import CSVData
TOPIC_NAME = "training_models"
RETRAIN_CYCLE = 10 # minutes
HOSTS = (os.environ.get("AMQ_HOSTNAME", "localhost"), os.environ.get("AMQ_PORT", 61613))
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
APP_NAME = os.environ.get("APP_NAME", "demo")
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO
)
print(os.listdir("./logs"), "files")
def main(predicted_metrics, prediction_horizon):
start_conn = morphemic.Connection(
......@@ -30,13 +33,14 @@ def main(predicted_metrics, prediction_horizon):
while True:
start_time = int(time.time())
logging.debug("TRAINING")
for metric in predicted_metrics:
retrain_msg = train(metric, prediction_horizon)
if retrain_msg:
logging.info(f"Training completed for {metric} metric")
start_conn.send_to_topic(TOPIC_NAME, retrain_msg)
else:
print("Not enough data for model training, waiting ...")
logging.info("Not enough data for model training, waiting ...")
start_conn.send_to_topic(TOPIC_NAME, retrain_msg)
end_time = int(time.time())
......@@ -44,7 +48,7 @@ def main(predicted_metrics, prediction_horizon):
if __name__ == "__main__":
logging.debug("Training")
logging.info(f"Training loop started")
msg = json.loads(sys.argv[1])
metrics_info = {
m["metric"]: {
......
......@@ -12,6 +12,9 @@ import scipy.stats as st
import logging
"""Script for nbeats fusion transformer prediction"""
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO
)
def predict(
......@@ -65,6 +68,7 @@ def predict(
model_path = os.path.join(params["save_path"], f"{target_column}.pth")
if not os.path.isfile(model_path): # no pretrained model, unable to predict
logging.info("no pretrained model, unable to predict")
print("no pretrained model, unable to predict")
return (None, None)
......
......@@ -2,14 +2,14 @@ FROM python:3.8-slim-buster
# Install Python dependencies.
WORKDIR /wd
# COPY deployment/nbeats/requirements.txt .
# COPY deployment/tft/requirements.txt .
COPY deployment/tft/requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt && mkdir models
ADD https://gitlab.ow2.org/melodic/morphemic-preprocessor/-/archive/morphemic-rc1.5/morphemic-preprocessor-morphemic-rc1.5.tar.gz /var/lib/morphemic/
# Copy the rest of the codebase into the image
COPY deployment/tft ./
COPY deployment/tft/ ./
RUN cd /var/lib/morphemic/ \
&& tar -zxf morphemic-preprocessor-morphemic-rc1.5.tar.gz \
......@@ -17,7 +17,9 @@ RUN cd /var/lib/morphemic/ \
&& cd morphemic-datasetmaker && python3 setup.py install \
&& cd ../.. \
&& cp -R /var/lib/morphemic/morphemic-preprocessor-morphemic-rc1.5/amq-message-python-library /wd/amq_message_python_library \
&& rm -rf /var/lib/morphemic
&& rm -rf /var/lib/morphemic \
&& mkdir -p /wd/logs
CMD ["python3", "main.py"]
......
......@@ -5,15 +5,20 @@ import json
from amq_message_python_library import * # python amq-message-python-library
import logging
import time
from datetime import datetime
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
START_APP_TOPIC = "metrics_to_predict"
METHOD = os.environ.get("METHOD", "tft")
METHOD = os.environ.get("METHOD", "nbeats")
START_TOPIC = f"start_forecasting.{METHOD}"
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO
)
def run_process(args):
print("running")
......@@ -77,6 +82,10 @@ class Msg(object):