Commit 539ff450 authored by Fotis Paraskevopoulos's avatar Fotis Paraskevopoulos
Browse files

Merge remote-tracking branch 'origin/morphemic-rc1.5' into iccs-eshybrid

parents 4e6ba7a2 91728d56
,awarno,bulls-ThinkPad-T480,28.09.2021 09:11,file:///home/awarno/.config/libreoffice/4;
\ No newline at end of file
AMQ_HOSTNAME=localhost
AMQ_USER=morphemic
AMQ_PASSWORD=morphemic
AMQ_HOST=147.102.17.76
AMQ_PORT_BROKER=61610
APP_NAME=default_application
AMQ_USER=admin
AMQ_PASSWORD=admin
AMQ_PORT=61613
APP_NAME=demo
METHOD=arima
DATA_PATH=./
INFLUXDB_HOSTNAME=147.102.17.76
INFLUXDB_HOSTNAME=localhost
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
INFLUXDB_DBNAME=morphemic
......@@ -108,11 +108,11 @@ def main():
)
# msg1 = Msg()
# msg1.body = '[{"metric": "value", "level": 3, "publish_rate": 30000}]'
# msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "NotFinishedOnTimeContext", "level": 3, "publish_rate": 30000}]'
# msg2 = Msg()
# msg2.body = (
# "{"
# + f'"metrics": ["value"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30'
# + f'"metrics": ["MinimumCores", "EstimatedRemainingTimeContext", "NotFinishedOnTimeContext"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30'
# + "}"
# )
......
......@@ -122,7 +122,8 @@ def main():
for metric in predicted_metrics:
prediction_msgs = predict(
metric,
prediction_length,
prediction_lengths[metric],
single_prediction_points_length=prediction_points_horizons[metric],
prediction_hor=prediction_horizon,
timestamp=time_0,
)
......@@ -153,15 +154,10 @@ def main():
if __name__ == "__main__":
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'arima')}.out",
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out",
level=logging.INFO,
datefmt="%Y-%m-%d %H:%M:%S",
format="START %(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s",
)
logging.Formatter.converter = lambda *args: datetime.now(
tz=timezone(TZ)
).timetuple()
msg = json.loads(sys.argv[1])
metrics_info = {
m["metric"]: {
......@@ -173,15 +169,19 @@ if __name__ == "__main__":
time_0 = msg["epoch_start"]
prediction_horizon = msg["prediction_horizon"] * 1000
prediction_points_horizon = msg["prediction_horizon"] * 1000 / msg["publish_rate"]
prediction_points_horizons = {
metric["metric"]: msg["prediction_horizon"] * 1000 // metric["publish_rate"]
for metric in msg["all_metrics"]
}
predicted_metrics = set(msg["metrics"])
prediction_cycle = msg["prediction_horizon"]
prediction_length = (
msg["prediction_horizon"]
prediction_lengths = {
metric["metric"]: msg["prediction_horizon"]
* 1000
// msg["publish_rate"]
// metric["publish_rate"]
* msg["number_of_forward_predictions"]
)
for metric in msg["all_metrics"]
}
logging.info(f"Predicted metrics: {predicted_metrics}")
number_of_forward_predictions = {
metric: msg["number_of_forward_predictions"] for metric in predicted_metrics
......
def square(x):
return x * x
\ No newline at end of file
......@@ -8,15 +8,20 @@ from filelock import FileLock
from src.preprocess_dataset import Dataset
import time
import logging
import pytz
from datetime import datetime
pd.options.mode.chained_assignment = None
"""Script for temporal fusion transformer prediction"""
TZ = os.environ.get("TIME_ZONE", "Europe/Vienna")
def predict(
target_column,
prediction_length,
single_prediction_points_length=1,
yaml_file="model.yaml",
prediction_hor=60,
timestamp=0,
......@@ -38,51 +43,77 @@ def predict(
new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
if new_ts_dataset.dropped_recent_series: # series with recent data was too short
logging.info(
f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
f"METRIC: {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print(
f"METRIC: {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print("Not enough fresh data, unable to predict TIME:")
return None
dataset = pd.read_csv(data_path)
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
if ts_dataset.dataset.shape[0] < 1:
logging.info(
f"METRIC: {target_column} Not enough fresh preprocessed data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print(
f"METRIC: {target_column} Not enough fresh preprocessed data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
return None
ts_dataset = ts_dataset.dataset[
ts_dataset.dataset.series == ts_dataset.dataset.series.max()
]
].tail(1000)
pred_len = params["dataset"]["prediction_length"]
best_model_aic = None
for order in [(2, 1, 2), (3, 0, 1), (1, 0, 1), (3, 1, 0)]:
sarima = sm.tsa.statespace.SARIMAX(ts_dataset[target_column], order=order)
model = sarima.fit()
if best_model_aic:
if model.aic < best_model_aic:
for order in [(2, 1, 2), (3, 0, 1), (1, 0, 1), (3, 1, 0), (1, 0, 0), (4, 0, 2)]:
try:
sarima = sm.tsa.statespace.SARIMAX(
ts_dataset[target_column], order=order, enforce_stationarity=False
)
model = sarima.fit()
if best_model_aic:
if model.aic < best_model_aic:
predictions = model.get_forecast(
pred_len, return_conf_int=True, alpha=0.05
)
best_model_aic = model.aic
else:
predictions = model.get_forecast(
pred_len, return_conf_int=True, alpha=0.05
)
best_model_aic = model.aic
else:
predictions = model.get_forecast(pred_len, return_conf_int=True, alpha=0.05)
best_model_aic = model.aic
except np.linalg.LinAlgError:
logging.info(
f"METRIC: {target_column} SARIMAX model order: {order} failed for metric {target_column} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
if not best_model_aic:
logging.info(
f"METRIC: {target_column} All SARIMAX failed for metric: {target_column}, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
return None
msgs = []
for i in range(pred_len):
msg = {
target_column: {
"metricValue": predictions.predicted_mean.values[i],
"level": 1, # TODO
"timestamp": int(time.time()),
"probability": 0.95,
"confidence_interval": [
predictions.conf_int(alpha=0.05).iloc[i].values[0],
predictions.conf_int(alpha=0.05).iloc[i].values[1],
], # quantiles difference
"predictionTime": timestamp + (i + 1) * (prediction_hor // 1000),
"refersTo": "TODO",
"cloud": "TODO",
"provider": "TODO",
if ((i + 1) % single_prediction_points_length) == 0:
msg = {
target_column: {
"metricValue": predictions.predicted_mean.values[i],
"level": 1, # TODO
"timestamp": int(time.time()),
"probability": 0.95,
"confidence_interval": [
predictions.conf_int(alpha=0.05).iloc[i].values[0],
predictions.conf_int(alpha=0.05).iloc[i].values[1],
], # quantiles difference
"predictionTime": timestamp + (i + 1) * (prediction_hor // 1000),
"refersTo": "TODO",
"cloud": "TODO",
"provider": "TODO",
}
}
}
msgs.append(msg)
msgs.append(msg)
return msgs
......@@ -34,13 +34,19 @@ class Dataset(object):
self.context_length = context_length
self.prediction_length = prediction_length
self.dataset = dataset
self.check_gap()
self.dropped_recent_series = True # default set to be true
if self.dataset.shape[0] > 0:
self.check_gap()
self.n = dataset.shape[0]
def cut_nan_start(self, dataset):
dataset.index = range(dataset.shape[0])
first_not_nan_index = dataset[self.target_column].first_valid_index()
return dataset[dataset.index > first_not_nan_index]
if first_not_nan_index == first_not_nan_index: # check is if it;s not np.nan
if first_not_nan_index > -1:
return dataset[dataset.index > first_not_nan_index]
else:
return dataset.dropna()
def fill_na(self, dataset):
dataset = dataset.replace("None", np.nan)
......@@ -68,43 +74,66 @@ class Dataset(object):
return dataset
def check_gap(self):
max_gap = self.dataset["time"].diff().abs().max()
logging.info(f"Max time gap in series {max_gap}")
print(f"Max time gap in series {max_gap}")
series_freq = self.dataset["time"].diff().value_counts().index.values[0]
logging.info(f"Detected series with {series_freq} frequency")
print(f"Detected series with {series_freq} frequency")
# check series length
series = np.split(
self.dataset,
*np.where(
self.dataset["time"].diff().abs().fillna(0).astype(int)
>= np.abs(self.max_missing_values * series_freq)
),
)
logging.info(f"{len(series)} series found")
print(f"{len(series)} series found")
preprocessed_series = []
for i, s in enumerate(series):
s = self.fill_na(s)
s = self.cut_nan_start(s)
s = self.add_obligatory_columns(s)
s = self.convert_formats(s)
s["split"] = "train"
if s.shape[0] > self.prediction_length * 2 + self.context_length:
s["series"] = i
preprocessed_series.append(s)
if i == len(series) - 1:
self.dataset = self.dataset.groupby(by=["time"]).min()
self.dataset["time"] = self.dataset.index
self.dataset.index = range(self.dataset.shape[0])
self.dataset[self.target_column] = pd.to_numeric(
self.dataset[self.target_column], errors="coerce"
).fillna(np.nan)
self.dataset = self.dataset.dropna(subset=[self.target_column])
if self.dataset.shape[0] > 0:
max_gap = self.dataset["time"].diff().abs().max()
logging.info(
f"Metric: {self.target_column} Max time gap in series {max_gap}"
)
print(f" Metric: {self.target_column} Max time gap in series {max_gap}")
self.series_freq = (
self.dataset["time"] // 1e9
).diff().value_counts().index.values[0] * 1e9
logging.info(
f"Metric: {self.target_column} Detected series with {self.series_freq} frequency"
)
print(
f"Metric: {self.target_column} Detected series with {self.series_freq} frequency"
)
# check series length
series = np.split(
self.dataset,
*np.where(
self.dataset["time"].diff().abs().fillna(0).astype(int)
>= np.abs(self.max_missing_values * self.series_freq)
),
)
logging.info(f"Metric: {self.target_column} {len(series)} series found")
print(f"{len(series)} series found")
preprocessed_series = []
for i, s in enumerate(series):
s = self.fill_na(s)
s = self.cut_nan_start(s)
s = self.add_obligatory_columns(s)
s = self.convert_formats(s)
s["split"] = "train"
logging.info(
f"Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}"
f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}"
)
if s.shape[0] > self.prediction_length * 2 + self.context_length:
s["series"] = i
preprocessed_series.append(s)
if i == len(series) - 1:
logging.info(
f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}"
)
logging.info(f"{len(preprocessed_series)} long enough series found")
print(f"{len(preprocessed_series)} long enough series found")
# logging.info(f"")
self.dataset = pd.concat(preprocessed_series)
if self.dataset["series"].max() != len(series) - 1:
self.dropped_recent_series = True
else:
self.dropped_recent_series = False
self.dataset.index = range(self.dataset.shape[0])
logging.info(
f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found"
)
print(f"{len(preprocessed_series)} long enough series found")
if preprocessed_series:
self.dataset = pd.concat(preprocessed_series)
if self.dataset["series"].max() != len(series) - 1:
self.dropped_recent_series = True
else:
self.dropped_recent_series = False
else:
self.dataset = pd.DataFrame()
self.dataset.index = range(self.dataset.shape[0])
import sys
sys.path.append(".")
import pytest
from src.model_predict import predict
import pandas as pd
import numpy as np
import random
@pytest.fixture
def df_1():
df = pd.DataFrame({"time": [], "metric_0": []})
return df
@pytest.fixture
def df_2():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["metric_0"] = np.nan
return df
@pytest.fixture
def df_3():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.nan
return df
@pytest.fixture
def df_4():
df = pd.DataFrame()
df["time"] = np.array(range(0, 3)) * 1e9
for i in range(5):
df[f"metric_{i}"] = 1
return df
@pytest.fixture
def df_5():
df = pd.DataFrame()
df["time"] = np.array(range(0, 3)) * 1e9
for i in range(5):
df[f"metric_{i}"] = 1
return df
@pytest.fixture
def df_6():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
return df
@pytest.fixture
def df_7():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
np.random.randint(0, df.shape[0] - 1, 990),
f"metric_{i}",
] = np.nan
df[f"metric_{i}"] = df[f"metric_{i}"].fillna("None")
return df
@pytest.fixture
def df_8():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
np.random.randint(0, df.shape[0] - 1, 990),
f"metric_{i}",
] = np.nan
df[f"metric_{i}"] = df[f"metric_{i}"].fillna(np.inf)
return df
@pytest.fixture
def df_9():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
np.random.randint(0, df.shape[0] - 1, 990),
f"metric_{i}",
] = np.nan
return df
@pytest.fixture
def df_10():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
list(range(20, 300)),
f"metric_{i}",
] = np.inf
return df
@pytest.fixture
def df_11():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
if i % 2 == 0:
df.loc[
list(range(20, 300)),
f"metric_{i}",
] = np.nan
return df
@pytest.fixture
def df_12():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = [
np.nan if i % 2 == 0 else random.random() for i in range(1000)
]
return df
@pytest.fixture
def df(request):
return request.getfixturevalue(request.param)
@pytest.fixture
def metric():
return "metric_0"
@pytest.fixture
def prediction_length():
return 60
@pytest.mark.parametrize(
"df,metric,prediction_length",
[
("df_1", metric, prediction_length),
("df_2", metric, prediction_length),
("df_3", metric, prediction_length),
("df_4", metric, prediction_length),
("df_5", metric, prediction_length),
("df_6", metric, prediction_length),
("df_7", metric, prediction_length),
("df_8", metric, prediction_length),
("df_9", metric, prediction_length),
("df_10", metric, prediction_length),
("df_11", metric, prediction_length),
("df_12", metric, prediction_length),
],
indirect=True,
)
def test_predict(df, metric, prediction_length):
df.to_csv("demo.csv")
output = predict(metric, prediction_length)
if output:
print("True")
assert True
import sys
sys.path.append(".")
import pytest
from src.preprocess_dataset import Dataset
import pandas as pd
import numpy as np
import random
@pytest.fixture
def df_1():
df = pd.DataFrame({"time": [], "metric_0": []})
return df
@pytest.fixture
def df_2():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["metric_0"] = np.nan
return df
@pytest.fixture
def df_3():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.nan
return df
@pytest.fixture
def df_4():
df = pd.DataFrame()
df["time"] = np.array(range(0, 3)) * 1e9
for i in range(5):
df[f"metric_{i}"] = 1
return df
@pytest.fixture
def df_5():
df = pd.DataFrame()
df["time"] = np.array(range(0, 3)) * 1e9
for i in range(5):
df[f"metric_{i}"] = 1
return df
@pytest.fixture
def df_6():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9