Commit 35cfdc64 authored by Anna Warno's avatar Anna Warno
Browse files

arima model corrected

parent 0985e9ce
docker run -t --env-file=env --network=host stomp_app
......@@ -6,7 +6,8 @@ from amq_message_python_library import * # python amq-message-python-library
import logging
import time
from datetime import datetime
import pytz
from pytz import timezone
from datetime import datetime
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
......@@ -17,10 +18,6 @@ METHOD = os.environ.get("METHOD", "nbeats")
START_TOPIC = f"start_forecasting.{METHOD}"
TZ = os.environ.get("TIME_ZONE", "Europe/Vienna")
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO
)
def run_process(args):
print("running")
......@@ -47,8 +44,7 @@ class StartListener(stomp.ConnectionListener):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
print(self.topic_name)
logging.debug(f" Body: {frame.body}")
logging.info(f" Body: {frame.body}")
message = json.loads(frame.body)
global publish_rate, all_metrics
......@@ -83,12 +79,19 @@ class Msg(object):
def main():
logging.getLogger().setLevel(logging.DEBUG)
logging.info(
f"EXPERIMENT STARTED FOR APPLICATION: {os.environ.get('APP_NAME', 'demo')}"
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
)
logging.info(
f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
logging.Formatter.converter = lambda *args: datetime.now(
tz=timezone(TZ)
).timetuple()
log = logging.getLogger()
log.info(
f"EXPERIMENT STARTED FOR APPLICATION: {os.environ.get('APP_NAME', 'demo')}"
)
start_app_conn = morphemic.Connection(
......@@ -108,18 +111,18 @@ def main():
"2", StartForecastingListener(start_conn.conn, START_TOPIC)
)
# msg1 = Msg()
# msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 10000}]'
# msg2 = Msg()
# msg2.body = (
# "{"
# + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 120'
# + "}"
# )
# print(msg1)
# StartListener(start_conn.conn, START_APP_TOPIC).on_message(msg1)
# StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2)
msg1 = Msg()
msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 30000}]'
msg2 = Msg()
msg2.body = (
"{"
+ f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30'
+ "}"
)
print(msg1)
StartListener(start_conn.conn, START_APP_TOPIC).on_message(msg1)
StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2)
while True:
pass
......
......@@ -11,12 +11,9 @@ import pandas as pd
import logging
from datetime import datetime
from src.dataset_maker import CSVData
from pytz import timezone
import pytz
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO
)
from datetime import datetime
METHOD = os.environ.get("METHOD", "nbeats")
START_TOPIC = f"start_forecasting.{METHOD}"
......@@ -92,13 +89,14 @@ def main():
dataset_preprocessor = CSVData(APP_NAME)
dataset_preprocessor.prepare_csv()
logging.debug("dataset downloaded")
logging.info(f"Dataset downloaded")
logging.info(
f"Dataset downloaded TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
influxdb_conn = InfluxdbPredictionsSender(METHOD, APP_NAME)
logging.debug(
f"waiting {(msg['epoch_start'] * 1000 - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds"
logging.info(
f"waiting {(msg['epoch_start'] * 1000 - int(time.time()) * 1000 - prediction_cycle * 1000) // 1000} seconds TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
dataset_preprocessor.prepare_csv()
......@@ -113,57 +111,63 @@ def main():
while True:
start_time = int(time.time())
logging.debug("prediction")
logging.info(f"prediction loop started")
logging.info(
f"prediction TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
logging.info(
f"prediction loop started TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
dataset_preprocessor.prepare_csv()
global time_0
for metric in predicted_metrics:
predictions = None
for i in range(number_of_forward_predictions[metric]):
prediction_msgs, prediction = predict(
prediction_msgs = predict(
metric,
(prediction_cycle * 1000) // msg["publish_rate"],
extra_data=predictions,
m=i + 1,
prediction_length,
prediction_hor=prediction_horizon,
timestamp=time_0 + (i + 1) * (prediction_horizon // 1000),
timestamp=time_0,
)
if i == (number_of_forward_predictions[metric] - 1):
print(
f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}"
)
if predictions is not None:
predictions = pd.concat(
[predictions, prediction], ignore_index=True
)
else:
predictions = prediction
if prediction_msgs:
logging.info(f"Sending predictions for {metric} metric")
logging.info(
f"TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
dest = f"{PRED_TOPIC_PREF}.{metric}"
print(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(prediction_msgs[metric]["timestamp"])} difference between timestamp and predicted in secnds'
)
print(
f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds'
)
logging.info(
f"Message: {prediction_msgs[metric]}, destination: {dest}"
)
start_conn.send_to_topic(dest, prediction_msgs[metric])
influxdb_conn.send_to_influxdb(metric, prediction_msgs)
for message in prediction_msgs:
logging.info(
f"Sending predictions for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
start_conn.send_to_topic(dest, message[metric])
influxdb_conn.send_to_influxdb(metric, message)
end_time = int(time.time())
print(f"sleeping {prediction_cycle - (end_time - start_time)} seconds")
time_0 = time_0 + prediction_cycle
time.sleep(prediction_cycle - (end_time - start_time))
time_to_wait = prediction_cycle - (end_time - start_time)
if time_to_wait < 0:
time_to_wait = prediction_cycle - (time_to_wait % prediction_cycle)
logging.info(
f"Prediction time is too slow (predictions might be delayed) TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
time_0 = time_0 + prediction_cycle
time.sleep(time_to_wait)
if __name__ == "__main__":
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'arima')}.out",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
)
logging.Formatter.converter = lambda *args: datetime.now(
tz=timezone(TZ)
).timetuple()
msg = json.loads(sys.argv[1])
metrics_info = {
m["metric"]: {
......@@ -175,10 +179,16 @@ if __name__ == "__main__":
time_0 = msg["epoch_start"]
prediction_horizon = msg["prediction_horizon"] * 1000
prediction_points_horizon = msg["prediction_horizon"] * 1000 / msg["publish_rate"]
predicted_metrics = set(msg["metrics"])
prediction_cycle = msg["prediction_horizon"]
logging.debug(f"Predicted metrics: {predicted_metrics}")
prediction_length = (
msg["prediction_horizon"]
* 1000
// msg["publish_rate"]
* msg["number_of_forward_predictions"]
)
logging.info(f"Predicted metrics: {predicted_metrics}")
number_of_forward_predictions = {
metric: msg["number_of_forward_predictions"] for metric in predicted_metrics
} # deafult number of forward predictions
......
......@@ -6,6 +6,9 @@ import time
from src.model_train import train
from amq_message_python_library import *
from src.dataset_maker import CSVData
import pytz
import time
from datetime import datetime
TOPIC_NAME = "training_models"
RETRAIN_CYCLE = 10 # minutes
......@@ -14,13 +17,12 @@ AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
APP_NAME = os.environ.get("APP_NAME", "demo")
TZ = os.environ.get("TIME_ZONE", "Europe/Vienna")
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'tft')}.out", level=logging.INFO
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO
)
print(os.listdir("./"), "files")
def main(predicted_metrics, prediction_horizon):
start_conn = morphemic.Connection(
......@@ -36,15 +38,25 @@ def main(predicted_metrics, prediction_horizon):
for metric in predicted_metrics:
retrain_msg = train(metric, prediction_horizon)
if retrain_msg:
logging.info(f"Training completed for {metric} metric")
logging.info(
f"Training completed for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
start_conn.send_to_topic(TOPIC_NAME, retrain_msg)
else:
print("Not enough data for model training, waiting ...")
logging.info("Not enough data for model training, waiting ...")
start_conn.send_to_topic(TOPIC_NAME, retrain_msg)
logging.info(
f"Not enough data for model training, waiting ... TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
end_time = int(time.time())
time.sleep(60 * RETRAIN_CYCLE - (end_time - start_time))
time_to_wait = 60 * RETRAIN_CYCLE - (end_time - start_time)
if time_to_wait < 0:
time_to_wait = 60 * RETRAIN_CYCLE - (time_to_wait % 60 * RETRAIN_CYCLE)
logging.info(
f"Waiting for the next training: {time_to_wait} seconds TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
time.sleep(time_to_wait)
if __name__ == "__main__":
......@@ -57,7 +69,14 @@ if __name__ == "__main__":
}
for m in msg["all_metrics"]
}
prediction_horizon = (msg["prediction_horizon"] * 1000) // msg["publish_rate"]
prediction_horizon = (
msg["prediction_horizon"]
* 1000
// msg["publish_rate"]
* msg["number_of_forward_predictions"]
)
predicted_metrics = set(metrics_info.keys())
logging.debug(f"Predicted metrics: {predicted_metrics}")
logging.info(
f"Predicted metrics: {predicted_metrics} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
main(predicted_metrics, prediction_horizon)
#!/bin/sh
python3 morphemic-datasetmaker/setup.py install
# rm -r morphemic-datasetmaker
# python3 main.py
# mv amq-message-python-library amq_message_python_library
......@@ -18,60 +18,59 @@ def predict(
target_column,
prediction_length,
yaml_file="model.yaml",
extra_data=None,
m=1,
prediction_hor=60,
timestamp=0,
):
with open(yaml_file) as file:
params = yaml.load(file, Loader=yaml.FullLoader)
params["dataset"]["prediction_length"] = prediction_length
params["dataset"]["context_length"] = prediction_length * 10
params["dataset"]["context_length"] = prediction_length * 5
data_path = os.path.join(
os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv'
)
dataset = pd.read_csv(data_path).head(500)
if extra_data is not None:
dataset = pd.concat([dataset, extra_data], ignore_index=True)
dataset = pd.read_csv(data_path)
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
ts_dataset = ts_dataset.dataset[
ts_dataset.dataset.series == ts_dataset.dataset.series.max()
]
pred_len = params["dataset"]["prediction_length"]
best_model_aic = None
sarima = sm.tsa.statespace.SARIMAX(ts_dataset.dataset[target_column])
model = sarima.fit()
predictions = model.get_forecast(pred_len, return_conf_int=True, alpha=0.05)
future_df = ts_dataset.dataset.iloc[[-1 for _ in range(pred_len)]]
for order in [(2, 1, 2), (3, 0, 1), (1, 0, 1), (3, 1, 0)]:
sarima = sm.tsa.statespace.SARIMAX(ts_dataset[target_column], order=order)
model = sarima.fit()
if best_model_aic:
if model.aic < best_model_aic:
predictions = model.get_forecast(
pred_len, return_conf_int=True, alpha=0.05
)
best_model_aic = model.aic
else:
predictions = model.get_forecast(pred_len, return_conf_int=True, alpha=0.05)
best_model_aic = model.aic
future_time_idx = list(range(ts_dataset.n, ts_dataset.n + pred_len))
future_df["time_idx"] = future_time_idx
future_df["split"] = "future"
ts_dataset.dataset = pd.concat([ts_dataset.dataset, future_df]).reset_index(
drop=True
)
msgs = []
msg = {
target_column: {
"metricValue": predictions.predicted_mean.values[-1],
"level": 1, # TODO
"timestamp": int(time.time()),
"probability": 0.95,
"confidence_interval": [
predictions.conf_int(alpha=0.05).iloc[-1].values[0],
predictions.conf_int(alpha=0.05).iloc[-1].values[1],
], # quantiles difference
"predictionTime": timestamp,
"refersTo": "TODO",
"cloud": "TODO",
"provider": "TODO",
for i in range(pred_len):
msg = {
target_column: {
"metricValue": predictions.predicted_mean.values[i],
"level": 1, # TODO
"timestamp": int(time.time()),
"probability": 0.95,
"confidence_interval": [
predictions.conf_int(alpha=0.05).iloc[i].values[0],
predictions.conf_int(alpha=0.05).iloc[i].values[1],
], # quantiles difference
"predictionTime": timestamp + (i + 1) * (prediction_hor // 1000),
"refersTo": "TODO",
"cloud": "TODO",
"provider": "TODO",
}
}
}
logging.debug(f"prediction msg: {msg}")
msgs.append(msg)
future_df["split"] = "val"
future_df[target_column] = predictions.predicted_mean.values
return (msg, future_df)
return msgs
import yaml
import pandas as pd
from filelock import FileLock
from src.preprocess_dataset import Dataset
"""Script for temporal fusion transformer training"""
import time
import os
def train(target_column, prediction_length, yaml_file="model.yaml"):
msg = None
msg = {
"metrics": [target_column],
"forecasting_method": os.environ.get("METHOD", "tft"),
"timestamp": int(time.time()),
}
return msg
import pandas as pd
import numpy as np
import logging
pd.options.mode.chained_assignment = None
......@@ -12,15 +13,18 @@ class Dataset(object):
self,
dataset,
target_column="value",
tv_unknown_reals="[]",
known_reals="[]",
tv_unknown_cat="[]",
static_reals="[]",
tv_unknown_reals=[],
known_reals=[],
tv_unknown_cat=[],
static_reals=[],
classification=0,
context_length=40,
prediction_length=5,
):
self.max_missing_values = (
20 # max consecutive missing values allowed per series
)
self.target_column = target_column
self.tv_unknown_cat = tv_unknown_cat
self.known_reals = known_reals
......@@ -29,22 +33,19 @@ class Dataset(object):
self.classification = classification
self.context_length = context_length
self.prediction_length = prediction_length
self.dataset = self.cut_nan_start(dataset)
self.fill_na()
self.dataset = self.add_obligatory_columns(self.dataset)
self.dataset = self.convert_formats(self.dataset)
self.dataset = dataset
self.check_gap()
self.n = dataset.shape[0]
self.ts_dataset = self.create_time_series_dataset()
def cut_nan_start(self, dataset):
first_not_nan_index = dataset[self.target_column][
dataset[self.target_column] != "None"
].index[0]
dataset.index = range(dataset.shape[0])
first_not_nan_index = dataset[self.target_column].first_valid_index()
return dataset[dataset.index > first_not_nan_index]
def fill_na(self):
self.dataset = self.dataset.replace("None", np.nan)
self.dataset = self.dataset.ffill(axis="rows")
def fill_na(self, dataset):
dataset = dataset.replace("None", np.nan)
dataset = dataset.ffill(axis="rows")
return dataset
def convert_formats(self, dataset):
if not self.classification:
......@@ -63,14 +64,47 @@ class Dataset(object):
dataset["split"] = "train"
n = dataset.shape[0]
dataset["split"][int(n * 0.8) :] = "val"
dataset["time_idx"] = range(n) # TODO check time gaps
dataset["time_idx"] = range(n)
return dataset
def create_time_series_dataset(self):
return self.dataset
def check_gap(self):
max_gap = self.dataset["time"].diff().abs().max()
logging.info(f"Max time gap in series {max_gap}")
print(f"Max time gap in series {max_gap}")
series_freq = self.dataset["time"].diff().value_counts().index.values[0]
logging.info(f"Detected series with {series_freq} frequency")
print(f"Detected series with {series_freq} frequency")
# check series length
series = np.split(
self.dataset,
*np.where(
self.dataset["time"].diff().abs().fillna(0).astype(int)
>= np.abs(self.max_missing_values * series_freq)
),
)
logging.info(f"{len(series)} series found")
print(f"{len(series)} series found")
preprocessed_series = []
for i, s in enumerate(series):
s = self.fill_na(s)
s = self.cut_nan_start(s)
s = self.add_obligatory_columns(s)
s = self.convert_formats(s)
s["split"] = "train"
if s.shape[0] > self.prediction_length * 2 + self.context_length:
s["series"] = i
preprocessed_series.append(s)
if i == len(series) - 1:
logging.info(
f"Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}"
)
def inherited_dataset(self, split1, split2):
df1 = self.dataset[lambda x: x.split == split1] # previous split fragment
df2 = self.dataset[lambda x: x.split == split2] # split part
inh_dataset = pd.concat([df1, df2])
return inh_dataset
logging.info(f"{len(preprocessed_series)} long enough series found")
print(f"{len(preprocessed_series)} long enough series found")
# logging.info(f"")
self.dataset = pd.concat(preprocessed_series)
if self.dataset["series"].max() != len(series) - 1:
self.dropped_recent_series = True
else:
self.dropped_recent_series = False
self.dataset.index = range(self.dataset.shape[0])
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment