Commit cb5362b3 authored by Fotis Paraskevopoulos's avatar Fotis Paraskevopoulos
Browse files

Merge remote-tracking branch 'origin/morphemic-rc2.0' into iccs-eshybrid-2.0

parents 6bbaa3b4 a7cf6e5f
......@@ -62,7 +62,7 @@ def worker(self, body, metric):
yhat = yhats[k]
yhat_lower = yhat_lowers[k]
yhat_upper = yhat_uppers[k]
self.connector.send_to_topic('intermediate_prediction.gluonmachines.' + metric,
self.connector.send_to_topic('intermediate_prediction.gluonts.' + metric,
{
"metricValue": float(yhat),
"level": 3,
......@@ -81,7 +81,7 @@ def worker(self, body, metric):
class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListener):
id = "gluonmachines"
id = "gluonts"
def __init__(self):
self._run = False
......@@ -91,8 +91,8 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen
def run(self):
self.connector.connect()
self.connector.set_listener(self.id, self)
self.connector.topic("start_forecasting.gluonmachines", self.id)
self.connector.topic("stop_forecasting.gluonmachines", self.id)
self.connector.topic("start_forecasting.gluonts", self.id)
self.connector.topic("stop_forecasting.gluonts", self.id)
self.connector.topic("metrics_to_predict", self.id)
def reconnect(self):
......@@ -100,7 +100,7 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen
self.run()
pass
def on_start_forecasting_gluonmachines(self, body):
def on_start_forecasting_gluonts(self, body):
sent_metrics = body["metrics"]
logging.debug(f"Gluonts Start Forecasting the following metrics: {sent_metrics}")
for metric in sent_metrics:
......@@ -134,11 +134,11 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen
self.connector.send_to_topic("training_models",
{
"metrics": list(metrics),
"forecasting_method": "gluonmachines",
"forecasting_method": "gluonts",
"timestamp": int(time())
})
def on_stop_forecasting_gluonmachines(self, body):
def on_stop_forecasting_gluonts(self, body):
logging.debug(f"Gluonts Stop Forecasting the following metrics: {body['metrics']}")
for metric in body["metrics"]:
if metric in metrics:
......
......@@ -63,6 +63,7 @@ def worker(self, body, metric):
yhat_lower = yhat_lowers[k]
yhat_upper = yhat_uppers[k]
# wait until epoch_start to send
start_sending_time = time.time()
message = {
"metricValue": yhat,
"level": 3,
......@@ -78,7 +79,8 @@ def worker(self, body, metric):
self.connector.send_to_topic('intermediate_prediction.prophet.'+metric, message)
prediction_time = prediction_time + prediction_horizon
epoch_start = epoch_start + prediction_horizon
sleep(prediction_horizon)
execution_time = time.time() - start_sending_time
sleep(prediction_horizon - execution_time)
class Prophet(morphemic.handler.ModelHandler, messaging.listener.MorphemicListener):
......
......@@ -12,7 +12,7 @@ RUN ls -la
RUN python3 setup.py sdist
RUN ls ./dist/
FROM ubuntu:latest
FROM ubuntu:focal
RUN mkdir -p /home/r_predictions
RUN apt-get update
......@@ -66,4 +66,4 @@ COPY ./src/r_predictors/forecasting_real_workload.R /home/r_predictions/
WORKDIR /home/r_predictions/esm_forecaster-0.1.0
CMD ["/bin/sh","-c","python3 /home/r_predictions/esm_forecaster-0.1.0/runtime/Predictor.py /home/r_predictions/esm_forecaster-0.1.0/r_predictors/prediction_configuration.properties > $LOG_FILE 2>&1"]
\ No newline at end of file
CMD ["/bin/sh","-c","python3 -u /home/r_predictions/esm_forecaster-0.1.0/runtime/Predictor.py /home/r_predictions/esm_forecaster-0.1.0/r_predictors/prediction_configuration.properties 2>&1 > $LOG_FILE "]
......@@ -6,11 +6,15 @@ The exponential smoothing predictor is based on the use of the Holt-Winters [1]
Apart from standard R and Python libraries, the libraries included in the src/requirements.txt file should be available for the Python code to be successfully executed. Moreover the `rapportools`,`gbutils`,`forecast`,`ggplot2`,`properties`,`xts`,`anytime` and `purrr` R libraries should be available (included in the src/r_predictors/r_commands.R file).
[1] https://www.rdocumentation.org/packages/forecast/versions/8.15/topics/forecast.HoltWinters
[2] https://www.rdocumentation.org/packages/forecast/versions/8.15/topics/ets
## Configuration
### Configuration file
The predictor comes with two configuration files which can be used to specify the behaviour of the component. The two files are located in the src/r_predictors directory of the project.
The options which will most probably need to be changed before deployment are the `broker_address`,the `horizon`, the `number_of_days_to_aggregate_data_from` and the `number_of_seconds_to_aggregate_on`.
The options which will most probably need to be changed before deployment are the `broker_address`,the `horizon`, the `number_of_days_to_aggregate_data_from` and the `number_of_seconds_to_aggregate_on`.
It should be noted that the `number_of_seconds_to_aggregate_on` variable is updated at runtime to become the minimum between the configuration value and the horizon value received for the predictions (i.e the prediction interval)
| Option | Description |
-------- |------------ |
......@@ -25,7 +29,7 @@ The options which will most probably need to be changed before deployment are th
|horizon| The number of seconds which should be forecasted into the future|
|path_to_datasets|The absolute path to the datasets, **not** including the final slash ('/') character.|
|application_name|The application name to be used when creating a dataset|
|number_of_seconds_to_aggregate_on| The duration of the monitoring data interval in seconds. Monitoring data inside this interval will be aggregated to a single value (the mean value of all per-second observated/interpolated values) |
|number_of_seconds_to_aggregate_on| The duration of the monitoring data interval in seconds (greater than or equal to one). Monitoring data inside this interval will be aggregated to a single value (the mean value of all per-second observated/interpolated values) |
|number_of_days_to_aggregate_data_from| The number of days which will be used to retrieve monitoring data from when creating a dataset|
|prediction_processing_time_safety_margin_seconds| The number of seconds which will be used as a buffer when performing a prediction in order not to delay predictions and yet use as much data as possible|
|testing_prediction_functionality| If set to 'True', then it will not send a prediction time for which predictions will be requested, but will rather allow the horizon setting to be used to create predictions|
......@@ -70,7 +74,7 @@ docker run <container_name>
### Test execution
To quickly test the functionality of the forecaster, assuming that the EMS (or an ActiveMQ broker) has been (or soon will be) setup and is accessible, that the persistence storage module is available, and that the 'latency' and 'memory' metrics are being published to it, the following commands can be issued in order - provided that the broker-client.jar file is available.
To quickly test the functionality of the forecaster, assuming that the EMS (or an ActiveMQ broker) has been (or will soon be) setup and accessible, that the persistence storage module is available, and that the 'latency' and 'memory' metrics are being published to it, the following commands can be issued in order - provided that the broker-client.jar file is available.
1) Publish metrics to predict:
java -jar broker-client.jar publish3 -Umorphemic -Pmorphemic tcp://localhost:61616 metrics_to_predict [{"metric":"latency","level":3,"publish_rate":10000},{"metric":"memory","level":3,"publish_rate":10000}]
......
......@@ -27,10 +27,15 @@ get_current_epoch_time <- function(){
#Assumes an xts time series object as input, with each record having a 1-sec difference from the previous one, and returns the last timestamp which is (or should have been) assigned (if not present).
find_last_timestamp <- function(mydata,next_prediction_time,realtime_mode){
counter <- 0
possible_timestamp <- as.numeric(end(mydata))
if(realtime_mode){
return(min(c(possible_timestamp,next_prediction_time)))
#return(min(c(possible_timestamp,next_prediction_time)))
if (next_prediction_time>possible_timestamp){
return(possible_timestamp)
}else{
print("Possible problem with the requested prediction time, there is already data for a timestamp newer than the time requested to predict for. Returning the newer timestamp, being aware that this will lead to this prediction returning no meaningful output")
return (possible_timestamp)
}
}else{
return (possible_timestamp)
}
......@@ -49,6 +54,8 @@ time_unit_granularity <- "sec" # Handle monitoring data using this time unit gra
endpoint_time_unit_granularity <- "seconds"
#configuration_properties <- read.properties(".\\prediction_configuration-windows.properties")
print("Reading properties from the following file:")
print(paste(getwd(),"/prediction_configuration.properties",sep=''))
configuration_properties <- read.properties(paste(getwd(),"/prediction_configuration.properties",sep=''))
realtime_mode <- as.logical(configuration_properties$realtime_mode) #whether or not we should use all datapoints available (True value), or we are only partially using the available dataset (False value) e.g to test the prediction method performance
......@@ -80,14 +87,28 @@ beta_value_argument <- as.double(args[5])
data_to_process <- read.csv(dataset_to_process, sep=",", header=TRUE)
#sanitize data_to_process by removing any very old values which may have been accidentally introduced. For this reason we remove all data points before now - number_of_days*24hrs*3600sec/hr seconds, and we additionally subtract configuration_properties$prediction_processing_time_safety_margin_seconds in order to account for the time it takes to create the dataset and start the prediction process)
oldest_acceptable_time_point <- get_current_epoch_time() -(as.numeric(configuration_properties$number_of_days_to_use_data_from)*24*3600 + as.numeric(configuration_properties$prediction_processing_time_safety_margin_seconds))
current_time <- get_current_epoch_time()
if (!realtime_mode){
current_time <- tail(data_to_process[time_field_name],1)
}
oldest_acceptable_time_point <- current_time -(as.numeric(configuration_properties$number_of_days_to_use_data_from)*24*3600 + as.numeric(configuration_properties$prediction_processing_time_safety_margin_seconds))
print(paste("Using data after time point",oldest_acceptable_time_point,"..."))
data_to_process <- data_to_process[data_to_process[[time_field_name]]>oldest_acceptable_time_point,]
if (length(data_to_process[,attribute_to_predict])>0){
print(paste("Constructing fine-grained data series for",attribute_to_predict,"using the requested granularity..."))
}else{
print("No valid data points remained after enforcing the number_of_days_to_use_data_from configuration option. This may mean that you are trying to predict using realtime mode, using data points older than the number of days specified in the number_of_days_to_use_data_from configuration option")
stop()
}
#Fail-safe default
df1 <- xts(as.numeric(data_to_process[,attribute_to_predict]),anytime(data_to_process[,time_field_name]))
date_time_init <- anytime(data_to_process[,time_field_name])
date_time_complete <- seq.POSIXt(from=min(date_time_init),
to=max(date_time_init),by=time_unit_granularity)
date_time_complete <- seq.POSIXt(
from=as.POSIXct(min(date_time_init),origin = "1970-01-01"),
to=as.POSIXct(max(date_time_init),origin = "1970-01-01"),
by=time_unit_granularity
)
df2 <- merge(df1,xts(,date_time_complete))
mydata <- na.approx(df2)
colnames(mydata)<-c(attribute_to_predict)
......@@ -95,6 +116,7 @@ colnames(mydata)<-c(attribute_to_predict)
print(paste("The complete time series to be predicted for attribute",attribute_to_predict,"has been created"))
configuration_forecasting_horizon <- as.integer(configuration_properties$horizon)
last_timestamp_data <- 0
if (configuration_forecasting_horizon>0){
print("Using a statically defined horizon from the configuration file")
......
......@@ -42,16 +42,34 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim
os.chdir(os.path.dirname(configuration_file_location))
prediction_data_file = get_prediction_data_filename(configuration_file_location)
from sys import platform
if State.testing_prediction_functionality:
print_with_time("Testing, so output will be based on the horizon setting from the properties file and the last timestamp in the data")
print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute)
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute]
# Windows
if platform == "win32":
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute]
# linux
elif platform == "linux" or platform == "linux2":
command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)]
#Choosing the solution of linux
else:
command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)]
else:
print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute+" "+next_prediction_time)
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time]
process_output = run(command, stdout=PIPE, stderr=PIPE, universal_newlines=True)
# Windows
if platform == "win32":
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time]
# Linux
elif platform == "linux" or platform == "linux2":
command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)+" "+str(next_prediction_time)]
#Choosing the solution of linux
else:
command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)+" "+str(next_prediction_time)]
process_output = run(command, shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
if (process_output.stdout==""):
print_with_time("Empty output from R predictions - the error output is the following:")
print(process_output.stderr) #There was an error during the calculation of the predicted value
......@@ -147,7 +165,7 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f
try:
prediction = predict_attributes(State.metrics_to_predict,State.next_prediction_time)
except Exception as e:
print_with_time("Could not create a prediction for some or all of the metrics for time point "+State.next_prediction_time+", proceeding to next prediction time. The encountered exception trace follows:")
print_with_time("Could not create a prediction for some or all of the metrics for time point "+str(State.next_prediction_time)+", proceeding to next prediction time. The encountered exception trace follows:")
print(e)
continue
for attribute in State.metrics_to_predict:
......@@ -179,7 +197,7 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f
State.connection.send_to_topic('intermediate_prediction.%s.%s' % (id, attribute), prediction_message_body)
State.connection.send_to_topic('training_events',training_events_message_body)
message_not_sent = False
print_with_time("Successfully sent prediction message for "+attribute)
print_with_time("Successfully sent prediction message for "+attribute+" to topic intermediate_prediction.%s.%s" % (id, attribute))
except ConnectionError as exception:
State.connection.disconnect()
State.connection = messaging.morphemic.Connection('admin', 'admin')
......@@ -242,6 +260,23 @@ class Listener(messaging.listener.MorphemicListener):
print_with_time("Problem while retrieving epoch start and/or prediction_horizon")
return
with open(State.configuration_file_location, "r+b") as f:
State.configuration_details.load(f, "utf-8")
# Do stuff with the p object...
initial_seconds_aggregation_value, metadata = State.configuration_details["number_of_seconds_to_aggregate_on"]
initial_seconds_aggregation_value = int(initial_seconds_aggregation_value)
if (prediction_horizon<initial_seconds_aggregation_value):
print_with_time("Changing number_of_seconds_to_aggregate_on to "+str(prediction_horizon)+" from its initial value "+str(initial_seconds_aggregation_value))
State.configuration_details["number_of_seconds_to_aggregate_on"] = str(prediction_horizon)
f.seek(0)
f.truncate(0)
State.configuration_details.store(f, encoding="utf-8")
maximum_time_required_for_prediction = State.prediction_processing_time_safety_margin_seconds #initialization, assuming X seconds processing time to derive a first prediction
if ((self.prediction_thread is None) or (not self.prediction_thread.is_alive())):
self.prediction_thread = threading.Thread(target = calculate_and_publish_predictions, args =[prediction_horizon,maximum_time_required_for_prediction])
......
......@@ -30,7 +30,7 @@ def predict_attribute(attribute, prediction_data_file, configuration_file_locati
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time]
process_output = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
process_output = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
if (process_output.stdout == ""):
print("Empty output from R predictions")
print("The error output is the following")
......
......@@ -191,6 +191,7 @@ public class PAResourceManagerGateway {
}
private RMStateFull getFullMonitoring() throws NotConnectedException, PermissionRestException {
reconnectIfDisconnected();
LOGGER.debug("Getting full RM state ...");
RMStateFull rmState = rmRestInterface.getRMStateFull(RMConnectionHelper.getSessionId());
LOGGER.debug("Full monitoring got.");
......
......@@ -300,16 +300,13 @@ public class NodeCandidateUtils {
try {
if (paCloud.getCloudProviderName().equals("openstack")){
if((!entries.contains(pair)) && openstackOsList.contains(os)) {
entries.add(pair);
JSONArray nodeCandidates = nodeCandidatesCache.get(Quartet.with(paCloud, region, imageReq, ""));
nodeCandidates.forEach(nc -> {
JSONObject nodeCandidate = (JSONObject) nc;
EntityManagerHelper.persist(createLocation(nodeCandidate, paCloud));
EntityManagerHelper.persist(createNodeCandidate(nodeCandidate, image, paCloud));
});
}
entries.add(pair);
JSONArray nodeCandidates = nodeCandidatesCache.get(Quartet.with(paCloud, region, imageReq, ""));
nodeCandidates.forEach(nc -> {
JSONObject nodeCandidate = (JSONObject) nc;
EntityManagerHelper.persist(createLocation(nodeCandidate, paCloud));
EntityManagerHelper.persist(createNodeCandidate(nodeCandidate, image, paCloud));
});
}
else {
JSONArray nodeCandidates = nodeCandidatesCache.get(Quartet.with(paCloud, region, imageReq, ""));
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment