From f0dc964b1e6221a0decee2ef1351d4b741886856 Mon Sep 17 00:00:00 2001 From: Anna Warno Date: Fri, 8 Oct 2021 11:43:58 +0200 Subject: [PATCH] timestamp series frequency error corrected --- deployment/arima/retrain.py | 6 +- deployment/arima/src/model_predict.py | 5 +- deployment/arima/src/preprocess_dataset.py | 168 +++++++++------- deployment/arima/test/model_predict_test.py | 7 +- .../arima/test/preprocess_dataset_test.py | 32 ++++ deployment/nbeats/env | 4 +- deployment/nbeats/main.py | 4 +- deployment/nbeats/predict.py | 1 + deployment/nbeats/retrain.py | 6 +- deployment/nbeats/src/model_predict.py | 3 +- deployment/nbeats/src/model_train.py | 10 +- deployment/nbeats/src/preprocess_dataset.py | 107 +++++++---- deployment/nbeats/test/model_predict_test.py | 1 + .../nbeats/test/preprocess_dataset_test.py | 32 ++++ deployment/tft/predict.py | 2 +- deployment/tft/retrain.py | 6 +- deployment/tft/src/model_predict.py | 3 +- deployment/tft/src/model_train.py | 9 +- deployment/tft/src/preprocess_dataset.py | 181 ++++++++++-------- deployment/tft/test/model_predict_test.py | 1 + .../tft/test/preprocess_dataset_test.py | 42 ++++ 21 files changed, 430 insertions(+), 200 deletions(-) diff --git a/deployment/arima/retrain.py b/deployment/arima/retrain.py index 2a6e654d..4fe0dfbb 100644 --- a/deployment/arima/retrain.py +++ b/deployment/arima/retrain.py @@ -36,7 +36,11 @@ def main(predicted_metrics, prediction_horizon): while True: start_time = int(time.time()) for metric in predicted_metrics: - retrain_msg = train(metric, prediction_horizon) + retrain_msg = train( + metric, + prediction_horizon, + publish_rate=metrics_info[metric]["publish_rate"], + ) if retrain_msg: logging.info( f"Training completed for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" diff --git a/deployment/arima/src/model_predict.py b/deployment/arima/src/model_predict.py index 7c519a0e..dc191c0f 100644 --- a/deployment/arima/src/model_predict.py +++ b/deployment/arima/src/model_predict.py @@ -25,6 +25,7 @@ def predict( yaml_file="model.yaml", prediction_hor=60, timestamp=0, + publish_rate=10000, ): with open(yaml_file) as file: params = yaml.load(file, Loader=yaml.FullLoader) @@ -40,7 +41,7 @@ def predict( return None dataset = pd.read_csv(data_path) - new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"], publish_rate=publish_rate) if new_ts_dataset.dropped_recent_series: # series with recent data was too short logging.info( f"METRIC: {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" @@ -52,7 +53,7 @@ def predict( dataset = pd.read_csv(data_path) - ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"], publish_rate=publish_rate) if ts_dataset.dataset.shape[0] < 1: logging.info( f"METRIC: {target_column} Not enough fresh preprocessed data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" diff --git a/deployment/arima/src/preprocess_dataset.py b/deployment/arima/src/preprocess_dataset.py index 5e91731c..ff71eaa8 100644 --- a/deployment/arima/src/preprocess_dataset.py +++ b/deployment/arima/src/preprocess_dataset.py @@ -20,6 +20,7 @@ class Dataset(object): classification=0, context_length=40, prediction_length=5, + publish_rate=10000, ): self.max_missing_values = ( @@ -33,6 +34,7 @@ class Dataset(object): self.classification = classification self.context_length = context_length self.prediction_length = prediction_length + self.publish_rate = publish_rate self.dataset = dataset self.dropped_recent_series = True # default set to be true if self.dataset.shape[0] > 0: @@ -43,13 +45,14 @@ class Dataset(object): dataset.index = range(dataset.shape[0]) first_not_nan_index = dataset[self.target_column].first_valid_index() if first_not_nan_index == first_not_nan_index: # check is if it;s not np.nan - if first_not_nan_index > -1: - return dataset[dataset.index > first_not_nan_index] + if first_not_nan_index is not None: + if first_not_nan_index > -1: + return dataset[dataset.index > first_not_nan_index] else: return dataset.dropna() def fill_na(self, dataset): - dataset = dataset.replace("None", np.nan) + dataset = dataset.replace(np.inf, np.nan) dataset = dataset.ffill(axis="rows") return dataset @@ -61,79 +64,114 @@ class Dataset(object): for name in self.tv_unknown_cat: dataset[name] = dataset[name].astype(str) - - dataset["series"] = dataset["series"].astype(str) return dataset + def convert_time_to_ms(self): + if self.dataset.shape[0] > 0: + digit_len = len(str(int(self.dataset["time"].values[0]))) + if digit_len >= 13: + self.dataset["time"] = self.dataset["time"].apply( + lambda x: int(str(x)[:13]) + ) + else: + self.dataset["time"] = self.dataset["time"].apply( + lambda x: int(int(str(x)[:digit_len]) * 10 ** (13 - digit_len)) + ) + self.dataset["time"] = self.dataset["time"].apply( + lambda x: int(x // 1e4 * 1e4) + ) + def add_obligatory_columns(self, dataset): - dataset["series"] = 0 - dataset["split"] = "train" n = dataset.shape[0] - dataset["split"][int(n * 0.8) :] = "val" - dataset["time_idx"] = range(n) + dataset["time_idx"] = range(n) # TODO check time gaps return dataset def check_gap(self): - self.dataset = self.dataset.groupby(by=["time"]).min() - self.dataset["time"] = self.dataset.index - self.dataset.index = range(self.dataset.shape[0]) - self.dataset[self.target_column] = pd.to_numeric( - self.dataset[self.target_column], errors="coerce" - ).fillna(np.nan) - self.dataset = self.dataset.dropna(subset=[self.target_column]) + print(self.dataset) if self.dataset.shape[0] > 0: - max_gap = self.dataset["time"].diff().abs().max() - logging.info( - f"Metric: {self.target_column} Max time gap in series {max_gap}" - ) - print(f" Metric: {self.target_column} Max time gap in series {max_gap}") - self.series_freq = ( - self.dataset["time"] // 1e9 - ).diff().value_counts().index.values[0] * 1e9 - logging.info( - f"Metric: {self.target_column} Detected series with {self.series_freq} frequency" - ) - print( - f"Metric: {self.target_column} Detected series with {self.series_freq} frequency" - ) - # check series length - series = np.split( - self.dataset, - *np.where( - self.dataset["time"].diff().abs().fillna(0).astype(int) - >= np.abs(self.max_missing_values * self.series_freq) - ), - ) - logging.info(f"Metric: {self.target_column} {len(series)} series found") - print(f"{len(series)} series found") - preprocessed_series = [] - for i, s in enumerate(series): - s = self.fill_na(s) - s = self.cut_nan_start(s) - s = self.add_obligatory_columns(s) - s = self.convert_formats(s) - s["split"] = "train" + self.dataset = self.dataset.groupby(by=["time"]).min() + self.dataset["time"] = self.dataset.index + self.dataset.index = range(self.dataset.shape[0]) + self.convert_time_to_ms() + print(self.dataset) + self.dataset[self.target_column] = pd.to_numeric( + self.dataset[self.target_column], errors="coerce" + ).fillna(np.nan) + self.dataset = self.dataset.replace(np.inf, np.nan) + self.dataset = self.dataset.dropna(subset=[self.target_column]) + if self.dataset.shape[0] > 0: + max_gap = self.dataset["time"].diff().abs().max() + logging.info( + f"Metric: {self.target_column} Max time gap in series {max_gap}" + ) + print(f" Metric: {self.target_column} Max time gap in series {max_gap}") + print(self.dataset["time"].diff().fillna(0).value_counts()) + series_freq = ( + (self.dataset["time"]) + .diff() + .fillna(0) + .value_counts() + .index.values[0] + ) + logging.info( - f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}" + f"Metric: {self.target_column} Detected series with {series_freq} frequency" + ) + print( + f"Metric: {self.target_column} Detected series with {series_freq} frequency" ) - if s.shape[0] > self.prediction_length * 2 + self.context_length: - s["series"] = i - preprocessed_series.append(s) - if i == len(series) - 1: + if series_freq != self.publish_rate: logging.info( - f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}" + f"Metric: {self.target_column} Detected series with {series_freq} frequency, but the frequency should be: {self.publish_rate}!" + ) + print( + f"Metric: {self.target_column} Detected series with {series_freq} frequency, but the frequency should be: {self.publish_rate}!" ) - logging.info( - f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found" - ) - print(f"{len(preprocessed_series)} long enough series found") - if preprocessed_series: - self.dataset = pd.concat(preprocessed_series) - if self.dataset["series"].max() != len(series) - 1: - self.dropped_recent_series = True + # check series length + series = np.split( + self.dataset, + *np.where( + self.dataset["time"].diff().abs().fillna(0).astype(int) + >= np.abs(self.max_missing_values * self.publish_rate) + ), + ) + logging.info(f"Metric: {self.target_column} {len(series)} series found") + print(f"{len(series)} series found") + preprocessed_series = [] + for i, s in enumerate(series): + s = self.fill_na(s) + s = self.cut_nan_start(s) + s = self.add_obligatory_columns(s) + s["split"] = "train" + s = self.convert_formats(s) + print(s.shape) + logging.info( + f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}" + ) + if s.shape[0] > self.prediction_length * 2 + self.context_length: + s["series"] = i + preprocessed_series.append(s) + if i == len(series) - 1: + logging.info( + f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}" + ) + + logging.info( + f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found" + ) + print(f"{len(preprocessed_series)} long enough series found") + # logging.info(f"") + if preprocessed_series: + self.dataset = pd.concat(preprocessed_series) + if self.dataset["series"].max() != len(series) - 1: + self.dropped_recent_series = True + else: + self.dropped_recent_series = False else: - self.dropped_recent_series = False - else: - self.dataset = pd.DataFrame() - self.dataset.index = range(self.dataset.shape[0]) + self.dataset = pd.DataFrame() + self.dropped_recent_series = True + self.dataset.index = range(self.dataset.shape[0]) + else: + self.dataset = pd.DataFrame() + self.dropped_recent_series = True diff --git a/deployment/arima/test/model_predict_test.py b/deployment/arima/test/model_predict_test.py index 5be30b00..60b6d5fd 100644 --- a/deployment/arima/test/model_predict_test.py +++ b/deployment/arima/test/model_predict_test.py @@ -53,7 +53,7 @@ def df_5(): @pytest.fixture def df_6(): df = pd.DataFrame() - df["time"] = np.array(range(0, 1000)) * 1e9 + df["time"] = np.array(range(1, 1001)) * 1e9 for i in range(5): df[f"metric_{i}"] = np.random.rand(1000) return df @@ -62,7 +62,7 @@ def df_6(): @pytest.fixture def df_7(): df = pd.DataFrame() - df["time"] = np.array(range(0, 1000)) * 1e9 + df["time"] = np.array(range(1, 1001)) * 1e9 for i in range(5): df[f"metric_{i}"] = np.random.rand(1000) df.loc[ @@ -76,7 +76,7 @@ def df_7(): @pytest.fixture def df_8(): df = pd.DataFrame() - df["time"] = np.array(range(0, 1000)) * 1e9 + df["time"] = np.array(range(1, 1001)) * 1e9 for i in range(5): df[f"metric_{i}"] = np.random.rand(1000) df.loc[ @@ -174,6 +174,7 @@ def prediction_length(): def test_predict(df, metric, prediction_length): df.to_csv("demo.csv") output = predict(metric, prediction_length) + print(output) if output: print("True") assert True diff --git a/deployment/arima/test/preprocess_dataset_test.py b/deployment/arima/test/preprocess_dataset_test.py index e61da6fd..96835f44 100644 --- a/deployment/arima/test/preprocess_dataset_test.py +++ b/deployment/arima/test/preprocess_dataset_test.py @@ -147,6 +147,35 @@ def df_13(): return df +@pytest.fixture +def df_14(): + df = pd.DataFrame() + df["time"] = 10 + for i in range(5): + df[f"metric_{i}"] = [np.nan for i in range(1000)] + return df + + +@pytest.fixture +def df_15(): + df = pd.DataFrame() + df["time"] = [i * 30 * 1e5 for i in range(500)] + [ + i * 30 * 1e5 + 10000 for i in range(500) + ] + for i in range(5): + df[f"metric_{i}"] = [np.nan for i in range(500)] + [2 for i in range(500)] + return df + + +@pytest.fixture +def df_16(): + df = pd.DataFrame() + df["time"] = [i for i in range(500)] + [i + 10000 for i in range(500)] + for i in range(5): + df[f"metric_{i}"] = [np.nan for i in range(500)] + [2 for i in range(500)] + return df + + @pytest.fixture def df(request): return request.getfixturevalue(request.param) @@ -174,6 +203,9 @@ class TestDataset: ("df_11", metric), ("df_12", metric), ("df_13", metric), + ("df_14", metric), + ("df_15", metric), + ("df_16", metric), ], indirect=True, ) diff --git a/deployment/nbeats/env b/deployment/nbeats/env index aa4cd5c1..6f917a73 100644 --- a/deployment/nbeats/env +++ b/deployment/nbeats/env @@ -1,6 +1,6 @@ AMQ_HOSTNAME=localhost -AMQ_USER=admin -AMQ_PASSWORD=admin +AMQ_USER=morphemic +AMQ_PASSWORD=morphemic AMQ_PORT=61613 APP_NAME=demo METHOD=tft diff --git a/deployment/nbeats/main.py b/deployment/nbeats/main.py index 5a798b1a..80f60003 100644 --- a/deployment/nbeats/main.py +++ b/deployment/nbeats/main.py @@ -114,11 +114,11 @@ def main(): ) # msg1 = Msg() - # msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "NotFinishedOnTimeContext", "level": 3, "publish_rate": 30000}]' + # msg1.body = '[{"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "WillFinishTooSoonContext", "level": 3, "publish_rate": 30000}, {"metric": "MinimumCores", "level": 3, "publish_rate": 30000}]' # msg2 = Msg() # msg2.body = ( # "{" - # + f'"metrics": ["MinimumCores", "EstimatedRemainingTimeContext", "NotFinishedOnTimeContext"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' + # + f'"metrics": ["EstimatedRemainingTimeContext", "WillFinishTooSoonContext", "MinimumCores"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' # + "}" # ) diff --git a/deployment/nbeats/predict.py b/deployment/nbeats/predict.py index 8d3985ab..773bff0d 100644 --- a/deployment/nbeats/predict.py +++ b/deployment/nbeats/predict.py @@ -132,6 +132,7 @@ def main(): predicted_point_idx=int( (i + 1) * prediction_points_horizons[metric] - 1 ), + publish_rate=metrics_info[metric]["publish_rate"], ) if i == (number_of_forward_predictions[metric] - 1): print( diff --git a/deployment/nbeats/retrain.py b/deployment/nbeats/retrain.py index fa69c0d5..64f225ae 100644 --- a/deployment/nbeats/retrain.py +++ b/deployment/nbeats/retrain.py @@ -33,7 +33,11 @@ def main(predicted_metrics, prediction_horizons): while True: start_time = int(time.time()) for metric in predicted_metrics: - retrain_msg = train(metric, prediction_horizons[metric]) + retrain_msg = train( + metric, + prediction_horizons[metric], + publish_rate=metrics_info[metric]["publish_rate"], + ) if retrain_msg: logging.info( f"Training completed for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" diff --git a/deployment/nbeats/src/model_predict.py b/deployment/nbeats/src/model_predict.py index 770089cf..ef334827 100644 --- a/deployment/nbeats/src/model_predict.py +++ b/deployment/nbeats/src/model_predict.py @@ -31,6 +31,7 @@ def predict( prediction_hor=60, timestamp=0, predicted_point_idx=0, + publish_rate=10000, ): with open(yaml_file) as file: @@ -55,7 +56,7 @@ def predict( return (None, None) dataset = pd.read_csv(data_path) - new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"], publish_rate=publish_rate) if new_ts_dataset.dropped_recent_series: # series with recent data was too short logging.info( f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" diff --git a/deployment/nbeats/src/model_train.py b/deployment/nbeats/src/model_train.py index 9c697f18..794ef9c5 100644 --- a/deployment/nbeats/src/model_train.py +++ b/deployment/nbeats/src/model_train.py @@ -29,7 +29,7 @@ logging.basicConfig( ) -def train(target_column, prediction_length, yaml_file="model.yaml"): +def train(target_column, prediction_length, yaml_file="model.yaml", publish_rate=10000): torch.manual_seed(12345) with open(yaml_file) as file: @@ -45,6 +45,7 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): return None dataset = pd.read_csv(data_path) + print(dataset, "dataset downloaded from persostent storage") if dataset.shape[0] < 14 * prediction_length: logging.info( @@ -52,7 +53,12 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): ) return None - ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + ts_dataset = Dataset( + dataset, + target_column=target_column, + **params["dataset"], + publish_rate=publish_rate, + ) print(ts_dataset.dataset, "DATASEEEET") if ts_dataset.dataset.shape[0] < 1: diff --git a/deployment/nbeats/src/preprocess_dataset.py b/deployment/nbeats/src/preprocess_dataset.py index adc66470..c2b0ba13 100644 --- a/deployment/nbeats/src/preprocess_dataset.py +++ b/deployment/nbeats/src/preprocess_dataset.py @@ -22,6 +22,7 @@ class Dataset(object): classification=0, context_length=40, prediction_length=5, + publish_rate=10000, ): self.max_missing_values = ( @@ -35,6 +36,7 @@ class Dataset(object): self.classification = classification self.context_length = context_length self.prediction_length = prediction_length + self.publish_rate = publish_rate self.dataset = dataset self.dropped_recent_series = True # default set to be true if self.dataset.shape[0] > 0: @@ -47,8 +49,9 @@ class Dataset(object): dataset.index = range(dataset.shape[0]) first_not_nan_index = dataset[self.target_column].first_valid_index() if first_not_nan_index == first_not_nan_index: # check is if it;s not np.nan - if first_not_nan_index > -1: - return dataset[dataset.index > first_not_nan_index] + if first_not_nan_index is not None: + if first_not_nan_index > -1: + return dataset[dataset.index > first_not_nan_index] else: return dataset.dropna() @@ -65,66 +68,52 @@ class Dataset(object): for name in self.tv_unknown_cat: dataset[name] = dataset[name].astype(str) - - dataset["series"] = dataset["series"].astype(str) return dataset + def convert_time_to_ms(self): + if self.dataset.shape[0] > 0: + digit_len = len(str(int(self.dataset["time"].values[0]))) + if digit_len >= 13: + self.dataset["time"] = self.dataset["time"].apply( + lambda x: int(str(x)[:13]) + ) + else: + self.dataset["time"] = self.dataset["time"].apply( + lambda x: int(int(str(x)[:digit_len]) * 10 ** (13 - digit_len)) + ) + self.dataset["time"] = self.dataset["time"].apply( + lambda x: int(x // 1e4 * 1e4) + ) + def add_obligatory_columns(self, dataset): - dataset["series"] = 0 - dataset["split"] = "train" n = dataset.shape[0] - dataset["split"][int(n * 0.8) :] = "val" dataset["time_idx"] = range(n) # TODO check time gaps return dataset - def create_time_series_dataset(self): - if not self.classification: - self.time_varying_unknown_reals = [ - self.target_column - ] + self.tv_unknown_reals - self.time_varying_unknown_categoricals = self.tv_unknown_cat - else: - self.time_varying_unknown_reals = self.tv_unknown_reals - self.time_varying_unknown_categoricals = [ - self.target_column - ] + self.tv_unknown_cat - - ts_dataset = TimeSeriesDataSet( - self.dataset[lambda x: x.split == "train"], - time_idx="time_idx", - target=self.target_column, - categorical_encoders={"series": NaNLabelEncoder().fit(self.dataset.series)}, - group_ids=["series"], - time_varying_unknown_reals=[self.target_column], - min_encoder_length=self.context_length, - max_encoder_length=self.context_length, - max_prediction_length=self.prediction_length, - min_prediction_length=self.prediction_length, - add_relative_time_idx=False, - allow_missings=False, - ) - return ts_dataset - def check_gap(self): if self.dataset.shape[0] > 0: self.dataset = self.dataset.groupby(by=["time"]).min() self.dataset["time"] = self.dataset.index self.dataset.index = range(self.dataset.shape[0]) + self.convert_time_to_ms() self.dataset[self.target_column] = pd.to_numeric( self.dataset[self.target_column], errors="coerce" ).fillna(np.nan) self.dataset = self.dataset.replace(np.inf, np.nan) self.dataset = self.dataset.dropna(subset=[self.target_column]) - print(self.dataset) if self.dataset.shape[0] > 0: max_gap = self.dataset["time"].diff().abs().max() logging.info( f"Metric: {self.target_column} Max time gap in series {max_gap}" ) print(f" Metric: {self.target_column} Max time gap in series {max_gap}") - series_freq = (self.dataset["time"] // 1e9).diff().fillna( - 0 - ).value_counts().index.values[0] * 1e9 + series_freq = ( + (self.dataset["time"]) + .diff() + .fillna(0) + .value_counts() + .index.values[0] + ) logging.info( f"Metric: {self.target_column} Detected series with {series_freq} frequency" @@ -132,12 +121,20 @@ class Dataset(object): print( f"Metric: {self.target_column} Detected series with {series_freq} frequency" ) + if series_freq != self.publish_rate: + logging.info( + f"Metric: {self.target_column} Detected series with {series_freq} frequency, but the frequency should be: {self.publish_rate}!" + ) + print( + f"Metric: {self.target_column} Detected series with {series_freq} frequency, but the frequency should be: {self.publish_rate}!" + ) + # check series length series = np.split( self.dataset, *np.where( self.dataset["time"].diff().abs().fillna(0).astype(int) - >= np.abs(self.max_missing_values * series_freq) + >= np.abs(self.max_missing_values * self.publish_rate) ), ) logging.info(f"Metric: {self.target_column} {len(series)} series found") @@ -147,8 +144,8 @@ class Dataset(object): s = self.fill_na(s) s = self.cut_nan_start(s) s = self.add_obligatory_columns(s) - s = self.convert_formats(s) s["split"] = "train" + s = self.convert_formats(s) logging.info( f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}" ) @@ -193,6 +190,34 @@ class Dataset(object): ) return inh_dataset + def create_time_series_dataset(self): + if not self.classification: + self.time_varying_unknown_reals = [ + self.target_column + ] + self.tv_unknown_reals + self.time_varying_unknown_categoricals = self.tv_unknown_cat + else: + self.time_varying_unknown_reals = self.tv_unknown_reals + self.time_varying_unknown_categoricals = [ + self.target_column + ] + self.tv_unknown_cat + + ts_dataset = TimeSeriesDataSet( + self.dataset[lambda x: x.split == "train"], + time_idx="time_idx", + target=self.target_column, + categorical_encoders={"series": NaNLabelEncoder().fit(self.dataset.series)}, + group_ids=["series"], + time_varying_unknown_reals=[self.target_column], + min_encoder_length=self.context_length, + max_encoder_length=self.context_length, + max_prediction_length=self.prediction_length, + min_prediction_length=self.prediction_length, + add_relative_time_idx=False, + allow_missings=False, + ) + return ts_dataset + def get_from_dataset(self, dataset): return TimeSeriesDataSet.from_dataset( self.ts_dataset, dataset, min_prediction_idx=0, stop_randomization=True diff --git a/deployment/nbeats/test/model_predict_test.py b/deployment/nbeats/test/model_predict_test.py index 5be30b00..d0baa170 100644 --- a/deployment/nbeats/test/model_predict_test.py +++ b/deployment/nbeats/test/model_predict_test.py @@ -174,6 +174,7 @@ def prediction_length(): def test_predict(df, metric, prediction_length): df.to_csv("demo.csv") output = predict(metric, prediction_length) + print(output) if output: print("True") assert True diff --git a/deployment/nbeats/test/preprocess_dataset_test.py b/deployment/nbeats/test/preprocess_dataset_test.py index e61da6fd..96835f44 100644 --- a/deployment/nbeats/test/preprocess_dataset_test.py +++ b/deployment/nbeats/test/preprocess_dataset_test.py @@ -147,6 +147,35 @@ def df_13(): return df +@pytest.fixture +def df_14(): + df = pd.DataFrame() + df["time"] = 10 + for i in range(5): + df[f"metric_{i}"] = [np.nan for i in range(1000)] + return df + + +@pytest.fixture +def df_15(): + df = pd.DataFrame() + df["time"] = [i * 30 * 1e5 for i in range(500)] + [ + i * 30 * 1e5 + 10000 for i in range(500) + ] + for i in range(5): + df[f"metric_{i}"] = [np.nan for i in range(500)] + [2 for i in range(500)] + return df + + +@pytest.fixture +def df_16(): + df = pd.DataFrame() + df["time"] = [i for i in range(500)] + [i + 10000 for i in range(500)] + for i in range(5): + df[f"metric_{i}"] = [np.nan for i in range(500)] + [2 for i in range(500)] + return df + + @pytest.fixture def df(request): return request.getfixturevalue(request.param) @@ -174,6 +203,9 @@ class TestDataset: ("df_11", metric), ("df_12", metric), ("df_13", metric), + ("df_14", metric), + ("df_15", metric), + ("df_16", metric), ], indirect=True, ) diff --git a/deployment/tft/predict.py b/deployment/tft/predict.py index 1bd61287..773bff0d 100644 --- a/deployment/tft/predict.py +++ b/deployment/tft/predict.py @@ -132,6 +132,7 @@ def main(): predicted_point_idx=int( (i + 1) * prediction_points_horizons[metric] - 1 ), + publish_rate=metrics_info[metric]["publish_rate"], ) if i == (number_of_forward_predictions[metric] - 1): print( @@ -194,7 +195,6 @@ if __name__ == "__main__": metric["metric"]: msg["prediction_horizon"] * 1000 // metric["publish_rate"] for metric in msg["all_metrics"] } - print(prediction_points_horizons) predicted_metrics = set(msg["metrics"]) prediction_cycle = msg["prediction_horizon"] prediction_lengths = { diff --git a/deployment/tft/retrain.py b/deployment/tft/retrain.py index fa69c0d5..64f225ae 100644 --- a/deployment/tft/retrain.py +++ b/deployment/tft/retrain.py @@ -33,7 +33,11 @@ def main(predicted_metrics, prediction_horizons): while True: start_time = int(time.time()) for metric in predicted_metrics: - retrain_msg = train(metric, prediction_horizons[metric]) + retrain_msg = train( + metric, + prediction_horizons[metric], + publish_rate=metrics_info[metric]["publish_rate"], + ) if retrain_msg: logging.info( f"Training completed for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" diff --git a/deployment/tft/src/model_predict.py b/deployment/tft/src/model_predict.py index 91bbe32d..4fc34800 100644 --- a/deployment/tft/src/model_predict.py +++ b/deployment/tft/src/model_predict.py @@ -33,6 +33,7 @@ def predict( prediction_hor=60, timestamp=0, predicted_point_idx=0, + publish_rate=10000, ): with open(yaml_file) as file: params = yaml.load(file, Loader=yaml.FullLoader) @@ -56,7 +57,7 @@ def predict( return (None, None) dataset = pd.read_csv(data_path) - new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"], publish_rate=publish_rate) if new_ts_dataset.dropped_recent_series: # series with recent data was too short logging.info( f"METRIC {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" diff --git a/deployment/tft/src/model_train.py b/deployment/tft/src/model_train.py index fa7e43c9..a4e5fa3c 100644 --- a/deployment/tft/src/model_train.py +++ b/deployment/tft/src/model_train.py @@ -31,7 +31,7 @@ LOSSES_DICT = { } -def train(target_column, prediction_length, yaml_file="model.yaml"): +def train(target_column, prediction_length, yaml_file="model.yaml", publish_rate=10000): torch.manual_seed(12345) with open(yaml_file) as file: @@ -54,7 +54,12 @@ def train(target_column, prediction_length, yaml_file="model.yaml"): ) return None - ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"]) + ts_dataset = Dataset( + dataset, + target_column=target_column, + **params["dataset"], + publish_rate=publish_rate, + ) lockfile = f"{params['dataloader_path']}_{target_column}.pickle" lock = FileLock(lockfile + ".lock") diff --git a/deployment/tft/src/preprocess_dataset.py b/deployment/tft/src/preprocess_dataset.py index b08e570a..b9f10273 100644 --- a/deployment/tft/src/preprocess_dataset.py +++ b/deployment/tft/src/preprocess_dataset.py @@ -22,6 +22,7 @@ class Dataset(object): classification=0, context_length=40, prediction_length=5, + publish_rate=10000, ): self.max_missing_values = ( @@ -35,6 +36,7 @@ class Dataset(object): self.classification = classification self.context_length = context_length self.prediction_length = prediction_length + self.publish_rate = publish_rate self.dataset = dataset self.dropped_recent_series = True # default set to be true if self.dataset.shape[0] > 0: @@ -47,8 +49,9 @@ class Dataset(object): dataset.index = range(dataset.shape[0]) first_not_nan_index = dataset[self.target_column].first_valid_index() if first_not_nan_index == first_not_nan_index: # check is if it;s not np.nan - if first_not_nan_index > -1: - return dataset[dataset.index > first_not_nan_index] + if first_not_nan_index is not None: + if first_not_nan_index > -1: + return dataset[dataset.index > first_not_nan_index] else: return dataset.dropna() @@ -65,18 +68,114 @@ class Dataset(object): for name in self.tv_unknown_cat: dataset[name] = dataset[name].astype(str) - - dataset["series"] = dataset["series"].astype(str) return dataset + def convert_time_to_ms(self): + if self.dataset.shape[0] > 0: + digit_len = len(str(int(self.dataset["time"].values[0]))) + if digit_len >= 13: + self.dataset["time"] = self.dataset["time"].apply( + lambda x: int(str(x)[:13]) + ) + else: + self.dataset["time"] = self.dataset["time"].apply( + lambda x: int(int(str(x)[:digit_len]) * 10 ** (13 - digit_len)) + ) + self.dataset["time"] = self.dataset["time"].apply( + lambda x: int(x // 1e4 * 1e4) + ) + def add_obligatory_columns(self, dataset): - dataset["series"] = 0 - dataset["split"] = "train" n = dataset.shape[0] - dataset["split"][int(n * 0.8) :] = "val" dataset["time_idx"] = range(n) # TODO check time gaps return dataset + def check_gap(self): + if self.dataset.shape[0] > 0: + self.dataset = self.dataset.groupby(by=["time"]).min() + self.dataset["time"] = self.dataset.index + self.dataset.index = range(self.dataset.shape[0]) + self.convert_time_to_ms() + self.dataset[self.target_column] = pd.to_numeric( + self.dataset[self.target_column], errors="coerce" + ).fillna(np.nan) + self.dataset = self.dataset.replace(np.inf, np.nan) + self.dataset = self.dataset.dropna(subset=[self.target_column]) + if self.dataset.shape[0] > 0: + max_gap = self.dataset["time"].diff().abs().max() + logging.info( + f"Metric: {self.target_column} Max time gap in series {max_gap}" + ) + print(f" Metric: {self.target_column} Max time gap in series {max_gap}") + series_freq = ( + (self.dataset["time"]) + .diff() + .fillna(0) + .value_counts() + .index.values[0] + ) + + logging.info( + f"Metric: {self.target_column} Detected series with {series_freq} frequency" + ) + print( + f"Metric: {self.target_column} Detected series with {series_freq} frequency" + ) + if series_freq != self.publish_rate: + logging.info( + f"Metric: {self.target_column} Detected series with {series_freq} frequency, but the frequency should be: {self.publish_rate}!" + ) + print( + f"Metric: {self.target_column} Detected series with {series_freq} frequency, but the frequency should be: {self.publish_rate}!" + ) + + # check series length + series = np.split( + self.dataset, + *np.where( + self.dataset["time"].diff().abs().fillna(0).astype(int) + >= np.abs(self.max_missing_values * self.publish_rate) + ), + ) + logging.info(f"Metric: {self.target_column} {len(series)} series found") + print(f"{len(series)} series found") + preprocessed_series = [] + for i, s in enumerate(series): + s = self.fill_na(s) + s = self.cut_nan_start(s) + s = self.add_obligatory_columns(s) + s["split"] = "train" + s = self.convert_formats(s) + logging.info( + f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}" + ) + if s.shape[0] > self.prediction_length * 2 + self.context_length: + s["series"] = i + preprocessed_series.append(s) + if i == len(series) - 1: + logging.info( + f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}" + ) + + logging.info( + f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found" + ) + print(f"{len(preprocessed_series)} long enough series found") + # logging.info(f"") + if preprocessed_series: + self.dataset = pd.concat(preprocessed_series) + if self.dataset["series"].max() != len(series) - 1: + self.dropped_recent_series = True + else: + self.dropped_recent_series = False + else: + self.dataset = pd.DataFrame() + self.dropped_recent_series = True + self.dataset.index = range(self.dataset.shape[0]) + else: + self.dataset = pd.DataFrame() + self.dropped_recent_series = True + def create_time_series_dataset(self): if not self.classification: self.time_varying_unknown_reals = [ @@ -113,74 +212,6 @@ class Dataset(object): ) return ts_dataset - def check_gap(self): - self.dataset = self.dataset.groupby(by=["time"]).min() - self.dataset["time"] = self.dataset.index - self.dataset.index = range(self.dataset.shape[0]) - self.dataset[self.target_column] = pd.to_numeric( - self.dataset[self.target_column], errors="coerce" - ).fillna(np.nan) - self.dataset = self.dataset.replace(np.inf, np.nan) - self.dataset = self.dataset.dropna(subset=[self.target_column]) - if self.dataset.shape[0] > 0: - max_gap = self.dataset["time"].diff().abs().max() - logging.info( - f"Metric: {self.target_column} Max time gap in series {max_gap}" - ) - print(f" Metric: {self.target_column} Max time gap in series {max_gap}") - series_freq = ( - self.dataset["time"] // 1e9 - ).diff().value_counts().index.values[0] * 1e9 - logging.info( - f"Metric: {self.target_column} Detected series with {series_freq} frequency" - ) - print( - f"Metric: {self.target_column} Detected series with {series_freq} frequency" - ) - # check series length - series = np.split( - self.dataset, - *np.where( - self.dataset["time"].diff().abs().fillna(0).astype(int) - >= np.abs(self.max_missing_values * series_freq) - ), - ) - logging.info(f"Metric: {self.target_column} {len(series)} series found") - print(f"{len(series)} series found") - preprocessed_series = [] - for i, s in enumerate(series): - s = self.fill_na(s) - s = self.cut_nan_start(s) - s = self.add_obligatory_columns(s) - s = self.convert_formats(s) - s["split"] = "train" - logging.info( - f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}" - ) - if s.shape[0] > self.prediction_length * 2 + self.context_length: - s["series"] = i - preprocessed_series.append(s) - if i == len(series) - 1: - logging.info( - f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}" - ) - - logging.info( - f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found" - ) - print(f"{len(preprocessed_series)} long enough series found") - # logging.info(f"") - if preprocessed_series: - self.dataset = pd.concat(preprocessed_series) - if self.dataset["series"].max() != len(series) - 1: - self.dropped_recent_series = True - else: - self.dropped_recent_series = False - else: - self.dataset = pd.DataFrame() - self.dropped_recent_series = True - self.dataset.index = range(self.dataset.shape[0]) - def inherited_dataset(self, split1, split2): df1 = ( self.dataset[lambda x: x.split == split1] diff --git a/deployment/tft/test/model_predict_test.py b/deployment/tft/test/model_predict_test.py index 5be30b00..d0baa170 100644 --- a/deployment/tft/test/model_predict_test.py +++ b/deployment/tft/test/model_predict_test.py @@ -174,6 +174,7 @@ def prediction_length(): def test_predict(df, metric, prediction_length): df.to_csv("demo.csv") output = predict(metric, prediction_length) + print(output) if output: print("True") assert True diff --git a/deployment/tft/test/preprocess_dataset_test.py b/deployment/tft/test/preprocess_dataset_test.py index 80e36837..96835f44 100644 --- a/deployment/tft/test/preprocess_dataset_test.py +++ b/deployment/tft/test/preprocess_dataset_test.py @@ -138,6 +138,44 @@ def df_12(): return df +@pytest.fixture +def df_13(): + df = pd.DataFrame() + df["time"] = 1 + for i in range(5): + df[f"metric_{i}"] = [random.random() for i in range(1000)] + return df + + +@pytest.fixture +def df_14(): + df = pd.DataFrame() + df["time"] = 10 + for i in range(5): + df[f"metric_{i}"] = [np.nan for i in range(1000)] + return df + + +@pytest.fixture +def df_15(): + df = pd.DataFrame() + df["time"] = [i * 30 * 1e5 for i in range(500)] + [ + i * 30 * 1e5 + 10000 for i in range(500) + ] + for i in range(5): + df[f"metric_{i}"] = [np.nan for i in range(500)] + [2 for i in range(500)] + return df + + +@pytest.fixture +def df_16(): + df = pd.DataFrame() + df["time"] = [i for i in range(500)] + [i + 10000 for i in range(500)] + for i in range(5): + df[f"metric_{i}"] = [np.nan for i in range(500)] + [2 for i in range(500)] + return df + + @pytest.fixture def df(request): return request.getfixturevalue(request.param) @@ -164,6 +202,10 @@ class TestDataset: ("df_10", metric), ("df_11", metric), ("df_12", metric), + ("df_13", metric), + ("df_14", metric), + ("df_15", metric), + ("df_16", metric), ], indirect=True, ) -- GitLab