Commit ff3f32fd authored by Andreas Tsagkaropoulos's avatar Andreas Tsagkaropoulos
Browse files

Correction of complete time series creation

Added appropriate code to handle cases in which predictions should be made with a horizon of less than 300 seconds in the future (setting the aggregation interval equal to this horizon)
Added option in subprocess spawning to improve logging output
Improvement of the handling of an unusual configuration and dataset combination (asking to use realtime mode on stale dataset)
parent 5e73ffe8
......@@ -27,10 +27,14 @@ get_current_epoch_time <- function(){
#Assumes an xts time series object as input, with each record having a 1-sec difference from the previous one, and returns the last timestamp which is (or should have been) assigned (if not present).
find_last_timestamp <- function(mydata,next_prediction_time,realtime_mode){
counter <- 0
possible_timestamp <- as.numeric(end(mydata))
if(realtime_mode){
return(min(c(possible_timestamp,next_prediction_time)))
#return(min(c(possible_timestamp,next_prediction_time)))
if (next_prediction_time>possible_timestamp){
return(next_prediction_time)
}else{
return (possible_timestamp)
}
}else{
return (possible_timestamp)
}
......@@ -80,9 +84,20 @@ beta_value_argument <- as.double(args[5])
data_to_process <- read.csv(dataset_to_process, sep=",", header=TRUE)
#sanitize data_to_process by removing any very old values which may have been accidentally introduced. For this reason we remove all data points before now - number_of_days*24hrs*3600sec/hr seconds, and we additionally subtract configuration_properties$prediction_processing_time_safety_margin_seconds in order to account for the time it takes to create the dataset and start the prediction process)
oldest_acceptable_time_point <- get_current_epoch_time() -(as.numeric(configuration_properties$number_of_days_to_use_data_from)*24*3600 + as.numeric(configuration_properties$prediction_processing_time_safety_margin_seconds))
current_time <- get_current_epoch_time()
if (!realtime_mode){
current_time <- tail(data_to_process[time_field_name],1)
}
oldest_acceptable_time_point <- current_time -(as.numeric(configuration_properties$number_of_days_to_use_data_from)*24*3600 + as.numeric(configuration_properties$prediction_processing_time_safety_margin_seconds))
print(paste("Using data after time point",oldest_acceptable_time_point,"..."))
data_to_process <- data_to_process[data_to_process[[time_field_name]]>oldest_acceptable_time_point,]
if (length(data_to_process[,attribute_to_predict])>0){
print(paste("Constructing fine-grained data series for",attribute_to_predict,"using the requested granularity..."))
}else{
print("No valid data points remained after enforcing the number_of_days_to_use_data_from configuration option. This may mean that you are trying to predict using realtime mode, using data points older than the number of days specified in the number_of_days_to_use_data_from configuration option")
stop()
}
#Fail-safe default
df1 <- xts(as.numeric(data_to_process[,attribute_to_predict]),anytime(data_to_process[,time_field_name]))
date_time_init <- anytime(data_to_process[,time_field_name])
......@@ -98,6 +113,7 @@ colnames(mydata)<-c(attribute_to_predict)
print(paste("The complete time series to be predicted for attribute",attribute_to_predict,"has been created"))
configuration_forecasting_horizon <- as.integer(configuration_properties$horizon)
last_timestamp_data <- 0
if (configuration_forecasting_horizon>0){
print("Using a statically defined horizon from the configuration file")
......
......@@ -51,7 +51,7 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time]
process_output = run(command, stdout=PIPE, stderr=PIPE, universal_newlines=True)
process_output = run(command, shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
if (process_output.stdout==""):
print_with_time("Empty output from R predictions - the error output is the following:")
print(process_output.stderr) #There was an error during the calculation of the predicted value
......@@ -242,6 +242,23 @@ class Listener(messaging.listener.MorphemicListener):
print_with_time("Problem while retrieving epoch start and/or prediction_horizon")
return
with open(State.configuration_file_location, "r+b") as f:
State.configuration_details.load(f, "utf-8")
# Do stuff with the p object...
initial_seconds_aggregation_value, metadata = State.configuration_details["number_of_seconds_to_aggregate_on"]
initial_seconds_aggregation_value = int(initial_seconds_aggregation_value)
if (prediction_horizon<initial_seconds_aggregation_value):
print_with_time("Changing number_of_seconds_to_aggregate_on to "+str(prediction_horizon)+" from its initial value "+str(initial_seconds_aggregation_value))
State.configuration_details["number_of_seconds_to_aggregate_on"] = str(prediction_horizon)
f.seek(0)
f.truncate(0)
State.configuration_details.store(f, encoding="utf-8")
maximum_time_required_for_prediction = State.prediction_processing_time_safety_margin_seconds #initialization, assuming X seconds processing time to derive a first prediction
if ((self.prediction_thread is None) or (not self.prediction_thread.is_alive())):
self.prediction_thread = threading.Thread(target = calculate_and_publish_predictions, args =[prediction_horizon,maximum_time_required_for_prediction])
......
......@@ -30,7 +30,7 @@ def predict_attribute(attribute, prediction_data_file, configuration_file_locati
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time]
process_output = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
process_output = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
if (process_output.stdout == ""):
print("Empty output from R predictions")
print("The error output is the following")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment