Commit 64f4675e authored by Anna Warno's avatar Anna Warno
Browse files

metric not present error corrected, tft flexible minimum points added

parent 0949cd6f
Pipeline #16939 passed with stage
in 1 minute and 19 seconds
......@@ -101,7 +101,7 @@ class Dataset(object):
)
def check_gap(self):
if self.dataset.shape[0] > 0:
if (self.dataset.shape[0] > 0) and (self.target_column in self.dataset.columns):
self.dataset = self.dataset.groupby(by=[self.time_column]).min()
self.dataset[self.time_column] = self.dataset.index
self.dataset.index = range(self.dataset.shape[0])
......
......@@ -176,6 +176,15 @@ def df_16():
return df
@pytest.fixture
def df_17():
df = pd.DataFrame()
df["ems_time"] = np.array(range(0, 1000)) * 1e9
for i in range(5):
df[f"other_metric_{i}"] = np.random.rand(1000)
return df
@pytest.fixture
def df(request):
return request.getfixturevalue(request.param)
......@@ -206,6 +215,7 @@ class TestDataset:
("df_14", metric),
("df_15", metric),
("df_16", metric),
("df_17", metric),
],
indirect=True,
)
......
......@@ -138,6 +138,9 @@ def main():
logging.info(
f"Sending predictions for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
logging.info(
f"Sending predictions for {metric}: {message} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
start_conn.send_to_topic(dest, message[metric])
influxdb_conn.send_to_influxdb(metric, message)
......
......@@ -105,7 +105,7 @@ class Dataset(object):
)
def check_gap(self):
if self.dataset.shape[0] > 0:
if (self.dataset.shape[0] > 0) and (self.target_column in self.dataset.columns):
self.dataset = self.dataset.groupby(by=[self.time_column]).min()
self.dataset[self.time_column] = self.dataset.index
self.dataset.index = range(self.dataset.shape[0])
......@@ -193,6 +193,7 @@ class Dataset(object):
else:
self.dataset = pd.DataFrame()
self.dropped_recent_series = True
logging.info(f"metric: {self.target_column} no data found")
if self.dataset.shape[0] > 0:
self.get_time_difference_current()
......
......@@ -138,6 +138,9 @@ def main():
logging.info(
f"Sending predictions for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
logging.info(
f"Sending predictions for {metric}: {message} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}"
)
start_conn.send_to_topic(dest, message[metric])
influxdb_conn.send_to_influxdb(metric, message)
......
......@@ -90,14 +90,18 @@ def predict(
pred_len = params["dataset"]["prediction_length"]
future_df = dataset.tail(pred_len).copy()
future_df[target_column] = 0
future_df = pd.concat(
[dataset.tail(params["dataset"]["context_length"]), future_df]
).reset_index()
last_series_length = new_ts_dataset.dataset[
new_ts_dataset.dataset["series"] == new_ts_dataset.dataset["series"].max()
].shape[0]
future_df = pd.concat(
[
dataset.tail(min(params["dataset"]["context_length"], last_series_length)),
future_df,
]
).reset_index()
future_df["time_idx"] = range(
last_series_length - params["dataset"]["context_length"],
last_series_length
- min(params["dataset"]["context_length"], last_series_length),
last_series_length + pred_len,
)
future_df["split"] = "future"
......
......@@ -105,7 +105,7 @@ class Dataset(object):
)
def check_gap(self):
if self.dataset.shape[0] > 0:
if (self.dataset.shape[0] > 0) and (self.target_column in self.dataset.columns):
self.dataset = self.dataset.groupby(by=[self.time_column]).min()
self.dataset[self.time_column] = self.dataset.index
self.dataset.index = range(self.dataset.shape[0])
......@@ -167,14 +167,17 @@ class Dataset(object):
s["split"] = "train"
s = self.convert_formats(s)
logging.info(
f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}"
f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length // 3}"
)
if s.shape[0] > self.prediction_length * 2 + self.context_length:
if (
s.shape[0]
> self.prediction_length * 2 + self.context_length // 3
):
s["series"] = i
preprocessed_series.append(s)
if i == len(series) - 1:
logging.info(
f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}"
f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length // 3}"
)
logging.info(
......@@ -214,7 +217,8 @@ class Dataset(object):
time_idx="time_idx",
target=self.target_column,
group_ids=["series"],
min_encoder_length=self.context_length, # keep encoder length long (as it is in the validation set)
min_encoder_length=self.context_length
// 3, # keep encoder length long (as it is in the validation set)
max_encoder_length=self.context_length,
min_prediction_length=self.prediction_length,
max_prediction_length=self.prediction_length,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment