Commit 0bff53d7 authored by Tomek Piekarz's avatar Tomek Piekarz
Browse files

Merge branch 'exponential_smoothing_predictor' into 'morphemic-rc2.0'

Miscellaneous improvements, and changes to Dockerfile and CI/CD script as proposed by Tomasz Piekarz

See merge request !254
parents 46eddb4a 77ea55cb
Pipeline #19141 passed with stages
in 28 minutes and 49 seconds
......@@ -229,7 +229,7 @@ deploy:exponential_smoothing:
# - apt-get install python3 python3-pip
- cd morphemic-forecasting-exponentialsmoothing
# - python3 src/setup.py sdist
- docker build -t exponential_smoothing -f ./Dockerfile2 .
- docker build -t exponential_smoothing -f ./Dockerfile .
- docker image ls
- echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin
- docker tag exponential_smoothing:latest $CI_REGISTRY_IMAGE/exponential_smoothing:$CI_COMMIT_BRANCH
......
FROM python:3 as source
RUN pip install --upgrade pip
RUN mkdir /src
ADD ./src/ /src/
WORKDIR /src
RUN pip install -r requirements.txt
RUN ls -la
RUN python3 setup.py sdist
RUN ls ./dist/
#FROM rbase
FROM ubuntu:latest
RUN mkdir -p /home/r_predictions
#RUN mkdir -p /home/prestocloud/output
#ADD input /home/prestocloud/input
RUN apt-get update
ENV TZ=Europe/Athens
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
......@@ -39,29 +50,14 @@ python3-pip \
&& rm -rf /var/lib/apt/lists/*
#RUN mkdir -p /home/prestocloud/
COPY ./src/r_predictors/r_commands.R /home/r_predictions/
#ENTRYPOINT ["Rscript","/home/r_predictions/forecasting_real_workload.R"]
#WORKDIR /home/r_predictions
RUN Rscript /home/r_predictions/r_commands.R #install prerequisite libraries
COPY ./src/dist/esm_forecaster-0.1.0.tar.gz /home/r_predictions/
COPY --from=source ./src/dist/esm_forecaster-0.1.0.tar.gz /home/r_predictions/
COPY ./src/requirements.txt /home/r_predictions/
COPY ./src/prepare_python_dependencies.sh /home/r_predictions/
RUN bash -x /home/r_predictions/prepare_python_dependencies.sh
#---------------------Installation up to here
#RUN apt-get update
#RUN apt-get -y install python3 python3-pip #this installation should be merged with the previous one
#COPY ./src/r_predictors/prediction_configuration.properties /home/r_predictions
#COPY ./extra_r_commands.R /home/r_predictions/
#RUN Rscript /home/r_predictions/extra_r_commands.R #these libraries should be merged with the previous ones
#WORKDIR /home/r_predictions
COPY ./src/r_predictors/forecasting_real_workload.R /home/r_predictions/
#below two commented lines only serve for experiments with predictive functionality
......@@ -69,6 +65,5 @@ COPY ./src/r_predictors/forecasting_real_workload.R /home/r_predictions/
#RUN Rscript forecasting_real_workload.R default_application.csv MinimumCores 1638878119
WORKDIR /home/r_predictions/esm_forecaster-0.1.0
#RUN python3 runtime/Predictor.py r_predictors/prediction_configuration-windows.properties
CMD ["/bin/sh","-c","python3 /home/r_predictions/esm_forecaster-0.1.0/runtime/Predictor.py /home/r_predictions/esm_forecaster-0.1.0/r_predictors/prediction_configuration.properties > /home/r_predictions/exponential_smoothing.log"]
CMD ["/bin/sh","-c","python3 /home/r_predictions/esm_forecaster-0.1.0/runtime/Predictor.py /home/r_predictions/esm_forecaster-0.1.0/r_predictors/prediction_configuration.properties > /home/r_predictions/exponential_smoothing.log"]
\ No newline at end of file
......@@ -5,7 +5,7 @@
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
</content>
<orderEntry type="jdk" jdkName="Python 3.9 (ExponentialSmoothingPredictor)" jdkType="Python SDK" />
<orderEntry type="jdk" jdkName="Python 3.10 (python3_10_venv)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
\ No newline at end of file
# Exponential Smoothing Predictor
## Introduction
The exponential smoothing predictor is based on the use of the Holt-Winters [1] and the ETS [2] libraries. It is coded using Python and R. The core predictive functionality is available as an R script while the Python implementation is responsible for organizing the communication to/from the Morphemic Forecasting module broker and the Dataset provider (InfluxDB).
Apart from standard R and Python libraries, the `jproperties` library should be available for the Python code to be successfully executed. Moreover the `rapportools`,`gbutils`,`forecast`,`ggplot2`,`properties`,`xts`,`anytime` and `purrr` R libraries should be available.
## Configuration
The predictor comes with two configuration files which can be used to specify the behaviour of the component. The two files are located in the src/r_predictors directory of the project.
The options which will most probably need to be changed before deployment are the `broker_address`,the `horizon`, the `number_of_days_to_aggregate_data_from` and the `number_of_seconds_to_aggregate_on`.
| Option | Description |
-------- |------------ |
| realtime_mode | Whether we are producing predictions at realtime or are simply using the R script to make predictions with a given dataset |
| broker_address | The IP address of the broker (or localhost) |
|broker_port| The port which should be used for the broker|
|prediction_method| The name of the prediction method which will be used to create the predictions, either 'Holt-Winters' or 'ETS' (unquoted)|
|forecasting_data_slicing_mode| 'none' (or any string value) to indicate that the whole of the data should be used, or 'percentage' to indicate that only some of the data should be used|
|forecasting_data_offset| The percentage of (initial) data points which should not be used if forecasting_data_slicing mode has been set to 'percentage'|
|forecasting_data_limit| The percentage of (last) data points which should not be used if forecasting_data_slicing mode has been set to 'percentage'|
|forecasting_data_used_for_training| The percentage of data (following its trimming using forecasting_data_offset and forecasting_data_limit) which should be used for training, and assuming that no forecasting horizon has been set and realtime_mode is False|
|horizon| The number of seconds which should be forecasted into the future|
|path_to_datasets|The absolute path to the datasets, **not** including the final slash ('/') character.|
|application_name|The application name to be used when creating a dataset|
|number_of_seconds_to_aggregate_on| The duration of the monitoring data interval in seconds. Monitoring data inside this interval will be aggregated to a single value (the mean value of all per-second observated/interpolated values) |
|number_of_days_to_aggregate_data_from| The number of days which will be used to retrieve monitoring data from when creating a dataset|
|prediction_processing_time_safety_margin_seconds| The number of seconds which will be used as a buffer when performing a prediction in order not to delay predictions and yet use as much data as possible|
|testing_prediction_functionality| If set to 'True', then it will not send a prediction time for which predictions will be requested, but will rather allow the horizon setting to be used to create predictions|
|try_to_optimize_parameters| If set to 'True' it will enable custom search for the optimal (alpha,beta,gamma) parameters, using a number of steps which are defined inside the code. This option may be unstable when used in conjunction with the 'ETS' method and will need more time to finish|
|generate_prediction_png_output| A boolean variable, which if set to 'True' will trigger the creation of a png illustrating the prediction |
|png_output_file| The absolute file path for the png file which should be generated|
## Simple execution of the Predictor component with full Morphemic functionality
When the component is started from the Python initializer script, it will listen to the Forecasting module broker topics which are relevant to it with the exception of the `training_models` event (as the predictor is currently retraining each time it is run). Furthermore, it will be producing monitoring events published under the 'intermediate_prediction.exponentialsmoothing' topic.
To start the predictor in this mode, it is necessary to start the src/runtime/predictions/Prediction.py script, with the location of the configuration file given as an argument.
When the predictor has been started, it is ready to accept messages from the broker, provided it has been started as well.
## Simple usage of the core prediction functionality
In order to simply test the prediction functionality, it is sufficient to use the `Rscript` command, followed by the full path of the `forecasting_real_workload.R` file which resides in the src/r_predictors directory, providing as an argument the prediction time for which a prediction is requested, and optionally the alpha/beta values for the prediction method which will be used (gamma will be estimated.)
## Dockerized execution of the component with full Morphemic functionality
TBD
\ No newline at end of file
import csv
import multiprocessing
import os,subprocess
import os, subprocess
import pickle
import sys
import math
import pandas as pd
from runtime.predictions.Prediction import Prediction
from jproperties import Properties
# Assuming that the timestamps are in input_dataset_timestamp_column field of the input dataset, and that input_dataset_train_label/input_dataset_test_label dataset split is in the input_dataset_split_column field
def predict_attribute(attribute, prediction_data_file,configuration_file_location,next_prediction_time):
input_dataset_timestamp_column="ems_time"
input_dataset_split_column="split"
input_dataset_train_label="train"
input_dataset_test_label="test"
custom_split = False # Whether the data are split based on a tag, or based on an 80%/20% train/test dataset split
custom_split_train_percentage = 0.8 # The amount of data that should be used for training when a custom split is desired.
def predict_attribute(attribute, prediction_data_file, configuration_file_location, next_prediction_time):
prediction_confidence_interval_produced = False
prediction_value_produced = False
prediction_valid = False
#configuration_file_location = os.path.dirname(os.path.realpath(__file__))
os.chdir(os.path.dirname(configuration_file_location))
#prediction_data_file = get_prediction_data_filename(configuration_file_location)
print ("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute+" "+next_prediction_time)
print("Issuing command: Rscript forecasting_real_workload.R " + str(
prediction_data_file) + " " + attribute + " " + next_prediction_time)
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time]
process_output = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
if (process_output.stdout==""):
if (process_output.stdout == ""):
print("Empty output from R predictions")
print("The error output is the following")
print(process_output.stderr) #There was an error during the calculation of the predicted value
print(process_output.stderr) # There was an error during the calculation of the predicted value
process_output_string_list = process_output.stdout.replace("[1] ", "").replace("\"", "").split()
prediction_value = 0
......@@ -53,12 +60,15 @@ def predict_attribute(attribute, prediction_data_file,configuration_file_locatio
prediction_smape = string.replace("smape:", "")
if (prediction_confidence_interval_produced and prediction_value_produced):
prediction_valid = True
print("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval)
print("The prediction for attribute " + attribute + " is " + str(
prediction_value) + " and the confidence interval is " + prediction_confidence_interval)
else:
print("There was an error during the calculation of the predicted value for "+str(attribute)+", the error log follows")
print("There was an error during the calculation of the predicted value for " + str(
attribute) + ", the error log follows")
print(process_output.stderr)
output_prediction = Prediction(prediction_value, prediction_confidence_interval,prediction_valid,prediction_mae,prediction_mse,prediction_mape,prediction_smape)
output_prediction = Prediction(prediction_value, prediction_confidence_interval, prediction_valid, prediction_mae,
prediction_mse, prediction_mape, prediction_smape)
return output_prediction
......@@ -73,40 +83,61 @@ def get_prediction_data_filename(configuration_file_location):
return "" + str(path_to_datasets) + str(application_name) + ".csv"
def fix_path_ending(path):
if (path[-1] is os.sep):
last_path_char = path[-1]
if (last_path_char is os.sep or last_path_char == "/" or last_path_char == "\\"):
return path
else:
return path + os.sep
return path + "/"
def preprocess_dataset(dataset, metric):
import pandas as pd
data=pd.read_csv(dataset)
data = pd.read_csv(dataset)
for column in data.columns:
if(column!=metric and column!="ems_time"):
data.drop(column,axis=1,inplace=True)
data.dropna(inplace = True)
data.sort_values("ems_time")
if (column != metric and column != input_dataset_timestamp_column and column != input_dataset_split_column):
data.drop(column, axis=1, inplace=True)
data.dropna(inplace=True)
data.sort_values(input_dataset_timestamp_column)
return data
def get_times_to_predict(dataset):
return dataset["ems_time"].values[round(len(dataset)*0.9):] #Use only the last 10% of data for testing
def get_initial_time_to_predict(dataset):
#the first after the 90th percentile element - the first element of the test dataset since we adopt a 90/10 split for training/test datasets
return dataset["ems_time"].values[round(len(dataset)*0.9)]
#dataset = sorted(csvreader, key=metric)
def get_times_to_predict(dataset, initial_time_to_predict, final_time_to_predict):
#The assumption here is that the initial and final times to predict are unique values in the input_dataset_timestamp_column column of the dataset
if custom_split:
return dataset[input_dataset_timestamp_column].values[math.ceil(len(dataset) * custom_split_train_percentage):] # Use only the last percentage of data for testing
else:
initial_time_to_predict_index = list(dataset[input_dataset_timestamp_column].values).index(initial_time_to_predict)
final_time_to_predict_index = list(dataset[input_dataset_timestamp_column].values).index(final_time_to_predict)
return dataset[input_dataset_timestamp_column].values[initial_time_to_predict_index:final_time_to_predict_index + 1]
def get_initial_time_to_predict(dataset, metric_name):
time_values = dataset[input_dataset_timestamp_column].values
metric_values = dataset[metric_name].values
if (custom_split):
# the first after the custom_split_train_percentage percentile element - the first element of the test dataset since we adopt a custom split for training/test datasets
calculated_element_index = math.ceil(len(dataset) * custom_split_train_percentage)
return time_values[calculated_element_index]
else:
first_test_element = list(dataset[input_dataset_split_column].values).index(input_dataset_test_label)
calculated_element_index = 0
for time_element_index in range(first_test_element, len(time_values)):
if not math.isnan(metric_values[time_element_index]):
calculated_element_index = time_element_index
break
return time_values[calculated_element_index]
# dataset = sorted(csvreader, key=metric)
def get_final_time_to_predict(dataset):
return dataset["ems_time"].values[-1]
return dataset[input_dataset_timestamp_column].values[-1]
def save_object(obj, filename):
with open(filename, 'wb') as output: # Overwrites any existing file.
pickle.dump(obj, output, pickle.HIGHEST_PROTOCOL)
if __name__ == "__main__":
p = Properties()
......@@ -115,88 +146,65 @@ if __name__ == "__main__":
p.load(f, "utf-8")
path_to_datasets, metadata = p["path_to_datasets"]
application_name, metadata = p["application_name"]
path_to_datasets = fix_path_ending(path_to_datasets)
dataset= "" + str(path_to_datasets) + str(application_name) + ".csv"
path_to_application_datasets = fix_path_ending(path_to_datasets) + str(application_name) + "/"
main_dataset_location = str(path_to_application_datasets) + str(
application_name) + ".csv" # the main dataset is named for example genome12final.csv, and is inside a folder named genome12final, inside the `path_to_datasets` folder
#dataset = application_name #or genome12
datasets_location=""
if application_name=="genome12":
datasets_location = "C:/Users/user/IdeaProjects/morphemic-preprocessor/morphemic-forecasting-exponentialsmoothing/src/test/datasets/genome12"
# dataset = application_name #or genome12
"""
initial_time_to_predict = 1613135625
#final_time_to_predict = 1613167107
final_time_to_predict = 1613193287
"""
elif application_name=="genome18":
output_file_name_prefix = path_to_application_datasets + "prediction_results"
datasets_location = "C:/Users/user/IdeaProjects/morphemic-preprocessor/morphemic-forecasting-exponentialsmoothing/src/test/datasets/genome18"
"""
initial_time_to_predict = 1613644750
#dummy - final_time_to_predict = 1613648333
final_time_to_predict = 1613648472
"""
output_file_name_prefix = dataset+"_prediction_results"
metrics_to_predict = ["EstimatedRemainingTimeContext", "SimulationLeftNumber", "SimulationElapsedTime",
"NotFinishedOnTime", "MinimumCoresContext", "NotFinished", "WillFinishTooSoonContext",
"NotFinishedOnTimeContext", "MinimumCores", "ETPercentile", "RemainingSimulationTimeMetric",
"TotalCores"]
metrics_to_predict = ["EstimatedRemainingTimeContext","SimulationLeftNumber","SimulationElapsedTime","NotFinishedOnTime","MinimumCoresContext","NotFinished","WillFinishTooSoonContext","NotFinishedOnTimeContext","MinimumCores","ETPercentile","RemainingSimulationTimeMetric","TotalCores"]
# metrics_to_predict = ["EstimatedRemainingTimeContext"]
#metrics_to_predict = ["EstimatedRemainingTimeContext"]
saved_object=True
saved_object=False
if saved_object:
for metric in metrics_to_predict:
with open("C:/Users/user/IdeaProjects/morphemic-preprocessor/morphemic-forecasting-exponentialsmoothing/src/test/datasets/"+str(application_name)+"/prediction_lists_"+str(metric)+".pkl", "rb") as input:
with open("C:/Users/user/IdeaProjects/morphemic-preprocessor/morphemic-forecasting-exponentialsmoothing/src/test/datasets/"+str(application_name)+"/output_"+str(metric)+".csv", "w",newline="") as output:
with open(path_to_application_datasets + "prediction_lists_" + str(metric) + ".pkl", "rb") as input:
with open(path_to_application_datasets + "output_" + str(metric) + ".csv", "w", newline="") as output:
prediction_lists = pickle.load(input)
writer = csv.writer(output)
#writer.writeheader()
# writer.writeheader()
writer.writerows(prediction_lists[metric])
#writer.writerows(list(prediction_lists.values())[0])
# writer.writerows(list(prediction_lists.values())[0])
exit(0)
pool_size = max(min(len(metrics_to_predict),12),12)
pool_size = max(min(len(metrics_to_predict), 12), 12)
pool = multiprocessing.Pool()
print("Prediction thread pool size set to " + str(pool_size))
prediction_lists = {}
for metric in metrics_to_predict:
prediction_lists[metric] = [["Timestamp", metric+"_actual_value", metric+"_predicted_value"]]
preprocessed_dataset = preprocess_dataset(dataset, metric)
dataset_location = datasets_location+"/"+metric+".csv"
preprocessed_dataset.to_csv(dataset_location)
prediction_lists[metric] = [["Timestamp", metric + "_actual_value", metric + "_predicted_value"]]
preprocessed_dataset = preprocess_dataset(main_dataset_location, metric)
metric_dataset_location = path_to_application_datasets + metric + ".csv"
preprocessed_dataset.to_csv(metric_dataset_location)
initial_time_to_predict = get_initial_time_to_predict(preprocessed_dataset)
initial_time_to_predict = get_initial_time_to_predict(preprocessed_dataset, metric)
final_time_to_predict = get_final_time_to_predict(preprocessed_dataset)
times_to_predict = get_times_to_predict(preprocessed_dataset)
times_to_predict = get_times_to_predict(preprocessed_dataset, initial_time_to_predict, final_time_to_predict)
prediction_processing = {}
for time_to_predict in times_to_predict:
#predictions = {}
#predictions["Timestamp"] = time_to_predict
#received_prediction = runtime.predict_attribute(metric,"",time_to_predict)
print("Starting " + metric + " prediction thread")
prediction_processing[time_to_predict] = pool.apply_async(predict_attribute, args=[metric, dataset_location,configuration_file_location,str(time_to_predict)])
#predictions[metric] = (prediction_processing[metric].get()).get_error_metrics_string()
#time_to_predict = time_to_predict + 30
prediction_processing[time_to_predict] = pool.apply_async(predict_attribute, args=[metric, metric_dataset_location, configuration_file_location, str(time_to_predict)])
for prediction_time in times_to_predict:
real_value = preprocessed_dataset.loc[preprocessed_dataset['ems_time'] == prediction_time, metric].iloc[0]
prediction_lists[metric].append([prediction_time,real_value,(prediction_processing[prediction_time].get()).value]) #get the results of the processing
real_value = preprocessed_dataset.loc[preprocessed_dataset[input_dataset_timestamp_column] == prediction_time, metric].iloc[0]
prediction_lists[metric].append([prediction_time, real_value, (
prediction_processing[prediction_time].get()).value]) # get the results of the processing
field_names=["Timestamp"]
field_names = ["Timestamp"]
field_names.append(metric)
#for metric in metrics_to_predict:
# field_names.append(metric)
output_file_name=output_file_name_prefix+"_"+metric+".csv"
with open(output_file_name, 'w',newline="") as csvfile:
# sample usage
save_object(prediction_lists, 'C:/Users/user/IdeaProjects/morphemic-preprocessor/morphemic-forecasting-exponentialsmoothing/src/test/datasets/'+str(application_name)+'/prediction_lists_'+str(metric)+'.pkl')
output_file_name = output_file_name_prefix + "_" + metric + ".csv"
with open(output_file_name, 'w', newline="") as csvfile:
save_object(prediction_lists, path_to_application_datasets + 'prediction_lists_' + str(metric) + '.pkl')
writer = csv.writer(csvfile)
writer.writerows(prediction_lists[metric])
......
,MAE,MAPE,sMAPE,MSE
EstimatedRemainingTimeContext,658910.5683596935,inf,1.7098672650226405,619828944081.6696
EstimatedRemainingTimeContext_AltCalc,658910.5683596936,,1.7098672650226394,619828944081.6699
SimulationLeftNumber,10.806496601814302,inf,0.1305127284233977,210.01936099824968
SimulationLeftNumber_AltCalc,10.806496601814306,,0.1305127284233977,210.0193609982496
SimulationElapsedTime,150.5,0.0024213399538184875,0.0024242776660612616,22650.25
SimulationElapsedTime_AltCalc,150.5,0.0024213399538184853,0.0024242776660612616,22650.25
NotFinishedOnTime,24272585.722914636,182.9538346668795,1.7287615913218626,772582844416910.5
NotFinishedOnTime_AltCalc,24272585.722914625,182.95383466687957,1.7287615913218612,772582844416911.0
MinimumCoresContext,1.1301461218757547,inf,2.0,1.2815430687117086
MinimumCoresContext_AltCalc,1.1301461218757525,,2.0,1.2815430687117102
NotFinished,24272585.722914636,182.9538346668795,1.7287615913218626,772582844416910.5
NotFinished_AltCalc,24272585.722914625,182.95383466687957,1.7287615913218612,772582844416911.0
WillFinishTooSoonContext,19823380.16689823,189.17288588510704,1.7267734400368235,502440416587275.9
WillFinishTooSoonContext_AltCalc,19823380.16689823,189.17288588510718,1.7267734400368242,502440416587275.6
NotFinishedOnTimeContext,24272585.722914636,182.9538346668795,1.7287615913218626,772582844416910.5
NotFinishedOnTimeContext_AltCalc,24272585.722914625,182.95383466687957,1.7287615913218612,772582844416911.0
MinimumCores,1.1301461218757547,inf,2.0,1.2815430687117086
MinimumCores_AltCalc,1.1301461218757525,,2.0,1.2815430687117102
ETPercentile,1.9897584298620399,0.02673866959099785,0.025052151584509092,11.542048697873208
ETPercentile_AltCalc,1.9897584298620392,0.026738669590997834,0.025052151584509092,11.542048697873208
RemainingSimulationTimeMetric,150.5,0.002738945711594705,0.0027427062154541495,22650.25
RemainingSimulationTimeMetric_AltCalc,150.5,0.0027389457115947068,0.00274270621545415,22650.25
TotalCores,0.0,0.0,0.0,0.0
TotalCores_AltCalc,0.0,0.0,0.0,0.0
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment