diff --git a/morphemic-forecasting-exponentialsmoothing/README.md b/morphemic-forecasting-exponentialsmoothing/README.md index 32a4d85fd937aad71714f58059c3f69c63a02e95..405be5a1280bbc764bfa4600e2db8cf4d1932b1a 100644 --- a/morphemic-forecasting-exponentialsmoothing/README.md +++ b/morphemic-forecasting-exponentialsmoothing/README.md @@ -53,8 +53,8 @@ When the predictor has been started, it is ready to accept messages from the bro ## Simple usage of the core prediction functionality -In order to simply test the prediction functionality, it is sufficient to use the `Rscript` command, followed by the full path of the `forecasting_real_workload.R` file which resides in the src/r_predictors directory, providing as an argument the prediction time for which a prediction is requested, and optionally the alpha/beta values for the prediction method which will be used (gamma will be estimated.) - +In order to simply test the prediction functionality (useful when testing, to determine that correct prediction output is produced), it is sufficient to use the `Rscript` command, followed by the full path of the `forecasting_real_workload.R` file which resides in the src/r_predictors directory, providing as an argument the full path to the dataset which should be used, the name of the metric which should be predicted (a relevant column should exist in the dataset) the prediction time for which a prediction is requested, and optionally the alpha/beta values for the prediction method which will be used (gamma will be estimated.) +More advanced usage of the core prediction functionality is possible, if this is desired please refer to the code documentation of the forecasting script (src/r_predictors/forecasting_real_workload.R) ### Docker container build To run the component in Dockerized form, it is first necessary to collect the necessary assets and build the Docker container. @@ -66,4 +66,15 @@ The correct operation of the component is also dependent on the availability of When the Docker container has been successfully built, to start it it is enough to execute the following command: -docker run \ No newline at end of file +docker run + +### Test execution + +To quickly test the functionality of the forecaster, assuming that the EMS (or an ActiveMQ broker) has been (or soon will be) setup and is accessible, that the persistence storage module is available, and that the 'latency' and 'memory' metrics are being published to it, the following commands can be issued in order - provided that the broker-client.jar file is available. + +1) Publish metrics to predict: +java -jar broker-client.jar publish3 -Umorphemic -Pmorphemic tcp://localhost:61616 metrics_to_predict [{"metric":"latency","level":3,"publish_rate":10000},{"metric":"memory","level":3,"publish_rate":10000}] + +2) Publish start forecasting: + +java -jar broker-client.jar publish3 -Umorphemic -Pmorphemic tcp://localhost:61616 start_forecasting.exponentialsmoothing {\"metrics\":[\"latency\",\"memory\"],\"timestamp\":1626179164,\"epoch_start\":1626179353,\"number_of_forward_predictions\":8,\"prediction_horizon\":120} diff --git a/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R b/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R index 67834cb19c7e82d4dd40caed02cd9a1bb4557860..d179e12c031b67f488d56e27d09080574fd5ea8c 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R +++ b/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R @@ -9,6 +9,14 @@ library(anytime) library(purrr) +# Outline of the operation of the forecasting script +# +# This forecasting script relies on the presence of a dataset which contains the metric values to be forecasted. It is called with three main parameters - dataset path, metric to be forecasted and the time for which the forecast should be produced - and two optional parameters, the alpha and beta coefficients to be used during forecasting. The time for which the forecast should be produced may be ommitted under some circumstances. +# +# To create the final dataset which will be used for predictions, this script creates a timeseries with all times from the beginning of the observations in the dataset, until its end, using 1-second intervals (to allow for predictions based on epoch). In order for the exponential smoothing forecaster to operate satisfactorily, it is necessary to set the `number_of_seconds_to_aggregate_on` variable to a value which is large enough to smooth small fluctuations, yet small enough to allow for reasonable reaction times (e.g 300 seconds). +# Once the creation of the dataset is over, the `configuration_forecasting_horizon` configuration property is evaluated. If this value is positive, the time for which the forecast should be made should be provided as a command line argument, and this allows the formation of a training dataset and a test dataset. If a non-positive horizon is provided, then the `realtime_mode` configuration property is evaluated. In case that this is false, the prediction time does not need to be provided (it means we simply want to evaluate the predictive functionality based on past data), and the next prediction time will be the time of the last observation in the dataset. If the realtime mode parameter is true, then the prediction time needs to be provided, and the script will try to create a prediction using the minimum value between the next prediction time and the last observation time which is available in the dataset - in this case the next prediction time is also needed(TODO:perhaps this behaviour should be changed). +#Then, the final data points which will be used for the forecasting are determined, and the forecasting models are created, to produce predictions. The user of the script can opt to try finding the best parameters manually, using the `try_to_optimize_parameters` configuration parameter. + find_smape <- function(actual, forecast) { return (1/length(actual) * sum(2*abs(forecast-actual) / (abs(actual)+abs(forecast))*100)) } @@ -49,6 +57,9 @@ if (try_to_optimize_parameters){ }else{ #downsampling to single hours frequency_setting <- 1 } + +#Parsing of command-line arguments. Providing the alpha and beta values as arguments is entirely optional. Providing the next_prediction_time may be optional or it may be needed, depending on the circumstances. Please refer to the execution flow which is outlined in the beginning of this program + args <- commandArgs(trailingOnly=TRUE) dataset_to_process <- args[1] attribute_to_predict <- args[2] diff --git a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py index aad3fbbfefa584d51108d2a774766d991423a4ee..bbc5c5c83cb56890be78234a19a6df24f07c6c0e 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py +++ b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py @@ -146,7 +146,9 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f try: prediction = predict_attributes(State.metrics_to_predict,State.next_prediction_time) except Exception as e: - print("Could not create a prediction for some or all of the metrics for time point "+State.next_prediction_time) + print("Could not create a prediction for some or all of the metrics for time point "+State.next_prediction_time+", proceeding to next prediction time. The encountered exception trace follows:") + print(e) + continue for attribute in State.metrics_to_predict: if(not prediction[attribute].prediction_valid): continue @@ -184,7 +186,11 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f State.disconnected = False State.previous_prediction = prediction - State.number_of_days_to_use_data_from = (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) / (wait_time / State.number_of_days_to_use_data_from) + #State.number_of_days_to_use_data_from = (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) / (wait_time / State.number_of_days_to_use_data_from) + #State.number_of_days_to_use_data_from = 1 + int( + # (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) / + # (wait_time / State.number_of_days_to_use_data_from) + #) class Listener(messaging.listener.MorphemicListener): @@ -216,7 +222,11 @@ class Listener(messaging.listener.MorphemicListener): State.testing_prediction_functionality = True elif self.get_topic_name(headers) == 'start_forecasting.exponentialsmoothing': - State.metrics_to_predict = json.loads(body)["metrics"] + try: + State.metrics_to_predict = json.loads(body)["metrics"] + except Exception as e: + print("Could not load json object to process the start forecasting message \n"+str(body)) + return #waitfor(first period) if (not State.initial_metric_list_received): print("The initial metric list has not been received, therefore no predictions are generated") @@ -229,8 +239,9 @@ class Listener(messaging.listener.MorphemicListener): State.next_prediction_time = State.epoch_start except Exception as e: print("Problem while retrieving epoch start and/or prediction_horizon") + return - maximum_time_required_for_prediction = 20 #initialization, assuming 20 seconds processing time to derive a first prediction + maximum_time_required_for_prediction = State.prediction_processing_time_safety_margin_seconds #initialization, assuming 20 seconds processing time to derive a first prediction if ((self.prediction_thread is None) or (not self.prediction_thread.is_alive())): self.prediction_thread = threading.Thread(target = calculate_and_publish_predictions, args =[prediction_horizon,maximum_time_required_for_prediction]) self.prediction_thread.start() @@ -255,8 +266,11 @@ class Listener(messaging.listener.MorphemicListener): logging.error(" %s", body) def on_disconnected(self): - print('disconnected') + print('Disconnected from broker, so will retry to connect...') State.disconnected=True + State.disconnection_handler.acquire() + State.disconnection_handler.notifyAll() + State.disconnection_handler.release() #connection.connect() @@ -284,6 +298,7 @@ if __name__ == "__main__": State.connection = messaging.morphemic.Connection(State.broker_username,State.broker_password,State.broker_address,State.broker_port) # morphemic = morphemic.model.Model(connection) State.connection.set_listener(id, Listener()) + print("Checking (EMS) broker connectivity state") if (State.disconnected or State.check_stale_connection()): try: State.connection.disconnect() #required to avoid the already connected exception @@ -299,6 +314,7 @@ if __name__ == "__main__": print("Encountered exception while trying to connect to broker") print(traceback.format_exc()) State.disconnected = True + time.sleep(5) continue State.disconnection_handler.acquire() State.disconnection_handler.wait() diff --git a/morphemic-forecasting-exponentialsmoothing/src/runtime/utilities/Utilities.py b/morphemic-forecasting-exponentialsmoothing/src/runtime/utilities/Utilities.py index 55e6a00a3b0d7f4e6486fc9bb26ae125d79ecdbb..698427d644b0e0ffa34b24b42bf5215c1e6d4999 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/runtime/utilities/Utilities.py +++ b/morphemic-forecasting-exponentialsmoothing/src/runtime/utilities/Utilities.py @@ -52,6 +52,11 @@ class Utilities: response = datasetmaker.make() print ("Dataset creation process finished with response "+str(response)) + if (str(response).startswith("4")): + print("An error response has been detected from the dataset maker, therefore asking for all data from the database in an effort to create a dataset") + _start_collection = None + response = datasetmaker.make() + print("Second dataset creation process finished with response "+str(response)) except Exception as e: print("Could not create new dataset as an exception was thrown") print(e) \ No newline at end of file diff --git a/morphemic-forecasting-exponentialsmoothing/src/test/test_scheduler.py b/morphemic-forecasting-exponentialsmoothing/src/test/test_scheduler.py index 96735714d5876492acee321852c95703b101fab9..7ab2d15115423dccac89d3f70ba27564f5abbb83 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/test/test_scheduler.py +++ b/morphemic-forecasting-exponentialsmoothing/src/test/test_scheduler.py @@ -28,6 +28,6 @@ if __name__=="__main__": #Below we subtract one reconfiguration interval, as we cannot send a prediction for a time point later than one prediction_horizon interval wait_time = next_prediction_time - horizon - time.time() - print("Waiting for "+str((wait_time*100).__floor__()/100)+" seconds") + print("Waiting for "+str((int(wait_time*100))/100)+" seconds") if (wait_time>0): time.sleep(wait_time) \ No newline at end of file