Commit 1e9d67d2 authored by Andreas Tsagkaropoulos's avatar Andreas Tsagkaropoulos
Browse files

Improvements to logging and control flow

Try to reconnect to broker only if 5 seconds have elapsed
Implemented an attempt to create a dataset using all data if this is not possible using the partial data set in the configuration file
Improvements to README file
parent f559ea1e
......@@ -66,4 +66,15 @@ The correct operation of the component is also dependent on the availability of
When the Docker container has been successfully built, to start it it is enough to execute the following command:
docker run <container_name>
\ No newline at end of file
docker run <container_name>
### Test execution
To quickly test the functionality of the forecaster, assuming that the EMS (or an ActiveMQ broker) has been (or soon will be) setup and is accessible, that the persistence storage module is available, and that the 'latency' and 'memory' metrics are being published to it, the following commands can be issued in order - provided that the broker-client.jar file is available.
1) Publish metrics to predict:
java -jar broker-client.jar publish3 -Umorphemic -Pmorphemic tcp://localhost:61616 metrics_to_predict [{"metric":"latency","level":3,"publish_rate":10000},{"metric":"memory","level":3,"publish_rate":10000}]
2) Publish start forecasting:
java -jar broker-client.jar publish3 -Umorphemic -Pmorphemic tcp://localhost:61616 start_forecasting.exponentialsmoothing {\"metrics\":[\"latency\",\"memory\"],\"timestamp\":1626179164,\"epoch_start\":1626179353,\"number_of_forward_predictions\":8,\"prediction_horizon\":120}
......@@ -222,7 +222,11 @@ class Listener(messaging.listener.MorphemicListener):
State.testing_prediction_functionality = True
elif self.get_topic_name(headers) == 'start_forecasting.exponentialsmoothing':
State.metrics_to_predict = json.loads(body)["metrics"]
try:
State.metrics_to_predict = json.loads(body)["metrics"]
except Exception as e:
print("Could not load json object to process the start forecasting message \n"+str(body))
return
#waitfor(first period)
if (not State.initial_metric_list_received):
print("The initial metric list has not been received, therefore no predictions are generated")
......@@ -235,6 +239,7 @@ class Listener(messaging.listener.MorphemicListener):
State.next_prediction_time = State.epoch_start
except Exception as e:
print("Problem while retrieving epoch start and/or prediction_horizon")
return
maximum_time_required_for_prediction = State.prediction_processing_time_safety_margin_seconds #initialization, assuming 20 seconds processing time to derive a first prediction
if ((self.prediction_thread is None) or (not self.prediction_thread.is_alive())):
......@@ -261,8 +266,11 @@ class Listener(messaging.listener.MorphemicListener):
logging.error(" %s", body)
def on_disconnected(self):
print('disconnected')
print('Disconnected from broker, so will retry to connect...')
State.disconnected=True
State.disconnection_handler.acquire()
State.disconnection_handler.notifyAll()
State.disconnection_handler.release()
#connection.connect()
......@@ -290,6 +298,7 @@ if __name__ == "__main__":
State.connection = messaging.morphemic.Connection(State.broker_username,State.broker_password,State.broker_address,State.broker_port)
# morphemic = morphemic.model.Model(connection)
State.connection.set_listener(id, Listener())
print("Checking (EMS) broker connectivity state")
if (State.disconnected or State.check_stale_connection()):
try:
State.connection.disconnect() #required to avoid the already connected exception
......@@ -305,6 +314,7 @@ if __name__ == "__main__":
print("Encountered exception while trying to connect to broker")
print(traceback.format_exc())
State.disconnected = True
time.sleep(5)
continue
State.disconnection_handler.acquire()
State.disconnection_handler.wait()
......
......@@ -52,6 +52,11 @@ class Utilities:
response = datasetmaker.make()
print ("Dataset creation process finished with response "+str(response))
if (str(response).startswith("4")):
print("An error response has been detected from the dataset maker, therefore asking for all data from the database in an effort to create a dataset")
_start_collection = None
response = datasetmaker.make()
print("Second dataset creation process finished with response "+str(response))
except Exception as e:
print("Could not create new dataset as an exception was thrown")
print(e)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment