From 1b646fddd4843861f86c842d5a34b016a8f735b5 Mon Sep 17 00:00:00 2001 From: Anna Warno Date: Tue, 5 Oct 2021 11:58:11 +0200 Subject: [PATCH] duplicated rows removed --- deployment/arima/src/preprocess_dataset.py | 3 + .../arima/test/preprocess_dataset_test.py | 10 ++ deployment/nbeats/main.py | 4 +- deployment/nbeats/src/preprocess_dataset.py | 125 ++++++++++-------- .../nbeats/test/preprocess_dataset_test.py | 10 ++ deployment/tft/src/preprocess_dataset.py | 3 + 6 files changed, 95 insertions(+), 60 deletions(-) diff --git a/deployment/arima/src/preprocess_dataset.py b/deployment/arima/src/preprocess_dataset.py index 8ccc6cd7..2b59a8f5 100644 --- a/deployment/arima/src/preprocess_dataset.py +++ b/deployment/arima/src/preprocess_dataset.py @@ -73,6 +73,9 @@ class Dataset(object): return dataset def check_gap(self): + self.dataset = self.dataset.groupby(by=["time"]).min() + self.dataset["time"] = self.dataset.index + self.dataset.index = range(self.dataset.shape[0]) self.dataset[self.target_column] = pd.to_numeric( self.dataset[self.target_column], errors="coerce" ).fillna(np.nan) diff --git a/deployment/arima/test/preprocess_dataset_test.py b/deployment/arima/test/preprocess_dataset_test.py index 80e36837..e61da6fd 100644 --- a/deployment/arima/test/preprocess_dataset_test.py +++ b/deployment/arima/test/preprocess_dataset_test.py @@ -138,6 +138,15 @@ def df_12(): return df +@pytest.fixture +def df_13(): + df = pd.DataFrame() + df["time"] = 1 + for i in range(5): + df[f"metric_{i}"] = [random.random() for i in range(1000)] + return df + + @pytest.fixture def df(request): return request.getfixturevalue(request.param) @@ -164,6 +173,7 @@ class TestDataset: ("df_10", metric), ("df_11", metric), ("df_12", metric), + ("df_13", metric), ], indirect=True, ) diff --git a/deployment/nbeats/main.py b/deployment/nbeats/main.py index 0cce7d09..5a798b1a 100644 --- a/deployment/nbeats/main.py +++ b/deployment/nbeats/main.py @@ -114,11 +114,11 @@ def main(): ) # msg1 = Msg() - # msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "metric_0", "level": 3, "publish_rate": 30000}, {"metric": "metric_1", "level": 3, "publish_rate": 30000}]' + # msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "NotFinishedOnTimeContext", "level": 3, "publish_rate": 30000}]' # msg2 = Msg() # msg2.body = ( # "{" - # + f'"metrics": ["MinimumCores", "metric_0", "metric_1"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 60' + # + f'"metrics": ["MinimumCores", "EstimatedRemainingTimeContext", "NotFinishedOnTimeContext"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' # + "}" # ) diff --git a/deployment/nbeats/src/preprocess_dataset.py b/deployment/nbeats/src/preprocess_dataset.py index 01ae9966..876ec282 100644 --- a/deployment/nbeats/src/preprocess_dataset.py +++ b/deployment/nbeats/src/preprocess_dataset.py @@ -46,8 +46,9 @@ class Dataset(object): def cut_nan_start(self, dataset): dataset.index = range(dataset.shape[0]) first_not_nan_index = dataset[self.target_column].first_valid_index() - if first_not_nan_index > -1: - return dataset[dataset.index > first_not_nan_index] + if first_not_nan_index: + if first_not_nan_index > -1: + return dataset[dataset.index > first_not_nan_index] else: return dataset.dropna() @@ -105,69 +106,77 @@ class Dataset(object): return ts_dataset def check_gap(self): - self.dataset[self.target_column] = pd.to_numeric( - self.dataset[self.target_column], errors="coerce" - ).fillna(np.nan) - self.dataset = self.dataset.replace(np.inf, np.nan) - self.dataset = self.dataset.dropna(subset=[self.target_column]) if self.dataset.shape[0] > 0: - max_gap = self.dataset["time"].diff().abs().max() - logging.info( - f"Metric: {self.target_column} Max time gap in series {max_gap}" - ) - print(f" Metric: {self.target_column} Max time gap in series {max_gap}") - series_freq = ( - self.dataset["time"] // 1e9 - ).diff().value_counts().index.values[0] * 1e9 - logging.info( - f"Metric: {self.target_column} Detected series with {series_freq} frequency" - ) - print( - f"Metric: {self.target_column} Detected series with {series_freq} frequency" - ) - # check series length - series = np.split( - self.dataset, - *np.where( - self.dataset["time"].diff().abs().fillna(0).astype(int) - >= np.abs(self.max_missing_values * series_freq) - ), - ) - logging.info(f"Metric: {self.target_column} {len(series)} series found") - print(f"{len(series)} series found") - preprocessed_series = [] - for i, s in enumerate(series): - s = self.fill_na(s) - s = self.cut_nan_start(s) - s = self.add_obligatory_columns(s) - s = self.convert_formats(s) - s["split"] = "train" + self.dataset = self.dataset.groupby(by=["time"]).min() + self.dataset["time"] = self.dataset.index + self.dataset.index = range(self.dataset.shape[0]) + self.dataset[self.target_column] = pd.to_numeric( + self.dataset[self.target_column], errors="coerce" + ).fillna(np.nan) + self.dataset = self.dataset.replace(np.inf, np.nan) + self.dataset = self.dataset.dropna(subset=[self.target_column]) + if self.dataset.shape[0] > 0: + max_gap = self.dataset["time"].diff().abs().max() + logging.info( + f"Metric: {self.target_column} Max time gap in series {max_gap}" + ) + print(f" Metric: {self.target_column} Max time gap in series {max_gap}") + series_freq = (self.dataset["time"] // 1e9).diff().fillna( + 0 + ).value_counts().index.values[0] * 1e9 + logging.info( - f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}" + f"Metric: {self.target_column} Detected series with {series_freq} frequency" + ) + print( + f"Metric: {self.target_column} Detected series with {series_freq} frequency" + ) + # check series length + series = np.split( + self.dataset, + *np.where( + self.dataset["time"].diff().abs().fillna(0).astype(int) + >= np.abs(self.max_missing_values * series_freq) + ), ) - if s.shape[0] > self.prediction_length * 2 + self.context_length: - s["series"] = i - preprocessed_series.append(s) - if i == len(series) - 1: + logging.info(f"Metric: {self.target_column} {len(series)} series found") + print(f"{len(series)} series found") + preprocessed_series = [] + for i, s in enumerate(series): + s = self.fill_na(s) + s = self.cut_nan_start(s) + s = self.add_obligatory_columns(s) + s = self.convert_formats(s) + s["split"] = "train" logging.info( - f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}" + f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}" ) + if s.shape[0] > self.prediction_length * 2 + self.context_length: + s["series"] = i + preprocessed_series.append(s) + if i == len(series) - 1: + logging.info( + f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}" + ) - logging.info( - f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found" - ) - print(f"{len(preprocessed_series)} long enough series found") - # logging.info(f"") - if preprocessed_series: - self.dataset = pd.concat(preprocessed_series) - if self.dataset["series"].max() != len(series) - 1: - self.dropped_recent_series = True + logging.info( + f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found" + ) + print(f"{len(preprocessed_series)} long enough series found") + # logging.info(f"") + if preprocessed_series: + self.dataset = pd.concat(preprocessed_series) + if self.dataset["series"].max() != len(series) - 1: + self.dropped_recent_series = True + else: + self.dropped_recent_series = False else: - self.dropped_recent_series = False - else: - self.dataset = pd.DataFrame() - self.dropped_recent_series = True - self.dataset.index = range(self.dataset.shape[0]) + self.dataset = pd.DataFrame() + self.dropped_recent_series = True + self.dataset.index = range(self.dataset.shape[0]) + else: + self.dataset = pd.DataFrame() + self.dropped_recent_series = True def inherited_dataset(self, split1, split2): df1 = ( diff --git a/deployment/nbeats/test/preprocess_dataset_test.py b/deployment/nbeats/test/preprocess_dataset_test.py index 80e36837..e61da6fd 100644 --- a/deployment/nbeats/test/preprocess_dataset_test.py +++ b/deployment/nbeats/test/preprocess_dataset_test.py @@ -138,6 +138,15 @@ def df_12(): return df +@pytest.fixture +def df_13(): + df = pd.DataFrame() + df["time"] = 1 + for i in range(5): + df[f"metric_{i}"] = [random.random() for i in range(1000)] + return df + + @pytest.fixture def df(request): return request.getfixturevalue(request.param) @@ -164,6 +173,7 @@ class TestDataset: ("df_10", metric), ("df_11", metric), ("df_12", metric), + ("df_13", metric), ], indirect=True, ) diff --git a/deployment/tft/src/preprocess_dataset.py b/deployment/tft/src/preprocess_dataset.py index 84d80b05..a13e4278 100644 --- a/deployment/tft/src/preprocess_dataset.py +++ b/deployment/tft/src/preprocess_dataset.py @@ -113,6 +113,9 @@ class Dataset(object): return ts_dataset def check_gap(self): + self.dataset = self.dataset.groupby(by=["time"]).min() + self.dataset["time"] = self.dataset.index + self.dataset.index = range(self.dataset.shape[0]) self.dataset[self.target_column] = pd.to_numeric( self.dataset[self.target_column], errors="coerce" ).fillna(np.nan) -- GitLab