Commit dda3e686 authored by Marta Różańska's avatar Marta Różańska
Browse files

Merge branch 'tft_nbeats' into 'morphemic-rc1.5'

duplicated rows removed

See merge request !165
parents d6783c7b 1b646fdd
Pipeline #16401 failed with stages
in 22 minutes and 47 seconds
......@@ -73,6 +73,9 @@ class Dataset(object):
return dataset
def check_gap(self):
self.dataset = self.dataset.groupby(by=["time"]).min()
self.dataset["time"] = self.dataset.index
self.dataset.index = range(self.dataset.shape[0])
self.dataset[self.target_column] = pd.to_numeric(
self.dataset[self.target_column], errors="coerce"
).fillna(np.nan)
......
......@@ -138,6 +138,15 @@ def df_12():
return df
@pytest.fixture
def df_13():
df = pd.DataFrame()
df["time"] = 1
for i in range(5):
df[f"metric_{i}"] = [random.random() for i in range(1000)]
return df
@pytest.fixture
def df(request):
return request.getfixturevalue(request.param)
......@@ -164,6 +173,7 @@ class TestDataset:
("df_10", metric),
("df_11", metric),
("df_12", metric),
("df_13", metric),
],
indirect=True,
)
......
......@@ -114,11 +114,11 @@ def main():
)
# msg1 = Msg()
# msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "metric_0", "level": 3, "publish_rate": 30000}, {"metric": "metric_1", "level": 3, "publish_rate": 30000}]'
# msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "NotFinishedOnTimeContext", "level": 3, "publish_rate": 30000}]'
# msg2 = Msg()
# msg2.body = (
# "{"
# + f'"metrics": ["MinimumCores", "metric_0", "metric_1"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 60'
# + f'"metrics": ["MinimumCores", "EstimatedRemainingTimeContext", "NotFinishedOnTimeContext"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30'
# + "}"
# )
......
......@@ -46,8 +46,9 @@ class Dataset(object):
def cut_nan_start(self, dataset):
dataset.index = range(dataset.shape[0])
first_not_nan_index = dataset[self.target_column].first_valid_index()
if first_not_nan_index > -1:
return dataset[dataset.index > first_not_nan_index]
if first_not_nan_index:
if first_not_nan_index > -1:
return dataset[dataset.index > first_not_nan_index]
else:
return dataset.dropna()
......@@ -105,69 +106,77 @@ class Dataset(object):
return ts_dataset
def check_gap(self):
self.dataset[self.target_column] = pd.to_numeric(
self.dataset[self.target_column], errors="coerce"
).fillna(np.nan)
self.dataset = self.dataset.replace(np.inf, np.nan)
self.dataset = self.dataset.dropna(subset=[self.target_column])
if self.dataset.shape[0] > 0:
max_gap = self.dataset["time"].diff().abs().max()
logging.info(
f"Metric: {self.target_column} Max time gap in series {max_gap}"
)
print(f" Metric: {self.target_column} Max time gap in series {max_gap}")
series_freq = (
self.dataset["time"] // 1e9
).diff().value_counts().index.values[0] * 1e9
logging.info(
f"Metric: {self.target_column} Detected series with {series_freq} frequency"
)
print(
f"Metric: {self.target_column} Detected series with {series_freq} frequency"
)
# check series length
series = np.split(
self.dataset,
*np.where(
self.dataset["time"].diff().abs().fillna(0).astype(int)
>= np.abs(self.max_missing_values * series_freq)
),
)
logging.info(f"Metric: {self.target_column} {len(series)} series found")
print(f"{len(series)} series found")
preprocessed_series = []
for i, s in enumerate(series):
s = self.fill_na(s)
s = self.cut_nan_start(s)
s = self.add_obligatory_columns(s)
s = self.convert_formats(s)
s["split"] = "train"
self.dataset = self.dataset.groupby(by=["time"]).min()
self.dataset["time"] = self.dataset.index
self.dataset.index = range(self.dataset.shape[0])
self.dataset[self.target_column] = pd.to_numeric(
self.dataset[self.target_column], errors="coerce"
).fillna(np.nan)
self.dataset = self.dataset.replace(np.inf, np.nan)
self.dataset = self.dataset.dropna(subset=[self.target_column])
if self.dataset.shape[0] > 0:
max_gap = self.dataset["time"].diff().abs().max()
logging.info(
f"Metric: {self.target_column} Max time gap in series {max_gap}"
)
print(f" Metric: {self.target_column} Max time gap in series {max_gap}")
series_freq = (self.dataset["time"] // 1e9).diff().fillna(
0
).value_counts().index.values[0] * 1e9
logging.info(
f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}"
f"Metric: {self.target_column} Detected series with {series_freq} frequency"
)
print(
f"Metric: {self.target_column} Detected series with {series_freq} frequency"
)
# check series length
series = np.split(
self.dataset,
*np.where(
self.dataset["time"].diff().abs().fillna(0).astype(int)
>= np.abs(self.max_missing_values * series_freq)
),
)
if s.shape[0] > self.prediction_length * 2 + self.context_length:
s["series"] = i
preprocessed_series.append(s)
if i == len(series) - 1:
logging.info(f"Metric: {self.target_column} {len(series)} series found")
print(f"{len(series)} series found")
preprocessed_series = []
for i, s in enumerate(series):
s = self.fill_na(s)
s = self.cut_nan_start(s)
s = self.add_obligatory_columns(s)
s = self.convert_formats(s)
s["split"] = "train"
logging.info(
f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}"
f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}"
)
if s.shape[0] > self.prediction_length * 2 + self.context_length:
s["series"] = i
preprocessed_series.append(s)
if i == len(series) - 1:
logging.info(
f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}"
)
logging.info(
f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found"
)
print(f"{len(preprocessed_series)} long enough series found")
# logging.info(f"")
if preprocessed_series:
self.dataset = pd.concat(preprocessed_series)
if self.dataset["series"].max() != len(series) - 1:
self.dropped_recent_series = True
logging.info(
f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found"
)
print(f"{len(preprocessed_series)} long enough series found")
# logging.info(f"")
if preprocessed_series:
self.dataset = pd.concat(preprocessed_series)
if self.dataset["series"].max() != len(series) - 1:
self.dropped_recent_series = True
else:
self.dropped_recent_series = False
else:
self.dropped_recent_series = False
else:
self.dataset = pd.DataFrame()
self.dropped_recent_series = True
self.dataset.index = range(self.dataset.shape[0])
self.dataset = pd.DataFrame()
self.dropped_recent_series = True
self.dataset.index = range(self.dataset.shape[0])
else:
self.dataset = pd.DataFrame()
self.dropped_recent_series = True
def inherited_dataset(self, split1, split2):
df1 = (
......
......@@ -138,6 +138,15 @@ def df_12():
return df
@pytest.fixture
def df_13():
df = pd.DataFrame()
df["time"] = 1
for i in range(5):
df[f"metric_{i}"] = [random.random() for i in range(1000)]
return df
@pytest.fixture
def df(request):
return request.getfixturevalue(request.param)
......@@ -164,6 +173,7 @@ class TestDataset:
("df_10", metric),
("df_11", metric),
("df_12", metric),
("df_13", metric),
],
indirect=True,
)
......
......@@ -113,6 +113,9 @@ class Dataset(object):
return ts_dataset
def check_gap(self):
self.dataset = self.dataset.groupby(by=["time"]).min()
self.dataset["time"] = self.dataset.index
self.dataset.index = range(self.dataset.shape[0])
self.dataset[self.target_column] = pd.to_numeric(
self.dataset[self.target_column], errors="coerce"
).fillna(np.nan)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment