From 44e8d64755838ffb9936cd81ce34bfa722fa93a2 Mon Sep 17 00:00:00 2001 From: Fotis Paraskevopoulos Date: Wed, 5 Oct 2022 19:31:01 +0300 Subject: [PATCH 1/2] Adding a version file, and removing unecessary files from Dockerfile, to make docker build process faster for quicker testing on remote maching --- morphemic-forecasting-eshybrid/Dockerfile | 8 - morphemic-forecasting-eshybrid/main.py | 1 + morphemic-forecasting-eshybrid/messaging | 1 - .../messaging/Event.py | 421 ++++++++++++++++++ .../messaging/MorphemicConnection.py | 74 +++ .../messaging/MorphemicListener.py | 49 ++ .../messaging/Payloads.py | 10 + .../messaging/__init__.py | 5 + .../messaging/requirements.txt | 2 + morphemic-forecasting-eshybrid/test_model.py | 13 +- .../test_processor.py | 2 +- morphemic-forecasting-eshybrid/version | 1 + 12 files changed, 573 insertions(+), 14 deletions(-) delete mode 120000 morphemic-forecasting-eshybrid/messaging create mode 100644 morphemic-forecasting-eshybrid/messaging/Event.py create mode 100644 morphemic-forecasting-eshybrid/messaging/MorphemicConnection.py create mode 100644 morphemic-forecasting-eshybrid/messaging/MorphemicListener.py create mode 100644 morphemic-forecasting-eshybrid/messaging/Payloads.py create mode 100644 morphemic-forecasting-eshybrid/messaging/__init__.py create mode 100644 morphemic-forecasting-eshybrid/messaging/requirements.txt create mode 100644 morphemic-forecasting-eshybrid/version diff --git a/morphemic-forecasting-eshybrid/Dockerfile b/morphemic-forecasting-eshybrid/Dockerfile index 1be7f4aa..c3e48a9d 100644 --- a/morphemic-forecasting-eshybrid/Dockerfile +++ b/morphemic-forecasting-eshybrid/Dockerfile @@ -18,14 +18,6 @@ RUN apt-get update && apt-get install -y \ COPY . /app -ADD https://gitlab.ow2.org/melodic/morphemic-preprocessor/-/archive/morphemic-rc2.5/morphemic-preprocessor-morphemic-rc2.5.tar.gz /var/lib/morphemic/ - -RUN cd /var/lib/morphemic/ \ - && tar -zxf morphemic-preprocessor-morphemic-rc2.5.tar.gz \ - && rm -rf /app/messaging \ - && cp -R /var/lib/morphemic/morphemic-preprocessor-morphemic-rc2.5/amq-message-python-library /app/messaging \ - && rm -rf /var/lib/morphemic - COPY docker-entrypoint.sh /app RUN chmod +x /app/docker-entrypoint.sh diff --git a/morphemic-forecasting-eshybrid/main.py b/morphemic-forecasting-eshybrid/main.py index 84fdfd0d..809570ce 100644 --- a/morphemic-forecasting-eshybrid/main.py +++ b/morphemic-forecasting-eshybrid/main.py @@ -10,6 +10,7 @@ logging.basicConfig(level=logging.ERROR) def main(): + print("v2.5.1") parser = argparse.ArgumentParser(description='Run eshybrid forecaster') parser.add_argument('--config', help='Config file to run, default sync.cfg') diff --git a/morphemic-forecasting-eshybrid/messaging b/morphemic-forecasting-eshybrid/messaging deleted file mode 120000 index 6a99709a..00000000 --- a/morphemic-forecasting-eshybrid/messaging +++ /dev/null @@ -1 +0,0 @@ -../amq-message-python-library \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/messaging/Event.py b/morphemic-forecasting-eshybrid/messaging/Event.py new file mode 100644 index 00000000..5b7ee49b --- /dev/null +++ b/morphemic-forecasting-eshybrid/messaging/Event.py @@ -0,0 +1,421 @@ + + +class Metric(enumerate): + """ + [0] (current/detected) Metrics & SLOs Events Format: + + + This event is aggregated by EMS and it is persisted in InfluxDB. Moreover, + Prediction Orchestrator will subscribe and receive the current metrics in order to + evaluate the forecasting methods, according to the defined KPIs (e.g., MAPE) + + * Topic: [metric_name] + > (e.g. MaxCPULoad) + + + { + "metricValue": 12.34, + + "level": 1, + + "timestamp": 143532341251, + + "refersTo": "MySQL_12345", + + "cloud": "AWS-Dublin", + + "provider": "AWS" + + } + + + + https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication + + """ + TIMESTAMP = "timestamp" + METRIC_VALUE = "metricValue" + REFERS_TO = "refersTo" + CLOUD = "cloud" + PROVIDER = "provider" + + + +class PredictionMetric(enumerate): + + """ + [1] Predicted Metrics & SLOs Events Format + + + This event is produced by the Prediction Orchestrator and reflects the final predicted value for a metric. + + - Topic: prediction.[metric_name] + > (e.g. prediction.MaxCPULoad) + + + { + "metricValue": 12.34, + + "level": 1, + + "timestamp": 143532341251, + + "probability": 0.98, + + "confidence_interval " : [8,15] + + "predictionTime": 143532342, + + "refersTo": "MySQL_12345", + + "cloud": "AWS-Dublin", + + "provider": "AWS" + + } + + + https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication + + """ + + _match = "prediction." + + METRICVALUE= "metricValue" + '''Predicted metric value''' + LEVEL= "level" + '''Level of VM where prediction occurred or refers''' + TIMESTAMP= "timestamp" + '''Prediction creation date/time from epoch''' + PROBABILITY= "probability" + '''Probability of the predicted metric value (range 0..1)''' + CONFIDENCE_INTERVAL= "confidence_interval" + '''the probability-confidence interval for the prediction''' + PREDICTION_TIME= "predictionTime" + '''This refers to time point in the imminent future (that is relative to the time + that is needed for reconfiguration) for which the predicted value is considered + valid/accurate (in UNIX Epoch)''' + REFERSTO= "refersTo" + '''The id of the application or component or (VM) host for which the prediction refers to''' + CLOUD= "cloud" + '''Cloud provider of the VM (with location)''' + PROVIDER= "provider" + '''Cloud provider name''' + + + +class MetricsToPredict(enumerate): + + """ + [2] Translator – to – Forecasting Methods/Prediction Orchestrator Events Format + + + This event is produced by the translator, to: + + imform Dataset Maker which metrics should subscribe to in order to aggregate the appropriate tanning dataset in the time-series DB. + instruct each of the Forecasting methods to predict the values of one or more monitoring metrics + inform the Prediction Orchestrator for the metrics which will be forecasted + + * Topic: metrics_to_predict + + + *Note:* This event could be communicated through Mule + + + [ + { + + "metric": "MaxCPULoad", + + "level": 3, + + "publish_rate": 60000, + + }, + + { + + "metric": "MinCPULoad", + + "level": 3, + + "publish_rate": 50000, + + } + + ] + + + https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication + + """ + + _match = "metrics_to_predict" + + METRIC = "metric" + '''name of the metric to predict''' + LEVEL = "level" + '''Level of monitoring topology where this metric may be produced/found''' + PUBLISH_RATE = "publish_rate" + '''expected rate for datapoints regarding the specific metric (according to CAMEL)''' + + +class TraningModels(enumerate): + """ + + [3] Forecasting Methods – to – Prediction Orchestrator Events Format + + + This event is produced by each of the Forecasting methods, to inform the + Prediction Orchestrator that the method has (re-)trained its model for one or more metrics. + + * Topic: training_models + + + { + + "metrics": ["MaxCPULoad","MinCPULoad"]", + + "forecasting_method": "ESHybrid", + + "timestamp": 143532341251, + + } + + + https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication + + """ + _match = "training_models" + + METRICS = "metrics" + '''metrics for which a certain forecasting method has successfully trained or re-trained its model''' + FORECASTING_METHOD = "forecasting_method" + '''the method that is currently re-training its models''' + TIMESTAMP = "timestamp" + '''date/time of model(s) (re-)training''' + + +class IntermediatePrediction(enumerate): + """ + + [4] Forecasting Methods – to – Prediction Orchestrator Events Format + + + This event is produced by each of the Forecasting methods, and is used by the Prediction Orchestrator to determine the final prediction value for the particular metric. + + + * Topic: intermediate_prediction.[forecasting_method].[metric_name] + * (e.g. intermediate_prediction.ESHybrid.MaxCPULoad) + * We note that any component will be able to subscribe to topics like: + * intermediate_prediction.*.MaxCPULoad → gets MaxCPULoad predictions produced by all forecasting methods or + * intermediate_prediction.ESHybrid.* → gets all metrics predictions from ESHybrid method + * We consider that each forecasting method publishes a static (but configurable) number m of predicted values (under the same timestamp) for time points into the future. These time points into the future are relevant to the reconfiguration time that it is needed (and can also be updated). + * For example if we configure m=5 predictions into the future and the reconfiguration time needed is TR=10 minutes, then at t0 a forecasting method publishes 5 events with the same timestamp and prediction times t0+10, t0+20, t0+30, t0+40, t0+50. + + + + { + "metricValue": 12.34, + + "level": 3, + + "timestamp": 143532341251, + + "probability": 0.98, + + "confidence_interval " : [8,15] + + "predictionTime": 143532342, + + "refersTo": "MySQL_12345", + + "cloud": "AWS-Dublin", + + "provider": "AWS" + + } + + + https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication + + """ + + _match="intermediate_prediction." + + METRICVALUE = "metricValue" + '''Predicted metric value (more than one such events will be produced for different time points into the future – this can be valuable to the Prediction Orchestrator in certain situations e.g., forecasting method is unreachable for a time period)''' + + LEVEL = "level" + '''Level of VM where prediction occurred or refers''' + + TIMESTAMP = "timestamp" + '''Prediction creation date/time from epoch''' + + PROBABILITY = "probability" + '''Probability of the predicted metric value (range 0..1)''' + + CONFIDENCE_INTERVAL = "confidence_interval" + '''the probability-confidence interval for the prediction''' + + PREDICTION_TIME = "predictionTime" + '''This refers to time point in the imminent future (that is relative to the time that is needed for reconfiguration) for which the predicted value is considered valid/accurate (in UNIX Epoch)''' + + REFERS_TO = "refersTo" + '''The id of the application or component or (VM) host for which the prediction refers to''' + + CLOUD = "cloud" + '''Cloud provider of the VM (with location)''' + + PROVIDER = "provider" + '''Cloud provider name''' + + + +class Prediction(enumerate): + """ + + [5] Prediction Orchestrator – to – Severity-based SLO Violation Detector Events Format + + + This event is used by the Prediction Orchestrator to inform the SLO Violation Detector about the current values of a metric, which can possibly lead to an SLO Violation detection. + + * Topic: prediction.[metric_name] + * (e.g. prediction.MaxCPULoad) + + + { + "metricValue": 12.34, + + "level": 1, + + "timestamp": 143532341251, + + "probability": 0.98, + + "confidence_interval " : [8,15] + + "predictionTime": 143532342, + + "refersTo": "MySQL_12345", + + "cloud": "AWS-Dublin", + + "provider": "AWS" + + } + + + + https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication + + + """ + + _match = "prediction." + + METRICVALUE = "metricValue" + '''Predicted metric value''' + + LEVEL = "level" + '''Level of VM where prediction occurred or refers''' + + TIMESTAMP = "timestamp" + '''Prediction creation date/time from epoch''' + + PROBABILITY = "probability" + '''Probability of the predicted metric value (range 0..1)''' + + CONFIDENCE_INTERVAL = "confidence_interval" + '''the probability-confidence interval for the prediction''' + + PREDICTIONTIME = "predictionTime" + '''This refers to time point in the imminent future (that is relative to the time that is needed for reconfiguration) for which the predicted value is considered valid/accurate (in UNIX Epoch)''' + + REFERSTO = "refersTo" + '''The id of the application or component or (VM) host for which the prediction refers to''' + + CLOUD = "cloud" + '''Cloud provider of the VM (with location)''' + + PROVIDER = "provider" + '''Cloud provider name''' + + +class StopForecasting(enumerate): + """ + [6] Prediction Orchestrator – to – Forecasting Methods Events Format + + + This event is used by the Prediction Orchestrator to instruct a forecasting method to stop producing predicted values for a selection of metrics. + + + * Topic: stop_forecasting.[forecasting_method] + * Each component that implements a specific forecasting method it should subscribe to its relevant topic (e.g. the ES-Hybrid component should subscribe to stop_forecasting.eshybrid topic) + + + { + "metrics": ["MaxCPULoad","MinCPULoad"], + "timestamp": 143532341251, + } + + https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication + + + """ + + _match="stop_forecasting." + + METRICS = "metrics" + '''metrics for which a certain method should stop producing predictions (because of poor results)''' + TIMESTAMP = "timestamp" + '''date/time of the command of the orchestrator''' + + +class StartForecasting(enumerate): + """ + + [7] Prediction Orchestrator – to – Forecasting Methods Events Format + + This event is used by the Prediction Orchestrator to instruct a forecasting method to start producing predicted values for a selection of metrics. + + + * Topic: start_forecasting.[forecasting_method] + * Each component that implements a specific forecasting method it should subscribe to its relevant topic (e.g. the ES-Hybrid component should subscribe to start_forecasting.eshybrid topic) + * We consider that each forecasting method should publish a static (but configurable) number m of predicted values (under the same timestamp) for time points into the future. These time points into the future are relevant to the reconfiguration time that it is needed (and can also be updated). + * For example if we configure m=5 predictions into the future and the reconfiguration time needed is TR=10 minutes, then at t0 a forecasting method publishes 5 events with the same timestamp and prediction times t0+10, t0+20, t0+30, t0+40, t0+50. + + + + + { + "metrics": ["MaxCPULoad","MinCPULoad"], + + "timestamp": 143532341251, + + "epoch_start": 143532341252, + + "number_of_forward_predictions": 5, + + "prediction_horizon": 600 + + } + + https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication + + + """ + + _match="start_forecasting." + + VERSION = "version" + METRICS = "metrics" + '''metrics for which a certain method should start producing predictions''' + TIMESTAMP = "timestamp" + '''date/time of the command of the orchestrator''' + EPOCH_START = "epoch_start" + '''this time refers to the start time after which all predictions will be considered (i.e. t0)''' + NUMBER_OF_FORWARD_PREDICTIONS = "number_of_forward_predictions" + ''' this is a number that indicates how many time points into the future do we need predictions for.''' + PREDICTION_HORIZON = "prediction_horizon" + '''This time equals to the time (in seconds) that is needed for the platform to implement an application reconfiguration (i.e. TR).''' \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/messaging/MorphemicConnection.py b/morphemic-forecasting-eshybrid/messaging/MorphemicConnection.py new file mode 100644 index 00000000..0c15fbf7 --- /dev/null +++ b/morphemic-forecasting-eshybrid/messaging/MorphemicConnection.py @@ -0,0 +1,74 @@ + +import stomp +import logging +import json + +from stomp.listener import PrintingListener + +class Connection: + + subscriptions = [] + + def __init__(self, username, password, + host='localhost', + port=61613, + debug=False, + **kwargs): + self.username = username + self.password = password + self.hosts = [(host, port)] + self.conn = stomp.Connection(host_and_ports=self.hosts, auto_content_length=False, + timeout=kwargs.get('timeout',180000),keepalive=kwargs.get('keepalive', True)) + + if debug: + logging.debug("Enabling debug") + self.conn.set_listener('print', PrintingListener()) + + def _build_id(self,topic,id): + return "id.%s-%s" % (topic,id) + + def set_listener(self, id, listener): + if self.conn: + self.conn.set_listener(id,listener) + + def subscribe(self,destination, id, ack='auto'): + if not self.conn: + raise RuntimeError('You need to connect first') + + self.conn.subscribe(destination, id, ack) + + def topic(self,destination, id, ack='auto'): + self.subscribe("/topic/%s" % destination ,self._build_id(destination,id),ack) + + def queue(self,destination, id, ack='auto'): + self.subscribe("/queue/%s" % destination ,self._build_id(destination,id),ack) + + def unsubscribe(self, topic,id): + + if not self.conn: + return + self.conn.unsubscribe(self._build_id(topic,id)) + + + def connect(self, wait=True): + + if not self.conn: + return + + self.conn.connect(self.username, self.password, wait=wait) + + def disconnect(self): + self.conn.disconnect() + + def send_to_topic(self,destination, body, headers={}, **kwargs): + + if not self.conn: + logging.error("Connect first") + return + + str = json.dumps(body) + + self.conn.send(destination="/topic/%s" % destination, + body= str, + content_type="application/json", + headers=headers, **kwargs) diff --git a/morphemic-forecasting-eshybrid/messaging/MorphemicListener.py b/morphemic-forecasting-eshybrid/messaging/MorphemicListener.py new file mode 100644 index 00000000..c48f6603 --- /dev/null +++ b/morphemic-forecasting-eshybrid/messaging/MorphemicListener.py @@ -0,0 +1,49 @@ +from json import JSONDecodeError + +from stomp.listener import ConnectionListener +import logging +import json +from slugify import slugify + +class MorphemicListener(ConnectionListener): + + def is_topic(self,headers, event): + if not hasattr(event,"_match"): + return False + match = getattr(event,'_match') + return headers.get('destination').startswith(match) + + + def has_topic_name(self,headers, string): + return headers.get('destination').startswith(string) + + def get_topic_name(self,headers): + return headers.get('destination').replace('/topic/','') + + + def has_topic_name(self,headers, string): + return headers.get('destination').startswith(string) + + def get_topic_name(self,headers): + return headers.get('destination').replace('/topic/','') + + + def on(self,headers, res): + logging.debug("Unknown message %s %s ",headers, res) + pass + + def on_message(self, headers, body): + + logging.debug("Headers %s",headers) + logging.debug(" %s",body) + + try: + res = json.loads(body) + func_name='on_%s' % slugify(headers.get('destination').replace('/topic/',''), separator='_',) + if hasattr(self,func_name): + func = getattr(self, func_name) + func(res) + else: + self.on(headers,res) + except JSONDecodeError: + logging.error("Error decoding %s", body) \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/messaging/Payloads.py b/morphemic-forecasting-eshybrid/messaging/Payloads.py new file mode 100644 index 00000000..5de1adc8 --- /dev/null +++ b/morphemic-forecasting-eshybrid/messaging/Payloads.py @@ -0,0 +1,10 @@ + +class MetricsToPredict: + + + def load(self,body): + self.metrics = body["metrics"] + self.timestamp = body["timestamp"] + self.epoch_start = body["epoch_start"] + self.number_of_forward_predictions = body["number_of_forward_predictions"] + self.prediction_horizon = body["prediction_horizon"] diff --git a/morphemic-forecasting-eshybrid/messaging/__init__.py b/morphemic-forecasting-eshybrid/messaging/__init__.py new file mode 100644 index 00000000..45fe25b1 --- /dev/null +++ b/morphemic-forecasting-eshybrid/messaging/__init__.py @@ -0,0 +1,5 @@ + +from . import MorphemicConnection as morphemic +from . import MorphemicListener as listener +from . import Event as events +from . import Payloads as payloads \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/messaging/requirements.txt b/morphemic-forecasting-eshybrid/messaging/requirements.txt new file mode 100644 index 00000000..2ba66344 --- /dev/null +++ b/morphemic-forecasting-eshybrid/messaging/requirements.txt @@ -0,0 +1,2 @@ +stomp.py==6.1.0 +python-slugify diff --git a/morphemic-forecasting-eshybrid/test_model.py b/morphemic-forecasting-eshybrid/test_model.py index 2fe3ac33..b5c1fc1c 100644 --- a/morphemic-forecasting-eshybrid/test_model.py +++ b/morphemic-forecasting-eshybrid/test_model.py @@ -17,14 +17,14 @@ from morphemic.model_manager import MetricModel logging.basicConfig(level=logging.DEBUG) metrics = [ - "WillFinishTooSoonContext" + "MinimumCoresContext" ] def test(c, metric): - dataset_file = "/Users/fotisp/Development/python/morphemic/local/out/dataset-maker/default_application_no_working.csv" + dataset_file = "/Users/fotisp/Development/python/morphemic/local/out/dataset-maker/default_application.csv" print("Training %s " % dataset_file) horizon = 120 @@ -57,9 +57,14 @@ def test(c, metric): pd_test = pd_test.shift(periods=horizon,freq=frequency) pd_test.reset_index(level=0, inplace=True) + + train_y = data_processor.train_y[data_processor.train_y['unique_id']==3] + train_y.reset_index(inplace=True) + train_x = data_processor.train_x[data_processor.train_x['unique_id']==3] + train_x.reset_index(inplace=True) model._model.fit( - data_processor.train_x, - data_processor.train_y, + train_x, + train_y, shuffle=False, verbose=True) diff --git a/morphemic-forecasting-eshybrid/test_processor.py b/morphemic-forecasting-eshybrid/test_processor.py index a9e34d5a..790c3955 100644 --- a/morphemic-forecasting-eshybrid/test_processor.py +++ b/morphemic-forecasting-eshybrid/test_processor.py @@ -17,7 +17,7 @@ metrics = [ def test(metric): - dataset_file = "/Users/fotisp/Development/python/morphemic/local/out/dataset-maker/default_application.csv" + dataset_file = "/Users/fotisp/Development/python/morphemic/local/out/dataset-maker/default_application_manual.csv" print("Training %s " % dataset_file) diff --git a/morphemic-forecasting-eshybrid/version b/morphemic-forecasting-eshybrid/version new file mode 100644 index 00000000..8c99f59f --- /dev/null +++ b/morphemic-forecasting-eshybrid/version @@ -0,0 +1 @@ +v2.5.1 \ No newline at end of file -- GitLab From b74c9704bdc0badad6880d4620e4d1c5f1ff4a1a Mon Sep 17 00:00:00 2001 From: Fotis Paraskevopoulos Date: Wed, 12 Oct 2022 19:22:34 +0300 Subject: [PATCH 2/2] MOR-239 Forcing stomp library in order to avoid mishandle of error --- .../src/requirements.txt | 2 +- .../src/runtime/messaging/MorphemicListener.py | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/morphemic-forecasting-exponentialsmoothing/src/requirements.txt b/morphemic-forecasting-exponentialsmoothing/src/requirements.txt index 0f7c45d0..2bc1ae39 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/requirements.txt +++ b/morphemic-forecasting-exponentialsmoothing/src/requirements.txt @@ -1,3 +1,3 @@ -stomp.py +stomp.py==6.1.0 python-slugify jproperties diff --git a/morphemic-forecasting-exponentialsmoothing/src/runtime/messaging/MorphemicListener.py b/morphemic-forecasting-exponentialsmoothing/src/runtime/messaging/MorphemicListener.py index 1af9e569..c48f6603 100644 --- a/morphemic-forecasting-exponentialsmoothing/src/runtime/messaging/MorphemicListener.py +++ b/morphemic-forecasting-exponentialsmoothing/src/runtime/messaging/MorphemicListener.py @@ -32,11 +32,8 @@ class MorphemicListener(ConnectionListener): logging.debug("Unknown message %s %s ",headers, res) pass + def on_message(self, headers, body): - #def on_message(self, headers,body): - def on_message(self,frame): - headers = frame.headers - body = frame.body logging.debug("Headers %s",headers) logging.debug(" %s",body) -- GitLab