diff --git a/morphemic-persistent-storage/database/data/meta/meta.db b/morphemic-persistent-storage/database/data/meta/meta.db deleted file mode 100644 index 1ffc2dba105f54095bffb34565de252b010024af..0000000000000000000000000000000000000000 Binary files a/morphemic-persistent-storage/database/data/meta/meta.db and /dev/null differ diff --git a/morphemic-persistent-storage/database/inputapi/requirements.txt b/morphemic-persistent-storage/database/inputapi/requirements.txt index 07cf6ff5372397994516407d19602e0a75680843..6bf935bd10389e09c76729f7d32c8c150c8d3a89 100644 --- a/morphemic-persistent-storage/database/inputapi/requirements.txt +++ b/morphemic-persistent-storage/database/inputapi/requirements.txt @@ -1,4 +1,4 @@ -flask influxdb stomp.py -requests \ No newline at end of file +requests +python-slugify \ No newline at end of file diff --git a/morphemic-persistent-storage/database/inputapi/src/__pycache__/activemqlistermanager.cpython-36.pyc b/morphemic-persistent-storage/database/inputapi/src/__pycache__/activemqlistermanager.cpython-36.pyc index 4a08853516ab37d344a4a8a80e464636a73e4c98..6b3b6a0e6389e65bd3c7b37bfadf26b59154fbbf 100644 Binary files a/morphemic-persistent-storage/database/inputapi/src/__pycache__/activemqlistermanager.cpython-36.pyc and b/morphemic-persistent-storage/database/inputapi/src/__pycache__/activemqlistermanager.cpython-36.pyc differ diff --git a/morphemic-persistent-storage/database/inputapi/src/activemqlistermanager.py b/morphemic-persistent-storage/database/inputapi/src/activemqlistermanager.py index 542c15dbb462463a5e1ab592b73a07fd35d7ef92..a90c677ca0b574ea382c7034d6dec8aea0df3d5f 100644 --- a/morphemic-persistent-storage/database/inputapi/src/activemqlistermanager.py +++ b/morphemic-persistent-storage/database/inputapi/src/activemqlistermanager.py @@ -1,13 +1,16 @@ -import stomp, os, json, time +import os, json, time from threading import Thread +from amq_client.MorphemicConnection import Connection +from amq_client.MorphemicListener import MorphemicListener data_format = os.environ.get("DATA_FORMAT", "json") class Listener(object): - def __init__(self, conn, handler): + def __init__(self, conn, handler, topic): self.conn = conn self.count = 0 + self.topic = topic self.handler = handler self.start = time.time() @@ -15,7 +18,7 @@ class Listener(object): print("received an error %s" % frame.body) def on_message(self, frame): - self.handler(frame.body) + self.handler(frame.body, self.topic) class Worker(Thread): @@ -55,10 +58,12 @@ class Worker(Thread): break print("Trying to connect ...") try: - conn = stomp.Connection(host_and_ports=[(self.hostname, self.port)]) - conn.set_listener("", Listener(conn, self.handler)) - conn.connect(login=self.username, passcode=self.password) - conn.subscribe(destination=self.topic, id=1, ack="auto") + #### + conn = Connection(username=self.username, password=self.password, host=self.hostname,port=self.port, debug=True) + conn.connect() + conn.set_listener('', Listener(conn, self.handler, self.topic)) + conn.subscribe(destination=self.topic, id=1, ack='auto') + #### self.status = "started" print("Waiting for messages...") while 1: diff --git a/morphemic-persistent-storage/database/inputapi/src/amq_client/Event.py b/morphemic-persistent-storage/database/inputapi/src/amq_client/Event.py new file mode 100644 index 0000000000000000000000000000000000000000..52dbc842ef3c729386bed8f8e7cd4b92bc9420c6 --- /dev/null +++ b/morphemic-persistent-storage/database/inputapi/src/amq_client/Event.py @@ -0,0 +1,420 @@ + + +class Metric(enumerate): + """ + [0] (current/detected) Metrics & SLOs Events Format: + + + This event is aggregated by EMS and it is persisted in InfluxDB. Moreover, + Prediction Orchestrator will subscribe and receive the current metrics in order to + evaluate the forecasting methods, according to the defined KPIs (e.g., MAPE) + + * Topic: [metric_name] + > (e.g. MaxCPULoad) + + + { + "metricValue": 12.34, + + "level": 1, + + "timestamp": 143532341251, + + "refersTo": "MySQL_12345", + + "cloud": "AWS-Dublin", + + "provider": "AWS" + + } + + + + https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication + + """ + TIMESTAMP = "timestamp" + METRIC_VALUE = "metricValue" + REFERS_TO = "refersTo" + CLOUD = "cloud" + PROVIDER = "provider" + + + +class PredictionMetric(enumerate): + + """ + [1] Predicted Metrics & SLOs Events Format + + + This event is produced by the Prediction Orchestrator and reflects the final predicted value for a metric. + + - Topic: prediction.[metric_name] + > (e.g. prediction.MaxCPULoad) + + + { + "metricValue": 12.34, + + "level": 1, + + "timestamp": 143532341251, + + "probability": 0.98, + + "confidence_interval " : [8,15] + + "predictionTime": 143532342, + + "refersTo": "MySQL_12345", + + "cloud": "AWS-Dublin", + + "provider": "AWS" + + } + + + https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication + + """ + + _match = "prediction." + + METRICVALUE= "metricValue" + '''Predicted metric value''' + LEVEL= "level" + '''Level of VM where prediction occurred or refers''' + TIMESTAMP= "timestamp" + '''Prediction creation date/time from epoch''' + PROBABILITY= "probability" + '''Probability of the predicted metric value (range 0..1)''' + CONFIDENCE_INTERVAL= "confidence_interval" + '''the probability-confidence interval for the prediction''' + PREDICTION_TIME= "predictionTime" + '''This refers to time point in the imminent future (that is relative to the time + that is needed for reconfiguration) for which the predicted value is considered + valid/accurate (in UNIX Epoch)''' + REFERSTO= "refersTo" + '''The id of the application or component or (VM) host for which the prediction refers to''' + CLOUD= "cloud" + '''Cloud provider of the VM (with location)''' + PROVIDER= "provider" + '''Cloud provider name''' + + + +class MetricsToPredict(enumerate): + + """ + [2] Translator – to – Forecasting Methods/Prediction Orchestrator Events Format + + + This event is produced by the translator, to: + + imform Dataset Maker which metrics should subscribe to in order to aggregate the appropriate tanning dataset in the time-series DB. + instruct each of the Forecasting methods to predict the values of one or more monitoring metrics + inform the Prediction Orchestrator for the metrics which will be forecasted + + * Topic: metrics_to_predict + + + *Note:* This event could be communicated through Mule + + + [ + { + + "metric": "MaxCPULoad", + + "level": 3, + + "publish_rate": 60000, + + }, + + { + + "metric": "MinCPULoad", + + "level": 3, + + "publish_rate": 50000, + + } + + ] + + + https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication + + """ + + _match = "metrics_to_predict" + + METRIC = "metric" + '''name of the metric to predict''' + LEVEL = "level" + '''Level of monitoring topology where this metric may be produced/found''' + PUBLISH_RATE = "publish_rate" + '''expected rate for datapoints regarding the specific metric (according to CAMEL)''' + + +class TraningModels(enumerate): + """ + + [3] Forecasting Methods – to – Prediction Orchestrator Events Format + + + This event is produced by each of the Forecasting methods, to inform the + Prediction Orchestrator that the method has (re-)trained its model for one or more metrics. + + * Topic: training_models + + + { + + "metrics": ["MaxCPULoad","MinCPULoad"]", + + "forecasting_method": "ESHybrid", + + "timestamp": 143532341251, + + } + + + https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication + + """ + _match = "training_models" + + METRICS = "metrics" + '''metrics for which a certain forecasting method has successfully trained or re-trained its model''' + FORECASTING_METHOD = "forecasting_method" + '''the method that is currently re-training its models''' + TIMESTAMP = "timestamp" + '''date/time of model(s) (re-)training''' + + +class IntermediatePrediction(enumerate): + """ + + [4] Forecasting Methods – to – Prediction Orchestrator Events Format + + + This event is produced by each of the Forecasting methods, and is used by the Prediction Orchestrator to determine the final prediction value for the particular metric. + + + * Topic: intermediate_prediction.[forecasting_method].[metric_name] + * (e.g. intermediate_prediction.ESHybrid.MaxCPULoad) + * We note that any component will be able to subscribe to topics like: + * intermediate_prediction.*.MaxCPULoad → gets MaxCPULoad predictions produced by all forecasting methods or + * intermediate_prediction.ESHybrid.* → gets all metrics predictions from ESHybrid method + * We consider that each forecasting method publishes a static (but configurable) number m of predicted values (under the same timestamp) for time points into the future. These time points into the future are relevant to the reconfiguration time that it is needed (and can also be updated). + * For example if we configure m=5 predictions into the future and the reconfiguration time needed is TR=10 minutes, then at t0 a forecasting method publishes 5 events with the same timestamp and prediction times t0+10, t0+20, t0+30, t0+40, t0+50. + + + + { + "metricValue": 12.34, + + "level": 3, + + "timestamp": 143532341251, + + "probability": 0.98, + + "confidence_interval " : [8,15] + + "predictionTime": 143532342, + + "refersTo": "MySQL_12345", + + "cloud": "AWS-Dublin", + + "provider": "AWS" + + } + + + https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication + + """ + + _match="intermediate_prediction." + + METRICVALUE = "metricValue" + '''Predicted metric value (more than one such events will be produced for different time points into the future – this can be valuable to the Prediction Orchestrator in certain situations e.g., forecasting method is unreachable for a time period)''' + + LEVEL = "level" + '''Level of VM where prediction occurred or refers''' + + TIMESTAMP = "timestamp" + '''Prediction creation date/time from epoch''' + + PROBABILITY = "probability" + '''Probability of the predicted metric value (range 0..1)''' + + CONFIDENCE_INTERVAL = "confidence_interval" + '''the probability-confidence interval for the prediction''' + + PREDICTION_TIME = "predictionTime" + '''This refers to time point in the imminent future (that is relative to the time that is needed for reconfiguration) for which the predicted value is considered valid/accurate (in UNIX Epoch)''' + + REFERS_TO = "refersTo" + '''The id of the application or component or (VM) host for which the prediction refers to''' + + CLOUD = "cloud" + '''Cloud provider of the VM (with location)''' + + PROVIDER = "provider" + '''Cloud provider name''' + + + +class Prediction(enumerate): + """ + + [5] Prediction Orchestrator – to – Severity-based SLO Violation Detector Events Format + + + This event is used by the Prediction Orchestrator to inform the SLO Violation Detector about the current values of a metric, which can possibly lead to an SLO Violation detection. + + * Topic: prediction.[metric_name] + * (e.g. prediction.MaxCPULoad) + + + { + "metricValue": 12.34, + + "level": 1, + + "timestamp": 143532341251, + + "probability": 0.98, + + "confidence_interval " : [8,15] + + "predictionTime": 143532342, + + "refersTo": "MySQL_12345", + + "cloud": "AWS-Dublin", + + "provider": "AWS" + + } + + + + https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication + + + """ + + _match = "prediction." + + METRICVALUE = "metricValue" + '''Predicted metric value''' + + LEVEL = "level" + '''Level of VM where prediction occurred or refers''' + + TIMESTAMP = "timestamp" + '''Prediction creation date/time from epoch''' + + PROBABILITY = "probability" + '''Probability of the predicted metric value (range 0..1)''' + + CONFIDENCE_INTERVAL = "confidence_interval" + '''the probability-confidence interval for the prediction''' + + PREDICTIONTIME = "predictionTime" + '''This refers to time point in the imminent future (that is relative to the time that is needed for reconfiguration) for which the predicted value is considered valid/accurate (in UNIX Epoch)''' + + REFERSTO = "refersTo" + '''The id of the application or component or (VM) host for which the prediction refers to''' + + CLOUD = "cloud" + '''Cloud provider of the VM (with location)''' + + PROVIDER = "provider" + '''Cloud provider name''' + + +class StopForecasting(enumerate): + """ + [6] Prediction Orchestrator – to – Forecasting Methods Events Format + + + This event is used by the Prediction Orchestrator to instruct a forecasting method to stop producing predicted values for a selection of metrics. + + + * Topic: stop_forecasting.[forecasting_method] + * Each component that implements a specific forecasting method it should subscribe to its relevant topic (e.g. the ES-Hybrid component should subscribe to stop_forecasting.eshybrid topic) + + + { + "metrics": ["MaxCPULoad","MinCPULoad"], + "timestamp": 143532341251, + } + + https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication + + + """ + + _match="stop_forecasting." + + METRICS = "metrics" + '''metrics for which a certain method should stop producing predictions (because of poor results)''' + TIMESTAMP = "timestamp" + '''date/time of the command of the orchestrator''' + + +class StartForecasting(enumerate): + """ + + [7] Prediction Orchestrator – to – Forecasting Methods Events Format + + This event is used by the Prediction Orchestrator to instruct a forecasting method to start producing predicted values for a selection of metrics. + + + * Topic: start_forecasting.[forecasting_method] + * Each component that implements a specific forecasting method it should subscribe to its relevant topic (e.g. the ES-Hybrid component should subscribe to start_forecasting.eshybrid topic) + * We consider that each forecasting method should publish a static (but configurable) number m of predicted values (under the same timestamp) for time points into the future. These time points into the future are relevant to the reconfiguration time that it is needed (and can also be updated). + * For example if we configure m=5 predictions into the future and the reconfiguration time needed is TR=10 minutes, then at t0 a forecasting method publishes 5 events with the same timestamp and prediction times t0+10, t0+20, t0+30, t0+40, t0+50. + + + + + { + "metrics": ["MaxCPULoad","MinCPULoad"], + + "timestamp": 143532341251, + + "epoch_start": 143532341252, + + "number_of_forward_predictions": 5, + + "prediction_horizon": 600 + + } + + https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication + + + """ + + _match="start_forecasting." + + METRICS = "metrics" + '''metrics for which a certain method should start producing predictions''' + TIMESTAMP = "timestamp" + '''date/time of the command of the orchestrator''' + EPOCH_START = "epoch_start" + '''this time refers to the start time after which all predictions will be considered (i.e. t0)''' + NUMBER_OF_FORWARD_PREDICTIONS = "number_of_forward_predictions" + ''' this is a number that indicates how many time points into the future do we need predictions for.''' + PREDICTION_HORIZON = "prediction_horizon" + '''This time equals to the time (in seconds) that is needed for the platform to implement an application reconfiguration (i.e. TR).''' \ No newline at end of file diff --git a/morphemic-persistent-storage/database/inputapi/src/amq_client/MorphemicConnection.py b/morphemic-persistent-storage/database/inputapi/src/amq_client/MorphemicConnection.py new file mode 100644 index 0000000000000000000000000000000000000000..9fd61b18c35ed6f26bd3197c53d5209aaa77ef41 --- /dev/null +++ b/morphemic-persistent-storage/database/inputapi/src/amq_client/MorphemicConnection.py @@ -0,0 +1,71 @@ +import stomp +import logging +import json + +from stomp.listener import PrintingListener + +class Connection: + + subscriptions = [] + + def __init__(self, username, password, + host='localhost', + port=61613, + debug=False): + self.username = username + self.password = password + self.hosts = [(host, port)] + self.conn = stomp.Connection(host_and_ports=self.hosts, auto_content_length=False) + + if debug: + logging.debug("Enabling debug") + self.conn.set_listener('print', PrintingListener()) + + def _build_id(self,topic,id): + return "id.%s-%s" % (topic,id) + + def set_listener(self, id, listener): + if self.conn: + self.conn.set_listener(id,listener) + + def subscribe(self,destination, id, ack='auto'): + if not self.conn: + raise RuntimeError('You need to connect first') + + self.conn.subscribe(destination, id, ack) + + def topic(self,destination, id, ack='auto'): + self.subscribe("/topic/%s" % destination ,self._build_id(destination,id),ack) + + def queue(self,destination, id, ack='auto'): + self.subscribe("/queue/%s" % destination ,self._build_id(destination,id),ack) + + def unsubscribe(self, topic,id): + + if not self.conn: + return + self.conn.unsubscribe(self._build_id(topic,id)) + + + def connect(self, wait=True): + + if not self.conn: + return + + self.conn.connect(self.username, self.password, wait=wait) + + def disconnect(self): + self.conn.disconnect() + + def send_to_topic(self,destination, body, headers={}, **kwargs): + + if not self.conn: + logging.error("Connect first") + return + + str = json.dumps(body) + + self.conn.send(destination="/topic/%s" % destination, + body= str, + content_type="application/json", + headers=headers, **kwargs) diff --git a/morphemic-persistent-storage/database/inputapi/src/amq_client/MorphemicListener.py b/morphemic-persistent-storage/database/inputapi/src/amq_client/MorphemicListener.py new file mode 100644 index 0000000000000000000000000000000000000000..35e2eeeb41cf864f2ca4d47dded4ee75e3c51721 --- /dev/null +++ b/morphemic-persistent-storage/database/inputapi/src/amq_client/MorphemicListener.py @@ -0,0 +1,48 @@ +from json import JSONDecodeError + +from stomp.listener import ConnectionListener +import logging +import json +from slugify import slugify + +class MorphemicListener(ConnectionListener): + def is_topic(self,headers, event): + if not hasattr(event,"_match"): + return False + match = getattr(event,'_match') + return headers.get('destination').startswith(match) + + + def has_topic_name(self,headers, string): + return headers.get('destination').startswith(string) + + def get_topic_name(self,headers): + return headers.get('destination').replace('/topic/','') + + + def has_topic_name(self,headers, string): + return headers.get('destination').startswith(string) + + def get_topic_name(self,headers): + return headers.get('destination').replace('/topic/','') + + + def on(self,headers, res): + logging.debug("Unknown message %s %s ",headers, res) + pass + + def on_message(self, headers, body): + + logging.debug("Headers %s",headers) + logging.debug(" %s",body) + + try: + res = json.loads(body) + func_name='on_%s' % slugify(headers.get('destination').replace('/topic/',''), separator='_',) + if hasattr(self,func_name): + func = getattr(self, func_name) + func(res) + else: + self.on(headers,res) + except JSONDecodeError: + logging.error("Error decoding %s", body) \ No newline at end of file diff --git a/morphemic-persistent-storage/database/inputapi/src/amq_client/Payloads.py b/morphemic-persistent-storage/database/inputapi/src/amq_client/Payloads.py new file mode 100644 index 0000000000000000000000000000000000000000..5de1adc844e289db185d74f7c7d76bca0045d686 --- /dev/null +++ b/morphemic-persistent-storage/database/inputapi/src/amq_client/Payloads.py @@ -0,0 +1,10 @@ + +class MetricsToPredict: + + + def load(self,body): + self.metrics = body["metrics"] + self.timestamp = body["timestamp"] + self.epoch_start = body["epoch_start"] + self.number_of_forward_predictions = body["number_of_forward_predictions"] + self.prediction_horizon = body["prediction_horizon"] diff --git a/morphemic-persistent-storage/database/inputapi/src/amq_client/__init__.py b/morphemic-persistent-storage/database/inputapi/src/amq_client/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..45fe25b1772cc225e9ec364aa83620bd07f4e3a7 --- /dev/null +++ b/morphemic-persistent-storage/database/inputapi/src/amq_client/__init__.py @@ -0,0 +1,5 @@ + +from . import MorphemicConnection as morphemic +from . import MorphemicListener as listener +from . import Event as events +from . import Payloads as payloads \ No newline at end of file diff --git a/morphemic-persistent-storage/database/inputapi/src/amq_client/__pycache__/Event.cpython-36.pyc b/morphemic-persistent-storage/database/inputapi/src/amq_client/__pycache__/Event.cpython-36.pyc new file mode 100644 index 0000000000000000000000000000000000000000..506d8457fd4e3d365eeed89da1e927ff6e9709a0 Binary files /dev/null and b/morphemic-persistent-storage/database/inputapi/src/amq_client/__pycache__/Event.cpython-36.pyc differ diff --git a/morphemic-persistent-storage/database/inputapi/src/amq_client/__pycache__/MorphemicConnection.cpython-36.pyc b/morphemic-persistent-storage/database/inputapi/src/amq_client/__pycache__/MorphemicConnection.cpython-36.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8b06e3807a31297d0c3bfca458bd3b5c586b5c10 Binary files /dev/null and b/morphemic-persistent-storage/database/inputapi/src/amq_client/__pycache__/MorphemicConnection.cpython-36.pyc differ diff --git a/morphemic-persistent-storage/database/inputapi/src/amq_client/__pycache__/MorphemicListener.cpython-36.pyc b/morphemic-persistent-storage/database/inputapi/src/amq_client/__pycache__/MorphemicListener.cpython-36.pyc new file mode 100644 index 0000000000000000000000000000000000000000..a6a0c4f95cec6936c9deb635bb19477f7bd3e9be Binary files /dev/null and b/morphemic-persistent-storage/database/inputapi/src/amq_client/__pycache__/MorphemicListener.cpython-36.pyc differ diff --git a/morphemic-persistent-storage/database/inputapi/src/amq_client/__pycache__/Payloads.cpython-36.pyc b/morphemic-persistent-storage/database/inputapi/src/amq_client/__pycache__/Payloads.cpython-36.pyc new file mode 100644 index 0000000000000000000000000000000000000000..1f1a799bde9d88212253b4d049e50f8f68799e6f Binary files /dev/null and b/morphemic-persistent-storage/database/inputapi/src/amq_client/__pycache__/Payloads.cpython-36.pyc differ diff --git a/morphemic-persistent-storage/database/inputapi/src/amq_client/__pycache__/__init__.cpython-36.pyc b/morphemic-persistent-storage/database/inputapi/src/amq_client/__pycache__/__init__.cpython-36.pyc new file mode 100644 index 0000000000000000000000000000000000000000..a317efdfae353ad4c2716a589dfdd263b7df7e83 Binary files /dev/null and b/morphemic-persistent-storage/database/inputapi/src/amq_client/__pycache__/__init__.cpython-36.pyc differ diff --git a/morphemic-persistent-storage/database/inputapi/src/amq_client/requirements.txt b/morphemic-persistent-storage/database/inputapi/src/amq_client/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..2b4d2df0ef555812200771260aea6cd66308bdd8 --- /dev/null +++ b/morphemic-persistent-storage/database/inputapi/src/amq_client/requirements.txt @@ -0,0 +1,28 @@ +setuptools +stomp.py + +python-slugify + +pandas +tensorflow +numpy>=1.9.0 +scipy>=0.14.1 + +joblib>=0.14.1 +scikit-learn>=0.24.0,<0.25.0 + +dask +distributed>=2.2.0 +pyyaml +pandas>=1.0 +liac-arff + +ConfigSpace>=0.4.14,<0.5 +pynisher>=0.6.3 +pyrfr>=0.7,<0.9 +smac>=0.13.1,<0.14 +emcee +pyDOE +scikit-optimize +sklearn +matplotlib diff --git a/morphemic-persistent-storage/database/inputapi/src/app.py b/morphemic-persistent-storage/database/inputapi/src/app.py index 2d9fcacd8ccb22bba808e7a6dbdfd34f53dc8433..a1048d6d326a18514f9939997b5827589680d0e5 100644 --- a/morphemic-persistent-storage/database/inputapi/src/app.py +++ b/morphemic-persistent-storage/database/inputapi/src/app.py @@ -1,6 +1,6 @@ -import json, time, os, requests, stomp -from flask import Flask, request, Response -from activemqlistermanager import ActiveMQManager +import json, time, os, requests +from amq_client.MorphemicConnection import Connection +from activemqlistermanager import ActiveMQManager, Worker from influxdb import InfluxDBClient from threading import Thread @@ -22,14 +22,17 @@ influxdb_username = os.environ.get("INFLUXDB_USERNAME", "morphemic") influxdb_password = os.environ.get("INFLUXDB_PASSWORD", "password") influxdb_dbname = os.environ.get("INFLUXDB_DBNAME", "morphemic") ps_management_queue = os.environ.get("PS_MANAGEMENT_QUEUE", "persistent_storage") +metrics_to_consume_topic = os.environ.get("METRICS_TO_CONSUME","/topic/metrics_to_predict") # "hostname": "localhost", "port": 61610, "topic": "static-topic-1", "metric": "somekey","username":"aaa","password": "111" -activemq_hostname = os.environ.get("ACTIVEMQ_HOST", "localhost") -activemq_port = int(os.environ.get("ACTIVEMQ_PORT", "61613")) -activemq_topic = os.environ.get("ACTIVEMQ_TOPIC", "static-topic-1") +activemq_hostname = os.environ.get("ACTIVEMQ_HOST", "147.102.17.76") +activemq_port = int(os.environ.get("ACTIVEMQ_PORT", "61610")) +activemq_topic = os.environ.get("ACTIVEMQ_TOPIC", "AAAA") activemq_subs_key = os.environ.get("ACTIVEMQ_SUBS_KEY", "subs-1") activemq_username = os.environ.get("ACTIVEMQ_USERNAME", "aaa") activemq_password = os.environ.get("ACTIVEMQ_PASSWORD", "111") +# +metric_to_predict_queue = os.environ.get("METRIC_TO_PREDICT","/topic/metrics_to_predict") class Publisher(Thread): @@ -44,8 +47,8 @@ class Publisher(Thread): def connect(self): try: - self.conn = stomp.Connection(host_and_ports=[(self.host, self.port)]) - self.conn.connect(login=self.username, passcode=self.password) + self.conn = Connection(username=self.username, password=self.password, host=self.hostname,port=self.port, debug=False) + self.conn.connect() print("Publisher is connected to ActiveMQ") except Exception as e: @@ -60,11 +63,8 @@ class Publisher(Thread): if len(self.queue) > 0: try: data, destination = self.queue.pop(0) - self.conn.send( - body=json.dumps(data), - destination=destination, - persistent="false", - ) + #self.conn.send(body=json.dumps(data),destination=destination,persistent="false",) + self.client.send_to_topic(destination, data) except Exception as e: print("An exception occured while publishing") print(e) @@ -325,7 +325,7 @@ class Subscription: self.setHostname(_json["hostname"]) self.setPort(_json["port"]) self.setTopic(_json["topic"]) - self.setMetric(_json["key"]) + self.setMetric(_json["metric"]) self.setUsername(_json["username"]) self.setPassword(_json["password"]) except Exception as e: @@ -364,15 +364,15 @@ class Ingestor(Thread): # {'timestamp': time.time(),'name':metric,'labels':{'hostname':'localhost','application':'application-1'},'value': measurement} pass - def addToList(self, content): - self.list_content.append(content) + def addToList(self, content, topic): + self.list_content.append((content,topic)) def run(self): self.connect() while True: size = len(self.list_content) - for content in self.list_content: - if self.insert(content): + for _couple in self.list_content: + if self.insert(_couple): self.data_points_inserted += 1 else: self.data_points_error += 1 @@ -410,22 +410,27 @@ class Ingestor(Thread): print(e) time.sleep(5) - def insert(self, content): + def insert(self, _couple): # {'timestamp': time.time(),'name':metric,'labels':{'hostname':'localhost','application':'application-1'},'value': measurement} # {“metricName”: “name”, “metricValue”: value, “timestamp”: time, “application”: “applicationName”, “level”: 1…} fields = None + topic = None try: + content, topic = _couple[0], _couple[1] fields = json.loads(content) except Exception as e: print("Cannot decode json") #print("content", content) return False # self.tolerance_manager.addTime(fields["application"], fields["timestamp"]) - application = fields[metric_name_field_application] - timestamp = fields[metric_name_field_timestamp] - metric = fields[metric_name_field_name] + application = None + if not metric_name_field_application in fields: + application = "default_application" + else: + application = fields[metric_name_field_application] + timestamp = int(fields[metric_name_field_timestamp]/1000) #time in sec + metric = topic[topic.rindex('/')+1:] value = fields[metric_name_field_value] - backet = self.backer_manager.getBacketBasedOnTime(application, timestamp) if backet != None: @@ -436,9 +441,10 @@ class Ingestor(Thread): # backet.setTolerance(tolerance) else: backet = Backet(application, 2, timestamp) - self.backer_manager.addBacket(application, backet) + backet.addLabels({"application": application, "level": fields["level"]}) backet.insert(metric, value, timestamp) - + self.backer_manager.addBacket(application, backet) + backet = self.backer_manager.popBacket(application) if backet != None: metrics = backet.getBacketSeries() @@ -453,6 +459,7 @@ class Ingestor(Thread): # {"hostname": "localhost", "application": "my_first_app"}, # "metrics": { "response_time": 42, "cpu_usage": 3 , "memory": 74 } # } + print(data) point = { "measurement": data["labels"]["application"], "fields": data["metrics"], @@ -471,67 +478,18 @@ class Ingestor(Thread): class InputApi: - def __init__( - self, - influxdb_hostname, - influxdb_port, - influxdb_username, - influxdb_password, - influxdb_database, - ): + def __init__(self,influxdb_hostname,influxdb_port,influxdb_username,influxdb_password,influxdb_database,): self.consumer_manager = None self.subscriptions = {} self.consumers = {} - self.ingestor = Ingestor( - influxdb_hostname, - influxdb_port, - influxdb_username, - influxdb_password, - influxdb_database, - ) + self.ingestor = Ingestor(influxdb_hostname,influxdb_port,influxdb_username,influxdb_password,influxdb_database,) self.data_points = 0 self.last_evaluation = time.time() self.consumer_controller = ConsumerManager() self.evaluation_interval = 5 + self.list_metrics = [] self.publisher = Publisher() - def getActiveMQParameters(self): - conns = [ - { - "hostname": activemq_hostname, - "port": activemq_port, - "topic": activemq_topic, - "key": activemq_subs_key, - "username": activemq_username, - "password": activemq_password, - }, - { - "hostname": activemq_hostname, - "port": activemq_port, - "topic": activemq_topic, - "key": activemq_subs_key + "-2", - "username": activemq_username, - "password": activemq_password, - }, - { - "hostname": activemq_hostname, - "port": activemq_port, - "topic": activemq_topic, - "key": activemq_subs_key + "-3", - "username": activemq_username, - "password": activemq_password, - }, - { - "hostname": activemq_hostname, - "port": activemq_port, - "topic": ps_management_queue, - "key": "management", - "username": activemq_username, - "password": activemq_password, - }, - ] - return conns - def saveSubscriptions(self, data): if type(data) != type([]): print("Error data type") @@ -540,11 +498,7 @@ class InputApi: for _json in data: subs = Subscription() subs.load(_json) - print( - "Subscription hostname : {0}, port : {1}, topic: {2}, metric: {3} added".format( - _json["hostname"], _json["port"], _json["topic"], _json["key"] - ) - ) + print("Subscription hostname : {0}, port : {1}, topic: {2}, metric: {3} added".format(_json["hostname"], _json["port"], _json["topic"], _json["metric"])) self.subscriptions[subs.getMetric()] = subs def prepareDatasetRequest(self, _json): @@ -557,14 +511,7 @@ class InputApi: for key in self.subscriptions.keys(): if not key in self.consumers: subs = self.subscriptions[key] - self.consumer_manager.startWorker( - subs.getHostname(), - subs.getPort(), - subs.getUsername(), - subs.getPassword(), - subs.getTopic(), - key, - ) + self.consumer_manager.startWorker(subs.getHostname(),subs.getPort(),subs.getUsername(),subs.getPassword(),subs.getTopic(),key) def handleSubscriptions(self, data): try: @@ -577,6 +524,32 @@ class InputApi: print("Error in handle subscription") print(e) + def subscribToOrchestrator(self): + subs = Worker(activemq_hostname, activemq_port, activemq_username, activemq_password, metric_to_predict_queue, self.metricToPredict, 10, 100) + subs.start() + + def metricToPredict(self, data): + print("Metric to consume event received") + try: + _json = json.loads(data) + #metrics = ["served_request","request_rate","response_time","performance","cpu_usage","memory"] + result = [] + for group in _json: + conn = { + "hostname": activemq_hostname, + "port": activemq_port, + "metric": group['metric'], + "topic": "/topic/{0}".format(group['metric']), #activemq_topic, + "key": activemq_subs_key, + "username": activemq_username, + "password": activemq_password, + } + result.append(conn) + self.saveSubscriptions(result) + self.startConsuming() + except Exception as e: + print(e) + def handleRequest(self, _json): if _json["request"] == "subscribe": if not "name" in _json: @@ -614,13 +587,19 @@ class InputApi: self.publisher.start() self.ingestor.start() self.consumer_manager = ActiveMQManager(self.getData) - self.saveSubscriptions(self.getActiveMQParameters()) - self.startConsuming() + #self.saveSubscriptions(self.getActiveMQParameters()) + #self.startConsuming() + self.subscribToOrchestrator() + #data = [ + # {"refersTo": "default", "level":3, "metric": "avgResponseTime", "publish_rate":3000}, + # {"refersTo": "default", "level":3, "metric": "memory", "publish_rate":3000} + #] + #self.metricToPredict(json.dumps(data)) def getSubscriberSize(self): return len(self.subscriptions.keys()) - def getData(self, data): + def getData(self, data, topic): try: _json = json.loads(data) if "request" in _json: @@ -630,7 +609,7 @@ class InputApi: except Exception as e: print("Non JSON content received") return None - self.ingestor.addToList(data) + self.ingestor.addToList(data, topic) self.data_points += 1 if time.time() - self.last_evaluation > self.evaluation_interval: rate = int(self.data_points / self.evaluation_interval) @@ -640,33 +619,6 @@ class InputApi: self.handleSubscriptions(data) -class EndpointAction(object): - def __init__(self, action): - self.action = action - - def __call__(self, *args): - response = self.action() - return Response(response, status=200, mimetype="application/json") - - -class FlaskAppWrapper(Thread): - app = None - - def __init__(self, name): - self.app = Flask(name) - super(FlaskAppWrapper, self).__init__() - - def run(self): - self.app.run() - - def add_endpoint(self, endpoint=None, endpoint_name=None, handler=None): - self.app.add_url_rule(endpoint, endpoint_name, EndpointAction(handler)) - - -def home(): - return "Welcome to Input API WS" - - if __name__ == "__main__": api = InputApi( influxdb_hostname, diff --git a/morphemic-persistent-storage/docker-compose.yaml b/morphemic-persistent-storage/docker-compose.yaml index e38593b623294f74483a4cd7413b37b6b62fe2bf..f01e8ab94d97c9a166a3e33c0067a3c476b99e14 100644 --- a/morphemic-persistent-storage/docker-compose.yaml +++ b/morphemic-persistent-storage/docker-compose.yaml @@ -2,8 +2,7 @@ version: '2' services: database: - build: - context: ./database + image: jdtotow/persistent_storage container_name: database restart: always env_file: @@ -12,6 +11,10 @@ services: - "./database/data:/var/lib/influxdb" ports: - 8086:8086 + environment: + - "ACTIVEMQ_PORT=61610" + - "ACTIVEMQ_TOPIC=AAAA" + - "ACTIVEMQ_HOST=147.102.17.76" publisher: image: jdtotow/publisher container_name: publisher diff --git a/morphemic-persistent-storage/example/ems/broker-client.jar b/morphemic-persistent-storage/example/ems/broker-client.jar new file mode 100644 index 0000000000000000000000000000000000000000..433351ae63597ee1a1b4274bdb15ede36f874940 Binary files /dev/null and b/morphemic-persistent-storage/example/ems/broker-client.jar differ diff --git a/morphemic-persistent-storage/example/ems/client.sh b/morphemic-persistent-storage/example/ems/client.sh new file mode 100755 index 0000000000000000000000000000000000000000..8e1865e6f1a8b11ee1c702e4ada6bc6893746ea0 --- /dev/null +++ b/morphemic-persistent-storage/example/ems/client.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# +# Copyright (C) 2017-2022 Institute of Communication and Computer Systems (imu.iccs.gr) +# +# This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless +# Esper library is used, in which case it is subject to the terms of General Public License v2.0. +# If a copy of the MPL was not distributed with this file, you can obtain one at +# https://www.mozilla.org/en-US/MPL/2.0/ +# + +MELODIC_CONFIG_DIR=. + +JAVA_OPTS=-Djavax.net.ssl.trustStore=./broker-truststore.p12\ -Djavax.net.ssl.trustStorePassword=melodic\ -Djavax.net.ssl.trustStoreType=pkcs12 +# -Djavax.net.debug=all +# -Djavax.net.debug=ssl,handshake,record + +java $JAVA_OPTS -jar broker-client.jar $*