Commit e798466f authored by Marta Różańska's avatar Marta Różańska
Browse files

Merge branch 'tft_nbeats' into 'morphemic-rc1.5'

ems time + delay error changed

See merge request !179
parents e1af78e1 067ca54e
Pipeline #16669 passed with stages
in 15 minutes and 9 seconds
......@@ -139,16 +139,18 @@ def main():
influxdb_conn.send_to_influxdb(metric, message)
end_time = int(time.time())
time_0 = time_0 + prediction_cycle
time_to_wait = prediction_cycle - (end_time - start_time)
time_to_wait = time_0 - end_time
if time_to_wait < 0:
time_to_wait = prediction_cycle - (time_to_wait % prediction_cycle)
time_to_skip = (end_time - time_0) // prediction_cycle
time_0 = time_0 + (time_to_skip + 1) * prediction_cycle
time_to_wait = time_0 - end_time
logging.info(
f"Prediction time is too slow (predictions might be delayed) TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
time_0 = time_0 + prediction_cycle
print(
f"Prediction time is too slow (predictions might be delayed) TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
time.sleep(time_to_wait)
......
import pandas as pd
import numpy as np
import logging
import time
pd.options.mode.chained_assignment = None
......@@ -13,6 +14,7 @@ class Dataset(object):
self,
dataset,
target_column="value",
time_column="ems_time",
tv_unknown_reals=[],
known_reals=[],
tv_unknown_cat=[],
......@@ -27,6 +29,7 @@ class Dataset(object):
20 # max consecutive missing values allowed per series
)
self.target_column = target_column
self.time_column = time_column
self.tv_unknown_cat = tv_unknown_cat
self.known_reals = known_reals
self.tv_unknown_reals = tv_unknown_reals
......@@ -68,16 +71,16 @@ class Dataset(object):
def convert_time_to_ms(self):
if self.dataset.shape[0] > 0:
digit_len = len(str(int(self.dataset["time"].values[0])))
digit_len = len(str(int(self.dataset[self.time_column].values[0])))
if digit_len >= 13:
self.dataset["time"] = self.dataset["time"].apply(
self.dataset[self.time_column] = self.dataset[self.time_column].apply(
lambda x: int(str(x)[:13])
)
else:
self.dataset["time"] = self.dataset["time"].apply(
self.dataset[self.time_column] = self.dataset[self.time_column].apply(
lambda x: int(int(str(x)[:digit_len]) * 10 ** (13 - digit_len))
)
self.dataset["time"] = self.dataset["time"].apply(
self.dataset[self.time_column] = self.dataset[self.time_column].apply(
lambda x: int(x // 1e4 * 1e4)
)
......@@ -86,11 +89,22 @@ class Dataset(object):
dataset["time_idx"] = range(n) # TODO check time gaps
return dataset
def get_time_difference_current(self):
if self.dataset.shape[0] > 0:
last_timestamp_database = self.dataset[self.time_column].values[-1]
current_time = int(time.time())
print(
f"Time difference between last timestamp and current time: {current_time - last_timestamp_database}"
)
logging.info(
f"Time difference between last timestamp and current time: {current_time - last_timestamp_database}"
)
def check_gap(self):
print(self.dataset)
if self.dataset.shape[0] > 0:
self.dataset = self.dataset.groupby(by=["time"]).min()
self.dataset["time"] = self.dataset.index
self.dataset = self.dataset.groupby(by=[self.time_column]).min()
self.dataset[self.time_column] = self.dataset.index
self.dataset.index = range(self.dataset.shape[0])
self.convert_time_to_ms()
print(self.dataset)
......@@ -100,14 +114,14 @@ class Dataset(object):
self.dataset = self.dataset.replace(np.inf, np.nan)
self.dataset = self.dataset.dropna(subset=[self.target_column])
if self.dataset.shape[0] > 0:
max_gap = self.dataset["time"].diff().abs().max()
max_gap = self.dataset[self.time_column].diff().abs().max()
logging.info(
f"Metric: {self.target_column} Max time gap in series {max_gap}"
)
print(f" Metric: {self.target_column} Max time gap in series {max_gap}")
print(self.dataset["time"].diff().fillna(0).value_counts())
print(self.dataset[self.time_column].diff().fillna(0).value_counts())
series_freq = (
(self.dataset["time"])
(self.dataset[self.time_column])
.diff()
.fillna(0)
.value_counts()
......@@ -132,7 +146,11 @@ class Dataset(object):
series = np.split(
self.dataset,
*np.where(
self.dataset["time"].diff().abs().fillna(0).astype(int)
self.dataset[self.time_column]
.diff()
.abs()
.fillna(0)
.astype(int)
>= np.abs(self.max_missing_values * self.publish_rate)
),
)
......@@ -175,3 +193,5 @@ class Dataset(object):
else:
self.dataset = pd.DataFrame()
self.dropped_recent_series = True
self.get_time_difference_current()
......@@ -11,14 +11,14 @@ import random
@pytest.fixture
def df_1():
df = pd.DataFrame({"time": [], "metric_0": []})
df = pd.DataFrame({"ems_time": [], "metric_0": []})
return df
@pytest.fixture
def df_2():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["ems_time"] = np.array(range(0, 1000)) * 1e9
df["metric_0"] = np.nan
return df
......@@ -26,7 +26,7 @@ def df_2():
@pytest.fixture
def df_3():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["ems_time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.nan
return df
......@@ -35,7 +35,7 @@ def df_3():
@pytest.fixture
def df_4():
df = pd.DataFrame()
df["time"] = np.array(range(0, 3)) * 1e9
df["ems_time"] = np.array(range(0, 3)) * 1e9
for i in range(5):
df[f"metric_{i}"] = 1
return df
......@@ -44,7 +44,7 @@ def df_4():
@pytest.fixture
def df_5():
df = pd.DataFrame()
df["time"] = np.array(range(0, 3)) * 1e9
df["ems_time"] = np.array(range(0, 3)) * 1e9
for i in range(5):
df[f"metric_{i}"] = 1
return df
......@@ -53,7 +53,7 @@ def df_5():
@pytest.fixture
def df_6():
df = pd.DataFrame()
df["time"] = np.array(range(1, 1001)) * 1e9
df["ems_time"] = np.array(range(1, 1001)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
return df
......@@ -62,7 +62,7 @@ def df_6():
@pytest.fixture
def df_7():
df = pd.DataFrame()
df["time"] = np.array(range(1, 1001)) * 1e9
df["ems_time"] = np.array(range(1, 1001)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
......@@ -76,7 +76,7 @@ def df_7():
@pytest.fixture
def df_8():
df = pd.DataFrame()
df["time"] = np.array(range(1, 1001)) * 1e9
df["ems_time"] = np.array(range(1, 1001)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
......@@ -90,7 +90,7 @@ def df_8():
@pytest.fixture
def df_9():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["ems_time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
......@@ -103,7 +103,7 @@ def df_9():
@pytest.fixture
def df_10():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["ems_time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
......@@ -116,7 +116,7 @@ def df_10():
@pytest.fixture
def df_11():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["ems_time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
if i % 2 == 0:
......@@ -130,7 +130,7 @@ def df_11():
@pytest.fixture
def df_12():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["ems_time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = [
np.nan if i % 2 == 0 else random.random() for i in range(1000)
......
......@@ -11,14 +11,14 @@ import random
@pytest.fixture
def df_1():
df = pd.DataFrame({"time": [], "metric_0": []})
df = pd.DataFrame({"ems_time": [], "metric_0": []})
return df
@pytest.fixture
def df_2():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["ems_time"] = np.array(range(0, 1000)) * 1e9
df["metric_0"] = np.nan
return df
......@@ -26,7 +26,7 @@ def df_2():
@pytest.fixture
def df_3():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["ems_time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.nan
return df
......@@ -35,7 +35,7 @@ def df_3():
@pytest.fixture
def df_4():
df = pd.DataFrame()
df["time"] = np.array(range(0, 3)) * 1e9
df["ems_time"] = np.array(range(0, 3)) * 1e9
for i in range(5):
df[f"metric_{i}"] = 1
return df
......@@ -44,7 +44,7 @@ def df_4():
@pytest.fixture
def df_5():
df = pd.DataFrame()
df["time"] = np.array(range(0, 3)) * 1e9
df["ems_time"] = np.array(range(0, 3)) * 1e9
for i in range(5):
df[f"metric_{i}"] = 1
return df
......@@ -53,7 +53,7 @@ def df_5():
@pytest.fixture
def df_6():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["ems_time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
return df
......@@ -62,7 +62,7 @@ def df_6():
@pytest.fixture
def df_7():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["ems_time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
......@@ -76,7 +76,7 @@ def df_7():
@pytest.fixture
def df_8():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["ems_time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
......@@ -90,7 +90,7 @@ def df_8():
@pytest.fixture
def df_9():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["ems_time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
......@@ -103,7 +103,7 @@ def df_9():
@pytest.fixture
def df_10():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["ems_time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
......@@ -116,7 +116,7 @@ def df_10():
@pytest.fixture
def df_11():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["ems_time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
if i % 2 == 0:
......@@ -130,7 +130,7 @@ def df_11():
@pytest.fixture
def df_12():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["ems_time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"metric_{i}"] = [
np.nan if i % 2 == 0 else random.random() for i in range(1000)
......@@ -141,7 +141,7 @@ def df_12():
@pytest.fixture
def df_13():
df = pd.DataFrame()
df["time"] = 1
df["ems_time"] = 1
for i in range(5):
df[f"metric_{i}"] = [random.random() for i in range(1000)]
return df
......@@ -150,7 +150,7 @@ def df_13():
@pytest.fixture
def df_14():
df = pd.DataFrame()
df["time"] = 10
df["ems_time"] = 10
for i in range(5):
df[f"metric_{i}"] = [np.nan for i in range(1000)]
return df
......@@ -159,7 +159,7 @@ def df_14():
@pytest.fixture
def df_15():
df = pd.DataFrame()
df["time"] = [i * 30 * 1e5 for i in range(500)] + [
df["ems_time"] = [i * 30 * 1e5 for i in range(500)] + [
i * 30 * 1e5 + 10000 for i in range(500)
]
for i in range(5):
......@@ -170,7 +170,7 @@ def df_15():
@pytest.fixture
def df_16():
df = pd.DataFrame()
df["time"] = [i for i in range(500)] + [i + 10000 for i in range(500)]
df["ems_time"] = [i for i in range(500)] + [i + 10000 for i in range(500)]
for i in range(5):
df[f"metric_{i}"] = [np.nan for i in range(500)] + [2 for i in range(500)]
return df
......
......@@ -114,11 +114,11 @@ def main():
)
# msg1 = Msg()
# msg1.body = '[{"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "WillFinishTooSoonContext", "level": 3, "publish_rate": 30000}, {"metric": "MinimumCores", "level": 3, "publish_rate": 30000}]'
# msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "NotFinishedOnTimeContext", "level": 3, "publish_rate": 30000}]'
# msg2 = Msg()
# msg2.body = (
# "{"
# + f'"metrics": ["EstimatedRemainingTimeContext", "WillFinishTooSoonContext", "MinimumCores"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30'
# + f'"metrics": ["MinimumCores", "EstimatedRemainingTimeContext", "NotFinishedOnTimeContext"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30'
# + "}"
# )
......
......@@ -23,3 +23,5 @@ save_path:
models
dataloader_path:
dataloader
context_length_ratio:
10
......@@ -14,6 +14,7 @@ from src.dataset_maker import CSVData
from pytz import timezone
import pytz
from datetime import datetime
import random
METHOD = os.environ.get("METHOD", "nbeats")
START_TOPIC = f"start_forecasting.{METHOD}"
......@@ -161,17 +162,18 @@ def main():
influxdb_conn.send_to_influxdb(metric, prediction_msgs)
end_time = int(time.time())
print(f"sleeping {prediction_cycle - (end_time - start_time)} seconds")
time_0 = time_0 + prediction_cycle
time_to_wait = prediction_cycle - (end_time - start_time)
time_to_wait = time_0 - end_time
if time_to_wait < 0:
time_to_wait = prediction_cycle - (time_to_wait % prediction_cycle)
time_to_skip = (end_time - time_0) // prediction_cycle
time_0 = time_0 + (time_to_skip + 1) * prediction_cycle
time_to_wait = time_0 - end_time
logging.info(
f"Prediction time is too slow (predictions might be delayed) TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
time_0 = time_0 + prediction_cycle
print(
f"Prediction time is too slow (predictions might be delayed) TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
time.sleep(time_to_wait)
......
......@@ -12,7 +12,7 @@ from pytz import timezone
from datetime import datetime
TOPIC_NAME = "training_models"
RETRAIN_CYCLE = 10 # minutes
RETRAIN_CYCLE = 2 # minutes
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
......
......@@ -37,7 +37,9 @@ def predict(
with open(yaml_file) as file:
params = yaml.load(file, Loader=yaml.FullLoader)
params["dataset"]["prediction_length"] = prediction_length
params["dataset"]["context_length"] = prediction_length * 12
params["dataset"]["context_length"] = (
prediction_length * params["context_length_ratio"]
)
model_path = os.path.join(params["save_path"], f"{target_column}.pth")
......@@ -56,7 +58,12 @@ def predict(
return (None, None)
dataset = pd.read_csv(data_path)
new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"], publish_rate=publish_rate)
new_ts_dataset = Dataset(
dataset,
target_column=target_column,
**params["dataset"],
publish_rate=publish_rate,
)
if new_ts_dataset.dropped_recent_series: # series with recent data was too short
logging.info(
f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
......
......@@ -35,7 +35,9 @@ def train(target_column, prediction_length, yaml_file="model.yaml", publish_rate
with open(yaml_file) as file:
params = yaml.load(file, Loader=yaml.FullLoader)
params["dataset"]["prediction_length"] = prediction_length
params["dataset"]["context_length"] = prediction_length * 12
params["dataset"]["context_length"] = (
prediction_length * params["context_length_ratio"]
)
data_path = os.path.join(
os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv'
......@@ -47,9 +49,9 @@ def train(target_column, prediction_length, yaml_file="model.yaml", publish_rate
dataset = pd.read_csv(data_path)
print(dataset, "dataset downloaded from persostent storage")
if dataset.shape[0] < 14 * prediction_length:
if dataset.shape[0] < (params["context_length_ratio"] + 2) * prediction_length:
logging.info(
f"METRIC: {target_column} dataset len: {dataset.shape[0]}, minimum points required: {14 * prediction_length} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
f"METRIC: {target_column} dataset len: {dataset.shape[0]}, minimum points required: {(params['context_length_ratio'] + 2) * prediction_length} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
return None
......@@ -63,7 +65,7 @@ def train(target_column, prediction_length, yaml_file="model.yaml", publish_rate
print(ts_dataset.dataset, "DATASEEEET")
if ts_dataset.dataset.shape[0] < 1:
logging.info(
f"METRIC: {target_column} Preprocessed dataset len: {ts_dataset.dataset.shape[0]}, minimum points required: {14 * prediction_length} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
f"METRIC: {target_column} Preprocessed dataset len: {ts_dataset.dataset.shape[0]}, minimum points required: {(params['context_length_ratio'] + 2) * prediction_length} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
return None
......@@ -112,6 +114,7 @@ def train(target_column, prediction_length, yaml_file="model.yaml", publish_rate
widths=[32, 512],
backcast_loss_ratio=1.0,
reduce_on_plateau_patience=5,
loss=MAE(),
)
model_path = os.path.join(params["save_path"], f"{target_column}.pth")
......
......@@ -3,6 +3,7 @@ from pytorch_forecasting import TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder
import numpy as np
import logging
import time
pd.options.mode.chained_assignment = None
......@@ -15,6 +16,7 @@ class Dataset(object):
self,
dataset,
target_column="value",
time_column="ems_time",
tv_unknown_reals=[],
known_reals=[],
tv_unknown_cat=[],
......@@ -29,6 +31,7 @@ class Dataset(object):
20 # max consecutive missing values allowed per series
)
self.target_column = target_column
self.time_column = time_column
self.tv_unknown_cat = tv_unknown_cat
self.known_reals = known_reals
self.tv_unknown_reals = tv_unknown_reals
......@@ -72,16 +75,16 @@ class Dataset(object):
def convert_time_to_ms(self):
if self.dataset.shape[0] > 0:
digit_len = len(str(int(self.dataset["time"].values[0])))
digit_len = len(str(int(self.dataset[self.time_column].values[0])))
if digit_len >= 13:
self.dataset["time"] = self.dataset["time"].apply(
lambda x: int(str(x)[:13])
self.dataset[self.time_column] = self.dataset[self.time_column].apply(
lambda x: int(str(int(x))[:13])
)
else:
self.dataset["time"] = self.dataset["time"].apply(
lambda x: int(int(str(x)[:digit_len]) * 10 ** (13 - digit_len))
self.dataset[self.time_column] = self.dataset[self.time_column].apply(
lambda x: int(int(str(int(x))[:digit_len]) * 10 ** (13 - digit_len))
)
self.dataset["time"] = self.dataset["time"].apply(
self.dataset[self.time_column] = self.dataset[self.time_column].apply(
lambda x: int(x // 1e4 * 1e4)
)
......@@ -90,10 +93,21 @@ class Dataset(object):
dataset["time_idx"] = range(n) # TODO check time gaps
return dataset
def get_time_difference_current(self):
if self.dataset.shape[0] > 0:
last_timestamp_database = self.dataset[self.time_column].values[-1]
current_time = int(time.time())
print(
f"Time difference between last timestamp and current time: {current_time - last_timestamp_database}"
)
logging.info(
f"Time difference between last timestamp and current time: {current_time - last_timestamp_database}"
)
def check_gap(self):
if self.dataset.shape[0] > 0:
self.dataset = self.dataset.groupby(by=["time"]).min()
self.dataset["time"] = self.dataset.index
self.dataset = self.dataset.groupby(by=[self.time_column]).min()
self.dataset[self.time_column] = self.dataset.index
self.dataset.index = range(self.dataset.shape[0])
self.convert_time_to_ms()
self.dataset[self.target_column] = pd.to_numeric(
......@@ -102,13 +116,13 @@ class Dataset(object):
self.dataset = self.dataset.replace(np.inf, np.nan)
self.dataset = self.dataset.dropna(subset=[self.target_column])
if self.dataset.shape[0] > 0:
max_gap = self.dataset["time"].diff().abs().max()
max_gap = self.dataset[self.time_column].diff().abs().max()
logging.info(
f"Metric: {self.target_column} Max time gap in series {max_gap}"
)
print(f" Metric: {self.target_column} Max time gap in series {max_gap}")
series_freq = (