Commit d3dfb013 authored by Anna Warno's avatar Anna Warno
Browse files

Merge branch 'morphemic-rc2.0' into 'tft_nbeats'

# Conflicts:
#   .gitlab-ci.yml
parents 7718a8c7 3e2ecefa
......@@ -19,7 +19,7 @@
<artifactId>amq-message-java-library</artifactId>
<name>AMQ message Java library</name>
<groupId>gr.ntua.imu.morphemic</groupId>
<version>1.1.0-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
<dependencies>
......@@ -46,6 +46,11 @@
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
<repositories>
......
......@@ -248,8 +248,15 @@ public class BrokerClient {
boolean final_closeConn = _closeConn;
Thread monitor_consumer_stop = new Thread(() -> {
while (!stop_signal.get()) {
//System.out.println("Busy-waiting");
synchronized (stop_signal) {
while (!stop_signal.get()) {
//System.out.println("Busy-waiting");
try {
stop_signal.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
try {
......@@ -329,12 +336,12 @@ public class BrokerClient {
}
// ------------------------------------------------------------------------
/*
public synchronized void openConnection() throws JMSException {
checkProperties();
openConnection(properties.getBrokerUrl(), null, null);
}
*/
public synchronized void openConnection(String connectionString) throws JMSException {
openConnection(connectionString, null, null);
}
......
......@@ -7,6 +7,15 @@ import javax.jms.*;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.logging.Logger;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import static java.util.logging.Level.INFO;
@Slf4j
......@@ -38,7 +47,18 @@ public class BrokerSubscriber {
client.receiveEvents(url, topic, stop_signal,message -> {
try {
if (message!=null) {
function.apply(topic, ((TextMessage) message).getText());
if (message instanceof TextMessage){
function.apply(topic, ((TextMessage) message).getText());
}else if (message instanceof ActiveMQBytesMessage) {
try {
String json_string = (((ActiveMQBytesMessage) message).readUTF());
Logger.getAnonymousLogger().log(INFO, json_string);
function.apply(topic, json_string);
}
catch (Exception e){
e.printStackTrace();
}
}
}
} catch (JMSException j) {
log.info("Shutting down subscriber...");
......
......@@ -56,11 +56,14 @@ public class EventFields {
public static final String prediction_time = "predictionTime";
}
/**
* This event is used to initiate forecasting of one or more monitoring metrics
* This event is used to initiate forecasting of one or more monitoring metrics. Epoch start is the time point on which the first forecast should be sent and number of forward predictions is the number of time points in the future which should be forecasted every prediction horizon seconds
*/
public static class PredictionOrchestratorToForecastingMethodsStartForecastingEventFields{
public static final String metrics = "metrics";
public static final String timestamp = "timestamp";
public static final String epoch_start = "epoch_start";
public static final String number_of_forward_predictions = "number_of_forward_predictions";
public static final String prediction_horizon = "prediction_horizon";
}
/**
* This event is used to stop forecasting of one or more monitoring metrics
......
......@@ -52,12 +52,18 @@ public class TopicNames {
public static String lstm(String metric_name) {
return "intermediate_prediction.lstm."+metric_name;
}
public static String gluon_machines(String metric_name) {
return "intermediate_prediction.gluonmachines."+metric_name;
public static String gluonts(String metric_name) {
return "intermediate_prediction.gluonts."+metric_name;
}
public static String prophet(String metric_name){
return "intermediate_prediction.prophet."+metric_name;
}
public static String cnn(String metric_name) {
return "intermediate_prediction.cnn."+metric_name;
}
public static String tft(String metric_name) {
return "intermediate_prediction.tft."+metric_name;
}
}
......@@ -71,8 +77,10 @@ public class TopicNames {
public static final String tsetlin_machines = "start_forecasting.tsetlinmachines";
public static final String exponential_smoothing = "start_forecasting.exponentialsmoothing";
public static final String lstm = "start_forecasting.lstm";
public static final String gluon_machines = "start_forecasting.gluonmachines";
public static final String gluonts = "start_forecasting.gluonts";
public static final String prophet = "start_forecasting.prophet";
public static final String tft = "start_forecasting.tft";
public static final String cnn = "start_forecasting.cnn";
}
/**
......@@ -85,7 +93,9 @@ public class TopicNames {
public static final String tsetlin_machines = "stop_forecasting.tsetlinmachines";
public static final String exponential_smoothing = "stop_forecasting.exponentialsmoothing";
public static final String lstm = "stop_forecasting.lstm";
public static final String gluon_machines = "stop_forecasting.gluonmachines";
public static final String gluonts = "stop_forecasting.gluonts";
public static final String prophet = "stop_forecasting.prophet";
public static final String tft = "stop_forecasting.tft";
public static final String cnn = "stop_forecasting.cnn";
}
}
......@@ -408,6 +408,7 @@ class StartForecasting(enumerate):
_match="start_forecasting."
VERSION = "version"
METRICS = "metrics"
'''metrics for which a certain method should start producing predictions'''
TIMESTAMP = "timestamp"
......
......@@ -12,11 +12,13 @@ class Connection:
def __init__(self, username, password,
host='localhost',
port=61613,
debug=True):
self.hosts = [(host, port)]
debug=False,
**kwargs):
self.username = username
self.password = password
self.conn = stomp.Connection(host_and_ports=self.hosts)
self.hosts = [(host, port)]
self.conn = stomp.Connection(host_and_ports=self.hosts, auto_content_length=False,
timeout=kwargs.get('timeout',180000),keepalive=kwargs.get('keepalive', True))
if debug:
logging.debug("Enabling debug")
......@@ -58,11 +60,15 @@ class Connection:
def disconnect(self):
self.conn.disconnect()
def send_to_topic(self,destination, body, headers={}, **kwargs):
if not self.conn:
logging.error("Connect first")
return
self.conn.send(destination="/topic/%s" % destination,body=json.dumps(body),content_type="application/json",headers=headers, **kwargs)
str = json.dumps(body)
self.conn.send(destination="/topic/%s" % destination,
body= str,
content_type="application/json",
headers=headers, **kwargs)
from json import JSONDecodeError
from stomp.listener import ConnectionListener
import logging
......@@ -20,10 +21,6 @@ class MorphemicListener(ConnectionListener):
return headers.get('destination').replace('/topic/','')
def on(self,headers, res):
logging.debug("Unknown message %s ",res)
pass
def has_topic_name(self,headers, string):
return headers.get('destination').startswith(string)
......@@ -32,7 +29,7 @@ class MorphemicListener(ConnectionListener):
def on(self,headers, res):
logging.debug("Unknown message %s ",res)
logging.debug("Unknown message %s %s ",headers, res)
pass
def on_message(self, headers, body):
......@@ -40,10 +37,13 @@ class MorphemicListener(ConnectionListener):
logging.debug("Headers %s",headers)
logging.debug(" %s",body)
res = json.loads(body)
func_name='on_%s' % slugify(headers.get('destination').replace('/topic/',''), separator='_',)
if hasattr(self,func_name):
func = getattr(self, func_name)
func(res)
else:
self.on(headers,res)
try:
res = json.loads(body)
func_name='on_%s' % slugify(headers.get('destination').replace('/topic/',''), separator='_',)
if hasattr(self,func_name):
func = getattr(self, func_name)
func(res)
else:
self.on(headers,res)
except JSONDecodeError:
logging.error("Error decoding %s", body)
\ No newline at end of file
......@@ -10,17 +10,17 @@ RUN pip install --no-cache-dir --upgrade pip \
\
&& poetry install --no-dev && mkdir models
ADD https://gitlab.ow2.org/melodic/morphemic-preprocessor/-/archive/morphemic-rc1.5/morphemic-preprocessor-morphemic-rc1.5.tar.gz /var/lib/morphemic/
ADD https://gitlab.ow2.org/melodic/morphemic-preprocessor/-/archive/morphemic-rc2.0/morphemic-preprocessor-morphemic-rc2.0.tar.gz /var/lib/morphemic/
# Copy the rest of the codebase into the image
COPY deployment/arima ./
RUN cd /var/lib/morphemic/ \
&& tar -zxf morphemic-preprocessor-morphemic-rc1.5.tar.gz \
&& cd morphemic-preprocessor-morphemic-rc1.5 \
&& tar -zxf morphemic-preprocessor-morphemic-rc2.0.tar.gz \
&& cd morphemic-preprocessor-morphemic-rc2.0 \
&& cd morphemic-datasetmaker && python3 setup.py install \
&& cd ../.. \
&& cp -R /var/lib/morphemic/morphemic-preprocessor-morphemic-rc1.5/amq-message-python-library /wd/amq_message_python_library \
&& cp -R /var/lib/morphemic/morphemic-preprocessor-morphemic-rc2.0/amq-message-python-library /wd/amq_message_python_library \
&& rm -rf /var/lib/morphemic
CMD ["poetry", "run" ,"python3", "main.py"]
......
......@@ -6,17 +6,17 @@ WORKDIR /wd
COPY deployment/nbeats/requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt && mkdir models
ADD https://gitlab.ow2.org/melodic/morphemic-preprocessor/-/archive/morphemic-rc1.5/morphemic-preprocessor-morphemic-rc1.5.tar.gz /var/lib/morphemic/
ADD https://gitlab.ow2.org/melodic/morphemic-preprocessor/-/archive/morphemic-rc2.0/morphemic-preprocessor-morphemic-rc2.0.tar.gz /var/lib/morphemic/
# Copy the rest of the codebase into the image
COPY deployment/nbeats/ ./
RUN cd /var/lib/morphemic/ \
&& tar -zxf morphemic-preprocessor-morphemic-rc1.5.tar.gz \
&& cd morphemic-preprocessor-morphemic-rc1.5 \
&& tar -zxf morphemic-preprocessor-morphemic-rc2.0.tar.gz \
&& cd morphemic-preprocessor-morphemic-rc2.0 \
&& cd morphemic-datasetmaker && python3 setup.py install \
&& cd ../.. \
&& cp -R /var/lib/morphemic/morphemic-preprocessor-morphemic-rc1.5/amq-message-python-library /wd/amq_message_python_library \
&& cp -R /var/lib/morphemic/morphemic-preprocessor-morphemic-rc2.0/amq-message-python-library /wd/amq_message_python_library \
&& rm -rf /var/lib/morphemic \
&& mkdir -p /wd/logs
......
......@@ -6,17 +6,17 @@ WORKDIR /wd
COPY deployment/tft/requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt && mkdir models
ADD https://gitlab.ow2.org/melodic/morphemic-preprocessor/-/archive/morphemic-rc1.5/morphemic-preprocessor-morphemic-rc1.5.tar.gz /var/lib/morphemic/
ADD https://gitlab.ow2.org/melodic/morphemic-preprocessor/-/archive/morphemic-rc2.0/morphemic-preprocessor-morphemic-rc2.0.tar.gz /var/lib/morphemic/
# Copy the rest of the codebase into the image
COPY deployment/tft/ ./
RUN cd /var/lib/morphemic/ \
&& tar -zxf morphemic-preprocessor-morphemic-rc1.5.tar.gz \
&& cd morphemic-preprocessor-morphemic-rc1.5 \
&& tar -zxf morphemic-preprocessor-morphemic-rc2.0.tar.gz \
&& cd morphemic-preprocessor-morphemic-rc2.0 \
&& cd morphemic-datasetmaker && python3 setup.py install \
&& cd ../.. \
&& cp -R /var/lib/morphemic/morphemic-preprocessor-morphemic-rc1.5/amq-message-python-library /wd/amq_message_python_library \
&& cp -R /var/lib/morphemic/morphemic-preprocessor-morphemic-rc2.0/amq-message-python-library /wd/amq_message_python_library \
&& rm -rf /var/lib/morphemic \
&& mkdir -p /wd/logs
......
......@@ -7,7 +7,7 @@ import time
pd.options.mode.chained_assignment = None
"""Script for preparingself.time_columnseries dataset from pythorch-forecasting package
"""Script for preparingself.time_columnseries dataset from pythorch-forecasting package
TODO: add checking whether data consists of multiple series, handle nans values"""
......
FROM python:3.7
RUN pip install --upgrade pip
RUN mkdir /app
RUN mkdir -p /app/log
ADD . /app
RUN pip install /app/lib
RUN pip install -r /app/amq_client/requirements.txt
WORKDIR /app
CMD ["python","-u","app.py"]
\ No newline at end of file
class Metric(enumerate):
"""
[0] (current/detected) Metrics & SLOs Events Format:
This event is aggregated by EMS and it is persisted in InfluxDB. Moreover,
Prediction Orchestrator will subscribe and receive the current metrics in order to
evaluate the forecasting methods, according to the defined KPIs (e.g., MAPE)
* Topic: [metric_name]
> (e.g. MaxCPULoad)
{
"metricValue": 12.34,
"level": 1,
"timestamp": 143532341251,
"refersTo": "MySQL_12345",
"cloud": "AWS-Dublin",
"provider": "AWS"
}
https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication
"""
TIMESTAMP = "timestamp"
METRIC_VALUE = "metricValue"
REFERS_TO = "refersTo"
CLOUD = "cloud"
PROVIDER = "provider"
class PredictionMetric(enumerate):
"""
[1] Predicted Metrics & SLOs Events Format
This event is produced by the Prediction Orchestrator and reflects the final predicted value for a metric.
- Topic: prediction.[metric_name]
> (e.g. prediction.MaxCPULoad)
{
"metricValue": 12.34,
"level": 1,
"timestamp": 143532341251,
"probability": 0.98,
"confidence_interval " : [8,15]
"predictionTime": 143532342,
"refersTo": "MySQL_12345",
"cloud": "AWS-Dublin",
"provider": "AWS"
}
https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication
"""
_match = "prediction."
METRICVALUE= "metricValue"
'''Predicted metric value'''
LEVEL= "level"
'''Level of VM where prediction occurred or refers'''
TIMESTAMP= "timestamp"
'''Prediction creation date/time from epoch'''
PROBABILITY= "probability"
'''Probability of the predicted metric value (range 0..1)'''
CONFIDENCE_INTERVAL= "confidence_interval"
'''the probability-confidence interval for the prediction'''
PREDICTION_TIME= "predictionTime"
'''This refers to time point in the imminent future (that is relative to the time
that is needed for reconfiguration) for which the predicted value is considered
valid/accurate (in UNIX Epoch)'''
REFERSTO= "refersTo"
'''The id of the application or component or (VM) host for which the prediction refers to'''
CLOUD= "cloud"
'''Cloud provider of the VM (with location)'''
PROVIDER= "provider"
'''Cloud provider name'''
class MetricsToPredict(enumerate):
"""
[2] Translator – to – Forecasting Methods/Prediction Orchestrator Events Format
This event is produced by the translator, to:
imform Dataset Maker which metrics should subscribe to in order to aggregate the appropriate tanning dataset in the time-series DB.
instruct each of the Forecasting methods to predict the values of one or more monitoring metrics
inform the Prediction Orchestrator for the metrics which will be forecasted
* Topic: metrics_to_predict
*Note:* This event could be communicated through Mule
[
{
"metric": "MaxCPULoad",
"level": 3,
"publish_rate": 60000,
},
{
"metric": "MinCPULoad",
"level": 3,
"publish_rate": 50000,
}
]
https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication
"""
_match = "metrics_to_predict"
METRIC = "metric"
'''name of the metric to predict'''
LEVEL = "level"
'''Level of monitoring topology where this metric may be produced/found'''
PUBLISH_RATE = "publish_rate"
'''expected rate for datapoints regarding the specific metric (according to CAMEL)'''
class TraningModels(enumerate):
"""
[3] Forecasting Methods – to – Prediction Orchestrator Events Format
This event is produced by each of the Forecasting methods, to inform the
Prediction Orchestrator that the method has (re-)trained its model for one or more metrics.
* Topic: training_models
{
"metrics": ["MaxCPULoad","MinCPULoad"]",
"forecasting_method": "ESHybrid",
"timestamp": 143532341251,
}
https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication
"""
_match = "training_models"
METRICS = "metrics"
'''metrics for which a certain forecasting method has successfully trained or re-trained its model'''
FORECASTING_METHOD = "forecasting_method"
'''the method that is currently re-training its models'''
TIMESTAMP = "timestamp"
'''date/time of model(s) (re-)training'''
class IntermediatePrediction(enumerate):
"""
[4] Forecasting Methods – to – Prediction Orchestrator Events Format
This event is produced by each of the Forecasting methods, and is used by the Prediction Orchestrator to determine the final prediction value for the particular metric.
* Topic: intermediate_prediction.[forecasting_method].[metric_name]
* (e.g. intermediate_prediction.ESHybrid.MaxCPULoad)
* We note that any component will be able to subscribe to topics like:
* intermediate_prediction.*.MaxCPULoad → gets MaxCPULoad predictions produced by all forecasting methods or
* intermediate_prediction.ESHybrid.* → gets all metrics predictions from ESHybrid method
* We consider that each forecasting method publishes a static (but configurable) number m of predicted values (under the same timestamp) for time points into the future. These time points into the future are relevant to the reconfiguration time that it is needed (and can also be updated).
* For example if we configure m=5 predictions into the future and the reconfiguration time needed is TR=10 minutes, then at t0 a forecasting method publishes 5 events with the same timestamp and prediction times t0+10, t0+20, t0+30, t0+40, t0+50.
{
"metricValue": 12.34,
"level": 3,
"timestamp": 143532341251,
"probability": 0.98,
"confidence_interval " : [8,15]
"predictionTime": 143532342,
"refersTo": "MySQL_12345",
"cloud": "AWS-Dublin",
"provider": "AWS"
}
https://confluence.7bulls.eu/display/MOR/Forecasting+Mechanism+Sub-components+Communication
"""
_match="intermediate_prediction."
METRICVALUE = "metricValue"
'''Predicted metric value (more than one such events will be produced for different time points into the future – this can be valuable to the Prediction Orchestrator in certain situations e.g., forecasting method is unreachable for a time period)'''
LEVEL = "level"
'''Level of VM where prediction occurred or refers'''
TIMESTAMP = "timestamp"
'''Prediction creation date/time from epoch'''
PROBABILITY = "probability"
'''Probability of the predicted metric value (range 0..1)'''
CONFIDENCE_INTERVAL = "confidence_interval"
'''the probability-confidence interval for the prediction'''
PREDICTION_TIME = "predictionTime"
'''This refers to time point in the imminent future (that is relative to the time that is needed for reconfiguration) for which the predicted value is considered valid/accurate (in UNIX Epoch)'''
REFERS_TO = "refersTo"
'''The id of the application or component or (VM) host for which the prediction refers to'''
CLOUD = "cloud"
'''Cloud provider of the VM (with location)'''
PROVIDER = "provider"
'''Cloud provider name'''