Commit dd28e8b7 authored by Marta Różańska's avatar Marta Różańska
Browse files

Merge branch 'exponential_smoothing_predictor' into 'morphemic-rc2.0'

Improvements to exponential smoothing forecaster

See merge request !270
parents 0d769522 1e9d67d2
Pipeline #20308 passed with stages
in 29 minutes and 19 seconds
......@@ -53,8 +53,8 @@ When the predictor has been started, it is ready to accept messages from the bro
## Simple usage of the core prediction functionality
In order to simply test the prediction functionality, it is sufficient to use the `Rscript` command, followed by the full path of the `forecasting_real_workload.R` file which resides in the src/r_predictors directory, providing as an argument the prediction time for which a prediction is requested, and optionally the alpha/beta values for the prediction method which will be used (gamma will be estimated.)
In order to simply test the prediction functionality (useful when testing, to determine that correct prediction output is produced), it is sufficient to use the `Rscript` command, followed by the full path of the `forecasting_real_workload.R` file which resides in the src/r_predictors directory, providing as an argument the full path to the dataset which should be used, the name of the metric which should be predicted (a relevant column should exist in the dataset) the prediction time for which a prediction is requested, and optionally the alpha/beta values for the prediction method which will be used (gamma will be estimated.)
More advanced usage of the core prediction functionality is possible, if this is desired please refer to the code documentation of the forecasting script (src/r_predictors/forecasting_real_workload.R)
### Docker container build
To run the component in Dockerized form, it is first necessary to collect the necessary assets and build the Docker container.
......@@ -66,4 +66,15 @@ The correct operation of the component is also dependent on the availability of
When the Docker container has been successfully built, to start it it is enough to execute the following command:
docker run <container_name>
\ No newline at end of file
docker run <container_name>
### Test execution
To quickly test the functionality of the forecaster, assuming that the EMS (or an ActiveMQ broker) has been (or soon will be) setup and is accessible, that the persistence storage module is available, and that the 'latency' and 'memory' metrics are being published to it, the following commands can be issued in order - provided that the broker-client.jar file is available.
1) Publish metrics to predict:
java -jar broker-client.jar publish3 -Umorphemic -Pmorphemic tcp://localhost:61616 metrics_to_predict [{"metric":"latency","level":3,"publish_rate":10000},{"metric":"memory","level":3,"publish_rate":10000}]
2) Publish start forecasting:
java -jar broker-client.jar publish3 -Umorphemic -Pmorphemic tcp://localhost:61616 start_forecasting.exponentialsmoothing {\"metrics\":[\"latency\",\"memory\"],\"timestamp\":1626179164,\"epoch_start\":1626179353,\"number_of_forward_predictions\":8,\"prediction_horizon\":120}
......@@ -9,6 +9,14 @@ library(anytime)
library(purrr)
# Outline of the operation of the forecasting script
#
# This forecasting script relies on the presence of a dataset which contains the metric values to be forecasted. It is called with three main parameters - dataset path, metric to be forecasted and the time for which the forecast should be produced - and two optional parameters, the alpha and beta coefficients to be used during forecasting. The time for which the forecast should be produced may be ommitted under some circumstances.
#
# To create the final dataset which will be used for predictions, this script creates a timeseries with all times from the beginning of the observations in the dataset, until its end, using 1-second intervals (to allow for predictions based on epoch). In order for the exponential smoothing forecaster to operate satisfactorily, it is necessary to set the `number_of_seconds_to_aggregate_on` variable to a value which is large enough to smooth small fluctuations, yet small enough to allow for reasonable reaction times (e.g 300 seconds).
# Once the creation of the dataset is over, the `configuration_forecasting_horizon` configuration property is evaluated. If this value is positive, the time for which the forecast should be made should be provided as a command line argument, and this allows the formation of a training dataset and a test dataset. If a non-positive horizon is provided, then the `realtime_mode` configuration property is evaluated. In case that this is false, the prediction time does not need to be provided (it means we simply want to evaluate the predictive functionality based on past data), and the next prediction time will be the time of the last observation in the dataset. If the realtime mode parameter is true, then the prediction time needs to be provided, and the script will try to create a prediction using the minimum value between the next prediction time and the last observation time which is available in the dataset - in this case the next prediction time is also needed(TODO:perhaps this behaviour should be changed).
#Then, the final data points which will be used for the forecasting are determined, and the forecasting models are created, to produce predictions. The user of the script can opt to try finding the best parameters manually, using the `try_to_optimize_parameters` configuration parameter.
find_smape <- function(actual, forecast) {
return (1/length(actual) * sum(2*abs(forecast-actual) / (abs(actual)+abs(forecast))*100))
}
......@@ -49,6 +57,9 @@ if (try_to_optimize_parameters){
}else{ #downsampling to single hours
frequency_setting <- 1
}
#Parsing of command-line arguments. Providing the alpha and beta values as arguments is entirely optional. Providing the next_prediction_time may be optional or it may be needed, depending on the circumstances. Please refer to the execution flow which is outlined in the beginning of this program
args <- commandArgs(trailingOnly=TRUE)
dataset_to_process <- args[1]
attribute_to_predict <- args[2]
......
......@@ -146,7 +146,9 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f
try:
prediction = predict_attributes(State.metrics_to_predict,State.next_prediction_time)
except Exception as e:
print("Could not create a prediction for some or all of the metrics for time point "+State.next_prediction_time)
print("Could not create a prediction for some or all of the metrics for time point "+State.next_prediction_time+", proceeding to next prediction time. The encountered exception trace follows:")
print(e)
continue
for attribute in State.metrics_to_predict:
if(not prediction[attribute].prediction_valid):
continue
......@@ -184,7 +186,11 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f
State.disconnected = False
State.previous_prediction = prediction
State.number_of_days_to_use_data_from = (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) / (wait_time / State.number_of_days_to_use_data_from)
#State.number_of_days_to_use_data_from = (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) / (wait_time / State.number_of_days_to_use_data_from)
#State.number_of_days_to_use_data_from = 1 + int(
# (prediction_horizon - State.prediction_processing_time_safety_margin_seconds) /
# (wait_time / State.number_of_days_to_use_data_from)
#)
class Listener(messaging.listener.MorphemicListener):
......@@ -216,7 +222,11 @@ class Listener(messaging.listener.MorphemicListener):
State.testing_prediction_functionality = True
elif self.get_topic_name(headers) == 'start_forecasting.exponentialsmoothing':
State.metrics_to_predict = json.loads(body)["metrics"]
try:
State.metrics_to_predict = json.loads(body)["metrics"]
except Exception as e:
print("Could not load json object to process the start forecasting message \n"+str(body))
return
#waitfor(first period)
if (not State.initial_metric_list_received):
print("The initial metric list has not been received, therefore no predictions are generated")
......@@ -229,8 +239,9 @@ class Listener(messaging.listener.MorphemicListener):
State.next_prediction_time = State.epoch_start
except Exception as e:
print("Problem while retrieving epoch start and/or prediction_horizon")
return
maximum_time_required_for_prediction = 20 #initialization, assuming 20 seconds processing time to derive a first prediction
maximum_time_required_for_prediction = State.prediction_processing_time_safety_margin_seconds #initialization, assuming 20 seconds processing time to derive a first prediction
if ((self.prediction_thread is None) or (not self.prediction_thread.is_alive())):
self.prediction_thread = threading.Thread(target = calculate_and_publish_predictions, args =[prediction_horizon,maximum_time_required_for_prediction])
self.prediction_thread.start()
......@@ -255,8 +266,11 @@ class Listener(messaging.listener.MorphemicListener):
logging.error(" %s", body)
def on_disconnected(self):
print('disconnected')
print('Disconnected from broker, so will retry to connect...')
State.disconnected=True
State.disconnection_handler.acquire()
State.disconnection_handler.notifyAll()
State.disconnection_handler.release()
#connection.connect()
......@@ -284,6 +298,7 @@ if __name__ == "__main__":
State.connection = messaging.morphemic.Connection(State.broker_username,State.broker_password,State.broker_address,State.broker_port)
# morphemic = morphemic.model.Model(connection)
State.connection.set_listener(id, Listener())
print("Checking (EMS) broker connectivity state")
if (State.disconnected or State.check_stale_connection()):
try:
State.connection.disconnect() #required to avoid the already connected exception
......@@ -299,6 +314,7 @@ if __name__ == "__main__":
print("Encountered exception while trying to connect to broker")
print(traceback.format_exc())
State.disconnected = True
time.sleep(5)
continue
State.disconnection_handler.acquire()
State.disconnection_handler.wait()
......
......@@ -52,6 +52,11 @@ class Utilities:
response = datasetmaker.make()
print ("Dataset creation process finished with response "+str(response))
if (str(response).startswith("4")):
print("An error response has been detected from the dataset maker, therefore asking for all data from the database in an effort to create a dataset")
_start_collection = None
response = datasetmaker.make()
print("Second dataset creation process finished with response "+str(response))
except Exception as e:
print("Could not create new dataset as an exception was thrown")
print(e)
\ No newline at end of file
......@@ -28,6 +28,6 @@ if __name__=="__main__":
#Below we subtract one reconfiguration interval, as we cannot send a prediction for a time point later than one prediction_horizon interval
wait_time = next_prediction_time - horizon - time.time()
print("Waiting for "+str((wait_time*100).__floor__()/100)+" seconds")
print("Waiting for "+str((int(wait_time*100))/100)+" seconds")
if (wait_time>0):
time.sleep(wait_time)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment