Commit 18bd5a10 authored by Andreas Tsagkaropoulos's avatar Andreas Tsagkaropoulos
Browse files

Enriched property files to also contain configuration parameters which were...

Enriched property files to also contain configuration parameters which were expected as environmental variables
Exclusively rely on property file values to connect to InfluxDB for historical metric values
Improved the arrangement of State class fields
Improved prediction logs
parent 1e9d67d2
#AMQ_HOST=ems
#AMQ_USER=aaa
#AMQ_PASSWORD=111
#AMQ_PORT_BROKER=61610
APP_NAME=default_application
METHOD=exponential_smoothing
# in file -> METHOD=Holt-Winters
#DATA_PATH=./
INFLUXDB_HOSTNAME=ui-influxdb
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
broker_address=localhost
broker_port=61610
broker_username=morphemic
......
#AMQ_HOST=ems
#AMQ_USER=aaa
#AMQ_PASSWORD=111
#AMQ_PORT_BROKER=61610
APP_NAME=default_application
METHOD=exponential_smoothing
# in file -> METHOD=Holt-Winters
#DATA_PATH=./
INFLUXDB_HOSTNAME=ui-influxdb
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
broker_address=localhost
broker_port=61610
broker_username=morphemic
......
......@@ -135,7 +135,7 @@ def calculate_and_publish_predictions(prediction_horizon,maximum_time_required_f
#Below we subtract one reconfiguration interval, as we cannot send a prediction for a time point later than one prediction_horizon interval
wait_time = State.next_prediction_time - prediction_horizon - time.time()
print("Waiting for "+str((int(wait_time*100))/100)+" seconds")
print("Waiting for "+str((int(wait_time*100))/100)+" seconds from time "+str(time.time()))
if (wait_time>0):
time.sleep(wait_time)
if(not Listener.start_forecasting):
......@@ -241,7 +241,7 @@ class Listener(messaging.listener.MorphemicListener):
print("Problem while retrieving epoch start and/or prediction_horizon")
return
maximum_time_required_for_prediction = State.prediction_processing_time_safety_margin_seconds #initialization, assuming 20 seconds processing time to derive a first prediction
maximum_time_required_for_prediction = State.prediction_processing_time_safety_margin_seconds #initialization, assuming X seconds processing time to derive a first prediction
if ((self.prediction_thread is None) or (not self.prediction_thread.is_alive())):
self.prediction_thread = threading.Thread(target = calculate_and_publish_predictions, args =[prediction_horizon,maximum_time_required_for_prediction])
self.prediction_thread.start()
......
......@@ -5,28 +5,40 @@ class State:
"""
Fail-safe default values introduced below
"""
#Used to create the dataset from the InfluxDB
application_name = "default_application"
influxdb_dbname = "morphemic"
influxdb_password = "password"
influxdb_username = "morphemic"
influxdb_port = 8086
influxdb_hostname = "ui-influxdb"
path_to_datasets = "./datasets"
dataset_file_name = "exponential_smoothing_dataset.csv"
number_of_days_to_use_data_from = 365
#Forecaster variables
metrics_to_predict = []
epoch_start = 0
next_prediction_time = 0
previous_prediction = None
configuration_file_location="/home/src/r_predictors/prediction_configuration.properties"
configuration_details = Properties()
broker_address = "localhost"
broker_port = 61610
broker_username = "morphemic"
broker_password = "morphemic"
path_to_datasets = "./datasets"
dataset_file_name = None
number_of_days_to_use_data_from = 365
prediction_processing_time_safety_margin_seconds = None
connection = None
prediction_processing_time_safety_margin_seconds = 20
disconnected = True
disconnection_handler = threading.Condition()
initial_metric_list_received = False
testing_prediction_functionality = False
#Connection details
connection = None
broker_address = "localhost"
broker_port = 61610
broker_username = "morphemic"
broker_password = "morphemic"
@staticmethod
def check_stale_connection():
return (not State.connection.conn.is_connected())
......@@ -20,6 +20,13 @@ class Utilities:
State.broker_username = State.configuration_details.get("broker_username").data
State.broker_password = State.configuration_details.get("broker_password").data
State.influxdb_hostname = State.configuration_details.get("INFLUXDB_HOSTNAME").data
State.influxdb_port = int(State.configuration_details.get("INFLUXDB_PORT").data)
State.influxdb_username = State.configuration_details.get("INFLUXDB_USERNAME").data
State.influxdb_password = State.configuration_details.get("INFLUXDB_PASSWORD").data
State.influxdb_dbname = State.configuration_details.get("INFLUXDB_DBNAME").data
State.influxdb_org = State.configuration_details.get("INFLUXDB_ORG").data
State.application_name = State.configuration_details.get("APP_NAME").data
#This method accesses influx db to retrieve the most recent metric values.
@staticmethod
def update_monitoring_data():
......@@ -28,6 +35,8 @@ class Utilities:
print("Starting dataset creation process...")
try:
"""
Deprecated functionality to retrieve dataset creation details. Relevant functionality moved inside the load configuration method
influxdb_hostname = os.environ.get("INFLUXDB_HOSTNAME","localhost")
influxdb_port = int(os.environ.get("INFLUXDB_PORT","8086"))
influxdb_username = os.environ.get("INFLUXDB_USERNAME","morphemic")
......@@ -35,20 +44,21 @@ class Utilities:
influxdb_dbname = os.environ.get("INFLUXDB_DBNAME","morphemic")
influxdb_org = os.environ.get("INFLUXDB_ORG","morphemic")
application_name = "default_application"
"""
#_start_collection = None # '30m','1h', '2d', #None for everything
_start_collection = str(State.number_of_days_to_use_data_from)+"d" # '30m','1h', '2d', None for everything
pathlib.Path(State.path_to_datasets).mkdir(parents=True, exist_ok=True)
configs = {
'hostname': influxdb_hostname,
'port': influxdb_port,
'username': influxdb_username,
'password': influxdb_password,
'dbname': influxdb_dbname,
'hostname': State.influxdb_hostname,
'port': State.influxdb_port,
'username': State.influxdb_username,
'password': State.influxdb_password,
'dbname': State.influxdb_dbname,
'path_dataset': State.path_to_datasets
}
datasetmaker = DatasetMaker(application_name,_start_collection,configs)
datasetmaker = DatasetMaker(State.application_name,_start_collection,configs)
response = datasetmaker.make()
print ("Dataset creation process finished with response "+str(response))
......
......@@ -28,6 +28,6 @@ if __name__=="__main__":
#Below we subtract one reconfiguration interval, as we cannot send a prediction for a time point later than one prediction_horizon interval
wait_time = next_prediction_time - horizon - time.time()
print("Waiting for "+str((int(wait_time*100))/100)+" seconds")
print("Waiting for "+str((int(wait_time*100))/100)+" seconds from time "+str(time.time()))
if (wait_time>0):
time.sleep(wait_time)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment