Commit 3a03dfb3 authored by maciek riedl's avatar maciek riedl
Browse files

Merge branch 'exponential_smoothing_predictor' into 'morphemic-rc2.0'

Exponential Smoothing forecaster improvements

See merge request !278
parents 408ce4cf 9e6b1c58
Pipeline #20459 failed with stages
in 33 minutes and 5 seconds
FROM python:3 as source
RUN pip install --upgrade pip
RUN mkdir /src
ADD ./src/ /src/
WORKDIR /src
RUN pip install -r requirements.txt
RUN ls -la
RUN python3 setup.py sdist
RUN ls ./dist/
#FROM rbase
FROM ubuntu:latest
RUN mkdir -p /home/r_predictions
#RUN mkdir -p /home/prestocloud/output
#ADD input /home/prestocloud/input
RUN apt-get update
ENV TZ=Europe/Athens
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y \
libcurl4-openssl-dev \
build-essential \
r-base-core \
r-base-dev \
r-cran-digest \
r-cran-boot \
r-cran-class \
r-cran-cluster \
r-cran-codetools \
r-cran-foreign \
r-cran-kernsmooth \
r-cran-lattice \
r-cran-littler \
r-cran-mass \
r-cran-matrix \
r-cran-mgcv \
r-cran-nlme \
r-cran-nnet \
r-cran-pkgkitten \
r-cran-rcpp \
r-cran-rpart \
r-cran-spatial \
r-cran-survival \
r-doc-html \
r-recommended \
python3 \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
#RUN mkdir -p /home/prestocloud/
COPY ./src/r_predictors/r_commands.R /home/r_predictions/
#ENTRYPOINT ["Rscript","/home/r_predictions/forecasting_real_workload.R"]
#WORKDIR /home/r_predictions
RUN Rscript /home/r_predictions/r_commands.R #install prerequisite libraries
COPY --from=source ./src/dist/esm_forecaster-0.1.0.tar.gz /home/r_predictions/
COPY ./src/requirements.txt /home/r_predictions/
COPY ./src/prepare_python_dependencies.sh /home/r_predictions/
RUN bash -x /home/r_predictions/prepare_python_dependencies.sh
#---------------------Installation up to here
#RUN apt-get update
#RUN apt-get -y install python3 python3-pip #this installation should be merged with the previous one
#COPY ./src/r_predictors/prediction_configuration.properties /home/r_predictions
#COPY ./extra_r_commands.R /home/r_predictions/
#RUN Rscript /home/r_predictions/extra_r_commands.R #these libraries should be merged with the previous ones
#WORKDIR /home/r_predictions
COPY ./src/r_predictors/forecasting_real_workload.R /home/r_predictions/
#below two commented lines only serve for experiments with predictive functionality
#COPY ./default_application.csv /home/r_predictions
#RUN Rscript forecasting_real_workload.R default_application.csv MinimumCores 1638878119
WORKDIR /home/r_predictions/esm_forecaster-0.1.0
#RUN python3 runtime/Predictor.py r_predictors/prediction_configuration-windows.properties
CMD ["/bin/sh","-c","python3 /home/r_predictions/esm_forecaster-0.1.0/runtime/Predictor.py /home/r_predictions/esm_forecaster-0.1.0/r_predictors/prediction_configuration.properties > /home/r_predictions/exponential_smoothing.log"]
......@@ -20,6 +20,11 @@ library(purrr)
find_smape <- function(actual, forecast) {
return (1/length(actual) * sum(2*abs(forecast-actual) / (abs(actual)+abs(forecast))*100))
}
get_current_epoch_time <- function(){
return (as.integer(as.POSIXct(Sys.time())))
}
#Assumes an xts time series object as input, with each record having a 1-sec difference from the previous one, and returns the last timestamp which is (or should have been) assigned (if not present).
find_last_timestamp <- function(mydata,next_prediction_time,realtime_mode){
counter <- 0
......@@ -39,6 +44,9 @@ get_time_value <- function(time_object){
####Time the execution of the prediction
start_time <- proc.time()
time_field_name <- "ems_time" # The field holding the epoch timestamp in the generated csv
time_unit_granularity <- "sec" # Handle monitoring data using this time unit granularity
endpoint_time_unit_granularity <- "seconds"
#configuration_properties <- read.properties(".\\prediction_configuration-windows.properties")
configuration_properties <- read.properties(paste(getwd(),"/prediction_configuration.properties",sep=''))
......@@ -71,16 +79,21 @@ beta_value_argument <- as.double(args[5])
#mydata <- read.csv(dataset_to_process, sep=",", header=TRUE)
data_to_process <- read.csv(dataset_to_process, sep=",", header=TRUE)
#sanitize data_to_process by removing any very old values which may have been accidentally introduced. For this reason we remove all data points before now - number_of_days*24hrs*3600sec/hr seconds, and we additionally subtract configuration_properties$prediction_processing_time_safety_margin_seconds in order to account for the time it takes to create the dataset and start the prediction process)
oldest_acceptable_time_point <- get_current_epoch_time() -(as.numeric(configuration_properties$number_of_days_to_use_data_from)*24*3600 + as.numeric(configuration_properties$prediction_processing_time_safety_margin_seconds))
data_to_process <- data_to_process[data_to_process[[time_field_name]]>oldest_acceptable_time_point,]
#Fail-safe default
df1 <- xts(as.numeric(data_to_process[,attribute_to_predict]),anytime(data_to_process[,"ems_time"]))
date_time_init <- anytime(data_to_process[,"ems_time"])
df1 <- xts(as.numeric(data_to_process[,attribute_to_predict]),anytime(data_to_process[,time_field_name]))
date_time_init <- anytime(data_to_process[,time_field_name])
date_time_complete <- seq.POSIXt(from=min(date_time_init),
to=max(date_time_init),by="sec")
to=max(date_time_init),by=time_unit_granularity)
df2 <- merge(df1,xts(,date_time_complete))
mydata <- na.approx(df2)
colnames(mydata)<-c(attribute_to_predict)
print(paste("The complete time series to be predicted for attribute",attribute_to_predict,"has been created"))
configuration_forecasting_horizon <- as.integer(configuration_properties$horizon)
if (configuration_forecasting_horizon>0){
......@@ -154,7 +167,7 @@ if (write_back_clean_data_file){
preprocessing_time<-proc.time() - load_time - start_time
testing_datapoints <- tail(data_points, number_of_data_points_used_for_testing)
mydata.test <- tail(period.apply(testing_datapoints,endpoints(testing_datapoints,"seconds",k=number_of_seconds_to_aggregate_on),mean),forecasting_horizon%/%(number_of_seconds_to_aggregate_on))
mydata.test <- tail(period.apply(testing_datapoints,endpoints(testing_datapoints,endpoint_time_unit_granularity,k=number_of_seconds_to_aggregate_on),mean),forecasting_horizon%/%(number_of_seconds_to_aggregate_on))
if (length(mydata.test)<=0){
print(paste("Unable to generate predictions as a prediction is requested for a shorter time duration than the aggregation interval (requested prediction with horizon",forecasting_horizon," whereas the aggregation period is",number_of_seconds_to_aggregate_on,")"))
......@@ -162,7 +175,7 @@ if (length(mydata.test)<=0){
}
training_datapoints <- head(data_points, number_of_data_points_used_for_training)
mydata.train <- period.apply(training_datapoints,endpoints(training_datapoints,"seconds",k=number_of_seconds_to_aggregate_on),mean)
mydata.train <- period.apply(training_datapoints,endpoints(training_datapoints,endpoint_time_unit_granularity,k=number_of_seconds_to_aggregate_on),mean)
#print(paste("length-debugging",length(mydata.train)+1,length(mydata.train)+length(mydata.test)))
mydata_trainseries <- (ts(mydata.train,start=c(1),frequency = frequency_setting))
......@@ -389,7 +402,7 @@ if(prediction_method=="ETS"){
if (as.logical(configuration_properties$generate_prediction_png_output)){
print(paste("creating new figure at",configuration_properties$png_output_file))
mydata.aggregated <- period.apply(data_points,endpoints(data_points,"seconds",k=number_of_seconds_to_aggregate_on),mean)
mydata.aggregated <- period.apply(data_points,endpoints(data_points,endpoint_time_unit_granularity,k=number_of_seconds_to_aggregate_on),mean)
mydata_full_series <- ts(mydata.aggregated,start=c(1),frequency = frequency_setting)
png(filename=configuration_properties$png_output_file,
......
#AMQ_HOST=ems
#AMQ_USER=aaa
#AMQ_PASSWORD=111
#AMQ_PORT_BROKER=61610
APP_NAME=default_application
METHOD=exponential_smoothing
INFLUXDB_HOSTNAME=localhost
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
INFLUXDB_DBNAME=morphemic
INFLUXDB_ORG=morphemic
broker_address=localhost
broker_port=61610
broker_username=morphemic
......@@ -9,7 +23,7 @@ forecasting_data_limit=
forecasting_data_used_for_training=0.999
path_to_datasets=C:/Users/user/Desktop/Predictions_using_R/custom_workloads/datasets
application_name=default_application
oldinput_data_file=C:/Users/user/Desktop/Predictions_using_ R/custom_workloads/periodically_increasing_4_metric.csv
input_data_file=C:/Users/user/Desktop/Predictions_using_R/custom_workloads/datasets/demo.csv
clean_data_file=C:/Users/user/Desktop/Predictions_using_R/clean_data.csv
output_data_file=C:/Users/user/Desktop/Predictions_using_R/output_data.csv
......
APP_NAME=default_application
METHOD=exponential_smoothing
INFLUXDB_HOSTNAME=ui-influxdb
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
INFLUXDB_DBNAME=morphemic
INFLUXDB_ORG=morphemic
broker_address=localhost
broker_port=61610
broker_username=morphemic
......
# Title : TODO
# Objective : TODO
# Created by: user
# Created on: 28/4/2021
dataset_to_process <- "C:/Users/user/Desktop/Predictions\ using\ R/custom_workloads/datasets/demo.csv"
data_to_process <- read.csv(dataset_to_process, sep=",", header=TRUE)
......
import datetime
import json
import threading
import time
......@@ -12,6 +13,7 @@ from messaging import morphemic
from runtime.predictions.Prediction import Prediction
from runtime.operational_status.State import State
from runtime.utilities.Utilities import Utilities
print_with_time = Utilities.print_with_time
def fix_path_ending(path):
......@@ -41,18 +43,17 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim
prediction_data_file = get_prediction_data_filename(configuration_file_location)
if State.testing_prediction_functionality:
print("Testing, so output will be based on the horizon setting from the properties file and the last timestamp in the data")
print("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute)
print_with_time("Testing, so output will be based on the horizon setting from the properties file and the last timestamp in the data")
print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute)
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute]
else:
print ("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute+" "+next_prediction_time)
print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute+" "+next_prediction_time)
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time]
process_output = run(command, stdout=PIPE, stderr=PIPE, universal_newlines=True)
if (process_output.stdout==""):
print("Empty output from R predictions")
print("The error output is the following")
print_with_time("Empty output from R predictions - the error output is the following:")
print(process_output.stderr) #There was an error during the calculation of the predicted value
process_output_string_list = process_output.stdout.replace("[1] ", "").replace("\"", "").split()
......@@ -79,10 +80,10 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim
prediction_smape = string.replace("smape:", "")
if (prediction_confidence_interval_produced and prediction_value_produced):
prediction_valid = True
print("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval)
print_with_time("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval)
else:
print("There was an error during the calculation of the predicted value for "+str(attribute)+", the error log follows")
print(process_output.stderr)
print_with_time("There was an error during the calculation of the predicted value for "+str(attribute)+", the error log follows")
print_with_time(process_output.stderr)
output_prediction = Prediction(prediction_value, prediction_confidence_interval,prediction_valid,prediction_mae,prediction_mse,prediction_mape,prediction_smape)
return output_prediction
......@@ -90,11 +91,11 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim
def predict_attributes(attributes,next_prediction_time):
pool = multiprocessing.Pool(len(attributes))
print("Prediction thread pool size set to " + str(len(attributes)))
print_with_time("Prediction thread pool size set to " + str(len(attributes)))
attribute_predictions = {}
for attribute in attributes:
print("Starting " + attribute + " prediction thread")
print_with_time("Starting " + attribute + " prediction thread")
start_time = time.time()
attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[attribute, State.configuration_file_location,str(next_prediction_time)])
#attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[attribute, configuration_file_location,str(next_prediction_time)]).get()
......@@ -117,7 +118,7 @@ def update_prediction_time(epoch_start,prediction_horizon,maximum_time_for_predi
if (estimated_time_after_prediction > earliest_time_to_predict_at ):
future_prediction_time_factor = 1+(estimated_time_after_prediction-earliest_time_to_predict_at)//prediction_horizon
print("Due to slowness of the prediction, skipping next prediction interval and targeting "+str(future_prediction_time_factor)+" intervals ahead (instead of simply waiting until the time that the next interval begins)")
print_with_time("Due to slowness of the prediction, skipping next prediction interval and targeting "+str(future_prediction_time_factor)+" intervals ahead (instead of simply waiting until the time that the next interval begins)")
prediction_time = earliest_time_to_predict_at+ future_prediction_time_factor*prediction_horizon
else:
prediction_time = earliest_time_to_predict_at + prediction_horizon
......@@ -126,7 +127,7 @@ def update_prediction_time(epoch_start,prediction_horizon,maximum_time_for_predi
def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_for_prediction):
while Listener.start_forecasting:
print("Using " + State.configuration_file_location + " for configuration details...")
print_with_time("Using " + State.configuration_file_location + " for configuration details...")
State.next_prediction_time = update_prediction_time(State.epoch_start, prediction_horizon,maximum_time_required_for_prediction)
for attribute in State.metrics_to_predict:
......@@ -135,7 +136,7 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f
#Below we subtract one reconfiguration interval, as we cannot send a prediction for a time point later than one prediction_horizon interval
wait_time = State.next_prediction_time - prediction_horizon - time.time()
print("Waiting for "+str((int(wait_time*100))/100)+" seconds")
print_with_time("Waiting for "+str((int(wait_time*100))/100)+" seconds, until time "+datetime.datetime.fromtimestamp(State.next_prediction_time - prediction_horizon).strftime('%Y-%m-%d %H:%M:%S'))
if (wait_time>0):
time.sleep(wait_time)
if(not Listener.start_forecasting):
......@@ -146,7 +147,7 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f
try:
prediction = predict_attributes(State.metrics_to_predict,State.next_prediction_time)
except Exception as e:
print("Could not create a prediction for some or all of the metrics for time point "+State.next_prediction_time+", proceeding to next prediction time. The encountered exception trace follows:")
print_with_time("Could not create a prediction for some or all of the metrics for time point "+State.next_prediction_time+", proceeding to next prediction time. The encountered exception trace follows:")
print(e)
continue
for attribute in State.metrics_to_predict:
......@@ -178,7 +179,7 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f
State.connection.send_to_topic('intermediate_prediction.%s.%s' % (id, attribute), prediction_message_body)
State.connection.send_to_topic('training_events',training_events_message_body)
message_not_sent = False
print("Successfully sent prediction message for "+attribute)
print_with_time("Successfully sent prediction message for "+attribute)
except ConnectionError as exception:
State.connection.disconnect()
State.connection = messaging.morphemic.Connection('admin', 'admin')
......@@ -208,7 +209,7 @@ class Listener(messaging.listener.MorphemicListener):
logging.debug("Headers %s", headers)
logging.debug(" %s", body)
print("New message on "+self.get_topic_name(headers))
print_with_time("New message on "+self.get_topic_name(headers))
print("---------------------------------------------")
if self.get_topic_name(headers) == 'metrics_to_predict':
......@@ -225,11 +226,11 @@ class Listener(messaging.listener.MorphemicListener):
try:
State.metrics_to_predict = json.loads(body)["metrics"]
except Exception as e:
print("Could not load json object to process the start forecasting message \n"+str(body))
print_with_time("Could not load json object to process the start forecasting message \n"+str(body))
return
#waitfor(first period)
if (not State.initial_metric_list_received):
print("The initial metric list has not been received, therefore no predictions are generated")
print_with_time("The initial metric list has not been received, therefore no predictions are generated")
return
prediction_horizon = None
try:
......@@ -238,21 +239,21 @@ class Listener(messaging.listener.MorphemicListener):
prediction_horizon = int(json.loads(body)["prediction_horizon"])
State.next_prediction_time = State.epoch_start
except Exception as e:
print("Problem while retrieving epoch start and/or prediction_horizon")
print_with_time("Problem while retrieving epoch start and/or prediction_horizon")
return
maximum_time_required_for_prediction = State.prediction_processing_time_safety_margin_seconds #initialization, assuming 20 seconds processing time to derive a first prediction
maximum_time_required_for_prediction = State.prediction_processing_time_safety_margin_seconds #initialization, assuming X seconds processing time to derive a first prediction
if ((self.prediction_thread is None) or (not self.prediction_thread.is_alive())):
self.prediction_thread = threading.Thread(target = calculate_and_publish_predictions, args =[prediction_horizon,maximum_time_required_for_prediction])
self.prediction_thread.start()
elif self.get_topic_name(headers) == 'stop_forecasting.exponentialsmoothing':
#waitfor(first period)
print("Received message to stop predicting some of the metrics")
print_with_time("Received message to stop predicting some of the metrics")
metrics_to_remove = json.loads(body)["metrics"]
for metric in metrics_to_remove:
if (State.metrics_to_predict.__contains__(metric)):
print("Stopping generating predictions for metric "+metric)
print_with_time("Stopping generating predictions for metric "+metric)
State.metrics_to_predict.remove(metric)
if len(State.metrics_to_predict)==0:
Listener.start_forecasting = False
......@@ -266,7 +267,7 @@ class Listener(messaging.listener.MorphemicListener):
logging.error(" %s", body)
def on_disconnected(self):
print('Disconnected from broker, so will retry to connect...')
print_with_time('Disconnected from broker, so will retry to connect...')
State.disconnected=True
State.disconnection_handler.acquire()
State.disconnection_handler.notifyAll()
......@@ -298,20 +299,20 @@ if __name__ == "__main__":
State.connection = messaging.morphemic.Connection(State.broker_username,State.broker_password,State.broker_address,State.broker_port)
# morphemic = morphemic.model.Model(connection)
State.connection.set_listener(id, Listener())
print("Checking (EMS) broker connectivity state")
print_with_time("Checking (EMS) broker connectivity state")
if (State.disconnected or State.check_stale_connection()):
try:
State.connection.disconnect() #required to avoid the already connected exception
State.connection.connect()
State.disconnected = False
State.connection.topic("metrics_to_predict", id)
print("Successfully connected to metrics_to_predict")
print_with_time("Successfully connected to metrics_to_predict")
State.connection.topic("start_forecasting.exponentialsmoothing", id)
print("Successfully connected to start_forecasting.exponentialsmoothing")
print_with_time("Successfully connected to start_forecasting.exponentialsmoothing")
State.connection.topic("stop_forecasting.exponentialsmoothing", id)
print("Successfully connected to stop_forecasting.exponentialsmoothing")
print_with_time("Successfully connected to stop_forecasting.exponentialsmoothing")
except Exception as e:
print("Encountered exception while trying to connect to broker")
print_with_time("Encountered exception while trying to connect to broker")
print(traceback.format_exc())
State.disconnected = True
time.sleep(5)
......
......@@ -5,28 +5,40 @@ class State:
"""
Fail-safe default values introduced below
"""
#Used to create the dataset from the InfluxDB
application_name = "default_application"
influxdb_dbname = "morphemic"
influxdb_password = "password"
influxdb_username = "morphemic"
influxdb_port = 8086
influxdb_hostname = "ui-influxdb"
path_to_datasets = "./datasets"
dataset_file_name = "exponential_smoothing_dataset.csv"
number_of_days_to_use_data_from = 365
#Forecaster variables
metrics_to_predict = []
epoch_start = 0
next_prediction_time = 0
previous_prediction = None
configuration_file_location="/home/src/r_predictors/prediction_configuration.properties"
configuration_details = Properties()
broker_address = "localhost"
broker_port = 61610
broker_username = "morphemic"
broker_password = "morphemic"
path_to_datasets = "./datasets"
dataset_file_name = None
number_of_days_to_use_data_from = 365
prediction_processing_time_safety_margin_seconds = None
connection = None
prediction_processing_time_safety_margin_seconds = 20
disconnected = True
disconnection_handler = threading.Condition()
initial_metric_list_received = False
testing_prediction_functionality = False
#Connection details
connection = None
broker_address = "localhost"
broker_port = 61610
broker_username = "morphemic"
broker_password = "morphemic"
@staticmethod
def check_stale_connection():
return (not State.connection.conn.is_connected())
import pathlib
from morphemic.dataset import DatasetMaker
import os
import datetime
from runtime.operational_status.State import State
class Utilities:
@staticmethod
def print_with_time(x):
now = datetime.datetime.now()
print("["+now.strftime('%Y-%m-%d %H:%M:%S')+"] "+str(x))
@staticmethod
def load_configuration():
with open(State.configuration_file_location,'rb') as config_file:
......@@ -20,14 +26,23 @@ class Utilities:
State.broker_username = State.configuration_details.get("broker_username").data
State.broker_password = State.configuration_details.get("broker_password").data
State.influxdb_hostname = State.configuration_details.get("INFLUXDB_HOSTNAME").data
State.influxdb_port = int(State.configuration_details.get("INFLUXDB_PORT").data)
State.influxdb_username = State.configuration_details.get("INFLUXDB_USERNAME").data
State.influxdb_password = State.configuration_details.get("INFLUXDB_PASSWORD").data
State.influxdb_dbname = State.configuration_details.get("INFLUXDB_DBNAME").data
State.influxdb_org = State.configuration_details.get("INFLUXDB_ORG").data
State.application_name = State.configuration_details.get("APP_NAME").data
#This method accesses influx db to retrieve the most recent metric values.
@staticmethod
def update_monitoring_data():
#query(metrics_to_predict,number_of_days_for_which_data_was_retrieved)
#save_new_file()
print("Starting dataset creation process...")
Utilities.print_with_time("Starting dataset creation process...")
try:
"""
Deprecated functionality to retrieve dataset creation details. Relevant functionality moved inside the load configuration method
influxdb_hostname = os.environ.get("INFLUXDB_HOSTNAME","localhost")
influxdb_port = int(os.environ.get("INFLUXDB_PORT","8086"))
influxdb_username = os.environ.get("INFLUXDB_USERNAME","morphemic")
......@@ -35,28 +50,29 @@ class Utilities:
influxdb_dbname = os.environ.get("INFLUXDB_DBNAME","morphemic")
influxdb_org = os.environ.get("INFLUXDB_ORG","morphemic")
application_name = "default_application"
"""
#_start_collection = None # '30m','1h', '2d', #None for everything
_start_collection = str(State.number_of_days_to_use_data_from)+"d" # '30m','1h', '2d', None for everything
pathlib.Path(State.path_to_datasets).mkdir(parents=True, exist_ok=True)
configs = {
'hostname': influxdb_hostname,
'port': influxdb_port,
'username': influxdb_username,
'password': influxdb_password,
'dbname': influxdb_dbname,
'hostname': State.influxdb_hostname,
'port': State.influxdb_port,
'username': State.influxdb_username,
'password': State.influxdb_password,
'dbname': State.influxdb_dbname,
'path_dataset': State.path_to_datasets
}
datasetmaker = DatasetMaker(application_name,_start_collection,configs)
datasetmaker = DatasetMaker(State.application_name,_start_collection,configs)
response = datasetmaker.make()
print ("Dataset creation process finished with response "+str(response))
Utilities.print_with_time("Dataset creation process finished with response "+str(response))
if (str(response).startswith("4")):
print("An error response has been detected from the dataset maker, therefore asking for all data from the database in an effort to create a dataset")
Utilities.print_with_time("An error response has been detected from the dataset maker, therefore asking for all data from the database in an effort to create a dataset")
_start_collection = None
response = datasetmaker.make()
print("Second dataset creation process finished with response "+str(response))
Utilities.print_with_time("Second dataset creation process finished with response "+str(response))
except Exception as e:
print("Could not create new dataset as an exception was thrown")
Utilities.print_with_time("Could not create new dataset as an exception was thrown")
print(e)
\ No newline at end of file
......@@ -122,7 +122,6 @@ if (configuration_properties$forecasting_data_slicing_mode == "percentage"){
print(paste("Data points number is",data_points_number,"- from these",number_of_data_points_used_for_testing,"will be used for testing. If the horizon is too large, only half of the data points will be used to evaluate the prediction"))
}
#TODO check the code line below for validity - maybe use head and tail
data_points <-tail(head(mydata[,attribute_to_predict],forecasting_data_points_limit),data_points_number-forecasting_data_offset)
###Load time
......
......@@ -28,6 +28,6 @@ if __name__=="__main__":
#Below we subtract one reconfiguration interval, as we cannot send a prediction for a time point later than one prediction_horizon interval
wait_time = next_prediction_time - horizon - time.time()
print("Waiting for "+str((int(wait_time*100))/100)+" seconds")
print("Waiting for "+str((int(wait_time*100))/100)+" seconds from time "+str(time.time()))
if (wait_time>0):
time.sleep(wait_time)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment