From f559ea1eb985a4ed267e2bff875f0e563c25ad06 Mon Sep 17 00:00:00 2001 From: Andreas Tsagkaropoulos Date: Fri, 18 Mar 2022 23:11:54 +0200 Subject: [PATCH 1/2] Added a description of the overall flow of execution in the forecasting script Improved the description of how to run the core predictive functionality, also adding details on two missing parameters Added better exception logging Enabled the use of `State.prediction_processing_time_safety_margin_seconds` variable instead of a hard-coded initialization Fixed the use of the floor function Removed the dynamic modification of the `State.number_of_days_to_use_data_from` variable --- morphemic-forecasting-exponentialsmoothing/README.md | 4 ++-- .../src/r_predictors/forecasting_real_workload.R | 11 +++++++++++ .../src/runtime/Predictor.py | 12 +++++++++--- .../src/test/test_scheduler.py | 2 +- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/morphemic-forecasting-exponentialsmoothing/README.md b/morphemic-forecasting-exponentialsmoothing/README.md index 32a4d85f..058ac3fc 100644 --- a/morphemic-forecasting-exponentialsmoothing/README.md +++ b/morphemic-forecasting-exponentialsmoothing/README.md @@ -53,8 +53,8 @@ When the predictor has been started, it is ready to accept messages from the bro ## Simple usage of the core prediction functionality -In order to simply test the prediction functionality, it is sufficient to use the `Rscript` command, followed by the full path of the `forecasting_real_workload.R` file which resides in the src/r_predictors directory, providing as an argument the prediction time for which a prediction is requested, and optionally the alpha/beta values for the prediction method which will be used (gamma will be estimated.) - +In order to simply test the prediction functionality (useful when testing, to determine that correct prediction output is produced), it is sufficient to use the `Rscript` command, followed by the full path of the `forecasting_real_workload.R` file which resides in the src/r_predictors directory, providing as an argument the full path to the dataset which should be used, the name of the metric which should be predicted (a relevant column should exist in the dataset) the prediction time for which a prediction is requested, and optionally the alpha/beta values for the prediction method which will be used (gamma will be estimated.) +More advanced usage of the core prediction functionality is possible, if this is desired please refer to the code documentation of the forecasting script (src/r_predictors/forecasting_real_workload.R) ### Docker container build To run the component in Dockerized form, it is first necessary to collect the necessary assets and build the Docker container. diff --git a/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R b/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R index 67834cb1..d179e12c 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R +++ b/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R @@ -9,6 +9,14 @@ library(anytime) library(purrr) +# Outline of the operation of the forecasting script +# +# This forecasting script relies on the presence of a dataset which contains the metric values to be forecasted. It is called with three main parameters - dataset path, metric to be forecasted and the time for which the forecast should be produced - and two optional parameters, the alpha and beta coefficients to be used during forecasting. The time for which the forecast should be produced may be ommitted under some circumstances. +# +# To create the final dataset which will be used for predictions, this script creates a timeseries with all times from the beginning of the observations in the dataset, until its end, using 1-second intervals (to allow for predictions based on epoch). In order for the exponential smoothing forecaster to operate satisfactorily, it is necessary to set the `number_of_seconds_to_aggregate_on` variable to a value which is large enough to smooth small fluctuations, yet small enough to allow for reasonable reaction times (e.g 300 seconds). +# Once the creation of the dataset is over, the `configuration_forecasting_horizon` configuration property is evaluated. If this value is positive, the time for which the forecast should be made should be provided as a command line argument, and this allows the formation of a training dataset and a test dataset. If a non-positive horizon is provided, then the `realtime_mode` configuration property is evaluated. In case that this is false, the prediction time does not need to be provided (it means we simply want to evaluate the predictive functionality based on past data), and the next prediction time will be the time of the last observation in the dataset. If the realtime mode parameter is true, then the prediction time needs to be provided, and the script will try to create a prediction using the minimum value between the next prediction time and the last observation time which is available in the dataset - in this case the next prediction time is also needed(TODO:perhaps this behaviour should be changed). +#Then, the final data points which will be used for the forecasting are determined, and the forecasting models are created, to produce predictions. The user of the script can opt to try finding the best parameters manually, using the `try_to_optimize_parameters` configuration parameter. + find_smape <- function(actual, forecast) { return (1/length(actual) * sum(2*abs(forecast-actual) / (abs(actual)+abs(forecast))*100)) } @@ -49,6 +57,9 @@ if (try_to_optimize_parameters){ }else{ #downsampling to single hours frequency_setting <- 1 } + +#Parsing of command-line arguments. Providing the alpha and beta values as arguments is entirely optional. Providing the next_prediction_time may be optional or it may be needed, depending on the circumstances. Please refer to the execution flow which is outlined in the beginning of this program + args <- commandArgs(trailingOnly=TRUE) dataset_to_process <- args[1] attribute_to_predict <- args[2] diff --git a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py index aad3fbbf..191c8f0c 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py +++ b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py @@ -146,7 +146,9 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f try: prediction = predict_attributes(State.metrics_to_predict,State.next_prediction_time) except Exception as e: - print("Could not create a prediction for some or all of the metrics for time point "+State.next_prediction_time) + print("Could not create a prediction for some or all of the metrics for time point "+State.next_prediction_time+", proceeding to next prediction time. The encountered exception trace follows:") + print(e) + continue for attribute in State.metrics_to_predict: if(not prediction[attribute].prediction_valid): continue @@ -184,7 +186,11 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f State.disconnected = False State.previous_prediction = prediction - State.number_of_days_to_use_data_from = (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) / (wait_time / State.number_of_days_to_use_data_from) + #State.number_of_days_to_use_data_from = (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) / (wait_time / State.number_of_days_to_use_data_from) + #State.number_of_days_to_use_data_from = 1 + int( + # (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) / + # (wait_time / State.number_of_days_to_use_data_from) + #) class Listener(messaging.listener.MorphemicListener): @@ -230,7 +236,7 @@ class Listener(messaging.listener.MorphemicListener): except Exception as e: print("Problem while retrieving epoch start and/or prediction_horizon") - maximum_time_required_for_prediction = 20 #initialization, assuming 20 seconds processing time to derive a first prediction + maximum_time_required_for_prediction = State.prediction_processing_time_safety_margin_seconds #initialization, assuming 20 seconds processing time to derive a first prediction if ((self.prediction_thread is None) or (not self.prediction_thread.is_alive())): self.prediction_thread = threading.Thread(target = calculate_and_publish_predictions, args =[prediction_horizon,maximum_time_required_for_prediction]) self.prediction_thread.start() diff --git a/morphemic-forecasting-exponentialsmoothing/src/test/test_scheduler.py b/morphemic-forecasting-exponentialsmoothing/src/test/test_scheduler.py index 96735714..7ab2d151 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/test/test_scheduler.py +++ b/morphemic-forecasting-exponentialsmoothing/src/test/test_scheduler.py @@ -28,6 +28,6 @@ if __name__=="__main__": #Below we subtract one reconfiguration interval, as we cannot send a prediction for a time point later than one prediction_horizon interval wait_time = next_prediction_time - horizon - time.time() - print("Waiting for "+str((wait_time*100).__floor__()/100)+" seconds") + print("Waiting for "+str((int(wait_time*100))/100)+" seconds") if (wait_time>0): time.sleep(wait_time) \ No newline at end of file -- GitLab From 1e9d67d2e07732fdabda9f8546c7c43dea38ecfe Mon Sep 17 00:00:00 2001 From: Andreas Tsagkaropoulos Date: Sat, 19 Mar 2022 01:34:04 +0200 Subject: [PATCH 2/2] Improvements to logging and control flow Try to reconnect to broker only if 5 seconds have elapsed Implemented an attempt to create a dataset using all data if this is not possible using the partial data set in the configuration file Improvements to README file --- .../README.md | 13 ++++++++++++- .../src/runtime/Predictor.py | 14 ++++++++++++-- .../src/runtime/utilities/Utilities.py | 5 +++++ 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/morphemic-forecasting-exponentialsmoothing/README.md b/morphemic-forecasting-exponentialsmoothing/README.md index 058ac3fc..405be5a1 100644 --- a/morphemic-forecasting-exponentialsmoothing/README.md +++ b/morphemic-forecasting-exponentialsmoothing/README.md @@ -66,4 +66,15 @@ The correct operation of the component is also dependent on the availability of When the Docker container has been successfully built, to start it it is enough to execute the following command: -docker run \ No newline at end of file +docker run + +### Test execution + +To quickly test the functionality of the forecaster, assuming that the EMS (or an ActiveMQ broker) has been (or soon will be) setup and is accessible, that the persistence storage module is available, and that the 'latency' and 'memory' metrics are being published to it, the following commands can be issued in order - provided that the broker-client.jar file is available. + +1) Publish metrics to predict: +java -jar broker-client.jar publish3 -Umorphemic -Pmorphemic tcp://localhost:61616 metrics_to_predict [{"metric":"latency","level":3,"publish_rate":10000},{"metric":"memory","level":3,"publish_rate":10000}] + +2) Publish start forecasting: + +java -jar broker-client.jar publish3 -Umorphemic -Pmorphemic tcp://localhost:61616 start_forecasting.exponentialsmoothing {\"metrics\":[\"latency\",\"memory\"],\"timestamp\":1626179164,\"epoch_start\":1626179353,\"number_of_forward_predictions\":8,\"prediction_horizon\":120} diff --git a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py index 191c8f0c..bbc5c5c8 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py +++ b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py @@ -222,7 +222,11 @@ class Listener(messaging.listener.MorphemicListener): State.testing_prediction_functionality = True elif self.get_topic_name(headers) == 'start_forecasting.exponentialsmoothing': - State.metrics_to_predict = json.loads(body)["metrics"] + try: + State.metrics_to_predict = json.loads(body)["metrics"] + except Exception as e: + print("Could not load json object to process the start forecasting message \n"+str(body)) + return #waitfor(first period) if (not State.initial_metric_list_received): print("The initial metric list has not been received, therefore no predictions are generated") @@ -235,6 +239,7 @@ class Listener(messaging.listener.MorphemicListener): State.next_prediction_time = State.epoch_start except Exception as e: print("Problem while retrieving epoch start and/or prediction_horizon") + return maximum_time_required_for_prediction = State.prediction_processing_time_safety_margin_seconds #initialization, assuming 20 seconds processing time to derive a first prediction if ((self.prediction_thread is None) or (not self.prediction_thread.is_alive())): @@ -261,8 +266,11 @@ class Listener(messaging.listener.MorphemicListener): logging.error(" %s", body) def on_disconnected(self): - print('disconnected') + print('Disconnected from broker, so will retry to connect...') State.disconnected=True + State.disconnection_handler.acquire() + State.disconnection_handler.notifyAll() + State.disconnection_handler.release() #connection.connect() @@ -290,6 +298,7 @@ if __name__ == "__main__": State.connection = messaging.morphemic.Connection(State.broker_username,State.broker_password,State.broker_address,State.broker_port) # morphemic = morphemic.model.Model(connection) State.connection.set_listener(id, Listener()) + print("Checking (EMS) broker connectivity state") if (State.disconnected or State.check_stale_connection()): try: State.connection.disconnect() #required to avoid the already connected exception @@ -305,6 +314,7 @@ if __name__ == "__main__": print("Encountered exception while trying to connect to broker") print(traceback.format_exc()) State.disconnected = True + time.sleep(5) continue State.disconnection_handler.acquire() State.disconnection_handler.wait() diff --git a/morphemic-forecasting-exponentialsmoothing/src/runtime/utilities/Utilities.py b/morphemic-forecasting-exponentialsmoothing/src/runtime/utilities/Utilities.py index 55e6a00a..698427d6 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/runtime/utilities/Utilities.py +++ b/morphemic-forecasting-exponentialsmoothing/src/runtime/utilities/Utilities.py @@ -52,6 +52,11 @@ class Utilities: response = datasetmaker.make() print ("Dataset creation process finished with response "+str(response)) + if (str(response).startswith("4")): + print("An error response has been detected from the dataset maker, therefore asking for all data from the database in an effort to create a dataset") + _start_collection = None + response = datasetmaker.make() + print("Second dataset creation process finished with response "+str(response)) except Exception as e: print("Could not create new dataset as an exception was thrown") print(e) \ No newline at end of file -- GitLab