diff --git a/forecasting_gluonts/gluonts_listener.py b/forecasting_gluonts/gluonts_listener.py index 4919c67163598036ea75946ed4468b4d6d32b5f4..894ac514d97e3e97c0428a16e342a4644f4b09dd 100644 --- a/forecasting_gluonts/gluonts_listener.py +++ b/forecasting_gluonts/gluonts_listener.py @@ -62,7 +62,7 @@ def worker(self, body, metric): yhat = yhats[k] yhat_lower = yhat_lowers[k] yhat_upper = yhat_uppers[k] - self.connector.send_to_topic('intermediate_prediction.gluonmachines.' + metric, + self.connector.send_to_topic('intermediate_prediction.gluonts.' + metric, { "metricValue": float(yhat), "level": 3, @@ -81,7 +81,7 @@ def worker(self, body, metric): class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListener): - id = "gluonmachines" + id = "gluonts" def __init__(self): self._run = False @@ -91,8 +91,8 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen def run(self): self.connector.connect() self.connector.set_listener(self.id, self) - self.connector.topic("start_forecasting.gluonmachines", self.id) - self.connector.topic("stop_forecasting.gluonmachines", self.id) + self.connector.topic("start_forecasting.gluonts", self.id) + self.connector.topic("stop_forecasting.gluonts", self.id) self.connector.topic("metrics_to_predict", self.id) def reconnect(self): @@ -100,7 +100,7 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen self.run() pass - def on_start_forecasting_gluonmachines(self, body): + def on_start_forecasting_gluonts(self, body): sent_metrics = body["metrics"] logging.debug(f"Gluonts Start Forecasting the following metrics: {sent_metrics}") for metric in sent_metrics: @@ -134,11 +134,11 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen self.connector.send_to_topic("training_models", { "metrics": list(metrics), - "forecasting_method": "gluonmachines", + "forecasting_method": "gluonts", "timestamp": int(time()) }) - def on_stop_forecasting_gluonmachines(self, body): + def on_stop_forecasting_gluonts(self, body): logging.debug(f"Gluonts Stop Forecasting the following metrics: {body['metrics']}") for metric in body["metrics"]: if metric in metrics: diff --git a/morphemic-forecasting-eshybrid/.gitignore b/morphemic-forecasting-eshybrid/.gitignore index d4ecf6a3c9ebca1b50a313a00bed4674734a9db5..83d129fca338c84e6cb66f538e33dad28f101486 100644 --- a/morphemic-forecasting-eshybrid/.gitignore +++ b/morphemic-forecasting-eshybrid/.gitignore @@ -2,3 +2,5 @@ sync.cfg.local sync.cfg.production sync.cfg.docker +sync.cfg.persistance +publisher.py diff --git a/morphemic-forecasting-eshybrid/forecasting/eshybrid.py b/morphemic-forecasting-eshybrid/forecasting/eshybrid.py index 2dd225dc9b5ad1b14514f2577f35c39510ca2dd9..d8e8af8c36c2bf092639a22d607af0d82e0352e8 100644 --- a/morphemic-forecasting-eshybrid/forecasting/eshybrid.py +++ b/morphemic-forecasting-eshybrid/forecasting/eshybrid.py @@ -7,6 +7,7 @@ import signal import time import socket + class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListener, morphemic.scheduler.Handler): metrics= set() @@ -19,6 +20,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen def __init__(self,config): self._run=False + self._force_train = False self._interval_count =1 self.id = (config['listener'] or {'id':'eshybrid'} )['id'] self.connector = messaging.morphemic.Connection( @@ -50,7 +52,6 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen ) - def wait_for_port(self, port, host='localhost', retries=10000000, timeout=5): """Wait until a port starts accepting TCP connections. Args: @@ -113,17 +114,17 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen def start(self): logging.debug("Starting ESHybrid") + signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) signal.signal(signal.SIGHUP, self.signal_handler) + self._run = self.run() if self._run: logging.info("ESHYBRID_STARTED") while self._run: - time.sleep(1) - if self.state=='forecasting': self.scheduler.check(self) continue @@ -136,7 +137,6 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen logging.info("Waiting for start_forecasting") continue - self.connector.disconnect() @@ -146,10 +146,11 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen m, times ) - - if predictions == None or not len(predictions): + if predictions.empty: + logging.debug("No prediction available yet for metric[%s]",m) continue + logging.debug("Got metric[%s] predictions %s",(m,predictions)) for index,row in predictions.iterrows(): t = row['ds'] payload = { @@ -157,31 +158,30 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen "timestamp": int(time.time()), "probability": 0.98, "confidence_interval": [float(8),float(15)], - "predictionTime": t, + "predictionTime": t.strftime('%s'), } logging.debug("Sending prediction for time %s => %s " % (t, payload) ) self.connector.send_to_topic( "intermediate_prediction.eshybrid.%s" % m, payload ) - #adding simple method to retrain the model - if self._last_training_time and time.time() - self._last_training_time > 3000: + if self._force_train or ( self._last_training_time and time.time() - self._last_training_time > 3000): self._train_model() def _train_model(self): - - self.dataset.make() - - if not os.path.exists("%s/%s.csv" % (self.data_set_path,self.application)): + dataset_results = self.dataset.make() + if not dataset_results.get('status',False): logging.error("**** NO DATA FROM DATASET MAKER ****") + self._force_train = True return - self.model.train("%s/%s.csv" % (self.data_set_path,self.application),self.metrics) - self._last_training_time = time.time() - + self._force_train = False + self.model.train(dataset_results.get('url'),self.metrics) def on_train(self,model): + self._last_training_time = time.time() + self._force_train = False self.connector.send_to_topic("training_models", { "metrics": self.metrics, @@ -189,6 +189,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen "timestamp": int(time.time() * 1000) }) + def on_metrics_to_predict(self,res): logging.debug("[2] Metrics to predict %s " % res) diff --git a/morphemic-forecasting-eshybrid/morphemic/model.py b/morphemic-forecasting-eshybrid/morphemic/model.py index 269c4c62952e536a76da34e6b6d02a8873ef00c7..372c61acc451b80dd34bfe41a70238210058c48c 100644 --- a/morphemic-forecasting-eshybrid/morphemic/model.py +++ b/morphemic-forecasting-eshybrid/morphemic/model.py @@ -1,5 +1,6 @@ import logging import threading +import time import uuid from datetime import datetime, timedelta import pandas as pd @@ -58,6 +59,7 @@ class UUIDModel: class ModelStatus(enumerate): IDLE = "IDLE" TRAINNING = "TRAINNING" + TRAINED = "TRAINED" PREDICTING = "PREDICTING" @@ -126,34 +128,39 @@ class Model: args = self._dataHandler.to_train(metric, path) _logger.info("Retraining for %s - %s rows " % (metric, args[0].shape[0])) - model.model_for_metric(metric).fit(args[0], args[1], verbose=False) + model.model_for_metric(metric).fit(args[0], args[1], verbose=True) def _retrain(self, metrics, path): with lock: - self.status = ModelStatus.TRAINNING + while self.status != ModelStatus.TRAINED: + try: + _logger.debug("Starting training model") - _logger.debug("Starting training model") + model = self._new_model() - model = self._new_model() + for m in metrics: + self._retrain_for_metric(model, m,path) + _logger.debug("Model training succesful for %s ",m) - for m in metrics: - self._retrain_for_metric(model, m,path) - _logger.debug("Model training finished") + self._model = model - self._model = model + _logger.debug("set new model") + self.status = ModelStatus.TRAINED + if self._handler: + self._handler.on_train(self) + except: + _logger.error("Not enough data - not training") + _logger.info("Waiting for next training loop") + time.sleep(30) - _logger.debug("set new model") - if self._handler: - self._handler.on_train(self) self.status = ModelStatus.IDLE def train(self, dataset_path, metrics): _logger.info("Start training for %s in %s " % (metrics,dataset_path,)) - if self.status == ModelStatus.IDLE: t = threading.Thread(target=self._retrain, args=(metrics, dataset_path)) @@ -166,7 +173,7 @@ class Model: _logger.debug("Request prediction for %s @ %s " % (metric,times,)) if not self._model: _logger.error("No model trained yet") - return + return pd.DataFrame() ret = self._model.model_for_metric(metric).predict( self._dataHandler.to_predict(metric,times) diff --git a/morphemic-forecasting-eshybrid/sender.py b/morphemic-forecasting-eshybrid/sender.py index 5afb5d11efa272e27e9551243a710986c06491f7..1ea1f974313edebd7b74c8e159514ad6812a200a 100644 --- a/morphemic-forecasting-eshybrid/sender.py +++ b/morphemic-forecasting-eshybrid/sender.py @@ -1,13 +1,19 @@ +import time import logging +import messaging + logger = logging.getLogger() logger.setLevel(logging.DEBUG) -import messaging +connector = messaging.morphemic.Connection('aaa', '111',host="localhost",port=61613) +connector.connect() +while True: + connector.send_to_topic("metrics_to_predict", [{'metric':'latency'},{'metric':'response_time'}] ) + time.sleep(5) -connector = messaging.morphemic.Connection('aaa', '111',host="localhost",port=61610) -connector.connect() + connector.send_to_topic("start_forecasting.eshybrid",{'version':1, 'metrics': ['latency','response_time'], 'timestamp': 1638866003, 'epoch_start': 1638866228, 'number_of_forward_predictions': 8, 'prediction_horizon': 30} ) + time.sleep(30) -connector.send_to_topic("start_forecasting.gluonmachines",{'metrics': ['ETPercentile_Ctx','SimulationElapsedTime_Ctx','SimulationLeftNumber_Ctx','WillFinishTooSoonContext','NotFinishedOnTimeContext'], 'timestamp': 1638866003, 'epoch_start': 1638866228, 'number_of_forward_predictions': 8, 'prediction_horizon': 30} ) diff --git a/morphemic-forecasting-exponentialsmoothing/Dockerfile b/morphemic-forecasting-exponentialsmoothing/Dockerfile index 8db981a41553ccb2106a25f23e22a13f4f959fe4..0a66676a8a49098fd78d5ad9e00b4be8a7e97d18 100644 --- a/morphemic-forecasting-exponentialsmoothing/Dockerfile +++ b/morphemic-forecasting-exponentialsmoothing/Dockerfile @@ -12,7 +12,7 @@ RUN ls -la RUN python3 setup.py sdist RUN ls ./dist/ -FROM ubuntu:latest +FROM ubuntu:focal RUN mkdir -p /home/r_predictions RUN apt-get update @@ -66,4 +66,4 @@ COPY ./src/r_predictors/forecasting_real_workload.R /home/r_predictions/ WORKDIR /home/r_predictions/esm_forecaster-0.1.0 -CMD ["/bin/sh","-c","python3 /home/r_predictions/esm_forecaster-0.1.0/runtime/Predictor.py /home/r_predictions/esm_forecaster-0.1.0/r_predictors/prediction_configuration.properties > $LOG_FILE 2>&1"] \ No newline at end of file +CMD ["/bin/sh","-c","python3 -u /home/r_predictions/esm_forecaster-0.1.0/runtime/Predictor.py /home/r_predictions/esm_forecaster-0.1.0/r_predictors/prediction_configuration.properties 2>&1 > $LOG_FILE "] diff --git a/morphemic-forecasting-exponentialsmoothing/README.md b/morphemic-forecasting-exponentialsmoothing/README.md index 405be5a1280bbc764bfa4600e2db8cf4d1932b1a..1cf1594ba511afa6f34c470fecacb9d9abab491e 100644 --- a/morphemic-forecasting-exponentialsmoothing/README.md +++ b/morphemic-forecasting-exponentialsmoothing/README.md @@ -6,11 +6,15 @@ The exponential smoothing predictor is based on the use of the Holt-Winters [1] Apart from standard R and Python libraries, the libraries included in the src/requirements.txt file should be available for the Python code to be successfully executed. Moreover the `rapportools`,`gbutils`,`forecast`,`ggplot2`,`properties`,`xts`,`anytime` and `purrr` R libraries should be available (included in the src/r_predictors/r_commands.R file). +[1] https://www.rdocumentation.org/packages/forecast/versions/8.15/topics/forecast.HoltWinters +[2] https://www.rdocumentation.org/packages/forecast/versions/8.15/topics/ets + ## Configuration ### Configuration file The predictor comes with two configuration files which can be used to specify the behaviour of the component. The two files are located in the src/r_predictors directory of the project. -The options which will most probably need to be changed before deployment are the `broker_address`,the `horizon`, the `number_of_days_to_aggregate_data_from` and the `number_of_seconds_to_aggregate_on`. +The options which will most probably need to be changed before deployment are the `broker_address`,the `horizon`, the `number_of_days_to_aggregate_data_from` and the `number_of_seconds_to_aggregate_on`. +It should be noted that the `number_of_seconds_to_aggregate_on` variable is updated at runtime to become the minimum between the configuration value and the horizon value received for the predictions (i.e the prediction interval) | Option | Description | -------- |------------ | @@ -25,7 +29,7 @@ The options which will most probably need to be changed before deployment are th |horizon| The number of seconds which should be forecasted into the future| |path_to_datasets|The absolute path to the datasets, **not** including the final slash ('/') character.| |application_name|The application name to be used when creating a dataset| -|number_of_seconds_to_aggregate_on| The duration of the monitoring data interval in seconds. Monitoring data inside this interval will be aggregated to a single value (the mean value of all per-second observated/interpolated values) | +|number_of_seconds_to_aggregate_on| The duration of the monitoring data interval in seconds (greater than or equal to one). Monitoring data inside this interval will be aggregated to a single value (the mean value of all per-second observated/interpolated values) | |number_of_days_to_aggregate_data_from| The number of days which will be used to retrieve monitoring data from when creating a dataset| |prediction_processing_time_safety_margin_seconds| The number of seconds which will be used as a buffer when performing a prediction in order not to delay predictions and yet use as much data as possible| |testing_prediction_functionality| If set to 'True', then it will not send a prediction time for which predictions will be requested, but will rather allow the horizon setting to be used to create predictions| @@ -70,7 +74,7 @@ docker run ### Test execution -To quickly test the functionality of the forecaster, assuming that the EMS (or an ActiveMQ broker) has been (or soon will be) setup and is accessible, that the persistence storage module is available, and that the 'latency' and 'memory' metrics are being published to it, the following commands can be issued in order - provided that the broker-client.jar file is available. +To quickly test the functionality of the forecaster, assuming that the EMS (or an ActiveMQ broker) has been (or will soon be) setup and accessible, that the persistence storage module is available, and that the 'latency' and 'memory' metrics are being published to it, the following commands can be issued in order - provided that the broker-client.jar file is available. 1) Publish metrics to predict: java -jar broker-client.jar publish3 -Umorphemic -Pmorphemic tcp://localhost:61616 metrics_to_predict [{"metric":"latency","level":3,"publish_rate":10000},{"metric":"memory","level":3,"publish_rate":10000}] diff --git a/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R b/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R index 14c3cce589121478c383fb5b99f1d85e7fe20dde..9846cb8494814f17305aed31e2b599de9187fa49 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R +++ b/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R @@ -27,10 +27,15 @@ get_current_epoch_time <- function(){ #Assumes an xts time series object as input, with each record having a 1-sec difference from the previous one, and returns the last timestamp which is (or should have been) assigned (if not present). find_last_timestamp <- function(mydata,next_prediction_time,realtime_mode){ - counter <- 0 possible_timestamp <- as.numeric(end(mydata)) if(realtime_mode){ - return(min(c(possible_timestamp,next_prediction_time))) + #return(min(c(possible_timestamp,next_prediction_time))) + if (next_prediction_time>possible_timestamp){ + return(possible_timestamp) + }else{ + print("Possible problem with the requested prediction time, there is already data for a timestamp newer than the time requested to predict for. Returning the newer timestamp, being aware that this will lead to this prediction returning no meaningful output") + return (possible_timestamp) + } }else{ return (possible_timestamp) } @@ -49,6 +54,8 @@ time_unit_granularity <- "sec" # Handle monitoring data using this time unit gra endpoint_time_unit_granularity <- "seconds" #configuration_properties <- read.properties(".\\prediction_configuration-windows.properties") +print("Reading properties from the following file:") +print(paste(getwd(),"/prediction_configuration.properties",sep='')) configuration_properties <- read.properties(paste(getwd(),"/prediction_configuration.properties",sep='')) realtime_mode <- as.logical(configuration_properties$realtime_mode) #whether or not we should use all datapoints available (True value), or we are only partially using the available dataset (False value) e.g to test the prediction method performance @@ -80,9 +87,20 @@ beta_value_argument <- as.double(args[5]) data_to_process <- read.csv(dataset_to_process, sep=",", header=TRUE) #sanitize data_to_process by removing any very old values which may have been accidentally introduced. For this reason we remove all data points before now - number_of_days*24hrs*3600sec/hr seconds, and we additionally subtract configuration_properties$prediction_processing_time_safety_margin_seconds in order to account for the time it takes to create the dataset and start the prediction process) -oldest_acceptable_time_point <- get_current_epoch_time() -(as.numeric(configuration_properties$number_of_days_to_use_data_from)*24*3600 + as.numeric(configuration_properties$prediction_processing_time_safety_margin_seconds)) +current_time <- get_current_epoch_time() +if (!realtime_mode){ + current_time <- tail(data_to_process[time_field_name],1) +} +oldest_acceptable_time_point <- current_time -(as.numeric(configuration_properties$number_of_days_to_use_data_from)*24*3600 + as.numeric(configuration_properties$prediction_processing_time_safety_margin_seconds)) +print(paste("Using data after time point",oldest_acceptable_time_point,"...")) data_to_process <- data_to_process[data_to_process[[time_field_name]]>oldest_acceptable_time_point,] +if (length(data_to_process[,attribute_to_predict])>0){ + print(paste("Constructing fine-grained data series for",attribute_to_predict,"using the requested granularity...")) +}else{ + print("No valid data points remained after enforcing the number_of_days_to_use_data_from configuration option. This may mean that you are trying to predict using realtime mode, using data points older than the number of days specified in the number_of_days_to_use_data_from configuration option") + stop() +} #Fail-safe default df1 <- xts(as.numeric(data_to_process[,attribute_to_predict]),anytime(data_to_process[,time_field_name])) date_time_init <- anytime(data_to_process[,time_field_name]) @@ -98,6 +116,7 @@ colnames(mydata)<-c(attribute_to_predict) print(paste("The complete time series to be predicted for attribute",attribute_to_predict,"has been created")) configuration_forecasting_horizon <- as.integer(configuration_properties$horizon) +last_timestamp_data <- 0 if (configuration_forecasting_horizon>0){ print("Using a statically defined horizon from the configuration file") diff --git a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py index e0050728127c691f57f1bae26324bf32b129f317..2fddbdf79f80d4bfaa29e71bb44f0293d4fb71a5 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py +++ b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py @@ -42,16 +42,34 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim os.chdir(os.path.dirname(configuration_file_location)) prediction_data_file = get_prediction_data_filename(configuration_file_location) + from sys import platform if State.testing_prediction_functionality: print_with_time("Testing, so output will be based on the horizon setting from the properties file and the last timestamp in the data") print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute) - command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute] + + # Windows + if platform == "win32": + command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute] + # linux + elif platform == "linux" or platform == "linux2": + command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)] + #Choosing the solution of linux + else: + command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)] else: print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute+" "+next_prediction_time) - command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time] - - process_output = run(command, stdout=PIPE, stderr=PIPE, universal_newlines=True) + # Windows + if platform == "win32": + command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time] + # Linux + elif platform == "linux" or platform == "linux2": + command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)+" "+str(next_prediction_time)] + #Choosing the solution of linux + else: + command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)+" "+str(next_prediction_time)] + + process_output = run(command, shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True) if (process_output.stdout==""): print_with_time("Empty output from R predictions - the error output is the following:") print(process_output.stderr) #There was an error during the calculation of the predicted value @@ -179,7 +197,7 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f State.connection.send_to_topic('intermediate_prediction.%s.%s' % (id, attribute), prediction_message_body) State.connection.send_to_topic('training_events',training_events_message_body) message_not_sent = False - print_with_time("Successfully sent prediction message for "+attribute) + print_with_time("Successfully sent prediction message for "+attribute+" to topic intermediate_prediction.%s.%s" % (id, attribute)) except ConnectionError as exception: State.connection.disconnect() State.connection = messaging.morphemic.Connection('admin', 'admin') @@ -242,6 +260,23 @@ class Listener(messaging.listener.MorphemicListener): print_with_time("Problem while retrieving epoch start and/or prediction_horizon") return + with open(State.configuration_file_location, "r+b") as f: + + State.configuration_details.load(f, "utf-8") + + # Do stuff with the p object... + initial_seconds_aggregation_value, metadata = State.configuration_details["number_of_seconds_to_aggregate_on"] + initial_seconds_aggregation_value = int(initial_seconds_aggregation_value) + + if (prediction_horizon