Commit 009e4f46 authored by Andreas Tsagkaropoulos's avatar Andreas Tsagkaropoulos
Browse files

Moved hard-coded strings to variables inside the R forecaster

Implemented some data sanitization to cover the case in which erroneous data is included in the dataset (and therefore remove timestamps which are outside the time range for which data has been requested for)
parent 339842cb
......@@ -20,6 +20,11 @@ library(purrr)
find_smape <- function(actual, forecast) {
return (1/length(actual) * sum(2*abs(forecast-actual) / (abs(actual)+abs(forecast))*100))
}
get_current_epoch_time <- function(){
return (as.integer(as.POSIXct(Sys.time())))
}
#Assumes an xts time series object as input, with each record having a 1-sec difference from the previous one, and returns the last timestamp which is (or should have been) assigned (if not present).
find_last_timestamp <- function(mydata,next_prediction_time,realtime_mode){
counter <- 0
......@@ -39,6 +44,9 @@ get_time_value <- function(time_object){
####Time the execution of the prediction
start_time <- proc.time()
time_field_name <- "ems_time" # The field holding the epoch timestamp in the generated csv
time_unit_granularity <- "sec" # Handle monitoring data using this time unit granularity
endpoint_time_unit_granularity <- "seconds"
#configuration_properties <- read.properties(".\\prediction_configuration-windows.properties")
configuration_properties <- read.properties(paste(getwd(),"/prediction_configuration.properties",sep=''))
......@@ -71,16 +79,21 @@ beta_value_argument <- as.double(args[5])
#mydata <- read.csv(dataset_to_process, sep=",", header=TRUE)
data_to_process <- read.csv(dataset_to_process, sep=",", header=TRUE)
#sanitize data_to_process by removing any very old values which may have been accidentally introduced. For this reason we remove all data points before now - number_of_days*24hrs*3600sec/hr seconds, and we additionally subtract configuration_properties$prediction_processing_time_safety_margin_seconds in order to account for the time it takes to create the dataset and start the prediction process)
oldest_acceptable_time_point <- get_current_epoch_time() -(as.numeric(configuration_properties$number_of_days_to_use_data_from)*24*3600 + as.numeric(configuration_properties$prediction_processing_time_safety_margin_seconds))
data_to_process <- data_to_process[data_to_process[[time_field_name]]>oldest_acceptable_time_point,]
#Fail-safe default
df1 <- xts(as.numeric(data_to_process[,attribute_to_predict]),anytime(data_to_process[,"ems_time"]))
date_time_init <- anytime(data_to_process[,"ems_time"])
df1 <- xts(as.numeric(data_to_process[,attribute_to_predict]),anytime(data_to_process[,time_field_name]))
date_time_init <- anytime(data_to_process[,time_field_name])
date_time_complete <- seq.POSIXt(from=min(date_time_init),
to=max(date_time_init),by="sec")
to=max(date_time_init),by=time_unit_granularity)
df2 <- merge(df1,xts(,date_time_complete))
mydata <- na.approx(df2)
colnames(mydata)<-c(attribute_to_predict)
print(paste("The complete time series to be predicted for attribute",attribute_to_predict,"has been created"))
configuration_forecasting_horizon <- as.integer(configuration_properties$horizon)
if (configuration_forecasting_horizon>0){
......@@ -154,7 +167,7 @@ if (write_back_clean_data_file){
preprocessing_time<-proc.time() - load_time - start_time
testing_datapoints <- tail(data_points, number_of_data_points_used_for_testing)
mydata.test <- tail(period.apply(testing_datapoints,endpoints(testing_datapoints,"seconds",k=number_of_seconds_to_aggregate_on),mean),forecasting_horizon%/%(number_of_seconds_to_aggregate_on))
mydata.test <- tail(period.apply(testing_datapoints,endpoints(testing_datapoints,endpoint_time_unit_granularity,k=number_of_seconds_to_aggregate_on),mean),forecasting_horizon%/%(number_of_seconds_to_aggregate_on))
if (length(mydata.test)<=0){
print(paste("Unable to generate predictions as a prediction is requested for a shorter time duration than the aggregation interval (requested prediction with horizon",forecasting_horizon," whereas the aggregation period is",number_of_seconds_to_aggregate_on,")"))
......@@ -162,7 +175,7 @@ if (length(mydata.test)<=0){
}
training_datapoints <- head(data_points, number_of_data_points_used_for_training)
mydata.train <- period.apply(training_datapoints,endpoints(training_datapoints,"seconds",k=number_of_seconds_to_aggregate_on),mean)
mydata.train <- period.apply(training_datapoints,endpoints(training_datapoints,endpoint_time_unit_granularity,k=number_of_seconds_to_aggregate_on),mean)
#print(paste("length-debugging",length(mydata.train)+1,length(mydata.train)+length(mydata.test)))
mydata_trainseries <- (ts(mydata.train,start=c(1),frequency = frequency_setting))
......@@ -389,7 +402,7 @@ if(prediction_method=="ETS"){
if (as.logical(configuration_properties$generate_prediction_png_output)){
print(paste("creating new figure at",configuration_properties$png_output_file))
mydata.aggregated <- period.apply(data_points,endpoints(data_points,"seconds",k=number_of_seconds_to_aggregate_on),mean)
mydata.aggregated <- period.apply(data_points,endpoints(data_points,endpoint_time_unit_granularity,k=number_of_seconds_to_aggregate_on),mean)
mydata_full_series <- ts(mydata.aggregated,start=c(1),frequency = frequency_setting)
png(filename=configuration_properties$png_output_file,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment