From 38d77b9a1ea1316678419c47eb623027820ee0c2 Mon Sep 17 00:00:00 2001 From: Bizid Imen Date: Thu, 14 Apr 2022 09:03:58 +0000 Subject: [PATCH 1/8] Update gluonts_listener.py --- forecasting_gluonts/gluonts_listener.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/forecasting_gluonts/gluonts_listener.py b/forecasting_gluonts/gluonts_listener.py index 4919c671..894ac514 100644 --- a/forecasting_gluonts/gluonts_listener.py +++ b/forecasting_gluonts/gluonts_listener.py @@ -62,7 +62,7 @@ def worker(self, body, metric): yhat = yhats[k] yhat_lower = yhat_lowers[k] yhat_upper = yhat_uppers[k] - self.connector.send_to_topic('intermediate_prediction.gluonmachines.' + metric, + self.connector.send_to_topic('intermediate_prediction.gluonts.' + metric, { "metricValue": float(yhat), "level": 3, @@ -81,7 +81,7 @@ def worker(self, body, metric): class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListener): - id = "gluonmachines" + id = "gluonts" def __init__(self): self._run = False @@ -91,8 +91,8 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen def run(self): self.connector.connect() self.connector.set_listener(self.id, self) - self.connector.topic("start_forecasting.gluonmachines", self.id) - self.connector.topic("stop_forecasting.gluonmachines", self.id) + self.connector.topic("start_forecasting.gluonts", self.id) + self.connector.topic("stop_forecasting.gluonts", self.id) self.connector.topic("metrics_to_predict", self.id) def reconnect(self): @@ -100,7 +100,7 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen self.run() pass - def on_start_forecasting_gluonmachines(self, body): + def on_start_forecasting_gluonts(self, body): sent_metrics = body["metrics"] logging.debug(f"Gluonts Start Forecasting the following metrics: {sent_metrics}") for metric in sent_metrics: @@ -134,11 +134,11 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen self.connector.send_to_topic("training_models", { "metrics": list(metrics), - "forecasting_method": "gluonmachines", + "forecasting_method": "gluonts", "timestamp": int(time()) }) - def on_stop_forecasting_gluonmachines(self, body): + def on_stop_forecasting_gluonts(self, body): logging.debug(f"Gluonts Stop Forecasting the following metrics: {body['metrics']}") for metric in body["metrics"]: if metric in metrics: -- GitLab From ff3f32fd9273cde49407ac16dca74623e55803f7 Mon Sep 17 00:00:00 2001 From: Andreas Tsagkaropoulos Date: Tue, 19 Apr 2022 00:40:32 +0300 Subject: [PATCH 2/8] Correction of complete time series creation Added appropriate code to handle cases in which predictions should be made with a horizon of less than 300 seconds in the future (setting the aggregation interval equal to this horizon) Added option in subprocess spawning to improve logging output Improvement of the handling of an unusual configuration and dataset combination (asking to use realtime mode on stale dataset) --- .../r_predictors/forecasting_real_workload.R | 22 ++++++++++++++++--- .../src/runtime/Predictor.py | 19 +++++++++++++++- .../src/test/benchmarking.py | 2 +- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R b/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R index 14c3cce5..5fe3723d 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R +++ b/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R @@ -27,10 +27,14 @@ get_current_epoch_time <- function(){ #Assumes an xts time series object as input, with each record having a 1-sec difference from the previous one, and returns the last timestamp which is (or should have been) assigned (if not present). find_last_timestamp <- function(mydata,next_prediction_time,realtime_mode){ - counter <- 0 possible_timestamp <- as.numeric(end(mydata)) if(realtime_mode){ - return(min(c(possible_timestamp,next_prediction_time))) + #return(min(c(possible_timestamp,next_prediction_time))) + if (next_prediction_time>possible_timestamp){ + return(next_prediction_time) + }else{ + return (possible_timestamp) + } }else{ return (possible_timestamp) } @@ -80,9 +84,20 @@ beta_value_argument <- as.double(args[5]) data_to_process <- read.csv(dataset_to_process, sep=",", header=TRUE) #sanitize data_to_process by removing any very old values which may have been accidentally introduced. For this reason we remove all data points before now - number_of_days*24hrs*3600sec/hr seconds, and we additionally subtract configuration_properties$prediction_processing_time_safety_margin_seconds in order to account for the time it takes to create the dataset and start the prediction process) -oldest_acceptable_time_point <- get_current_epoch_time() -(as.numeric(configuration_properties$number_of_days_to_use_data_from)*24*3600 + as.numeric(configuration_properties$prediction_processing_time_safety_margin_seconds)) +current_time <- get_current_epoch_time() +if (!realtime_mode){ + current_time <- tail(data_to_process[time_field_name],1) +} +oldest_acceptable_time_point <- current_time -(as.numeric(configuration_properties$number_of_days_to_use_data_from)*24*3600 + as.numeric(configuration_properties$prediction_processing_time_safety_margin_seconds)) +print(paste("Using data after time point",oldest_acceptable_time_point,"...")) data_to_process <- data_to_process[data_to_process[[time_field_name]]>oldest_acceptable_time_point,] +if (length(data_to_process[,attribute_to_predict])>0){ + print(paste("Constructing fine-grained data series for",attribute_to_predict,"using the requested granularity...")) +}else{ + print("No valid data points remained after enforcing the number_of_days_to_use_data_from configuration option. This may mean that you are trying to predict using realtime mode, using data points older than the number of days specified in the number_of_days_to_use_data_from configuration option") + stop() +} #Fail-safe default df1 <- xts(as.numeric(data_to_process[,attribute_to_predict]),anytime(data_to_process[,time_field_name])) date_time_init <- anytime(data_to_process[,time_field_name]) @@ -98,6 +113,7 @@ colnames(mydata)<-c(attribute_to_predict) print(paste("The complete time series to be predicted for attribute",attribute_to_predict,"has been created")) configuration_forecasting_horizon <- as.integer(configuration_properties$horizon) +last_timestamp_data <- 0 if (configuration_forecasting_horizon>0){ print("Using a statically defined horizon from the configuration file") diff --git a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py index e0050728..350a8c97 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py +++ b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py @@ -51,7 +51,7 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time] - process_output = run(command, stdout=PIPE, stderr=PIPE, universal_newlines=True) + process_output = run(command, shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True) if (process_output.stdout==""): print_with_time("Empty output from R predictions - the error output is the following:") print(process_output.stderr) #There was an error during the calculation of the predicted value @@ -242,6 +242,23 @@ class Listener(messaging.listener.MorphemicListener): print_with_time("Problem while retrieving epoch start and/or prediction_horizon") return + with open(State.configuration_file_location, "r+b") as f: + + State.configuration_details.load(f, "utf-8") + + # Do stuff with the p object... + initial_seconds_aggregation_value, metadata = State.configuration_details["number_of_seconds_to_aggregate_on"] + initial_seconds_aggregation_value = int(initial_seconds_aggregation_value) + + if (prediction_horizon Date: Wed, 20 Apr 2022 11:12:20 +0300 Subject: [PATCH 3/8] Improvements in README reflecting changes in the operation of the module --- morphemic-forecasting-exponentialsmoothing/README.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/morphemic-forecasting-exponentialsmoothing/README.md b/morphemic-forecasting-exponentialsmoothing/README.md index 405be5a1..1cf1594b 100644 --- a/morphemic-forecasting-exponentialsmoothing/README.md +++ b/morphemic-forecasting-exponentialsmoothing/README.md @@ -6,11 +6,15 @@ The exponential smoothing predictor is based on the use of the Holt-Winters [1] Apart from standard R and Python libraries, the libraries included in the src/requirements.txt file should be available for the Python code to be successfully executed. Moreover the `rapportools`,`gbutils`,`forecast`,`ggplot2`,`properties`,`xts`,`anytime` and `purrr` R libraries should be available (included in the src/r_predictors/r_commands.R file). +[1] https://www.rdocumentation.org/packages/forecast/versions/8.15/topics/forecast.HoltWinters +[2] https://www.rdocumentation.org/packages/forecast/versions/8.15/topics/ets + ## Configuration ### Configuration file The predictor comes with two configuration files which can be used to specify the behaviour of the component. The two files are located in the src/r_predictors directory of the project. -The options which will most probably need to be changed before deployment are the `broker_address`,the `horizon`, the `number_of_days_to_aggregate_data_from` and the `number_of_seconds_to_aggregate_on`. +The options which will most probably need to be changed before deployment are the `broker_address`,the `horizon`, the `number_of_days_to_aggregate_data_from` and the `number_of_seconds_to_aggregate_on`. +It should be noted that the `number_of_seconds_to_aggregate_on` variable is updated at runtime to become the minimum between the configuration value and the horizon value received for the predictions (i.e the prediction interval) | Option | Description | -------- |------------ | @@ -25,7 +29,7 @@ The options which will most probably need to be changed before deployment are th |horizon| The number of seconds which should be forecasted into the future| |path_to_datasets|The absolute path to the datasets, **not** including the final slash ('/') character.| |application_name|The application name to be used when creating a dataset| -|number_of_seconds_to_aggregate_on| The duration of the monitoring data interval in seconds. Monitoring data inside this interval will be aggregated to a single value (the mean value of all per-second observated/interpolated values) | +|number_of_seconds_to_aggregate_on| The duration of the monitoring data interval in seconds (greater than or equal to one). Monitoring data inside this interval will be aggregated to a single value (the mean value of all per-second observated/interpolated values) | |number_of_days_to_aggregate_data_from| The number of days which will be used to retrieve monitoring data from when creating a dataset| |prediction_processing_time_safety_margin_seconds| The number of seconds which will be used as a buffer when performing a prediction in order not to delay predictions and yet use as much data as possible| |testing_prediction_functionality| If set to 'True', then it will not send a prediction time for which predictions will be requested, but will rather allow the horizon setting to be used to create predictions| @@ -70,7 +74,7 @@ docker run ### Test execution -To quickly test the functionality of the forecaster, assuming that the EMS (or an ActiveMQ broker) has been (or soon will be) setup and is accessible, that the persistence storage module is available, and that the 'latency' and 'memory' metrics are being published to it, the following commands can be issued in order - provided that the broker-client.jar file is available. +To quickly test the functionality of the forecaster, assuming that the EMS (or an ActiveMQ broker) has been (or will soon be) setup and accessible, that the persistence storage module is available, and that the 'latency' and 'memory' metrics are being published to it, the following commands can be issued in order - provided that the broker-client.jar file is available. 1) Publish metrics to predict: java -jar broker-client.jar publish3 -Umorphemic -Pmorphemic tcp://localhost:61616 metrics_to_predict [{"metric":"latency","level":3,"publish_rate":10000},{"metric":"memory","level":3,"publish_rate":10000}] -- GitLab From dced0e07cd6d426aeb1dd86a6bb772cb054bb7cd Mon Sep 17 00:00:00 2001 From: Fotis Paraskevopoulos Date: Tue, 3 May 2022 17:39:43 +0300 Subject: [PATCH 4/8] Fixing invalid null dataframe causing thread locking --- morphemic-forecasting-eshybrid/.gitignore | 2 ++ .../forecasting/eshybrid.py | 32 +++++++++--------- .../morphemic/model.py | 33 +++++++++++-------- morphemic-forecasting-eshybrid/sender.py | 14 +++++--- 4 files changed, 48 insertions(+), 33 deletions(-) diff --git a/morphemic-forecasting-eshybrid/.gitignore b/morphemic-forecasting-eshybrid/.gitignore index d4ecf6a3..83d129fc 100644 --- a/morphemic-forecasting-eshybrid/.gitignore +++ b/morphemic-forecasting-eshybrid/.gitignore @@ -2,3 +2,5 @@ sync.cfg.local sync.cfg.production sync.cfg.docker +sync.cfg.persistance +publisher.py diff --git a/morphemic-forecasting-eshybrid/forecasting/eshybrid.py b/morphemic-forecasting-eshybrid/forecasting/eshybrid.py index 2dd225dc..f5722c8d 100644 --- a/morphemic-forecasting-eshybrid/forecasting/eshybrid.py +++ b/morphemic-forecasting-eshybrid/forecasting/eshybrid.py @@ -7,6 +7,7 @@ import signal import time import socket + class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListener, morphemic.scheduler.Handler): metrics= set() @@ -19,6 +20,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen def __init__(self,config): self._run=False + self._force_train = False self._interval_count =1 self.id = (config['listener'] or {'id':'eshybrid'} )['id'] self.connector = messaging.morphemic.Connection( @@ -50,7 +52,6 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen ) - def wait_for_port(self, port, host='localhost', retries=10000000, timeout=5): """Wait until a port starts accepting TCP connections. Args: @@ -113,17 +114,17 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen def start(self): logging.debug("Starting ESHybrid") + signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) signal.signal(signal.SIGHUP, self.signal_handler) + self._run = self.run() if self._run: logging.info("ESHYBRID_STARTED") while self._run: - time.sleep(1) - if self.state=='forecasting': self.scheduler.check(self) continue @@ -136,7 +137,6 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen logging.info("Waiting for start_forecasting") continue - self.connector.disconnect() @@ -146,8 +146,8 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen m, times ) - - if predictions == None or not len(predictions): + logging.debug("Got metric[%s] predictions %s",(m,predictions)) + if predictions.empty: continue for index,row in predictions.iterrows(): @@ -157,31 +157,30 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen "timestamp": int(time.time()), "probability": 0.98, "confidence_interval": [float(8),float(15)], - "predictionTime": t, + "predictionTime": t.strftime('%s'), } logging.debug("Sending prediction for time %s => %s " % (t, payload) ) self.connector.send_to_topic( "intermediate_prediction.eshybrid.%s" % m, payload ) - #adding simple method to retrain the model - if self._last_training_time and time.time() - self._last_training_time > 3000: + if self._force_train or ( self._last_training_time and time.time() - self._last_training_time > 3000): self._train_model() def _train_model(self): - - self.dataset.make() - - if not os.path.exists("%s/%s.csv" % (self.data_set_path,self.application)): + dataset_results = self.dataset.make() + if not dataset_results.get('status',False): logging.error("**** NO DATA FROM DATASET MAKER ****") + self._force_train = True return - self.model.train("%s/%s.csv" % (self.data_set_path,self.application),self.metrics) - self._last_training_time = time.time() - + self._force_train = False + self.model.train(dataset_results.get('url'),self.metrics) def on_train(self,model): + self._last_training_time = time.time() + self._force_train = False self.connector.send_to_topic("training_models", { "metrics": self.metrics, @@ -189,6 +188,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen "timestamp": int(time.time() * 1000) }) + def on_metrics_to_predict(self,res): logging.debug("[2] Metrics to predict %s " % res) diff --git a/morphemic-forecasting-eshybrid/morphemic/model.py b/morphemic-forecasting-eshybrid/morphemic/model.py index 269c4c62..372c61ac 100644 --- a/morphemic-forecasting-eshybrid/morphemic/model.py +++ b/morphemic-forecasting-eshybrid/morphemic/model.py @@ -1,5 +1,6 @@ import logging import threading +import time import uuid from datetime import datetime, timedelta import pandas as pd @@ -58,6 +59,7 @@ class UUIDModel: class ModelStatus(enumerate): IDLE = "IDLE" TRAINNING = "TRAINNING" + TRAINED = "TRAINED" PREDICTING = "PREDICTING" @@ -126,34 +128,39 @@ class Model: args = self._dataHandler.to_train(metric, path) _logger.info("Retraining for %s - %s rows " % (metric, args[0].shape[0])) - model.model_for_metric(metric).fit(args[0], args[1], verbose=False) + model.model_for_metric(metric).fit(args[0], args[1], verbose=True) def _retrain(self, metrics, path): with lock: - self.status = ModelStatus.TRAINNING + while self.status != ModelStatus.TRAINED: + try: + _logger.debug("Starting training model") - _logger.debug("Starting training model") + model = self._new_model() - model = self._new_model() + for m in metrics: + self._retrain_for_metric(model, m,path) + _logger.debug("Model training succesful for %s ",m) - for m in metrics: - self._retrain_for_metric(model, m,path) - _logger.debug("Model training finished") + self._model = model - self._model = model + _logger.debug("set new model") + self.status = ModelStatus.TRAINED + if self._handler: + self._handler.on_train(self) + except: + _logger.error("Not enough data - not training") + _logger.info("Waiting for next training loop") + time.sleep(30) - _logger.debug("set new model") - if self._handler: - self._handler.on_train(self) self.status = ModelStatus.IDLE def train(self, dataset_path, metrics): _logger.info("Start training for %s in %s " % (metrics,dataset_path,)) - if self.status == ModelStatus.IDLE: t = threading.Thread(target=self._retrain, args=(metrics, dataset_path)) @@ -166,7 +173,7 @@ class Model: _logger.debug("Request prediction for %s @ %s " % (metric,times,)) if not self._model: _logger.error("No model trained yet") - return + return pd.DataFrame() ret = self._model.model_for_metric(metric).predict( self._dataHandler.to_predict(metric,times) diff --git a/morphemic-forecasting-eshybrid/sender.py b/morphemic-forecasting-eshybrid/sender.py index 5afb5d11..1ea1f974 100644 --- a/morphemic-forecasting-eshybrid/sender.py +++ b/morphemic-forecasting-eshybrid/sender.py @@ -1,13 +1,19 @@ +import time import logging +import messaging + logger = logging.getLogger() logger.setLevel(logging.DEBUG) -import messaging +connector = messaging.morphemic.Connection('aaa', '111',host="localhost",port=61613) +connector.connect() +while True: + connector.send_to_topic("metrics_to_predict", [{'metric':'latency'},{'metric':'response_time'}] ) + time.sleep(5) -connector = messaging.morphemic.Connection('aaa', '111',host="localhost",port=61610) -connector.connect() + connector.send_to_topic("start_forecasting.eshybrid",{'version':1, 'metrics': ['latency','response_time'], 'timestamp': 1638866003, 'epoch_start': 1638866228, 'number_of_forward_predictions': 8, 'prediction_horizon': 30} ) + time.sleep(30) -connector.send_to_topic("start_forecasting.gluonmachines",{'metrics': ['ETPercentile_Ctx','SimulationElapsedTime_Ctx','SimulationLeftNumber_Ctx','WillFinishTooSoonContext','NotFinishedOnTimeContext'], 'timestamp': 1638866003, 'epoch_start': 1638866228, 'number_of_forward_predictions': 8, 'prediction_horizon': 30} ) -- GitLab From 8dd95e99e27ee6c458ad081a7fdee0354f0bfcb7 Mon Sep 17 00:00:00 2001 From: Fotis Paraskevopoulos Date: Tue, 3 May 2022 18:24:50 +0300 Subject: [PATCH 5/8] Log when there are not predictions for a single metric --- morphemic-forecasting-eshybrid/forecasting/eshybrid.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/morphemic-forecasting-eshybrid/forecasting/eshybrid.py b/morphemic-forecasting-eshybrid/forecasting/eshybrid.py index f5722c8d..d8e8af8c 100644 --- a/morphemic-forecasting-eshybrid/forecasting/eshybrid.py +++ b/morphemic-forecasting-eshybrid/forecasting/eshybrid.py @@ -146,10 +146,11 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen m, times ) - logging.debug("Got metric[%s] predictions %s",(m,predictions)) if predictions.empty: + logging.debug("No prediction available yet for metric[%s]",m) continue + logging.debug("Got metric[%s] predictions %s",(m,predictions)) for index,row in predictions.iterrows(): t = row['ds'] payload = { -- GitLab From 8b7728ec68424dd627888025e696d403b6542623 Mon Sep 17 00:00:00 2001 From: Andreas Tsagkaropoulos Date: Fri, 6 May 2022 00:08:01 +0300 Subject: [PATCH 6/8] Correction to spawn correctly the R forecasting subprocess from Python in Linux (and not only in Windows) Improvement of the command used to start the forecasting process, in order to flush python output and obtain logs immediately when published Logging and business logic improvement in forecasts to warn about the availability of data with a timestamp equal to or newer compared to the timestamp of the requested prediction time, and nevertheless return the last available timestamp in the dataset --- .../Dockerfile | 2 +- .../r_predictors/forecasting_real_workload.R | 5 +++- .../src/runtime/Predictor.py | 24 ++++++++++++++++--- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/morphemic-forecasting-exponentialsmoothing/Dockerfile b/morphemic-forecasting-exponentialsmoothing/Dockerfile index 8db981a4..f8f4cbd3 100644 --- a/morphemic-forecasting-exponentialsmoothing/Dockerfile +++ b/morphemic-forecasting-exponentialsmoothing/Dockerfile @@ -66,4 +66,4 @@ COPY ./src/r_predictors/forecasting_real_workload.R /home/r_predictions/ WORKDIR /home/r_predictions/esm_forecaster-0.1.0 -CMD ["/bin/sh","-c","python3 /home/r_predictions/esm_forecaster-0.1.0/runtime/Predictor.py /home/r_predictions/esm_forecaster-0.1.0/r_predictors/prediction_configuration.properties > $LOG_FILE 2>&1"] \ No newline at end of file +CMD ["/bin/sh","-c","python3 -u /home/r_predictions/esm_forecaster-0.1.0/runtime/Predictor.py /home/r_predictions/esm_forecaster-0.1.0/r_predictors/prediction_configuration.properties 2>&1 > $LOG_FILE "] \ No newline at end of file diff --git a/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R b/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R index 5fe3723d..9846cb84 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R +++ b/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R @@ -31,8 +31,9 @@ find_last_timestamp <- function(mydata,next_prediction_time,realtime_mode){ if(realtime_mode){ #return(min(c(possible_timestamp,next_prediction_time))) if (next_prediction_time>possible_timestamp){ - return(next_prediction_time) + return(possible_timestamp) }else{ + print("Possible problem with the requested prediction time, there is already data for a timestamp newer than the time requested to predict for. Returning the newer timestamp, being aware that this will lead to this prediction returning no meaningful output") return (possible_timestamp) } }else{ @@ -53,6 +54,8 @@ time_unit_granularity <- "sec" # Handle monitoring data using this time unit gra endpoint_time_unit_granularity <- "seconds" #configuration_properties <- read.properties(".\\prediction_configuration-windows.properties") +print("Reading properties from the following file:") +print(paste(getwd(),"/prediction_configuration.properties",sep='')) configuration_properties <- read.properties(paste(getwd(),"/prediction_configuration.properties",sep='')) realtime_mode <- as.logical(configuration_properties$realtime_mode) #whether or not we should use all datapoints available (True value), or we are only partially using the available dataset (False value) e.g to test the prediction method performance diff --git a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py index 350a8c97..2fddbdf7 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py +++ b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py @@ -42,14 +42,32 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim os.chdir(os.path.dirname(configuration_file_location)) prediction_data_file = get_prediction_data_filename(configuration_file_location) + from sys import platform if State.testing_prediction_functionality: print_with_time("Testing, so output will be based on the horizon setting from the properties file and the last timestamp in the data") print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute) - command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute] + + # Windows + if platform == "win32": + command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute] + # linux + elif platform == "linux" or platform == "linux2": + command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)] + #Choosing the solution of linux + else: + command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)] else: print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute+" "+next_prediction_time) - command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time] + # Windows + if platform == "win32": + command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time] + # Linux + elif platform == "linux" or platform == "linux2": + command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)+" "+str(next_prediction_time)] + #Choosing the solution of linux + else: + command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)+" "+str(next_prediction_time)] process_output = run(command, shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True) if (process_output.stdout==""): @@ -179,7 +197,7 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f State.connection.send_to_topic('intermediate_prediction.%s.%s' % (id, attribute), prediction_message_body) State.connection.send_to_topic('training_events',training_events_message_body) message_not_sent = False - print_with_time("Successfully sent prediction message for "+attribute) + print_with_time("Successfully sent prediction message for "+attribute+" to topic intermediate_prediction.%s.%s" % (id, attribute)) except ConnectionError as exception: State.connection.disconnect() State.connection = messaging.morphemic.Connection('admin', 'admin') -- GitLab From d7e0eb574a13bbd31ab5fbaed5552826b25fc956 Mon Sep 17 00:00:00 2001 From: Andreas Tsagkaropoulos Date: Fri, 6 May 2022 08:33:02 +0000 Subject: [PATCH 7/8] Update morphemic-forecasting-exponentialsmoothing/Dockerfile --- morphemic-forecasting-exponentialsmoothing/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/morphemic-forecasting-exponentialsmoothing/Dockerfile b/morphemic-forecasting-exponentialsmoothing/Dockerfile index f8f4cbd3..0a66676a 100644 --- a/morphemic-forecasting-exponentialsmoothing/Dockerfile +++ b/morphemic-forecasting-exponentialsmoothing/Dockerfile @@ -12,7 +12,7 @@ RUN ls -la RUN python3 setup.py sdist RUN ls ./dist/ -FROM ubuntu:latest +FROM ubuntu:focal RUN mkdir -p /home/r_predictions RUN apt-get update @@ -66,4 +66,4 @@ COPY ./src/r_predictors/forecasting_real_workload.R /home/r_predictions/ WORKDIR /home/r_predictions/esm_forecaster-0.1.0 -CMD ["/bin/sh","-c","python3 -u /home/r_predictions/esm_forecaster-0.1.0/runtime/Predictor.py /home/r_predictions/esm_forecaster-0.1.0/r_predictors/prediction_configuration.properties 2>&1 > $LOG_FILE "] \ No newline at end of file +CMD ["/bin/sh","-c","python3 -u /home/r_predictions/esm_forecaster-0.1.0/runtime/Predictor.py /home/r_predictions/esm_forecaster-0.1.0/r_predictors/prediction_configuration.properties 2>&1 > $LOG_FILE "] -- GitLab From 3f8d522d9ca55821af82efca95823be9de31bed2 Mon Sep 17 00:00:00 2001 From: mriedl Date: Mon, 23 May 2022 11:16:49 +0200 Subject: [PATCH 8/8] fixed connection in SAL --- .../infrastructure/deployment/PAResourceManagerGateway.java | 1 + 1 file changed, 1 insertion(+) diff --git a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/infrastructure/deployment/PAResourceManagerGateway.java b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/infrastructure/deployment/PAResourceManagerGateway.java index cc22fa9b..477e34c6 100644 --- a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/infrastructure/deployment/PAResourceManagerGateway.java +++ b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/infrastructure/deployment/PAResourceManagerGateway.java @@ -191,6 +191,7 @@ public class PAResourceManagerGateway { } private RMStateFull getFullMonitoring() throws NotConnectedException, PermissionRestException { + reconnectIfDisconnected(); LOGGER.debug("Getting full RM state ..."); RMStateFull rmState = rmRestInterface.getRMStateFull(RMConnectionHelper.getSessionId()); LOGGER.debug("Full monitoring got."); -- GitLab