Commit 13f0d022 authored by Fotis Paraskevopoulos's avatar Fotis Paraskevopoulos
Browse files

Merge remote-tracking branch 'origin/morphemic-rc1.5' into iccs-eshybrid

parents ee371d0d fdb56126
......@@ -36,7 +36,11 @@ def main(predicted_metrics, prediction_horizon):
while True:
start_time = int(time.time())
for metric in predicted_metrics:
retrain_msg = train(metric, prediction_horizon)
retrain_msg = train(
metric,
prediction_horizon,
publish_rate=metrics_info[metric]["publish_rate"],
)
if retrain_msg:
logging.info(
f"Training completed for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
......
......@@ -25,6 +25,7 @@ def predict(
yaml_file="model.yaml",
prediction_hor=60,
timestamp=0,
publish_rate=10000,
):
with open(yaml_file) as file:
params = yaml.load(file, Loader=yaml.FullLoader)
......@@ -40,7 +41,7 @@ def predict(
return None
dataset = pd.read_csv(data_path)
new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"], publish_rate=publish_rate)
if new_ts_dataset.dropped_recent_series: # series with recent data was too short
logging.info(
f"METRIC: {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
......@@ -52,7 +53,7 @@ def predict(
dataset = pd.read_csv(data_path)
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"], publish_rate=publish_rate)
if ts_dataset.dataset.shape[0] < 1:
logging.info(
f"METRIC: {target_column} Not enough fresh preprocessed data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
......
......@@ -20,6 +20,7 @@ class Dataset(object):
classification=0,
context_length=40,
prediction_length=5,
publish_rate=10000,
):
self.max_missing_values = (
......@@ -33,6 +34,7 @@ class Dataset(object):
self.classification = classification
self.context_length = context_length
self.prediction_length = prediction_length
self.publish_rate = publish_rate
self.dataset = dataset
self.dropped_recent_series = True # default set to be true
if self.dataset.shape[0] > 0:
......@@ -43,13 +45,14 @@ class Dataset(object):
dataset.index = range(dataset.shape[0])
first_not_nan_index = dataset[self.target_column].first_valid_index()
if first_not_nan_index == first_not_nan_index: # check is if it;s not np.nan
if first_not_nan_index > -1:
return dataset[dataset.index > first_not_nan_index]
if first_not_nan_index is not None:
if first_not_nan_index > -1:
return dataset[dataset.index > first_not_nan_index]
else:
return dataset.dropna()
def fill_na(self, dataset):
dataset = dataset.replace("None", np.nan)
dataset = dataset.replace(np.inf, np.nan)
dataset = dataset.ffill(axis="rows")
return dataset
......@@ -61,79 +64,114 @@ class Dataset(object):
for name in self.tv_unknown_cat:
dataset[name] = dataset[name].astype(str)
dataset["series"] = dataset["series"].astype(str)
return dataset
def convert_time_to_ms(self):
if self.dataset.shape[0] > 0:
digit_len = len(str(int(self.dataset["time"].values[0])))
if digit_len >= 13:
self.dataset["time"] = self.dataset["time"].apply(
lambda x: int(str(x)[:13])
)
else:
self.dataset["time"] = self.dataset["time"].apply(
lambda x: int(int(str(x)[:digit_len]) * 10 ** (13 - digit_len))
)
self.dataset["time"] = self.dataset["time"].apply(
lambda x: int(x // 1e4 * 1e4)
)
def add_obligatory_columns(self, dataset):
dataset["series"] = 0
dataset["split"] = "train"
n = dataset.shape[0]
dataset["split"][int(n * 0.8) :] = "val"
dataset["time_idx"] = range(n)
dataset["time_idx"] = range(n) # TODO check time gaps
return dataset
def check_gap(self):
self.dataset = self.dataset.groupby(by=["time"]).min()
self.dataset["time"] = self.dataset.index
self.dataset.index = range(self.dataset.shape[0])
self.dataset[self.target_column] = pd.to_numeric(
self.dataset[self.target_column], errors="coerce"
).fillna(np.nan)
self.dataset = self.dataset.dropna(subset=[self.target_column])
print(self.dataset)
if self.dataset.shape[0] > 0:
max_gap = self.dataset["time"].diff().abs().max()
logging.info(
f"Metric: {self.target_column} Max time gap in series {max_gap}"
)
print(f" Metric: {self.target_column} Max time gap in series {max_gap}")
self.series_freq = (
self.dataset["time"] // 1e9
).diff().value_counts().index.values[0] * 1e9
logging.info(
f"Metric: {self.target_column} Detected series with {self.series_freq} frequency"
)
print(
f"Metric: {self.target_column} Detected series with {self.series_freq} frequency"
)
# check series length
series = np.split(
self.dataset,
*np.where(
self.dataset["time"].diff().abs().fillna(0).astype(int)
>= np.abs(self.max_missing_values * self.series_freq)
),
)
logging.info(f"Metric: {self.target_column} {len(series)} series found")
print(f"{len(series)} series found")
preprocessed_series = []
for i, s in enumerate(series):
s = self.fill_na(s)
s = self.cut_nan_start(s)
s = self.add_obligatory_columns(s)
s = self.convert_formats(s)
s["split"] = "train"
self.dataset = self.dataset.groupby(by=["time"]).min()
self.dataset["time"] = self.dataset.index
self.dataset.index = range(self.dataset.shape[0])
self.convert_time_to_ms()
print(self.dataset)
self.dataset[self.target_column] = pd.to_numeric(
self.dataset[self.target_column], errors="coerce"
).fillna(np.nan)
self.dataset = self.dataset.replace(np.inf, np.nan)
self.dataset = self.dataset.dropna(subset=[self.target_column])
if self.dataset.shape[0] > 0:
max_gap = self.dataset["time"].diff().abs().max()
logging.info(
f"Metric: {self.target_column} Max time gap in series {max_gap}"
)
print(f" Metric: {self.target_column} Max time gap in series {max_gap}")
print(self.dataset["time"].diff().fillna(0).value_counts())
series_freq = (
(self.dataset["time"])
.diff()
.fillna(0)
.value_counts()
.index.values[0]
)
logging.info(
f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}"
f"Metric: {self.target_column} Detected series with {series_freq} frequency"
)
print(
f"Metric: {self.target_column} Detected series with {series_freq} frequency"
)
if s.shape[0] > self.prediction_length * 2 + self.context_length:
s["series"] = i
preprocessed_series.append(s)
if i == len(series) - 1:
if series_freq != self.publish_rate:
logging.info(
f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}"
f"Metric: {self.target_column} Detected series with {series_freq} frequency, but the frequency should be: {self.publish_rate}!"
)
print(
f"Metric: {self.target_column} Detected series with {series_freq} frequency, but the frequency should be: {self.publish_rate}!"
)
logging.info(
f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found"
)
print(f"{len(preprocessed_series)} long enough series found")
if preprocessed_series:
self.dataset = pd.concat(preprocessed_series)
if self.dataset["series"].max() != len(series) - 1:
self.dropped_recent_series = True
# check series length
series = np.split(
self.dataset,
*np.where(
self.dataset["time"].diff().abs().fillna(0).astype(int)
>= np.abs(self.max_missing_values * self.publish_rate)
),
)
logging.info(f"Metric: {self.target_column} {len(series)} series found")
print(f"{len(series)} series found")
preprocessed_series = []
for i, s in enumerate(series):
s = self.fill_na(s)
s = self.cut_nan_start(s)
s = self.add_obligatory_columns(s)
s["split"] = "train"
s = self.convert_formats(s)
print(s.shape)
logging.info(
f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}"
)
if s.shape[0] > self.prediction_length * 2 + self.context_length:
s["series"] = i
preprocessed_series.append(s)
if i == len(series) - 1:
logging.info(
f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}"
)
logging.info(
f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found"
)
print(f"{len(preprocessed_series)} long enough series found")
# logging.info(f"")
if preprocessed_series:
self.dataset = pd.concat(preprocessed_series)
if self.dataset["series"].max() != len(series) - 1:
self.dropped_recent_series = True
else:
self.dropped_recent_series = False
else:
self.dropped_recent_series = False
else:
self.dataset = pd.DataFrame()
self.dataset.index = range(self.dataset.shape[0])
self.dataset = pd.DataFrame()
self.dropped_recent_series = True
self.dataset.index = range(self.dataset.shape[0])
else:
self.dataset = pd.DataFrame()
self.dropped_recent_series = True
......@@ -53,7 +53,7 @@ def df_5():
@pytest.fixture
def df_6():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["time"] = np.array(range(1, 1001)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
return df
......@@ -62,7 +62,7 @@ def df_6():
@pytest.fixture
def df_7():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["time"] = np.array(range(1, 1001)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
......@@ -76,7 +76,7 @@ def df_7():
@pytest.fixture
def df_8():
df = pd.DataFrame()
df["time"] = np.array(range(0, 1000)) * 1e9
df["time"] = np.array(range(1, 1001)) * 1e9
for i in range(5):
df[f"metric_{i}"] = np.random.rand(1000)
df.loc[
......@@ -174,6 +174,7 @@ def prediction_length():
def test_predict(df, metric, prediction_length):
df.to_csv("demo.csv")
output = predict(metric, prediction_length)
print(output)
if output:
print("True")
assert True
......@@ -147,6 +147,35 @@ def df_13():
return df
@pytest.fixture
def df_14():
df = pd.DataFrame()
df["time"] = 10
for i in range(5):
df[f"metric_{i}"] = [np.nan for i in range(1000)]
return df
@pytest.fixture
def df_15():
df = pd.DataFrame()
df["time"] = [i * 30 * 1e5 for i in range(500)] + [
i * 30 * 1e5 + 10000 for i in range(500)
]
for i in range(5):
df[f"metric_{i}"] = [np.nan for i in range(500)] + [2 for i in range(500)]
return df
@pytest.fixture
def df_16():
df = pd.DataFrame()
df["time"] = [i for i in range(500)] + [i + 10000 for i in range(500)]
for i in range(5):
df[f"metric_{i}"] = [np.nan for i in range(500)] + [2 for i in range(500)]
return df
@pytest.fixture
def df(request):
return request.getfixturevalue(request.param)
......@@ -174,6 +203,9 @@ class TestDataset:
("df_11", metric),
("df_12", metric),
("df_13", metric),
("df_14", metric),
("df_15", metric),
("df_16", metric),
],
indirect=True,
)
......
AMQ_HOSTNAME=localhost
AMQ_USER=admin
AMQ_PASSWORD=admin
AMQ_USER=morphemic
AMQ_PASSWORD=morphemic
AMQ_PORT=61613
APP_NAME=demo
METHOD=tft
......
......@@ -114,11 +114,11 @@ def main():
)
# msg1 = Msg()
# msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "NotFinishedOnTimeContext", "level": 3, "publish_rate": 30000}]'
# msg1.body = '[{"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "WillFinishTooSoonContext", "level": 3, "publish_rate": 30000}, {"metric": "MinimumCores", "level": 3, "publish_rate": 30000}]'
# msg2 = Msg()
# msg2.body = (
# "{"
# + f'"metrics": ["MinimumCores", "EstimatedRemainingTimeContext", "NotFinishedOnTimeContext"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30'
# + f'"metrics": ["EstimatedRemainingTimeContext", "WillFinishTooSoonContext", "MinimumCores"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30'
# + "}"
# )
......
......@@ -132,6 +132,7 @@ def main():
predicted_point_idx=int(
(i + 1) * prediction_points_horizons[metric] - 1
),
publish_rate=metrics_info[metric]["publish_rate"],
)
if i == (number_of_forward_predictions[metric] - 1):
print(
......
......@@ -33,7 +33,11 @@ def main(predicted_metrics, prediction_horizons):
while True:
start_time = int(time.time())
for metric in predicted_metrics:
retrain_msg = train(metric, prediction_horizons[metric])
retrain_msg = train(
metric,
prediction_horizons[metric],
publish_rate=metrics_info[metric]["publish_rate"],
)
if retrain_msg:
logging.info(
f"Training completed for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
......
......@@ -31,6 +31,7 @@ def predict(
prediction_hor=60,
timestamp=0,
predicted_point_idx=0,
publish_rate=10000,
):
with open(yaml_file) as file:
......@@ -55,7 +56,7 @@ def predict(
return (None, None)
dataset = pd.read_csv(data_path)
new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"], publish_rate=publish_rate)
if new_ts_dataset.dropped_recent_series: # series with recent data was too short
logging.info(
f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
......
......@@ -29,7 +29,7 @@ logging.basicConfig(
)
def train(target_column, prediction_length, yaml_file="model.yaml"):
def train(target_column, prediction_length, yaml_file="model.yaml", publish_rate=10000):
torch.manual_seed(12345)
with open(yaml_file) as file:
......@@ -45,6 +45,7 @@ def train(target_column, prediction_length, yaml_file="model.yaml"):
return None
dataset = pd.read_csv(data_path)
print(dataset, "dataset downloaded from persostent storage")
if dataset.shape[0] < 14 * prediction_length:
logging.info(
......@@ -52,7 +53,12 @@ def train(target_column, prediction_length, yaml_file="model.yaml"):
)
return None
ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"])
ts_dataset = Dataset(
dataset,
target_column=target_column,
**params["dataset"],
publish_rate=publish_rate,
)
print(ts_dataset.dataset, "DATASEEEET")
if ts_dataset.dataset.shape[0] < 1:
......
......@@ -22,6 +22,7 @@ class Dataset(object):
classification=0,
context_length=40,
prediction_length=5,
publish_rate=10000,
):
self.max_missing_values = (
......@@ -35,6 +36,7 @@ class Dataset(object):
self.classification = classification
self.context_length = context_length
self.prediction_length = prediction_length
self.publish_rate = publish_rate
self.dataset = dataset
self.dropped_recent_series = True # default set to be true
if self.dataset.shape[0] > 0:
......@@ -47,8 +49,9 @@ class Dataset(object):
dataset.index = range(dataset.shape[0])
first_not_nan_index = dataset[self.target_column].first_valid_index()
if first_not_nan_index == first_not_nan_index: # check is if it;s not np.nan
if first_not_nan_index > -1:
return dataset[dataset.index > first_not_nan_index]
if first_not_nan_index is not None:
if first_not_nan_index > -1:
return dataset[dataset.index > first_not_nan_index]
else:
return dataset.dropna()
......@@ -65,66 +68,52 @@ class Dataset(object):
for name in self.tv_unknown_cat:
dataset[name] = dataset[name].astype(str)
dataset["series"] = dataset["series"].astype(str)
return dataset
def convert_time_to_ms(self):
if self.dataset.shape[0] > 0:
digit_len = len(str(int(self.dataset["time"].values[0])))
if digit_len >= 13:
self.dataset["time"] = self.dataset["time"].apply(
lambda x: int(str(x)[:13])
)
else:
self.dataset["time"] = self.dataset["time"].apply(
lambda x: int(int(str(x)[:digit_len]) * 10 ** (13 - digit_len))
)
self.dataset["time"] = self.dataset["time"].apply(
lambda x: int(x // 1e4 * 1e4)
)
def add_obligatory_columns(self, dataset):
dataset["series"] = 0
dataset["split"] = "train"
n = dataset.shape[0]
dataset["split"][int(n * 0.8) :] = "val"
dataset["time_idx"] = range(n) # TODO check time gaps
return dataset
def create_time_series_dataset(self):
if not self.classification:
self.time_varying_unknown_reals = [
self.target_column
] + self.tv_unknown_reals
self.time_varying_unknown_categoricals = self.tv_unknown_cat
else:
self.time_varying_unknown_reals = self.tv_unknown_reals
self.time_varying_unknown_categoricals = [
self.target_column
] + self.tv_unknown_cat
ts_dataset = TimeSeriesDataSet(
self.dataset[lambda x: x.split == "train"],
time_idx="time_idx",
target=self.target_column,
categorical_encoders={"series": NaNLabelEncoder().fit(self.dataset.series)},
group_ids=["series"],
time_varying_unknown_reals=[self.target_column],
min_encoder_length=self.context_length,
max_encoder_length=self.context_length,
max_prediction_length=self.prediction_length,
min_prediction_length=self.prediction_length,
add_relative_time_idx=False,
allow_missings=False,
)
return ts_dataset
def check_gap(self):
if self.dataset.shape[0] > 0:
self.dataset = self.dataset.groupby(by=["time"]).min()
self.dataset["time"] = self.dataset.index
self.dataset.index = range(self.dataset.shape[0])
self.convert_time_to_ms()
self.dataset[self.target_column] = pd.to_numeric(
self.dataset[self.target_column], errors="coerce"
).fillna(np.nan)
self.dataset = self.dataset.replace(np.inf, np.nan)
self.dataset = self.dataset.dropna(subset=[self.target_column])
print(self.dataset)
if self.dataset.shape[0] > 0:
max_gap = self.dataset["time"].diff().abs().max()
logging.info(
f"Metric: {self.target_column} Max time gap in series {max_gap}"
)
print(f" Metric: {self.target_column} Max time gap in series {max_gap}")
series_freq = (self.dataset["time"] // 1e9).diff().fillna(
0
).value_counts().index.values[0] * 1e9
series_freq = (
(self.dataset["time"])
.diff()
.fillna(0)
.value_counts()
.index.values[0]
)
logging.info(
f"Metric: {self.target_column} Detected series with {series_freq} frequency"
......@@ -132,12 +121,20 @@ class Dataset(object):
print(
f"Metric: {self.target_column} Detected series with {series_freq} frequency"
)
if series_freq != self.publish_rate:
logging.info(
f"Metric: {self.target_column} Detected series with {series_freq} frequency, but the frequency should be: {self.publish_rate}!"
)
print(
f"Metric: {self.target_column} Detected series with {series_freq} frequency, but the frequency should be: {self.publish_rate}!"
)
# check series length
series = np.split(
self.dataset,
*np.where(
self.dataset["time"].diff().abs().fillna(0).astype(int)
>= np.abs(self.max_missing_values * series_freq)
>= np.abs(self.max_missing_values * self.publish_rate)
),
)
logging.info(f"Metric: {self.target_column} {len(series)} series found")
......@@ -147,8 +144,8 @@ class Dataset(object):
s = self.fill_na(s)
s = self.cut_nan_start(s)
s = self.add_obligatory_columns(s)
s = self.convert_formats(s)
s["split"] = "train"
s = self.convert_formats(s)
logging.info(
f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}"
)
......@@ -193,6 +190,34 @@ class Dataset(object):
)
return inh_dataset
def create_time_series_dataset(self):