Commit 8b7728ec authored by Andreas Tsagkaropoulos's avatar Andreas Tsagkaropoulos
Browse files

Correction to spawn correctly the R forecasting subprocess from Python in...

Correction to spawn correctly the R forecasting subprocess from Python in Linux (and not only in Windows)
Improvement of the command used to start the forecasting process, in order to flush python output and obtain logs immediately when published
Logging and business logic improvement in forecasts to warn about the availability of data with a timestamp equal to or newer compared to the timestamp of the requested prediction time, and nevertheless return the last available timestamp in the dataset
parent 0f83288b
......@@ -66,4 +66,4 @@ COPY ./src/r_predictors/forecasting_real_workload.R /home/r_predictions/
WORKDIR /home/r_predictions/esm_forecaster-0.1.0
CMD ["/bin/sh","-c","python3 /home/r_predictions/esm_forecaster-0.1.0/runtime/Predictor.py /home/r_predictions/esm_forecaster-0.1.0/r_predictors/prediction_configuration.properties > $LOG_FILE 2>&1"]
\ No newline at end of file
CMD ["/bin/sh","-c","python3 -u /home/r_predictions/esm_forecaster-0.1.0/runtime/Predictor.py /home/r_predictions/esm_forecaster-0.1.0/r_predictors/prediction_configuration.properties 2>&1 > $LOG_FILE "]
\ No newline at end of file
......@@ -31,8 +31,9 @@ find_last_timestamp <- function(mydata,next_prediction_time,realtime_mode){
if(realtime_mode){
#return(min(c(possible_timestamp,next_prediction_time)))
if (next_prediction_time>possible_timestamp){
return(next_prediction_time)
return(possible_timestamp)
}else{
print("Possible problem with the requested prediction time, there is already data for a timestamp newer than the time requested to predict for. Returning the newer timestamp, being aware that this will lead to this prediction returning no meaningful output")
return (possible_timestamp)
}
}else{
......@@ -53,6 +54,8 @@ time_unit_granularity <- "sec" # Handle monitoring data using this time unit gra
endpoint_time_unit_granularity <- "seconds"
#configuration_properties <- read.properties(".\\prediction_configuration-windows.properties")
print("Reading properties from the following file:")
print(paste(getwd(),"/prediction_configuration.properties",sep=''))
configuration_properties <- read.properties(paste(getwd(),"/prediction_configuration.properties",sep=''))
realtime_mode <- as.logical(configuration_properties$realtime_mode) #whether or not we should use all datapoints available (True value), or we are only partially using the available dataset (False value) e.g to test the prediction method performance
......
......@@ -42,14 +42,32 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim
os.chdir(os.path.dirname(configuration_file_location))
prediction_data_file = get_prediction_data_filename(configuration_file_location)
from sys import platform
if State.testing_prediction_functionality:
print_with_time("Testing, so output will be based on the horizon setting from the properties file and the last timestamp in the data")
print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute)
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute]
# Windows
if platform == "win32":
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute]
# linux
elif platform == "linux" or platform == "linux2":
command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)]
#Choosing the solution of linux
else:
command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)]
else:
print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute+" "+next_prediction_time)
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time]
# Windows
if platform == "win32":
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time]
# Linux
elif platform == "linux" or platform == "linux2":
command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)+" "+str(next_prediction_time)]
#Choosing the solution of linux
else:
command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)+" "+str(next_prediction_time)]
process_output = run(command, shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
if (process_output.stdout==""):
......@@ -179,7 +197,7 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f
State.connection.send_to_topic('intermediate_prediction.%s.%s' % (id, attribute), prediction_message_body)
State.connection.send_to_topic('training_events',training_events_message_body)
message_not_sent = False
print_with_time("Successfully sent prediction message for "+attribute)
print_with_time("Successfully sent prediction message for "+attribute+" to topic intermediate_prediction.%s.%s" % (id, attribute))
except ConnectionError as exception:
State.connection.disconnect()
State.connection = messaging.morphemic.Connection('admin', 'admin')
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment