Commit e5b3f5ba authored by maciek riedl's avatar maciek riedl
Browse files

Merge branch 'exponential_smoothing_predictor' into 'morphemic-rc2.0'

Improvements to allow exponential smoothing forecasts using smaller forecasting horizons

See merge request !286
parents 8fcd5ae7 ff3f32fd
Pipeline #20713 passed with stages
in 34 minutes and 15 seconds
......@@ -27,10 +27,14 @@ get_current_epoch_time <- function(){
#Assumes an xts time series object as input, with each record having a 1-sec difference from the previous one, and returns the last timestamp which is (or should have been) assigned (if not present).
find_last_timestamp <- function(mydata,next_prediction_time,realtime_mode){
counter <- 0
possible_timestamp <- as.numeric(end(mydata))
if(realtime_mode){
return(min(c(possible_timestamp,next_prediction_time)))
#return(min(c(possible_timestamp,next_prediction_time)))
if (next_prediction_time>possible_timestamp){
return(next_prediction_time)
}else{
return (possible_timestamp)
}
}else{
return (possible_timestamp)
}
......@@ -80,9 +84,20 @@ beta_value_argument <- as.double(args[5])
data_to_process <- read.csv(dataset_to_process, sep=",", header=TRUE)
#sanitize data_to_process by removing any very old values which may have been accidentally introduced. For this reason we remove all data points before now - number_of_days*24hrs*3600sec/hr seconds, and we additionally subtract configuration_properties$prediction_processing_time_safety_margin_seconds in order to account for the time it takes to create the dataset and start the prediction process)
oldest_acceptable_time_point <- get_current_epoch_time() -(as.numeric(configuration_properties$number_of_days_to_use_data_from)*24*3600 + as.numeric(configuration_properties$prediction_processing_time_safety_margin_seconds))
current_time <- get_current_epoch_time()
if (!realtime_mode){
current_time <- tail(data_to_process[time_field_name],1)
}
oldest_acceptable_time_point <- current_time -(as.numeric(configuration_properties$number_of_days_to_use_data_from)*24*3600 + as.numeric(configuration_properties$prediction_processing_time_safety_margin_seconds))
print(paste("Using data after time point",oldest_acceptable_time_point,"..."))
data_to_process <- data_to_process[data_to_process[[time_field_name]]>oldest_acceptable_time_point,]
if (length(data_to_process[,attribute_to_predict])>0){
print(paste("Constructing fine-grained data series for",attribute_to_predict,"using the requested granularity..."))
}else{
print("No valid data points remained after enforcing the number_of_days_to_use_data_from configuration option. This may mean that you are trying to predict using realtime mode, using data points older than the number of days specified in the number_of_days_to_use_data_from configuration option")
stop()
}
#Fail-safe default
df1 <- xts(as.numeric(data_to_process[,attribute_to_predict]),anytime(data_to_process[,time_field_name]))
date_time_init <- anytime(data_to_process[,time_field_name])
......@@ -98,6 +113,7 @@ colnames(mydata)<-c(attribute_to_predict)
print(paste("The complete time series to be predicted for attribute",attribute_to_predict,"has been created"))
configuration_forecasting_horizon <- as.integer(configuration_properties$horizon)
last_timestamp_data <- 0
if (configuration_forecasting_horizon>0){
print("Using a statically defined horizon from the configuration file")
......
......@@ -51,7 +51,7 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time]
process_output = run(command, stdout=PIPE, stderr=PIPE, universal_newlines=True)
process_output = run(command, shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
if (process_output.stdout==""):
print_with_time("Empty output from R predictions - the error output is the following:")
print(process_output.stderr) #There was an error during the calculation of the predicted value
......@@ -242,6 +242,23 @@ class Listener(messaging.listener.MorphemicListener):
print_with_time("Problem while retrieving epoch start and/or prediction_horizon")
return
with open(State.configuration_file_location, "r+b") as f:
State.configuration_details.load(f, "utf-8")
# Do stuff with the p object...
initial_seconds_aggregation_value, metadata = State.configuration_details["number_of_seconds_to_aggregate_on"]
initial_seconds_aggregation_value = int(initial_seconds_aggregation_value)
if (prediction_horizon<initial_seconds_aggregation_value):
print_with_time("Changing number_of_seconds_to_aggregate_on to "+str(prediction_horizon)+" from its initial value "+str(initial_seconds_aggregation_value))
State.configuration_details["number_of_seconds_to_aggregate_on"] = str(prediction_horizon)
f.seek(0)
f.truncate(0)
State.configuration_details.store(f, encoding="utf-8")
maximum_time_required_for_prediction = State.prediction_processing_time_safety_margin_seconds #initialization, assuming X seconds processing time to derive a first prediction
if ((self.prediction_thread is None) or (not self.prediction_thread.is_alive())):
self.prediction_thread = threading.Thread(target = calculate_and_publish_predictions, args =[prediction_horizon,maximum_time_required_for_prediction])
......
......@@ -30,7 +30,7 @@ def predict_attribute(attribute, prediction_data_file, configuration_file_locati
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time]
process_output = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
process_output = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
if (process_output.stdout == ""):
print("Empty output from R predictions")
print("The error output is the following")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment