Commit 339842cb authored by Andreas Tsagkaropoulos's avatar Andreas Tsagkaropoulos
Browse files

Removal of unused parameters in configuration files

Addition of helper print_with_time method, allowing better log output (including the time), and change of print() calls to use it
parent d1c0cc7e
......@@ -4,9 +4,8 @@
#AMQ_PORT_BROKER=61610
APP_NAME=default_application
METHOD=exponential_smoothing
# in file -> METHOD=Holt-Winters
#DATA_PATH=./
INFLUXDB_HOSTNAME=ui-influxdb
INFLUXDB_HOSTNAME=localhost
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
......@@ -24,7 +23,7 @@ forecasting_data_limit=
forecasting_data_used_for_training=0.999
path_to_datasets=C:/Users/user/Desktop/Predictions_using_R/custom_workloads/datasets
application_name=default_application
oldinput_data_file=C:/Users/user/Desktop/Predictions_using_ R/custom_workloads/periodically_increasing_4_metric.csv
input_data_file=C:/Users/user/Desktop/Predictions_using_R/custom_workloads/datasets/demo.csv
clean_data_file=C:/Users/user/Desktop/Predictions_using_R/clean_data.csv
output_data_file=C:/Users/user/Desktop/Predictions_using_R/output_data.csv
......
#AMQ_HOST=ems
#AMQ_USER=aaa
#AMQ_PASSWORD=111
#AMQ_PORT_BROKER=61610
APP_NAME=default_application
METHOD=exponential_smoothing
# in file -> METHOD=Holt-Winters
#DATA_PATH=./
INFLUXDB_HOSTNAME=ui-influxdb
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
......
import datetime
import json
import threading
import time
......@@ -12,6 +13,7 @@ from messaging import morphemic
from runtime.predictions.Prediction import Prediction
from runtime.operational_status.State import State
from runtime.utilities.Utilities import Utilities
print_with_time = Utilities.print_with_time
def fix_path_ending(path):
......@@ -41,18 +43,17 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim
prediction_data_file = get_prediction_data_filename(configuration_file_location)
if State.testing_prediction_functionality:
print("Testing, so output will be based on the horizon setting from the properties file and the last timestamp in the data")
print("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute)
print_with_time("Testing, so output will be based on the horizon setting from the properties file and the last timestamp in the data")
print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute)
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute]
else:
print ("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute+" "+next_prediction_time)
print_with_time("Issuing command: Rscript forecasting_real_workload.R "+str(prediction_data_file)+" "+attribute+" "+next_prediction_time)
command = ['Rscript', 'forecasting_real_workload.R', prediction_data_file, attribute, next_prediction_time]
process_output = run(command, stdout=PIPE, stderr=PIPE, universal_newlines=True)
if (process_output.stdout==""):
print("Empty output from R predictions")
print("The error output is the following")
print_with_time("Empty output from R predictions - the error output is the following:")
print(process_output.stderr) #There was an error during the calculation of the predicted value
process_output_string_list = process_output.stdout.replace("[1] ", "").replace("\"", "").split()
......@@ -79,10 +80,10 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim
prediction_smape = string.replace("smape:", "")
if (prediction_confidence_interval_produced and prediction_value_produced):
prediction_valid = True
print("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval)
print_with_time("The prediction for attribute " + attribute + " is " + str(prediction_value)+ " and the confidence interval is "+prediction_confidence_interval)
else:
print("There was an error during the calculation of the predicted value for "+str(attribute)+", the error log follows")
print(process_output.stderr)
print_with_time("There was an error during the calculation of the predicted value for "+str(attribute)+", the error log follows")
print_with_time(process_output.stderr)
output_prediction = Prediction(prediction_value, prediction_confidence_interval,prediction_valid,prediction_mae,prediction_mse,prediction_mape,prediction_smape)
return output_prediction
......@@ -90,11 +91,11 @@ def predict_attribute(attribute, configuration_file_location,next_prediction_tim
def predict_attributes(attributes,next_prediction_time):
pool = multiprocessing.Pool(len(attributes))
print("Prediction thread pool size set to " + str(len(attributes)))
print_with_time("Prediction thread pool size set to " + str(len(attributes)))
attribute_predictions = {}
for attribute in attributes:
print("Starting " + attribute + " prediction thread")
print_with_time("Starting " + attribute + " prediction thread")
start_time = time.time()
attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[attribute, State.configuration_file_location,str(next_prediction_time)])
#attribute_predictions[attribute] = pool.apply_async(predict_attribute, args=[attribute, configuration_file_location,str(next_prediction_time)]).get()
......@@ -117,7 +118,7 @@ def update_prediction_time(epoch_start,prediction_horizon,maximum_time_for_predi
if (estimated_time_after_prediction > earliest_time_to_predict_at ):
future_prediction_time_factor = 1+(estimated_time_after_prediction-earliest_time_to_predict_at)//prediction_horizon
print("Due to slowness of the prediction, skipping next prediction interval and targeting "+str(future_prediction_time_factor)+" intervals ahead (instead of simply waiting until the time that the next interval begins)")
print_with_time("Due to slowness of the prediction, skipping next prediction interval and targeting "+str(future_prediction_time_factor)+" intervals ahead (instead of simply waiting until the time that the next interval begins)")
prediction_time = earliest_time_to_predict_at+ future_prediction_time_factor*prediction_horizon
else:
prediction_time = earliest_time_to_predict_at + prediction_horizon
......@@ -126,7 +127,7 @@ def update_prediction_time(epoch_start,prediction_horizon,maximum_time_for_predi
def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_for_prediction):
while Listener.start_forecasting:
print("Using " + State.configuration_file_location + " for configuration details...")
print_with_time("Using " + State.configuration_file_location + " for configuration details...")
State.next_prediction_time = update_prediction_time(State.epoch_start, prediction_horizon,maximum_time_required_for_prediction)
for attribute in State.metrics_to_predict:
......@@ -135,7 +136,7 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f
#Below we subtract one reconfiguration interval, as we cannot send a prediction for a time point later than one prediction_horizon interval
wait_time = State.next_prediction_time - prediction_horizon - time.time()
print("Waiting for "+str((int(wait_time*100))/100)+" seconds from time "+str(time.time()))
print_with_time("Waiting for "+str((int(wait_time*100))/100)+" seconds, until time "+datetime.datetime.fromtimestamp(State.next_prediction_time - prediction_horizon).strftime('%Y-%m-%d %H:%M:%S'))
if (wait_time>0):
time.sleep(wait_time)
if(not Listener.start_forecasting):
......@@ -146,7 +147,7 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f
try:
prediction = predict_attributes(State.metrics_to_predict,State.next_prediction_time)
except Exception as e:
print("Could not create a prediction for some or all of the metrics for time point "+State.next_prediction_time+", proceeding to next prediction time. The encountered exception trace follows:")
print_with_time("Could not create a prediction for some or all of the metrics for time point "+State.next_prediction_time+", proceeding to next prediction time. The encountered exception trace follows:")
print(e)
continue
for attribute in State.metrics_to_predict:
......@@ -178,7 +179,7 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f
State.connection.send_to_topic('intermediate_prediction.%s.%s' % (id, attribute), prediction_message_body)
State.connection.send_to_topic('training_events',training_events_message_body)
message_not_sent = False
print("Successfully sent prediction message for "+attribute)
print_with_time("Successfully sent prediction message for "+attribute)
except ConnectionError as exception:
State.connection.disconnect()
State.connection = messaging.morphemic.Connection('admin', 'admin')
......@@ -208,7 +209,7 @@ class Listener(messaging.listener.MorphemicListener):
logging.debug("Headers %s", headers)
logging.debug(" %s", body)
print("New message on "+self.get_topic_name(headers))
print_with_time("New message on "+self.get_topic_name(headers))
print("---------------------------------------------")
if self.get_topic_name(headers) == 'metrics_to_predict':
......@@ -225,11 +226,11 @@ class Listener(messaging.listener.MorphemicListener):
try:
State.metrics_to_predict = json.loads(body)["metrics"]
except Exception as e:
print("Could not load json object to process the start forecasting message \n"+str(body))
print_with_time("Could not load json object to process the start forecasting message \n"+str(body))
return
#waitfor(first period)
if (not State.initial_metric_list_received):
print("The initial metric list has not been received, therefore no predictions are generated")
print_with_time("The initial metric list has not been received, therefore no predictions are generated")
return
prediction_horizon = None
try:
......@@ -238,7 +239,7 @@ class Listener(messaging.listener.MorphemicListener):
prediction_horizon = int(json.loads(body)["prediction_horizon"])
State.next_prediction_time = State.epoch_start
except Exception as e:
print("Problem while retrieving epoch start and/or prediction_horizon")
print_with_time("Problem while retrieving epoch start and/or prediction_horizon")
return
maximum_time_required_for_prediction = State.prediction_processing_time_safety_margin_seconds #initialization, assuming X seconds processing time to derive a first prediction
......@@ -248,11 +249,11 @@ class Listener(messaging.listener.MorphemicListener):
elif self.get_topic_name(headers) == 'stop_forecasting.exponentialsmoothing':
#waitfor(first period)
print("Received message to stop predicting some of the metrics")
print_with_time("Received message to stop predicting some of the metrics")
metrics_to_remove = json.loads(body)["metrics"]
for metric in metrics_to_remove:
if (State.metrics_to_predict.__contains__(metric)):
print("Stopping generating predictions for metric "+metric)
print_with_time("Stopping generating predictions for metric "+metric)
State.metrics_to_predict.remove(metric)
if len(State.metrics_to_predict)==0:
Listener.start_forecasting = False
......@@ -266,7 +267,7 @@ class Listener(messaging.listener.MorphemicListener):
logging.error(" %s", body)
def on_disconnected(self):
print('Disconnected from broker, so will retry to connect...')
print_with_time('Disconnected from broker, so will retry to connect...')
State.disconnected=True
State.disconnection_handler.acquire()
State.disconnection_handler.notifyAll()
......@@ -298,20 +299,20 @@ if __name__ == "__main__":
State.connection = messaging.morphemic.Connection(State.broker_username,State.broker_password,State.broker_address,State.broker_port)
# morphemic = morphemic.model.Model(connection)
State.connection.set_listener(id, Listener())
print("Checking (EMS) broker connectivity state")
print_with_time("Checking (EMS) broker connectivity state")
if (State.disconnected or State.check_stale_connection()):
try:
State.connection.disconnect() #required to avoid the already connected exception
State.connection.connect()
State.disconnected = False
State.connection.topic("metrics_to_predict", id)
print("Successfully connected to metrics_to_predict")
print_with_time("Successfully connected to metrics_to_predict")
State.connection.topic("start_forecasting.exponentialsmoothing", id)
print("Successfully connected to start_forecasting.exponentialsmoothing")
print_with_time("Successfully connected to start_forecasting.exponentialsmoothing")
State.connection.topic("stop_forecasting.exponentialsmoothing", id)
print("Successfully connected to stop_forecasting.exponentialsmoothing")
print_with_time("Successfully connected to stop_forecasting.exponentialsmoothing")
except Exception as e:
print("Encountered exception while trying to connect to broker")
print_with_time("Encountered exception while trying to connect to broker")
print(traceback.format_exc())
State.disconnected = True
time.sleep(5)
......
import pathlib
from morphemic.dataset import DatasetMaker
import os
import datetime
from runtime.operational_status.State import State
class Utilities:
@staticmethod
def print_with_time(x):
now = datetime.datetime.now()
print("["+now.strftime('%Y-%m-%d %H:%M:%S')+"] "+str(x))
@staticmethod
def load_configuration():
with open(State.configuration_file_location,'rb') as config_file:
......@@ -32,7 +38,7 @@ class Utilities:
def update_monitoring_data():
#query(metrics_to_predict,number_of_days_for_which_data_was_retrieved)
#save_new_file()
print("Starting dataset creation process...")
Utilities.print_with_time("Starting dataset creation process...")
try:
"""
......@@ -61,12 +67,12 @@ class Utilities:
datasetmaker = DatasetMaker(State.application_name,_start_collection,configs)
response = datasetmaker.make()
print ("Dataset creation process finished with response "+str(response))
Utilities.print_with_time("Dataset creation process finished with response "+str(response))
if (str(response).startswith("4")):
print("An error response has been detected from the dataset maker, therefore asking for all data from the database in an effort to create a dataset")
Utilities.print_with_time("An error response has been detected from the dataset maker, therefore asking for all data from the database in an effort to create a dataset")
_start_collection = None
response = datasetmaker.make()
print("Second dataset creation process finished with response "+str(response))
Utilities.print_with_time("Second dataset creation process finished with response "+str(response))
except Exception as e:
print("Could not create new dataset as an exception was thrown")
Utilities.print_with_time("Could not create new dataset as an exception was thrown")
print(e)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment