diff --git a/deployment/arima/main.py b/deployment/arima/main.py index 7908bfe1c47e55d2dbf0151b9d684c329ece6356..208895e51b440501fa33be62d2733c9297de6610 100644 --- a/deployment/arima/main.py +++ b/deployment/arima/main.py @@ -112,11 +112,11 @@ def main(): ) # msg1 = Msg() - # msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "NotFinishedOnTimeContext", "level": 3, "publish_rate": 30000}]' + # msg1.body = '[{"metric": "MinimumCores", "level": 3, "publish_rate": 30000}, {"metric": "EstimatedRemainingTimeContext", "level": 3, "publish_rate": 30000}, {"metric": "NotFinishedOnTimeContext", "level": 3, "publish_rate": 10000}, {"metric": "ETPercentile_Ctx", "level": 3, "publish_rate": 10000}]' # msg2 = Msg() # msg2.body = ( # "{" - # + f'"metrics": ["MinimumCores", "EstimatedRemainingTimeContext", "NotFinishedOnTimeContext"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' + # + f'"metrics": ["MinimumCores", "EstimatedRemainingTimeContext", "NotFinishedOnTimeContext", "ETPercentile_Ctx"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 30' # + "}" # ) diff --git a/deployment/arima/predict.py b/deployment/arima/predict.py index c87915b9bfd82bb6131f1bd5c8b294d760b6c86a..618875abdacfa9ce5f79c448134d67be4fb932bc 100644 --- a/deployment/arima/predict.py +++ b/deployment/arima/predict.py @@ -14,6 +14,7 @@ from src.dataset_maker import CSVData from pytz import timezone import pytz from datetime import datetime +import random METHOD = os.environ.get("METHOD", "nbeats") START_TOPIC = f"start_forecasting.{METHOD}" @@ -146,6 +147,7 @@ def main(): print( f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) + print() end_time = int(time.time()) diff --git a/deployment/arima/src/example.py b/deployment/arima/src/example.py deleted file mode 100644 index 75ff52b0674ea7b5066c82ba11db7ed8c17276ed..0000000000000000000000000000000000000000 --- a/deployment/arima/src/example.py +++ /dev/null @@ -1,2 +0,0 @@ -def square(x): - return x * x \ No newline at end of file diff --git a/deployment/arima/src/model_predict.py b/deployment/arima/src/model_predict.py index dc191c0ffb0e55a9872445bc92ea4bbb2f7e033a..e34075d929a004fcc95e4a71aa09021ea6e6c75e 100644 --- a/deployment/arima/src/model_predict.py +++ b/deployment/arima/src/model_predict.py @@ -41,7 +41,12 @@ def predict( return None dataset = pd.read_csv(data_path) - new_ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"], publish_rate=publish_rate) + new_ts_dataset = Dataset( + dataset, + target_column=target_column, + **params["dataset"], + publish_rate=publish_rate, + ) if new_ts_dataset.dropped_recent_series: # series with recent data was too short logging.info( f"METRIC: {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" @@ -53,7 +58,12 @@ def predict( dataset = pd.read_csv(data_path) - ts_dataset = Dataset(dataset, target_column=target_column, **params["dataset"], publish_rate=publish_rate) + ts_dataset = Dataset( + dataset, + target_column=target_column, + **params["dataset"], + publish_rate=publish_rate, + ) if ts_dataset.dataset.shape[0] < 1: logging.info( f"METRIC: {target_column} Not enough fresh preprocessed data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" @@ -110,7 +120,9 @@ def predict( predictions.conf_int(alpha=0.05).iloc[i].values[0], predictions.conf_int(alpha=0.05).iloc[i].values[1], ], # quantiles difference - "predictionTime": timestamp + (i + 1) * (prediction_hor // 1000), + "predictionTime": timestamp + + (i // single_prediction_points_length + 1) + * (prediction_hor // 1000), "refersTo": "TODO", "cloud": "TODO", "provider": "TODO", diff --git a/deployment/arima/src/preprocess_dataset.py b/deployment/arima/src/preprocess_dataset.py index 789b21320f1697e0bbde4308de6d25afa81e3970..7a8fffe2964650cb7ca9fbb81098313c5a428b9b 100644 --- a/deployment/arima/src/preprocess_dataset.py +++ b/deployment/arima/src/preprocess_dataset.py @@ -101,7 +101,6 @@ class Dataset(object): ) def check_gap(self): - print(self.dataset) if self.dataset.shape[0] > 0: self.dataset = self.dataset.groupby(by=[self.time_column]).min() self.dataset[self.time_column] = self.dataset.index diff --git a/deployment/nbeats/predict.py b/deployment/nbeats/predict.py index 1c40e84319fb743bd0a3a26156b2c4ab5b3d9cfb..1b8a6774da11d574b83a4f7bf409bd56305ed27c 100644 --- a/deployment/nbeats/predict.py +++ b/deployment/nbeats/predict.py @@ -122,48 +122,32 @@ def main(): dataset_preprocessor.prepare_csv() global time_0 for metric in predicted_metrics: - predictions = None - for i in range(number_of_forward_predictions[metric]): - prediction_msgs, prediction = predict( - metric, - prediction_lengths[metric], - extra_data=None, - m=i + 1, - prediction_hor=prediction_horizon, - timestamp=time_0 + (i + 1) * (prediction_horizon // 1000), - predicted_point_idx=int( - (i + 1) * prediction_points_horizons[metric] - 1 - ), - publish_rate=metrics_info[metric]["publish_rate"], - ) - if i == (number_of_forward_predictions[metric] - 1): - print( - f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}" - ) - logging.info( - f"time difference in seconds between last preiction and current time {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}" - ) + prediction_msgs = predict( + metric, + prediction_lengths[metric], + single_prediction_points_length=prediction_points_horizons[metric], + prediction_hor=prediction_horizon, + timestamp=time_0, + publish_rate=metrics_info[metric]["publish_rate"], + ) - else: - predictions = prediction + if prediction_msgs: + dest = f"{PRED_TOPIC_PREF}.{metric}" - if prediction_msgs: + for message in prediction_msgs: logging.info( f"Sending predictions for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) + start_conn.send_to_topic(dest, message[metric]) + influxdb_conn.send_to_influxdb(metric, message) - dest = f"{PRED_TOPIC_PREF}.{metric}" - print( - f'{int(prediction_msgs[metric]["predictionTime"]) - int(prediction_msgs[metric]["timestamp"])} difference between timestamp and predicted in secnds' - ) - logging.info( - f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds TIME: {datetime.now(pytz.timezone(TZ)).strftime("%d/%m/%Y %H:%M:%S")}' - ) - logging.info( - f"Message: {prediction_msgs[metric]}, destination: {dest} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" - ) - start_conn.send_to_topic(dest, prediction_msgs[metric]) - influxdb_conn.send_to_influxdb(metric, prediction_msgs) + current_time = int(time.time()) + logging.info( + f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + print( + f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) end_time = int(time.time()) time_0 = time_0 + prediction_cycle diff --git a/deployment/nbeats/src/model_predict.py b/deployment/nbeats/src/model_predict.py index 0ea200bf397cb326685f1862f7c7b681e6a2cd0a..8458b94012144f02638b6e18d3d98f5dae61bbfd 100644 --- a/deployment/nbeats/src/model_predict.py +++ b/deployment/nbeats/src/model_predict.py @@ -18,13 +18,14 @@ TZ = os.environ.get("TIME_ZONE", "Europe/Vienna") """Script for nbeats fusion transformer prediction""" logging.basicConfig( - filename=f"logs/{os.environ.get('METHOD', 'nbeats')}.out", level=logging.INFO + filename=f"logs/{os.environ.get('METHOD', 'model')}.out", level=logging.INFO ) def predict( target_column, prediction_length, + single_prediction_points_length=1, yaml_file="model.yaml", extra_data=None, m=1, @@ -48,14 +49,14 @@ def predict( f"no pretrained model, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) print("no pretrained model, unable to predict") - return (None, None) + return None data_path = os.path.join( os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' ) if not os.path.isfile(data_path): - return (None, None) + return None dataset = pd.read_csv(data_path) new_ts_dataset = Dataset( @@ -69,7 +70,7 @@ def predict( f"Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) print("Not enough fresh data, unable to predict TIME:") - return (None, None) + return None dataset = new_ts_dataset.dataset @@ -118,7 +119,7 @@ def predict( model.load_state_dict(torch.load(model_path)) logging.info("Model corrupted unable to predict") else: - return (None, None) + return None prediction_input = ts_dataset.get_from_dataset(future_df) prediction_input = prediction_input.to_dataloader(train=False) @@ -133,43 +134,43 @@ def predict( for x, _ in prediction_input: # make prediction out = model(x) - out = torch.flatten(model.transform_output(out))[predicted_point_idx] - out = out.item() + out = torch.flatten(model.transform_output(out)) predictions_with_dropout.append(out) - conf_intervals = st.t.interval( - alpha=0.95, - df=len(predictions_with_dropout) - 1, - loc=np.mean(predictions_with_dropout), - scale=st.sem(predictions_with_dropout), - ) - - predicted_values = list(conf_intervals) + [ - torch.flatten(prediction)[predicted_point_idx].item() - ] - predicted_values.sort() # ensure that predictions and confidence intervals are in correct order - - msg = { - target_column: { - "metricValue": predicted_values[1], # middle predicted value - "level": 1, # TODO - "timestamp": int(time.time()), - "probability": 0.95, - "confidence_interval": [ - predicted_values[0], - predicted_values[-1], - ], # quantiles difference - "predictionTime": timestamp, - "refersTo": "TODO", - "cloud": "TODO", - "provider": "TODO", - } - } - logging.info( - f"prediction msg: {msg} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" - ) - - future_df = future_df.tail(pred_len) - future_df["split"] = "val" - future_df[target_column] = prediction[0] - return (msg, future_df) + msgs = [] + for i in range(pred_len): + if ((i + 1) % single_prediction_points_length) == 0: + predictions_intervals = [p[i].item() for p in predictions_with_dropout] + conf_intervals = st.t.interval( + alpha=0.95, + df=len(predictions_intervals) - 1, + loc=np.mean(predictions_intervals), + scale=st.sem(predictions_intervals), + ) + + predicted_values = list(conf_intervals) + [ + torch.flatten(prediction)[i].item() + ] + predicted_values.sort() # ensure that predictions and confidence intervals are in correct order + + msg = { + target_column: { + "metricValue": predicted_values[1], # middle predicted value + "level": 1, # TODO + "timestamp": int(time.time()), + "probability": 0.95, + "confidence_interval": [ + predicted_values[0], + predicted_values[-1], + ], # quantiles difference + "predictionTime": timestamp + + (i // single_prediction_points_length + 1) + * (prediction_hor // 1000), + "refersTo": "TODO", + "cloud": "TODO", + "provider": "TODO", + } + } + msgs.append(msg) + + return msgs diff --git a/deployment/tft/predict.py b/deployment/tft/predict.py index 1c40e84319fb743bd0a3a26156b2c4ab5b3d9cfb..1b8a6774da11d574b83a4f7bf409bd56305ed27c 100644 --- a/deployment/tft/predict.py +++ b/deployment/tft/predict.py @@ -122,48 +122,32 @@ def main(): dataset_preprocessor.prepare_csv() global time_0 for metric in predicted_metrics: - predictions = None - for i in range(number_of_forward_predictions[metric]): - prediction_msgs, prediction = predict( - metric, - prediction_lengths[metric], - extra_data=None, - m=i + 1, - prediction_hor=prediction_horizon, - timestamp=time_0 + (i + 1) * (prediction_horizon // 1000), - predicted_point_idx=int( - (i + 1) * prediction_points_horizons[metric] - 1 - ), - publish_rate=metrics_info[metric]["publish_rate"], - ) - if i == (number_of_forward_predictions[metric] - 1): - print( - f"time_0 difference seconds {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}" - ) - logging.info( - f"time difference in seconds between last preiction and current time {start_time + (i + 1) * prediction_horizon // 1000 - int(time.time())}" - ) + prediction_msgs = predict( + metric, + prediction_lengths[metric], + single_prediction_points_length=prediction_points_horizons[metric], + prediction_hor=prediction_horizon, + timestamp=time_0, + publish_rate=metrics_info[metric]["publish_rate"], + ) - else: - predictions = prediction + if prediction_msgs: + dest = f"{PRED_TOPIC_PREF}.{metric}" - if prediction_msgs: + for message in prediction_msgs: logging.info( f"Sending predictions for {metric} metric TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) + start_conn.send_to_topic(dest, message[metric]) + influxdb_conn.send_to_influxdb(metric, message) - dest = f"{PRED_TOPIC_PREF}.{metric}" - print( - f'{int(prediction_msgs[metric]["predictionTime"]) - int(prediction_msgs[metric]["timestamp"])} difference between timestamp and predicted in secnds' - ) - logging.info( - f'{int(prediction_msgs[metric]["predictionTime"]) - int(time.time())} difference between current time and predicted in secnds TIME: {datetime.now(pytz.timezone(TZ)).strftime("%d/%m/%Y %H:%M:%S")}' - ) - logging.info( - f"Message: {prediction_msgs[metric]}, destination: {dest} TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" - ) - start_conn.send_to_topic(dest, prediction_msgs[metric]) - influxdb_conn.send_to_influxdb(metric, prediction_msgs) + current_time = int(time.time()) + logging.info( + f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) + print( + f"metric: {metric}, Time difference between current and last horizon: {prediction_msgs[-1][metric]['predictionTime'] - current_time} seconds. TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" + ) end_time = int(time.time()) time_0 = time_0 + prediction_cycle diff --git a/deployment/tft/src/model_predict.py b/deployment/tft/src/model_predict.py index c98c90abcd5f29a4ef24ab510c9727405680bf29..217d34387da85fcaddf2a6bbc3e0294d486b6bbf 100644 --- a/deployment/tft/src/model_predict.py +++ b/deployment/tft/src/model_predict.py @@ -27,6 +27,7 @@ pd.options.mode.chained_assignment = None def predict( target_column, prediction_length, + single_prediction_points_length=1, yaml_file="model.yaml", extra_data=None, m=1, @@ -47,14 +48,14 @@ def predict( f"METRIC {target_column} no pretrained model, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) print("no pretrained model, unable to predict") - return (None, None) + return None data_path = os.path.join( os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv' ) if not os.path.isfile(data_path): - return (None, None) + return None dataset = pd.read_csv(data_path) new_ts_dataset = Dataset( @@ -68,7 +69,7 @@ def predict( f"METRIC {target_column} Not enough fresh data, unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) print("METRIC {target_column} Not enough fresh data, unable to predict TIME:") - return (None, None) + return None dataset = new_ts_dataset.dataset @@ -119,48 +120,50 @@ def predict( logging.info( f"METRIC {target_column} No pretrained model unable to predict TIME: {datetime.now(pytz.timezone(TZ)).strftime('%d/%m/%Y %H:%M:%S')}" ) - return (None, None) + return None with lock: if os.path.isfile(model_path): model.load_state_dict(torch.load(model_path)) logging.info("Model corrupted unable to predict") else: - return (None, None) + return None prediction_input = ts_dataset.get_from_dataset(future_df) prediction_input = prediction_input.to_dataloader(train=False) prediction = model.predict(future_df, mode="raw")["prediction"] - predicted_values = [ - prediction[-1][predicted_point_idx][0].item(), - prediction[-1][predicted_point_idx][3].item(), - prediction[-1][predicted_point_idx][-1].item(), - ] - - predicted_values.sort() - - msg = { - target_column: { - "metricValue": predicted_values[1], - "level": 1, # TODO - "timestamp": int(time.time()), - "probability": 0.95, - "confidence_interval": [ - predicted_values[0], - predicted_values[-1], - ], # quantiles difference - "predictionTime": timestamp, - "refersTo": "TODO", - "cloud": "TODO", - "provider": "TODO", - } - } - logging.debug(f"prediction msg: {msg}") - - future_df["split"] = "val" - future_df = future_df.tail(pred_len) - future_df["split"] = "val" - future_df[target_column] = prediction.permute(0, 2, 1)[0][3] - return (msg, future_df) + msgs = [] + for i in range(pred_len): + if ((i + 1) % single_prediction_points_length) == 0: + predicted_values = [ + prediction[-1][i][0].item(), + prediction[-1][i][3].item(), + prediction[-1][i][-1].item(), + ] + + predicted_values.sort() + + msg = { + target_column: { + "metricValue": predicted_values[1], + "level": 1, # TODO + "timestamp": int(time.time()), + "probability": 0.95, + "confidence_interval": [ + predicted_values[0], + predicted_values[-1], + ], # quantiles difference + "predictionTime": timestamp + + (i // single_prediction_points_length + 1) + * (prediction_hor // 1000), + "refersTo": "TODO", + "cloud": "TODO", + "provider": "TODO", + } + } + + msgs.append(msg) + + return msgs