Commit 18b1275b authored by Anna Warno's avatar Anna Warno
Browse files

errors coreected, unit tests added

parent 994e528a
AMQ_HOSTNAME=localhost
AMQ_USER=morphemic
AMQ_PASSWORD=morphemic
AMQ_HOST=147.102.17.76
AMQ_PORT_BROKER=61610
APP_NAME=default_application
AMQ_USER=admin
AMQ_PASSWORD=admin
AMQ_PORT=61613
APP_NAME=demo
METHOD=arima
DATA_PATH=./
INFLUXDB_HOSTNAME=147.102.17.76
INFLUXDB_HOSTNAME=localhost
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
INFLUXDB_DBNAME=morphemic
......@@ -112,7 +112,7 @@ def main():
# msg2 = Msg()
# msg2.body = (
# "{"
# + f'"metrics": ["MinimumCores", "metric_0", "metric_1"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30'
# + f'"metrics": ["MinimumCores", "metric_0", "metric_1"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 60'
# + "}"
# )
......
......@@ -123,6 +123,7 @@ def main():
prediction_msgs = predict(
metric,
prediction_lengths[metric],
single_prediction_points_length=prediction_points_horizons[metric],
prediction_hor=prediction_horizon,
timestamp=time_0,
)
......@@ -154,6 +155,7 @@ def main():
if __name__ == "__main__":
logging.basicConfig(
filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out",
level=logging.INFO,
)
msg = json.loads(sys.argv[1])
......@@ -171,7 +173,6 @@ if __name__ == "__main__":
metric["metric"]: msg["prediction_horizon"] * 1000 // metric["publish_rate"]
for metric in msg["all_metrics"]
}
print(prediction_points_horizons)
predicted_metrics = set(msg["metrics"])
prediction_cycle = msg["prediction_horizon"]
prediction_lengths = {
......
......@@ -21,6 +21,7 @@ TZ = os.environ.get("TIME_ZONE", "Europe/Vienna")
def predict(
target_column,
prediction_length,
single_prediction_points_length=1,
yaml_file="model.yaml",
prediction_hor=60,
timestamp=0,
......@@ -42,10 +43,10 @@ def predict(
new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
if new_ts_dataset.dropped_recent_series: # series with recent data was too short
logging.info(
f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
f"METRIC: {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print(
f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
f"METRIC: {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
return None
......@@ -54,16 +55,16 @@ def predict(
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
if ts_dataset.dataset.shape[0] < 1:
logging.info(
f"Not enough fresh preprocessed data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
f"METRIC: {target_column} Not enough fresh preprocessed data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
print(
f"Not enough fresh preprocessed data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
f"METRIC: {target_column} Not enough fresh preprocessed data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
return None
ts_dataset = ts_dataset.dataset[
ts_dataset.dataset.series == ts_dataset.dataset.series.max()
]
].tail(1000)
pred_len = params["dataset"]["prediction_length"]
best_model_aic = None
......@@ -86,32 +87,33 @@ def predict(
best_model_aic = model.aic
except np.linalg.LinAlgError:
logging.info(
f"SARIMAX model order: {order} failed for metric {target_column} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
f"METRIC: {target_column} SARIMAX model order: {order} failed for metric {target_column} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
if not best_model_aic:
logging.info(
f"All SARIMAX failed for metric: {target_column}, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
f"METRIC: {target_column} All SARIMAX failed for metric: {target_column}, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
return None
msgs = []
for i in range(pred_len):
msg = {
target_column: {
"metricValue": predictions.predicted_mean.values[i],
"level": 1, # TODO
"timestamp": int(time.time()),
"probability": 0.95,
"confidence_interval": [
predictions.conf_int(alpha=0.05).iloc[i].values[0],
predictions.conf_int(alpha=0.05).iloc[i].values[1],
], # quantiles difference
"predictionTime": timestamp + (i + 1) * (prediction_hor // 1000),
"refersTo": "TODO",
"cloud": "TODO",
"provider": "TODO",
if ((i + 1) % single_prediction_points_length) == 0:
msg = {
target_column: {
"metricValue": predictions.predicted_mean.values[i],
"level": 1, # TODO
"timestamp": int(time.time()),
"probability": 0.95,
"confidence_interval": [
predictions.conf_int(alpha=0.05).iloc[i].values[0],
predictions.conf_int(alpha=0.05).iloc[i].values[1],
], # quantiles difference
"predictionTime": timestamp + (i + 1) * (prediction_hor // 1000),
"refersTo": "TODO",
"cloud": "TODO",
"provider": "TODO",
}
}
}
msgs.append(msg)
msgs.append(msg)
return msgs
import sys
sys.path.append(".")
import pytest
from src.model_predict import predict
import pandas as pd
import numpy as np
import random
@pytest.fixture
def df_1():
df = pd.DataFrame({"time": [], "metric_0": []})
return df
@pytest.fixture
def df_2():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["metric_0"] = np.nan
return df
@pytest.fixture
def df_3():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.nan
return df
@pytest.fixture
def df_4():
df = pd.DataFrame()
df["time"] = np.array(range(0, 3)) * 1e9
for i in range(5):
df[f"metric_{i}"] = 1
return df
@pytest.fixture
def df_5():
df = pd.DataFrame()
df["time"] = np.array(range(0, 3)) * 1e9
for i in range(5):
df[f"metric_{i}"] = 1
return df
@pytest.fixture
def df_6():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
return df
@pytest.fixture
def df_7():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
np.random.randint(0, df.shape[0] - 1, 990),
f"metric_{i}",
] = np.nan
df[f"metric_{i}"] = df[f"metric_{i}"].fillna("None")
return df
@pytest.fixture
def df_8():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
np.random.randint(0, df.shape[0] - 1, 990),
f"metric_{i}",
] = np.nan
df[f"metric_{i}"] = df[f"metric_{i}"].fillna(np.inf)
return df
@pytest.fixture
def df_9():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
np.random.randint(0, df.shape[0] - 1, 990),
f"metric_{i}",
] = np.nan
return df
@pytest.fixture
def df_10():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
list(range(20, 300)),
f"metric_{i}",
] = np.inf
return df
@pytest.fixture
def df_11():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
if i % 2 == 0:
df.loc[
list(range(20, 300)),
f"metric_{i}",
] = np.nan
return df
@pytest.fixture
def df_12():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = [
np.nan if i % 2 == 0 else random.random() for i in range(1000)
]
return df
@pytest.fixture
def df(request):
return request.getfixturevalue(request.param)
@pytest.fixture
def metric():
return "metric_0"
@pytest.fixture
def prediction_length():
return 60
@pytest.mark.parametrize(
"df,metric,prediction_length",
[
("df_1", metric, prediction_length),
("df_2", metric, prediction_length),
("df_3", metric, prediction_length),
("df_4", metric, prediction_length),
("df_5", metric, prediction_length),
("df_6", metric, prediction_length),
("df_7", metric, prediction_length),
("df_8", metric, prediction_length),
("df_9", metric, prediction_length),
("df_10", metric, prediction_length),
("df_11", metric, prediction_length),
("df_12", metric, prediction_length),
],
indirect=True,
)
def test_predict(df, metric, prediction_length):
df.to_csv("demo.csv")
output = predict(metric, prediction_length)
if output:
print("True")
assert True
import sys
sys.path.append(".")
import pytest
from src.preprocess_dataset import Dataset
import pandas as pd
import numpy as np
import random
@pytest.fixture
def df_1():
df = pd.DataFrame({"time": [], "metric_0": []})
return df
@pytest.fixture
def df_2():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["metric_0"] = np.nan
return df
@pytest.fixture
def df_3():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.nan
return df
@pytest.fixture
def df_4():
df = pd.DataFrame()
df["time"] = np.array(range(0, 3)) * 1e9
for i in range(5):
df[f"metric_{i}"] = 1
return df
@pytest.fixture
def df_5():
df = pd.DataFrame()
df["time"] = np.array(range(0, 3)) * 1e9
for i in range(5):
df[f"metric_{i}"] = 1
return df
@pytest.fixture
def df_6():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
return df
@pytest.fixture
def df_7():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
np.random.randint(0, df.shape[0] - 1, 990),
f"metric_{i}",
] = np.nan
df[f"metric_{i}"] = df[f"metric_{i}"].fillna("None")
return df
@pytest.fixture
def df_8():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
np.random.randint(0, df.shape[0] - 1, 990),
f"metric_{i}",
] = np.nan
df[f"metric_{i}"] = df[f"metric_{i}"].fillna(np.inf)
return df
@pytest.fixture
def df_9():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
np.random.randint(0, df.shape[0] - 1, 990),
f"metric_{i}",
] = np.nan
return df
@pytest.fixture
def df_10():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
list(range(20, 300)),
f"metric_{i}",
] = np.inf
return df
@pytest.fixture
def df_11():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
if i % 2 == 0:
df.loc[
list(range(20, 300)),
f"metric_{i}",
] = np.nan
return df
@pytest.fixture
def df_12():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = [
np.nan if i % 2 == 0 else random.random() for i in range(1000)
]
return df
@pytest.fixture
def df(request):
return request.getfixturevalue(request.param)
@pytest.fixture
def metric():
return "metric_0"
class TestDataset:
@pytest.mark.parametrize(
"df,metric",
[
("df_1", metric),
("df_2", metric),
("df_3", metric),
("df_4", metric),
("df_5", metric),
("df_6", metric),
("df_7", metric),
("df_8", metric),
("df_9", metric),
("df_10", metric),
("df_11", metric),
("df_12", metric),
],
indirect=True,
)
def test_init(self, df, metric):
preprocessed_dataset = Dataset(df, metric)
assert isinstance(preprocessed_dataset, Dataset)
AMQ_HOSTNAME=localhost
AMQ_USER=morphemic
AMQ_PASSWORD=morphemic
AMQ_HOST=147.102.17.76
AMQ_PORT_BROKER=61610
APP_NAME=default_application
METHOD=nbeats
AMQ_USER=admin
AMQ_PASSWORD=admin
AMQ_PORT=61613
APP_NAME=demo
METHOD=tft
DATA_PATH=./
INFLUXDB_HOSTNAME=147.102.17.76
INFLUXDB_HOSTNAME=localhost
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
INFLUXDB_DBNAME=morphemic
TIME_ZONE=Europe/Vienna
......@@ -114,11 +114,11 @@ def main():
)
# msg1 = Msg()
# msg1.body = '[{"metric": "memory", "level": 3, "publish_rate": 30000}]'
# msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "metric_0", "level": 3, "publish_rate": 30000}, {"metric": "metric_1", "level": 3, "publish_rate": 30000}]'
# msg2 = Msg()
# msg2.body = (
# "{"
# + f'"metrics": ["memory"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30'
# + f'"metrics": ["MinimumCores", "metric_0", "metric_1"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 60'
# + "}"
# )
......
......@@ -122,15 +122,16 @@ def main():
for metric in predicted_metrics:
predictions = None
for i in range(number_of_forward_predictions[metric]):
print(int((i + 1) * prediction_points_horizon), "point idx")
prediction_msgs, prediction = predict(
metric,
prediction_length,
prediction_lengths[metric],
extra_data=None,
m=i + 1,
prediction_hor=prediction_horizon,
timestamp=time_0 + (i + 1) * (prediction_horizon // 1000),
predicted_point_idx=int((i + 1) * prediction_points_horizon - 1),
predicted_point_idx=int(
(i + 1) * prediction_points_horizons[metric] - 1
),
)
if i == (number_of_forward_predictions[metric] - 1):
print(
......@@ -189,15 +190,19 @@ if __name__ == "__main__":
time_0 = msg["epoch_start"]
prediction_horizon = msg["prediction_horizon"] * 1000
prediction_points_horizon = msg["prediction_horizon"] * 1000 / msg["publish_rate"]
prediction_points_horizons = {
metric["metric"]: msg["prediction_horizon"] * 1000 // metric["publish_rate"]
for metric in msg["all_metrics"]
}
predicted_metrics = set(msg["metrics"])
prediction_cycle = msg["prediction_horizon"]
prediction_length = (
msg["prediction_horizon"]
prediction_lengths = {
metric["metric"]: msg["prediction_horizon"]
* 1000
// msg["publish_rate"]
// metric["publish_rate"]
* msg["number_of_forward_predictions"]
)
for metric in msg["all_metrics"]
}
logging.info(f"Predicted metrics: {predicted_metrics}")
number_of_forward_predictions = {
metric: msg["number_of_forward_predictions"] for metric in predicted_metrics
......
......@@ -21,7 +21,7 @@ APP_NAME = os.environ.get("APP_NAME", "demo")
TZ = os.environ.get("TIME_ZONE", "Europe/Vienna")
def main(predicted_metrics, prediction_horizon):
def main(predicted_metrics, prediction_horizons):