From 0949cd6f76b5adf2ae33dd66a1cd0802f962a0cf Mon Sep 17 00:00:00 2001 From: Anna Warno Date: Mon, 25 Oct 2021 09:40:15 +0200 Subject: [PATCH] prediction speed corrected --- deployment/arima/main.py | 4 +- deployment/arima/predict.py | 2 + deployment/arima/src/example.py | 2 - deployment/arima/src/model_predict.py | 18 ++++- deployment/arima/src/preprocess_dataset.py | 1 - deployment/nbeats/predict.py | 56 +++++--------- deployment/nbeats/src/model_predict.py | 87 +++++++++++----------- deployment/tft/predict.py | 56 +++++--------- deployment/tft/src/model_predict.py | 75 ++++++++++--------- 9 files changed, 142 insertions(+), 159 deletions(-) delete mode 100644 deployment/arima/src/example.py diff --git a/deployment/arima/main.py b/deployment/arima/main.py index 7908bfe1..208895e5 100644 --- a/deployment/arima/main.py +++ b/deployment/arima/main.py @@ -112,11 +112,11 @@ def main(): ) # msg1 = Msg() - # msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "NotFinishedOnTimeContext", "level": 3, "publish_rate": 30000}]' + # msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "NotFinishedOnTimeContext", "level": 3, "publish_rate": 10000}, {"metric": "ETPercentile_Ctx", "level": 3, "publish_rate": 10000}]' # msg2 = Msg() # msg2.body = ( # "{" - # + f'"metrics": ["MinimumCores", "EstimatedRemainingTimeContext", "NotFinishedOnTimeContext"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' + # + f'"metrics": ["MinimumCores", "EstimatedRemainingTimeContext", "NotFinishedOnTimeContext", "ETPercentile_Ctx"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' # + "}" # ) diff --git a/deployment/arima/predict.py b/deployment/arima/predict.py index c87915b9..618875ab 100644 --- a/deployment/arima/predict.py +++ b/deployment/arima/predict.py @@ -14,6 +14,7 @@ from src.dataset_maker import CSVData from pytz import timezone import pytz from datetime import datetime +import random METHOD = os.environ.get("METHOD", "nbeats") START_TOPIC = f"start_forecasting.{METHOD}" @@ -146,6 +147,7 @@ def main(): print( f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) + print() end_time = int(time.time()) diff --git a/deployment/arima/src/example.py b/deployment/arima/src/example.py deleted file mode 100644 index 75ff52b0..00000000 --- a/deployment/arima/src/example.py +++ /dev/null @@ -1,2 +0,0 @@ -def square(x): - return x * x \ No newline at end of file diff --git a/deployment/arima/src/model_predict.py b/deployment/arima/src/model_predict.py index dc191c0f..e34075d9 100644 --- a/deployment/arima/src/model_predict.py +++ b/deployment/arima/src/model_predict.py @@ -41,7 +41,12 @@ def predict( return None dataset = pd.read_csv(data_path) - new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"], publish_rate=publish_rate) + new_ts_dataset = Dataset( + dataset, + target_column=target_column, + **params["dataset"], + publish_rate=publish_rate, + ) if new_ts_dataset.dropped_recent_series: # series with recent data was too short logging.info( f"METRIC: {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" @@ -53,7 +58,12 @@ def predict( dataset = pd.read_csv(data_path) - ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"], publish_rate=publish_rate) + ts_dataset = Dataset( + dataset, + target_column=target_column, + **params["dataset"], + publish_rate=publish_rate, + ) if ts_dataset.dataset.shape[0] < 1: logging.info( f"METRIC: {target_column} Not enough fresh preprocessed data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" @@ -110,7 +120,9 @@ def predict( predictions.conf_int(alpha=0.05).iloc[i].values[0], predictions.conf_int(alpha=0.05).iloc[i].values[1], ], # quantiles difference - "predictionTime": timestamp + (i + 1) * (prediction_hor // 1000), + "predictionTime": timestamp + + (i // single_prediction_points_length + 1) + * (prediction_hor // 1000), "refersTo": "TODO", "cloud": "TODO", "provider": "TODO", diff --git a/deployment/arima/src/preprocess_dataset.py b/deployment/arima/src/preprocess_dataset.py index 789b2132..7a8fffe2 100644 --- a/deployment/arima/src/preprocess_dataset.py +++ b/deployment/arima/src/preprocess_dataset.py @@ -101,7 +101,6 @@ class Dataset(object): ) def check_gap(self): - print(self.dataset) if self.dataset.shape[0] > 0: self.dataset = self.dataset.groupby(by=[self.time_column]).min() self.dataset[self.time_column] = self.dataset.index diff --git a/deployment/nbeats/predict.py b/deployment/nbeats/predict.py index 1c40e843..1b8a6774 100644 --- a/deployment/nbeats/predict.py +++ b/deployment/nbeats/predict.py @@ -122,48 +122,32 @@ def main(): dataset_preprocessor.prepare_csv() global time_0 for metric in predicted_metrics: - predictions = None - for i in range(number_of_forward_predictions[metric]): - prediction_msgs, prediction = predict( - metric, - prediction_lengths[metric], - extra_data=None, - m=i + 1, - prediction_hor=prediction_horizon, - timestamp=time_0 + (i + 1) * (prediction_horizon // 1000), - predicted_point_idx=int( - (i + 1) * prediction_points_horizons[metric] - 1 - ), - publish_rate=metrics_info[metric]["publish_rate"], - ) - if i == (number_of_forward_predictions[metric] - 1): - print( - f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}" - ) - logging.info( - f"time difference in seconds between last preiction and current time {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}" - ) + prediction_msgs = predict( + metric, + prediction_lengths[metric], + single_prediction_points_length=prediction_points_horizons[metric], + prediction_hor=prediction_horizon, + timestamp=time_0, + publish_rate=metrics_info[metric]["publish_rate"], + ) - else: - predictions = prediction + if prediction_msgs: + dest = f"{PRED_TOPIC_PREF}.{metric}" - if prediction_msgs: + for message in prediction_msgs: logging.info( f"Sending predictions for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) + start_conn.send_to_topic(dest, message[metric]) + influxdb_conn.send_to_influxdb(metric, message) - dest = f"{PRED_TOPIC_PREF}.{metric}" - print( - f'{int(prediction_msgs[metric]["predictionTime"]) - int(prediction_msgs[metric]["timestamp"])} difference between timestamp and predicted in secnds' - ) - logging.info( - f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds TIME: {datetime.now(pytz.timezone(TZ)).strftime("%d/%m/%Y %H:%M:%S")}' - ) - logging.info( - f"Message: {prediction_msgs[metric]}, destination: {dest} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" - ) - start_conn.send_to_topic(dest, prediction_msgs[metric]) - influxdb_conn.send_to_influxdb(metric, prediction_msgs) + current_time = int(time.time()) + logging.info( + f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + print( + f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) end_time = int(time.time()) time_0 = time_0 + prediction_cycle diff --git a/deployment/nbeats/src/model_predict.py b/deployment/nbeats/src/model_predict.py index 0ea200bf..8458b940 100644 --- a/deployment/nbeats/src/model_predict.py +++ b/deployment/nbeats/src/model_predict.py @@ -18,13 +18,14 @@ TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") """Script for nbeats fusion transformer prediction""" logging.basicConfig( - filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO + filename=f"logs/{os.environ.get('METHOD', 'model')}.out", level=logging.INFO ) def predict( target_column, prediction_length, + single_prediction_points_length=1, yaml_file="model.yaml", extra_data=None, m=1, @@ -48,14 +49,14 @@ def predict( f"no pretrained model, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) print("no pretrained model, unable to predict") - return (None, None) + return None data_path = os.path.join( os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' ) if not os.path.isfile(data_path): - return (None, None) + return None dataset = pd.read_csv(data_path) new_ts_dataset = Dataset( @@ -69,7 +70,7 @@ def predict( f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) print("Not enough fresh data, unable to predict TIME:") - return (None, None) + return None dataset = new_ts_dataset.dataset @@ -118,7 +119,7 @@ def predict( model.load_state_dict(torch.load(model_path)) logging.info("Model corrupted unable to predict") else: - return (None, None) + return None prediction_input = ts_dataset.get_from_dataset(future_df) prediction_input = prediction_input.to_dataloader(train=False) @@ -133,43 +134,43 @@ def predict( for x, _ in prediction_input: # make prediction out = model(x) - out = torch.flatten(model.transform_output(out))[predicted_point_idx] - out = out.item() + out = torch.flatten(model.transform_output(out)) predictions_with_dropout.append(out) - conf_intervals = st.t.interval( - alpha=0.95, - df=len(predictions_with_dropout) - 1, - loc=np.mean(predictions_with_dropout), - scale=st.sem(predictions_with_dropout), - ) - - predicted_values = list(conf_intervals) + [ - torch.flatten(prediction)[predicted_point_idx].item() - ] - predicted_values.sort() # ensure that predictions and confidence intervals are in correct order - - msg = { - target_column: { - "metricValue": predicted_values[1], # middle predicted value - "level": 1, # TODO - "timestamp": int(time.time()), - "probability": 0.95, - "confidence_interval": [ - predicted_values[0], - predicted_values[-1], - ], # quantiles difference - "predictionTime": timestamp, - "refersTo": "TODO", - "cloud": "TODO", - "provider": "TODO", - } - } - logging.info( - f"prediction msg: {msg} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" - ) - - future_df = future_df.tail(pred_len) - future_df["split"] = "val" - future_df[target_column] = prediction[0] - return (msg, future_df) + msgs = [] + for i in range(pred_len): + if ((i + 1) % single_prediction_points_length) == 0: + predictions_intervals = [p[i].item() for p in predictions_with_dropout] + conf_intervals = st.t.interval( + alpha=0.95, + df=len(predictions_intervals) - 1, + loc=np.mean(predictions_intervals), + scale=st.sem(predictions_intervals), + ) + + predicted_values = list(conf_intervals) + [ + torch.flatten(prediction)[i].item() + ] + predicted_values.sort() # ensure that predictions and confidence intervals are in correct order + + msg = { + target_column: { + "metricValue": predicted_values[1], # middle predicted value + "level": 1, # TODO + "timestamp": int(time.time()), + "probability": 0.95, + "confidence_interval": [ + predicted_values[0], + predicted_values[-1], + ], # quantiles difference + "predictionTime": timestamp + + (i // single_prediction_points_length + 1) + * (prediction_hor // 1000), + "refersTo": "TODO", + "cloud": "TODO", + "provider": "TODO", + } + } + msgs.append(msg) + + return msgs diff --git a/deployment/tft/predict.py b/deployment/tft/predict.py index 1c40e843..1b8a6774 100644 --- a/deployment/tft/predict.py +++ b/deployment/tft/predict.py @@ -122,48 +122,32 @@ def main(): dataset_preprocessor.prepare_csv() global time_0 for metric in predicted_metrics: - predictions = None - for i in range(number_of_forward_predictions[metric]): - prediction_msgs, prediction = predict( - metric, - prediction_lengths[metric], - extra_data=None, - m=i + 1, - prediction_hor=prediction_horizon, - timestamp=time_0 + (i + 1) * (prediction_horizon // 1000), - predicted_point_idx=int( - (i + 1) * prediction_points_horizons[metric] - 1 - ), - publish_rate=metrics_info[metric]["publish_rate"], - ) - if i == (number_of_forward_predictions[metric] - 1): - print( - f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}" - ) - logging.info( - f"time difference in seconds between last preiction and current time {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}" - ) + prediction_msgs = predict( + metric, + prediction_lengths[metric], + single_prediction_points_length=prediction_points_horizons[metric], + prediction_hor=prediction_horizon, + timestamp=time_0, + publish_rate=metrics_info[metric]["publish_rate"], + ) - else: - predictions = prediction + if prediction_msgs: + dest = f"{PRED_TOPIC_PREF}.{metric}" - if prediction_msgs: + for message in prediction_msgs: logging.info( f"Sending predictions for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) + start_conn.send_to_topic(dest, message[metric]) + influxdb_conn.send_to_influxdb(metric, message) - dest = f"{PRED_TOPIC_PREF}.{metric}" - print( - f'{int(prediction_msgs[metric]["predictionTime"]) - int(prediction_msgs[metric]["timestamp"])} difference between timestamp and predicted in secnds' - ) - logging.info( - f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds TIME: {datetime.now(pytz.timezone(TZ)).strftime("%d/%m/%Y %H:%M:%S")}' - ) - logging.info( - f"Message: {prediction_msgs[metric]}, destination: {dest} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" - ) - start_conn.send_to_topic(dest, prediction_msgs[metric]) - influxdb_conn.send_to_influxdb(metric, prediction_msgs) + current_time = int(time.time()) + logging.info( + f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + print( + f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) end_time = int(time.time()) time_0 = time_0 + prediction_cycle diff --git a/deployment/tft/src/model_predict.py b/deployment/tft/src/model_predict.py index c98c90ab..217d3438 100644 --- a/deployment/tft/src/model_predict.py +++ b/deployment/tft/src/model_predict.py @@ -27,6 +27,7 @@ pd.options.mode.chained_assignment = None def predict( target_column, prediction_length, + single_prediction_points_length=1, yaml_file="model.yaml", extra_data=None, m=1, @@ -47,14 +48,14 @@ def predict( f"METRIC {target_column} no pretrained model, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) print("no pretrained model, unable to predict") - return (None, None) + return None data_path = os.path.join( os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' ) if not os.path.isfile(data_path): - return (None, None) + return None dataset = pd.read_csv(data_path) new_ts_dataset = Dataset( @@ -68,7 +69,7 @@ def predict( f"METRIC {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) print("METRIC {target_column} Not enough fresh data, unable to predict TIME:") - return (None, None) + return None dataset = new_ts_dataset.dataset @@ -119,48 +120,50 @@ def predict( logging.info( f"METRIC {target_column} No pretrained model unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) - return (None, None) + return None with lock: if os.path.isfile(model_path): model.load_state_dict(torch.load(model_path)) logging.info("Model corrupted unable to predict") else: - return (None, None) + return None prediction_input = ts_dataset.get_from_dataset(future_df) prediction_input = prediction_input.to_dataloader(train=False) prediction = model.predict(future_df, mode="raw")["prediction"] - predicted_values = [ - prediction[-1][predicted_point_idx][0].item(), - prediction[-1][predicted_point_idx][3].item(), - prediction[-1][predicted_point_idx][-1].item(), - ] - - predicted_values.sort() - - msg = { - target_column: { - "metricValue": predicted_values[1], - "level": 1, # TODO - "timestamp": int(time.time()), - "probability": 0.95, - "confidence_interval": [ - predicted_values[0], - predicted_values[-1], - ], # quantiles difference - "predictionTime": timestamp, - "refersTo": "TODO", - "cloud": "TODO", - "provider": "TODO", - } - } - logging.debug(f"prediction msg: {msg}") - - future_df["split"] = "val" - future_df = future_df.tail(pred_len) - future_df["split"] = "val" - future_df[target_column] = prediction.permute(0, 2, 1)[0][3] - return (msg, future_df) + msgs = [] + for i in range(pred_len): + if ((i + 1) % single_prediction_points_length) == 0: + predicted_values = [ + prediction[-1][i][0].item(), + prediction[-1][i][3].item(), + prediction[-1][i][-1].item(), + ] + + predicted_values.sort() + + msg = { + target_column: { + "metricValue": predicted_values[1], + "level": 1, # TODO + "timestamp": int(time.time()), + "probability": 0.95, + "confidence_interval": [ + predicted_values[0], + predicted_values[-1], + ], # quantiles difference + "predictionTime": timestamp + + (i // single_prediction_points_length + 1) + * (prediction_hor // 1000), + "refersTo": "TODO", + "cloud": "TODO", + "provider": "TODO", + } + } + + msgs.append(msg) + + return msgs -- GitLab