From 153324796f129bad28b6bf6cbb4cba766552be1b Mon Sep 17 00:00:00 2001 From: Andreas Tsagkaropoulos Date: Tue, 16 May 2023 13:15:34 +0300 Subject: [PATCH] Redirected standard error to standard output, and flushed standard output in case of an error in predictions (rather than stderr) Allowed to modify dynamically the number_of_seconds_to_aggregate_on variable value, in order to decrease the forecast depth (and consequently the time to make a new prediction), at a small cost in prediction accuracy --- .../src/r_predictors/forecasting_real_workload.R | 4 ++++ .../src/runtime/Predictor.py | 8 +++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R b/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R index 91469e30..88c08ace 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R +++ b/morphemic-forecasting-exponentialsmoothing/src/r_predictors/forecasting_real_workload.R @@ -196,6 +196,10 @@ if (write_back_clean_data_file){ preprocessing_time<-proc.time() - load_time - start_time testing_datapoints <- tail(data_points, number_of_data_points_used_for_testing) +if (number_of_seconds_to_aggregate_on<(forecasting_horizon%/%10)) { + print(paste("Setting new value for number_of_seconds_to_aggregate_on, from ",number_of_seconds_to_aggregate_on," to ",forecasting_horizon%/%10," in order not to make too far-fetched (slow) predictions")) + number_of_seconds_to_aggregate_on <- forecasting_horizon%/%10 +} mydata.test <- tail(period.apply(testing_datapoints,endpoints(testing_datapoints,endpoint_time_unit_granularity,k=number_of_seconds_to_aggregate_on),mean),forecasting_horizon%/%(number_of_seconds_to_aggregate_on)) if (length(mydata.test)<=0){ diff --git a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py index 28df51f1..0fda1610 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py +++ b/morphemic-forecasting-exponentialsmoothing/src/runtime/Predictor.py @@ -69,7 +69,7 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time] # Linux elif platform == "linux" or platform == "linux2": - command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)+" "+str(next_prediction_time)] + command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)+" "+str(next_prediction_time) + " 2>&1"] #Choosing the solution of linux else: command = ["Rscript forecasting_real_workload.R "+str(prediction_data_file) + " "+ str(attribute)+" "+str(next_prediction_time)] @@ -106,7 +106,7 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim print_with_time("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval) else: print_with_time("There was an error during the calculation of the predicted value for "+str(attribute)+", the error log follows") - print_with_time(process_output.stderr) + print_with_time(process_output.stdout) output_prediction = Prediction(prediction_value, prediction_confidence_interval,prediction_valid,prediction_mae,prediction_mse,prediction_mape,prediction_smape) return output_prediction @@ -141,10 +141,11 @@ def update_prediction_time(epoch_start,prediction_horizon,maximum_time_for_predi if (estimated_time_after_prediction > earliest_time_to_predict_at ): future_prediction_time_factor = 1+(estimated_time_after_prediction-earliest_time_to_predict_at)//prediction_horizon - print_with_time("Due to slowness of the prediction, skipping next prediction interval and targeting "+str(future_prediction_time_factor)+" intervals ahead (instead of simply waiting until the time that the next interval begins)") prediction_time = earliest_time_to_predict_at+ future_prediction_time_factor*prediction_horizon + print_with_time("Due to slowness of the prediction, skipping next time point for prediction (prediction at " + str(earliest_time_to_predict_at-prediction_horizon)+" for "+ str(earliest_time_to_predict_at)+") and targeting "+str(future_prediction_time_factor)+" intervals ahead (prediction at time point "+str(prediction_time-prediction_horizon)+" for "+ str(prediction_time)+")") else: prediction_time = earliest_time_to_predict_at + prediction_horizon + print_with_time("Time is now "+str(current_time)+" and next prediction batch starts with prediction for time "+str(prediction_time)) return prediction_time @@ -271,6 +272,7 @@ class Listener(messaging.listener.MorphemicListener): State.epoch_start = json.loads(body)["epoch_start"] prediction_horizon = int(json.loads(body)["prediction_horizon"]) State.next_prediction_time = update_prediction_time(State.epoch_start,prediction_horizon,State.prediction_processing_time_safety_margin_seconds) # State.next_prediction_time was assigned the value of State.epoch_start here, but this re-initializes targeted prediction times after each start_forecasting message, which is not desired necessarily + print_with_time("A start_forecasting message has been received, epoch start and prediction horizon are "+str(State.epoch_start)+", and "+str(prediction_horizon)+ " seconds respectively") except Exception as e: print_with_time("Problem while retrieving epoch start and/or prediction_horizon") return -- GitLab