Commit 49b0ecd7 authored by Anna Warno's avatar Anna Warno
Browse files

arima added

parent 92b62634
FROM python:3.8-slim-buster
# Install Python dependencies.
WORKDIR /wd
# COPY deployment/nbeats/requirements.txt .
COPY deployment/tft/requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt && mkdir models
ADD https://gitlab.ow2.org/melodic/morphemic-preprocessor/-/archive/morphemic-rc1.5/morphemic-preprocessor-morphemic-rc1.5.tar.gz /var/lib/morphemic/
# Copy the rest of the codebase into the image
COPY deployment/tft ./
RUN cd /var/lib/morphemic/ \
&& tar -zxf morphemic-preprocessor-morphemic-rc1.5.tar.gz \
&& cd morphemic-preprocessor-morphemic-rc1.5 \
&& cd morphemic-datasetmaker && python3 setup.py install \
&& cd ../.. \
&& cp -R /var/lib/morphemic/morphemic-preprocessor-morphemic-rc1.5/amq-message-python-library /wd/amq_message_python_library \
&& rm -rf /var/lib/morphemic
CMD ["python3", "main.py"]
docker run -t --env-file=env --network=host stomp_app
AMQ_HOSTNAME=localhost
AMQ_USER=morphemic
AMQ_PASSWORD=morphemic
AMQ_PORT=61613
APP_NAME=demo
METHOD=arima
DATA_PATH=./
INFLUXDB_HOSTNAME=localhost
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
INFLUXDB_DBNAME=morphemic
import os
from multiprocessing import Pool
import stomp
import json
from amq_message_python_library import * # python amq-message-python-library
import logging
import time
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
START_APP_TOPIC = "metrics_to_predict"
METHOD = os.environ.get("METHOD", "tft")
START_TOPIC = f"start_forecasting.{METHOD}"
def run_process(args):
print("running")
os.system(f"python {args[0]} '{args[1]}'")
def start_training(metrics_to_predict):
processes = (("retrain.py", metrics_to_predict), ("predict.py", metrics_to_predict))
pool = Pool(processes=2)
pool.map(run_process, processes)
class StartListener(stomp.ConnectionListener):
"""Custom listener, parameters:
- conn (stomp connector)
- topic_name, name of topic to subscribe"""
def __init__(self, conn, topic_name):
self.conn = conn
self.topic_name = topic_name
def on_error(self, frame):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
print(self.topic_name)
logging.debug(f" Body: {frame.body}")
message = json.loads(frame.body)
global publish_rate, all_metrics
publish_rate = message[0]["publish_rate"]
all_metrics = message
class StartForecastingListener(stomp.ConnectionListener):
"""Custom listener, parameters:
- conn (stomp connector)
- topic_name, name of topic to subscribe"""
def __init__(self, conn, topic_name):
self.conn = conn
self.topic_name = topic_name
def on_error(self, frame):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
message = json.loads(frame.body)
message["publish_rate"] = publish_rate
message["all_metrics"] = all_metrics
message = json.dumps(message)
start_training(message)
self.conn.disconnect()
class Msg(object):
def __init__(self):
self.body = None
def main():
logging.getLogger().setLevel(logging.DEBUG)
start_app_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER, debug=True
)
start_app_conn.connect()
start_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER, debug=True
)
start_conn.connect()
start_conn.conn.subscribe(f"/topic/{START_APP_TOPIC}", "1", ack="auto")
start_app_conn.conn.subscribe(f"/topic/{START_TOPIC}", "2", ack="auto")
start_conn.conn.set_listener("1", StartListener(start_conn.conn, START_APP_TOPIC))
start_app_conn.conn.set_listener(
"2", StartForecastingListener(start_conn.conn, START_TOPIC)
)
# print("START")
# msg1 = Msg()
# msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 10000}, {"metric": "avgResponseTime", "level": 3, "publish_rate": 10000}]'
# msg2 = Msg()
# msg2.body = (
# "{"
# + f'"metrics": ["memory", "avgResponseTime"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 50'
# + "}"
# )
# print(msg1)
# StartListener(start_conn.conn, START_APP_TOPIC).on_message(msg1)
# StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2)
while True:
pass
if __name__ == "__main__":
publish_rate = 0
all_metrics = {}
main()
data:
csv_path: demo.csv
training:
bs: 8
max_epochs: 1
loss: quantile
dataset:
tv_unknown_reals: []
known_reals: []
tv_unknown_cat: []
static_reals: []
context_length: 10
prediction_length: 5
classification: 0
model:
learning_rate: 0.05
hidden_size: 32
attention_head_size: 1
hidden_continuous_size: 16
output_size: 7
prediction:
bs: 32
save_path:
models
import time
import os
import stomp
import threading
from src.model_predict import predict
from amq_message_python_library import *
from src.influxdb_predictions import InfluxdbPredictionsSender
import json
import sys
import pandas as pd
import logging
from src.dataset_maker import CSVData
METHOD = os.environ.get("METHOD", "tft")
START_TOPIC = f"start_forecasting.{METHOD}"
STOP_TOPIC = f"stop_forecasting.{METHOD}"
PRED_TOPIC_PREF = f"intermediate_prediction.{METHOD}"
PREDICTION_CYCLE = 1 # minutes
APP_NAME = os.environ.get("APP_NAME", "demo")
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
class CustomListener(stomp.ConnectionListener):
"""Custom listener, parameters:
- conn (stomp connector)
- topic_name, name of topic to subscribe,
- start , if start is set to be true recived metrics
are added to predicted metric, otherwise recived
metrics are removed from predicted metric (start
mode corresponds to start_prediction)"""
def __init__(self, conn, topic_name, start=True):
self.conn = conn
self.topic_name = topic_name
self.start = start
def on_error(self, frame):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
global predicted_metrics, lock, number_of_forward_predictions, metrics_info
lock.acquire()
try:
metrics = set(json.loads(frame.body)["metrics"])
if self.start:
predicted_metrics = predicted_metrics.union(metrics)
for metric in metrics:
frame_body = json.loads(frame.body)
logging.debug(frame_body)
number_of_forward_predictions[metric] = frame_body[
"number_of_forward_predictions"
]
self.prediction_cycle = frame_body["prediction_horizon"]
else:
predicted_metrics = predicted_metrics.difference(metrics)
finally:
lock.release()
def main():
logging.debug(f"metrics to predict {predicted_metrics}")
start_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER, debug=True
)
start_conn.connect()
stop_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER, debug=True
)
stop_conn.connect()
start_conn.conn.subscribe(f"/topic/{START_TOPIC}", "1", ack="auto")
stop_conn.conn.subscribe(f"/topic/{STOP_TOPIC}", "2", ack="auto")
start_conn.set_listener(
"1", CustomListener(start_conn.conn, START_TOPIC, start=True)
)
stop_conn.set_listener("2", CustomListener(stop_conn.conn, STOP_TOPIC, start=False))
dataset_preprocessor = CSVData(APP_NAME)
dataset_preprocessor.prepare_csv()
logging.debug("dataset downloaded")
influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME)
logging.debug(
f"waiting {(msg['epoch_start'] * 1000 - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds"
)
dataset_preprocessor.prepare_csv()
time.sleep(
max(
0,
(msg["epoch_start"] * 1000 - int(time.time()) * 1000),
)
// 1000
)
while True:
start_time = int(time.time())
logging.debug("prediction")
dataset_preprocessor.prepare_csv()
global time_0
for metric in predicted_metrics:
predictions = None
for i in range(number_of_forward_predictions[metric]):
prediction_msgs, prediction = predict(
metric,
(prediction_cycle * 1000) // msg["publish_rate"],
extra_data=predictions,
m=i + 1,
prediction_hor=prediction_horizon,
timestamp=time_0 + (i + 1) * (prediction_horizon // 1000),
)
if i == (number_of_forward_predictions[metric] - 1):
print(
f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}"
)
if predictions is not None:
predictions = pd.concat(
[predictions, prediction], ignore_index=True
)
else:
predictions = prediction
if prediction_msgs:
dest = f"{PRED_TOPIC_PREF}.{metric}"
print(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(prediction_msgs[metric]["timestamp"])} difference between timestamp and predicted in secnds'
)
print(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds'
)
start_conn.send_to_topic(dest, prediction_msgs[metric])
influxdb_conn.send_to_influxdb(metric, prediction_msgs)
end_time = int(time.time())
print(f"sleeping {prediction_cycle - (end_time - start_time)} seconds")
time_0 = time_0 + prediction_cycle
time.sleep(prediction_cycle - (end_time - start_time))
if __name__ == "__main__":
msg = json.loads(sys.argv[1])
metrics_info = {
m["metric"]: {
"level": m["level"],
"publish_rate": m["publish_rate"],
}
for m in msg["all_metrics"]
}
time_0 = msg["epoch_start"]
prediction_horizon = msg["prediction_horizon"] * 1000
predicted_metrics = set(msg["metrics"])
prediction_cycle = msg["prediction_horizon"]
logging.debug(f"Predicted metrics: {predicted_metrics}")
number_of_forward_predictions = {
metric: msg["number_of_forward_predictions"] for metric in predicted_metrics
} # deafult number of forward predictions
lock = threading.Lock()
main()
stomp.py
pandas==1.1.3
statsmodels==0.12.2
filelock==3.0.12
influxdb
python-slugify
import stomp
import os
import sys
import json
import logging
import time
from src.model_train import train
from amq_message_python_library import *
from datetime import timedelta
from src.dataset_maker import CSVData
TOPIC_NAME = "training_models"
RETRAIN_CYCLE = 10 # minutes
HOSTS = (os.environ.get("AMQ_HOSTNAME", "localhost"), os.environ.get("AMQ_PORT", 61613))
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
APP_NAME = os.environ.get("APP_NAME", "demo")
def main(predicted_metrics, prediction_horizon):
start_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER
)
start_conn.connect()
dataset_preprocessor = CSVData(APP_NAME)
dataset_preprocessor.prepare_csv()
while True:
start_time = int(time.time())
logging.debug("TRAINING")
end_time = int(time.time())
time.sleep(60 * RETRAIN_CYCLE - (end_time - start_time))
if __name__ == "__main__":
logging.debug("Training")
msg = json.loads(sys.argv[1])
metrics_info = {
m["metric"]: {
"level": m["level"],
"publish_rate": m["publish_rate"],
}
for m in msg["all_metrics"]
}
prediction_horizon = (msg["prediction_horizon"] * 1000) // msg["publish_rate"]
predicted_metrics = set(metrics_info.keys())
logging.debug(f"Predicted metrics: {predicted_metrics}")
main(predicted_metrics, prediction_horizon)
#!/bin/sh
python3 morphemic-datasetmaker/setup.py install
# rm -r morphemic-datasetmaker
# python3 main.py
# mv amq-message-python-library amq_message_python_library
from morphemic.dataset import DatasetMaker
import os
from filelock import FileLock
"""Script for preparing csv data downloaded form InfluxDB database, data"""
class CSVData(object):
def __init__(self, name, start_collection=None):
self.name = name
self.config = {
"hostname": os.environ.get("INFLUXDB_HOSTNAME", "localhost"),
"port": int(os.environ.get("INFLUXDB_PORT", "8086")),
"username": os.environ.get("INFLUXDB_USERNAME", "morphemic"),
"password": os.environ.get("INFLUXDB_PASSWORD", "password"),
"dbname": os.environ.get("INFLUXDB_DBNAME", "morphemic"),
"path_dataset": os.environ.get("DATA_PATH", "./"),
}
self.start_collection = start_collection
def prepare_csv(self):
lockfile = os.path.join(self.config["path_dataset"], f"{self.name}.csv")
lock = FileLock(lockfile + ".lock")
if os.path.isfile(lockfile):
with lock:
datasetmaker = DatasetMaker(
self.name, self.start_collection, self.config
)
response = datasetmaker.make()
else:
datasetmaker = DatasetMaker(self.name, self.start_collection, self.config)
response = datasetmaker.make()
from influxdb import *
import os
import datetime
"""Connects with Influxdb and sends predictet values"""
class InfluxdbPredictionsSender(object):
def __init__(self, method, app_name):
self.client = InfluxDBClient(
host=os.environ.get("INFLUXDB_HOSTNAME", "localhost"),
port=int(os.environ.get("INFLUXDB_PORT", "8086")),
username=os.environ.get("INFLUXDB_USERNAME", "morphemic"),
password=os.environ.get("INFLUXDB_PASSWORD", "password"),
)
self.client.switch_database(os.environ.get("INFLUXDB_DBNAME", "morphemic"))
self.method = method
self.app_name = app_name
def send_to_influxdb(self, metric, msg):
msg = {
"measurement": f"{self.app_name}.{metric}.prediction",
"tags": {
"method": f"{self.method}",
},
"time": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"),
"fields": {
"value": msg[metric]["metricValue"],
"prediction_horizon": msg[metric]["predictionTime"],
},
}
try:
self.client.write_points(
[
msg,
],
)
print("predictions sent to influxdb")
except Exception as e:
print("Could not send predictions to influxdb")
import yaml
import pandas as pd
import numpy as np
import time
import os
import statsmodels.api as sm
from filelock import FileLock
from src.preprocess_dataset import Dataset
import time
import logging
pd.options.mode.chained_assignment = None
"""Script for temporal fusion transformer prediction"""
class SARIMAXTrainer(BaseTrainer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def preprocess_data(self):
if self.exog_col is not None:
self.train_data_exog = self.train_data[self.exog_col].copy()
self.val_data_exog = self.val_data[self.exog_col].copy()
else:
self.train_data_exog = None
self.val_data_exog = None
self.train_data = self.train_data[self.col]
self.val_data = self.val_data[self.col]
def train(self):
self.model = sm.tsa.statespace.SARIMAX(
self.train_data, exog=self.train_data_exog, **self.model_kwargs
)
self.res = self.model.fit(**self.fit_kwargs)
def predict(self):
self.forecast = self.res.forecast(
steps=self.val_data.shape[0], exog=self.val_data_exog
)
def predict(
target_column,
prediction_length,
yaml_file="model.yaml",
extra_data=None,
m=1,
prediction_hor=60,
timestamp=0,
):
with open(yaml_file) as file:
params = yaml.load(file, Loader=yaml.FullLoader)
params["dataset"]["prediction_length"] = prediction_length
params["dataset"]["context_length"] = prediction_length * 10
data_path = os.path.join(
os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv'
)
dataset = pd.read_csv(data_path).head(500)
if extra_data is not None:
dataset = pd.concat([dataset, extra_data], ignore_index=True)
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
pred_len = params["dataset"]["prediction_length"]
model = sm.tsa.statespace.SARIMAX(ts_dataset)
model.fit()
predictions = model.forecast(steps=pred_len)
future_df = ts_dataset.dataset.iloc[[-1 for _ in range(pred_len)]]
future_time_idx = list(range(ts_dataset.n, ts_dataset.n + pred_len))
future_df["time_idx"] = future_time_idx
future_df["split"] = "future"
ts_dataset.dataset = pd.concat([ts_dataset.dataset, future_df]).reset_index(
drop=True
)
msg = {
target_column: {
"metricValue": predictions[-1],
"level": 1, # TODO
"timestamp": int(time.time()),
"probability": 0.95,
"confidence_interval": [
predictions[-1],
predictions[-1],
], # quantiles difference
"predictionTime": timestamp,