From a93daf214d0a2e95d7ad712a205621b468ff3e4a Mon Sep 17 00:00:00 2001 From: Andreas Tsagkaropoulos Date: Tue, 25 Oct 2022 16:50:34 +0300 Subject: [PATCH 1/2] Improvements to allow sending a batch of predictions at once, using the exponential smoothing predictor --- .../src/runtime/Predictor.py | 91 ++++++++++--------- .../src/runtime/operational_status/State.py | 2 +- 2 files changed, 51 insertions(+), 42 deletions(-) diff --git a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py index e0050728..3a6ff35e 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py +++ b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py @@ -144,49 +144,58 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f Utilities.load_configuration() Utilities.update_monitoring_data() - try: - prediction = predict_attributes(State.metrics_to_predict,State.next_prediction_time) - except Exception as e: - print_with_time("Could not create a prediction for some or all of the metrics for time point "+str(State.next_prediction_time)+", proceeding to next prediction time. The encountered exception trace follows:") - print(e) - continue - for attribute in State.metrics_to_predict: - if(not prediction[attribute].prediction_valid): - continue - if (State.disconnected or State.check_stale_connection()): - State.connection.connect() - message_not_sent = True - current_time = int(time.time()) - prediction_message_body = { - "metricValues": prediction[attribute].value, - "level": 3, - "timestamp": current_time, - "probability": 0.95, - "confidence_interval": str(prediction[attribute].lower_confidence_interval_value) + "," + str( - prediction[attribute].upper_confidence_interval_value), - "predictionTime": int(State.next_prediction_time), - "refersTo": "Undefined",#"MySQL_12345", - "cloud": "Undefined",#"AWS-Dublin", - "provider": "Undefined",#"AWS" - } - training_events_message_body = { - "metrics": State.metrics_to_predict, - "forecasting_method": "exponentialsmoothing", - "timestamp": current_time, - } - while (message_not_sent): - try: - State.connection.send_to_topic('intermediate_prediction.%s.%s' % (id, attribute), prediction_message_body) - State.connection.send_to_topic('training_events',training_events_message_body) - message_not_sent = False - print_with_time("Successfully sent prediction message for "+attribute) - except ConnectionError as exception: - State.connection.disconnect() - State.connection = messaging.morphemic.Connection('admin', 'admin') + first_prediction = None + for prediction_index in range(0,State.total_time_intervals_to_predict): + prediction_time = int(State.next_prediction_time)+prediction_index*prediction_horizon + try: + prediction = predict_attributes(State.metrics_to_predict,prediction_time) + if (prediction_time == int(State.next_prediction_time)): + first_prediction = prediction + except Exception as e: + print_with_time("Could not create a prediction for some or all of the metrics for time point "+str(State.next_prediction_time)+", proceeding to next prediction time. However, "+str(prediction_index)+" predictions were produced (out of the configured "+State.total_time_intervals_to_predict+"). The encountered exception trace follows:") + print(e) + #continue was here, to continue while loop, replaced by break + break + for attribute in State.metrics_to_predict: + if(not prediction[attribute].prediction_valid): + #continue was here, to continue while loop, replaced by break + break + if (State.disconnected or State.check_stale_connection()): State.connection.connect() - State.disconnected = False + message_not_sent = True + current_time = int(time.time()) + prediction_message_body = { + "metricValues": prediction[attribute].value, + "level": 3, + "timestamp": current_time, + "probability": 0.95, + "confidence_interval": str(prediction[attribute].lower_confidence_interval_value) + "," + str( + prediction[attribute].upper_confidence_interval_value), + "predictionTime": prediction_time, + "refersTo": "Undefined",#"MySQL_12345", + "cloud": "Undefined",#"AWS-Dublin", + "provider": "Undefined",#"AWS" + } + training_events_message_body = { + "metrics": State.metrics_to_predict, + "forecasting_method": "exponentialsmoothing", + "timestamp": current_time, + } + while (message_not_sent): + try: + State.connection.send_to_topic('intermediate_prediction.%s.%s' % (id, attribute), prediction_message_body) + State.connection.send_to_topic('training_events',training_events_message_body) + message_not_sent = False + print_with_time("Successfully sent prediction message for "+attribute) + except ConnectionError as exception: + State.connection.disconnect() + State.connection = messaging.morphemic.Connection('admin', 'admin') + State.connection.connect() + State.disconnected = False + + if (first_prediction is not None): + State.previous_prediction = first_prediction #first_prediction is the first of the batch of the predictions which are produced. The size of this batch is set by the State.total_time_intervals_to_predict (currently set to 8) - State.previous_prediction = prediction #State.number_of_days_to_use_data_from = (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) / (wait_time / State.number_of_days_to_use_data_from) #State.number_of_days_to_use_data_from = 1 + int( # (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) / diff --git a/morphemic-forecasting-exponentialsmoothing/src/runtime/operational_status/State.py b/morphemic-forecasting-exponentialsmoothing/src/runtime/operational_status/State.py index be0b0863..261325b6 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/runtime/operational_status/State.py +++ b/morphemic-forecasting-exponentialsmoothing/src/runtime/operational_status/State.py @@ -29,7 +29,7 @@ class State: disconnection_handler = threading.Condition() initial_metric_list_received = False testing_prediction_functionality = False - + total_time_intervals_to_predict = 8 #Connection details connection = None -- GitLab From 5c6f2df230e24815fc2c68ffd337c14e93510ece Mon Sep 17 00:00:00 2001 From: Andreas Tsagkaropoulos Date: Thu, 17 Nov 2022 17:10:32 +0200 Subject: [PATCH 2/2] Change of training_events to training_models --- .../src/runtime/Predictor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py index 524bb0cf..aa0f414a 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py +++ b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py @@ -193,7 +193,7 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f "cloud": "todo", "provider": "todo", } - training_events_message_body = { + training_models_message_body = { "metrics": State.metrics_to_predict, "forecasting_method": "exponentialsmoothing", "timestamp": current_time, @@ -201,7 +201,7 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f while (message_not_sent): try: State.connection.send_to_topic('intermediate_prediction.%s.%s' % (id, attribute), prediction_message_body) - State.connection.send_to_topic('training_events',training_events_message_body) + State.connection.send_to_topic('training_models',training_models_message_body) message_not_sent = False print_with_time("Successfully sent prediction message for %s to topic intermediate_prediction.%s.%s\n\n%s\n\n" % (attribute, id, attribute, prediction_message_body)) except ConnectionError as exception: -- GitLab