Commit 3ed89bc5 authored by Mohamed Khalil Labidi's avatar Mohamed Khalil Labidi
Browse files

Merge branch 'proactive-dev' into 'byon-ns'

# Conflicts:
#   scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java
#   scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Deployment.java
parents 9eda6b26 a0926ee0
# Gitlab CI/CD script for the Morphemic Preprocessor project
variables:
MAVEN_IMAGE: "maven:3.5.2-jdk-8"
MAVEN_IMAGE: "maven:3.8.1-jdk-11"
MAVEN_IMAGE_JDK_8: "maven:3.5.2-jdk-8"
LOCAL_REPO: "127.0.0.1:5000"
DOCKER_REPO: "gitlab.ow2.org:4567"
......@@ -10,8 +11,11 @@ variables:
DOCKER_DRIVER: overlay
DOCKER_TLS_CERTDIR: "/certs"
DOCKER_CLI: "docker run --rm -v /var/run/docker.sock:/var/run/docker.sock -v $CI_PROJECT_DIR/maven_repo:/root/.m2 -w /usr/src/mymaven -v $CI_PROJECT_DIR:/usr/src/mymaven $MAVEN_IMAGE"
SCHEDULING_ABSTRACTION_LAYER_CLI: "mvn --batch-mode -f scheduling-abstraction-layer/pom.xml"
AMQ_MESSAGE_JAVA_LIBRARY_CLI: "mvn --batch-mode -f amq-message-java-library/pom.xml"
SLO_SEVERITY_CALCULATOR_CLI: "mvn --batch-mode -f morphemic-slo-severity-calculator/pom.xml"
PREDICTON_ORCHESTRATOR_CLI: "mvn --batch-mode -N -f prediction_orchestrator/pom.xml"
cache:
paths:
......@@ -30,12 +34,13 @@ before_script:
</settings>' > $HOME/.m2/settings.xml
stages:
- deployLibrary
- build
- deploy
build:scheduling-abstraction-layer:
stage: build
image: $MAVEN_IMAGE
image: $MAVEN_IMAGE_JDK_8
script:
- $SCHEDULING_ABSTRACTION_LAYER_CLI clean install
artifacts:
......@@ -43,7 +48,7 @@ build:scheduling-abstraction-layer:
- /builds/melodic/morphemic-preprocessor/maven_repo/org/activeeon/scheduling-abstraction-layer/
build:amq-message-java-library:
stage: build
stage: deployLibrary
image: $MAVEN_IMAGE
script:
- $AMQ_MESSAGE_JAVA_LIBRARY_CLI clean install
......@@ -51,13 +56,30 @@ build:amq-message-java-library:
paths:
- /builds/melodic/morphemic-preprocessor/maven_repo/gr/ntua/imu/morphemic/amq-message-java-library/
build:prediction_orchestrator:
stage: build
image: $MAVEN_IMAGE
script:
- $PREDICTON_ORCHESTRATOR_CLI -Pwithout-docker clean install
artifacts:
paths:
- /builds/melodic/morphemic-preprocessor/maven_repo/eu/morphemic/prediction_orchestrator/
build:slo-severity-calculator:
stage: build
image: $MAVEN_IMAGE
script:
- $SLO_SEVERITY_CALCULATOR_CLI -Dtest=!UnboundedMonitoringAttributeTests clean install
artifacts:
paths:
- /builds/melodic/morphemic-preprocessor/maven_repo/gr/ntua/imu/morphemic/SLOSeverityCalculator/
deploy:performance-model:
stage: deploy
image: $DOCKER_DIND_IMAGE
only:
- master
- morphemic-rc1.5
- proactive-dev
services:
- $DOCKER_DIND_SERVICE
script:
......@@ -68,9 +90,41 @@ deploy:performance-model:
- docker tag performance_model:latest $CI_REGISTRY_IMAGE/performance_model:$CI_COMMIT_BRANCH
- docker push $CI_REGISTRY_IMAGE/performance_model:$CI_COMMIT_BRANCH
deploy:persistent-storage-database:
stage: deploy
image: $DOCKER_DIND_IMAGE
only:
- master
- morphemic-rc1.5
services:
- $DOCKER_DIND_SERVICE
script:
- cd morphemic-persistent-storage/database
- docker build -t persistent_storage_database -f ./Dockerfile .
- docker image ls
- echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin
- docker tag persistent_storage_database:latest $CI_REGISTRY_IMAGE/persistent_storage_database:$CI_COMMIT_BRANCH
- docker push $CI_REGISTRY_IMAGE/persistent_storage_database:$CI_COMMIT_BRANCH
deploy:persistent-storage-activemq:
stage: deploy
image: $DOCKER_DIND_IMAGE
only:
- master
- morphemic-rc1.5
services:
- $DOCKER_DIND_SERVICE
script:
- cd morphemic-persistent-storage/example
- docker build -t persistent_storage_activemq -f ./Dockerfile .
- docker image ls
- echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin
- docker tag persistent_storage_activemq:latest $CI_REGISTRY_IMAGE/persistent_storage_activemq:$CI_COMMIT_BRANCH
- docker push $CI_REGISTRY_IMAGE/persistent_storage_activemq:$CI_COMMIT_BRANCH
deploy:scheduling-abstraction-layer:
stage: deploy
image: $MAVEN_IMAGE
image: $MAVEN_IMAGE_JDK_8
only:
- master
- morphemic-rc1.5
......@@ -90,3 +144,70 @@ deploy:amq-message-java-library:
- build:amq-message-java-library
script:
- $AMQ_MESSAGE_JAVA_LIBRARY_CLI deploy
deploy:prediction_orchestrator:
stage: deploy
image: $DOCKER_DIND_IMAGE
only:
- master
- morphemic-rc1.5
- predictionOrchestrator
services:
- $DOCKER_DIND_SERVICE
dependencies:
- build:prediction_orchestrator
script:
- $DOCKER_CLI $PREDICTON_ORCHESTRATOR_CLI deploy
- echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin
- docker tag $LOCAL_REPO/morphemic/prediction-orchestrator:latest $CI_REGISTRY_IMAGE/prediction-orchestrator:$CI_COMMIT_BRANCH
- docker push $CI_REGISTRY_IMAGE/prediction-orchestrator:$CI_COMMIT_BRANCH
deploy:slo-severity-calculator:
stage: deploy
image: $DOCKER_DIND_IMAGE
only:
- master
- morphemic-rc1.5
dependencies:
- build:slo-severity-calculator
services:
- $DOCKER_DIND_SERVICE
script:
- cd morphemic-slo-severity-calculator
- mkdir -p target/
- cp /builds/melodic/morphemic-preprocessor/maven_repo/gr/ntua/imu/morphemic/SLOSeverityCalculator/1.0-SNAPSHOT/SLOSeverityCalculator-1.0-SNAPSHOT.jar target/
- docker build -t slo_severity_calculator -f ./Dockerfile .
- docker image ls
- echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin
- docker tag slo_severity_calculator:latest $CI_REGISTRY_IMAGE/slo_severity_calculator:$CI_COMMIT_BRANCH
- docker push $CI_REGISTRY_IMAGE/slo_severity_calculator:$CI_COMMIT_BRANCH
deploy:nbeats:
stage: deploy
image: $DOCKER_DIND_IMAGE
only:
- master
- morphemic-rc1.5
services:
- $DOCKER_DIND_SERVICE
script:
- docker build -t nbeats -f ./deployment/nbeats/Dockerfile .
- docker image ls
- echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin
- docker tag nbeats:latest $CI_REGISTRY_IMAGE/nbeats:$CI_COMMIT_BRANCH
- docker push $CI_REGISTRY_IMAGE/nbeats:$CI_COMMIT_BRANCH
deploy:tft:
stage: deploy
image: $DOCKER_DIND_IMAGE
only:
- master
- morphemic-rc1.5
services:
- $DOCKER_DIND_SERVICE
script:
- docker build -t tft -f ./deployment/tft/Dockerfile .
- docker image ls
- echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin
- docker tag tft:latest $CI_REGISTRY_IMAGE/tft:$CI_COMMIT_BRANCH
- docker push $CI_REGISTRY_IMAGE/tft:$CI_COMMIT_BRANCH
......@@ -19,7 +19,7 @@
<artifactId>amq-message-java-library</artifactId>
<name>AMQ message Java library</name>
<groupId>gr.ntua.imu.morphemic</groupId>
<version>1.1.0-SNAPSHOT</version>
<version>1.1.1-SNAPSHOT</version>
<dependencies>
......@@ -46,6 +46,11 @@
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
<repositories>
......
......@@ -248,8 +248,15 @@ public class BrokerClient {
boolean final_closeConn = _closeConn;
Thread monitor_consumer_stop = new Thread(() -> {
while (!stop_signal.get()) {
//System.out.println("Busy-waiting");
synchronized (stop_signal) {
while (!stop_signal.get()) {
//System.out.println("Busy-waiting");
try {
stop_signal.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
try {
......
......@@ -7,6 +7,15 @@ import javax.jms.*;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.logging.Logger;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import static java.util.logging.Level.INFO;
@Slf4j
......@@ -38,7 +47,18 @@ public class BrokerSubscriber {
client.receiveEvents(url, topic, stop_signal,message -> {
try {
if (message!=null) {
function.apply(topic, ((TextMessage) message).getText());
if (message instanceof TextMessage){
function.apply(topic, ((TextMessage) message).getText());
}else if (message instanceof ActiveMQBytesMessage) {
try {
String json_string = (((ActiveMQBytesMessage) message).readUTF());
Logger.getAnonymousLogger().log(INFO, json_string);
function.apply(topic, json_string);
}
catch (Exception e){
e.printStackTrace();
}
}
}
} catch (JMSException j) {
log.info("Shutting down subscriber...");
......
......@@ -56,11 +56,14 @@ public class EventFields {
public static final String prediction_time = "predictionTime";
}
/**
* This event is used to initiate forecasting of one or more monitoring metrics
* This event is used to initiate forecasting of one or more monitoring metrics. Epoch start is the time point on which the first forecast should be sent and number of forward predictions is the number of time points in the future which should be forecasted every prediction horizon seconds
*/
public static class PredictionOrchestratorToForecastingMethodsStartForecastingEventFields{
public static final String metrics = "metrics";
public static final String timestamp = "timestamp";
public static final String epoch_start = "epoch_start";
public static final String number_of_forward_predictions = "number_of_forward_predictions";
public static final String prediction_horizon = "prediction_horizon";
}
/**
* This event is used to stop forecasting of one or more monitoring metrics
......
......@@ -58,6 +58,12 @@ public class TopicNames {
public static String prophet(String metric_name){
return "intermediate_prediction.prophet."+metric_name;
}
public static String cnn(String metric_name) {
return "intermediate_prediction.cnn."+metric_name;
}
public static String tft(String metric_name) {
return "intermediate_prediction.tft."+metric_name;
}
}
......@@ -73,6 +79,8 @@ public class TopicNames {
public static final String lstm = "start_forecasting.lstm";
public static final String gluon_machines = "start_forecasting.gluonmachines";
public static final String prophet = "start_forecasting.prophet";
public static final String tft = "start_forecasting.tft";
public static final String cnn = "start_forecasting.cnn";
}
/**
......@@ -87,5 +95,7 @@ public class TopicNames {
public static final String lstm = "stop_forecasting.lstm";
public static final String gluon_machines = "stop_forecasting.gluonmachines";
public static final String prophet = "stop_forecasting.prophet";
public static final String tft = "stop_forecasting.tft";
public static final String cnn = "stop_forecasting.cnn";
}
}
......@@ -12,11 +12,11 @@ class Connection:
def __init__(self, username, password,
host='localhost',
port=61613,
debug=True):
self.hosts = [(host, port)]
debug=False):
self.username = username
self.password = password
self.conn = stomp.Connection(host_and_ports=self.hosts)
self.hosts = [(host, port)]
self.conn = stomp.Connection(host_and_ports=self.hosts, auto_content_length=False)
if debug:
logging.debug("Enabling debug")
......@@ -58,11 +58,15 @@ class Connection:
def disconnect(self):
self.conn.disconnect()
def send_to_topic(self,destination, body, headers={}, **kwargs):
if not self.conn:
logging.error("Connect first")
return
self.conn.send(destination="/topic/%s" % destination,body=json.dumps(body),content_type="application/json",headers=headers, **kwargs)
str = json.dumps(body)
self.conn.send(destination="/topic/%s" % destination,
body= str,
content_type="application/json",
headers=headers, **kwargs)
from json import JSONDecodeError
from stomp.listener import ConnectionListener
import logging
......@@ -20,10 +21,6 @@ class MorphemicListener(ConnectionListener):
return headers.get('destination').replace('/topic/','')
def on(self,headers, res):
logging.debug("Unknown message %s ",res)
pass
def has_topic_name(self,headers, string):
return headers.get('destination').startswith(string)
......@@ -32,7 +29,7 @@ class MorphemicListener(ConnectionListener):
def on(self,headers, res):
logging.debug("Unknown message %s ",res)
logging.debug("Unknown message %s %s ",headers, res)
pass
def on_message(self, headers, body):
......@@ -40,10 +37,13 @@ class MorphemicListener(ConnectionListener):
logging.debug("Headers %s",headers)
logging.debug(" %s",body)
res = json.loads(body)
func_name='on_%s' % slugify(headers.get('destination').replace('/topic/',''), separator='_',)
if hasattr(self,func_name):
func = getattr(self, func_name)
func(res)
else:
self.on(headers,res)
try:
res = json.loads(body)
func_name='on_%s' % slugify(headers.get('destination').replace('/topic/',''), separator='_',)
if hasattr(self,func_name):
func = getattr(self, func_name)
func(res)
else:
self.on(headers,res)
except JSONDecodeError:
logging.error("Error decoding %s", body)
\ No newline at end of file
# Model deployment
Model deployment code for tft and nbeats models. Each modules consists of :
- Dockerfile
- After installing all dependecies runs main.py script
- env (file with enviromental variables) is used as a parameter
- docker_run.sh (docker compose will be used later)
- TODO: build docker-compose, decide which volumes will be used (e.g data path, model path, app config path)
- main.py
- Starts running scripts after the message from amq from metrics_to_predict topic
- Runs independently (as sepparated processes) two scripts, one for training (retrain.py) the other for prediction (predict.py)
- retrain.py
- Trains (at the moment as a cyclic job with fixed 10 minutes frequency) models
- Currently one model per metric is used, however that can be changed for tft
- After models trainings messages are send to training_models topic
- TODO: change retraining conditions (needs to be discussed), add automl hyperparameters optimization
- predict.py
- Sends prediction (at the moment it is a cyclic event with configurable number of forward predictions which may be changed with start_frecasting message), currently each metrics predictions are published with the same frequency (but according to the examples in python stomp.py library predictions frequency may differ accross the metrics, so maybe this will be changed in future)
- Until the first model is trained, no prediction is sent
- m preidctions where m is the number of forward predictions for given metric are currently sent as sepparated messages, this solution can be replaced by one message with lists
- TODO: verify which parameters like are stable and which may be changed by amq messages (prediction_horizon, publish_rate), parallel the predictions, cache models, fill properly fields like 'refersTo', 'provider', calculate properl confidence interval (nbeats)
- src
- folder with all helpers
- TODO: install dataset-maker package in a more proper way
How to test?
Ensure that there are no docker images nor containers, check if ports for influxdb, amq are not already in use.
From morphemic-persistent-storage:
docker-compose up -d
From this directory:
cd tft
docker build -t stomp_app -f Dockerfile ../..
./docker_run.sh
from amq web console (http://localhost:8161/admin) message (json format) e.g:
[{"metric": "cpu_usage", "level": 3, "publish_rate": 60}]
might be sent from topic: metrics_to_predict
then
{
"metrics": ["cpu_usage"],
"timestamp": 143532341251,
"epoch_start": 143532341252,
"number_of_forward_predictions": 5,
"prediction_horizon": 120
}
from topic start_forecating.[METHOD NAME] e.g start_forecasting.tft
Until the first model is trained, predictions will not be sent
FROM python:3.8-slim-buster
# Install Python dependencies.
WORKDIR /wd
COPY deployment/tft/requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt && mkdir models
# Copy the rest of the codebase into the image
COPY deployment/tft ./
COPY morphemic-datasetmaker ./morphemic-datasetmaker
COPY amq-message-python-library ./amq-message-python-library
RUN cd morphemic-datasetmaker && python3 setup.py install && cd .. && rm -r morphemic-datasetmaker && mv amq-message-python-library amq_message_python_library
CMD ["python3", "main.py"]
docker run -t --env-file=env --network=host stomp_app
AMQ_HOSTNAME=localhost
AMQ_USER=morphemic
AMQ_PASSWORD=morphemic
AMQ_PORT=61613
APP_NAME=demo
METHOD=nbeats
DATA_PATH=./
INFLUXDB_HOSTNAME=localhost
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
INFLUXDB_DBNAME=morphemic
import os
from multiprocessing import Pool
import stomp
import json
from amq_message_python_library import * # python amq-message-python-library
import logging
import time
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
AMQ_HOST = os.environ.get("AMQ_HOST", "localhost")
AMQ_PORT_BROKER = os.environ.get("AMQ_PORT_BROKER", "61613")
START_APP_TOPIC = "metrics_to_predict"
METHOD = os.environ.get("METHOD", "tft")
START_TOPIC = f"start_forecasting.{METHOD}"
def run_process(args):
print("running")
os.system(f"python {args[0]} '{args[1]}'")
def start_training(metrics_to_predict):
processes = (("retrain.py", metrics_to_predict), ("predict.py", metrics_to_predict))
pool = Pool(processes=2)
pool.map(run_process, processes)
class StartListener(stomp.ConnectionListener):
"""Custom listener, parameters:
- conn (stomp connector)
- topic_name, name of topic to subscribe"""
def __init__(self, conn, topic_name):
self.conn = conn
self.topic_name = topic_name
def on_error(self, frame):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
print(self.topic_name)
logging.debug(f" Body: {frame.body}")
message = json.loads(frame.body)
global publish_rate, all_metrics
publish_rate = message[0]["publish_rate"]
all_metrics = message
class StartForecastingListener(stomp.ConnectionListener):
"""Custom listener, parameters:
- conn (stomp connector)
- topic_name, name of topic to subscribe"""
def __init__(self, conn, topic_name):
self.conn = conn
self.topic_name = topic_name
def on_error(self, frame):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
message = json.loads(frame.body)
message["publish_rate"] = publish_rate
message["all_metrics"] = all_metrics
message = json.dumps(message)
start_training(message)
self.conn.disconnect()
class Msg(object):
def __init__(self):
self.body = None
def main():
logging.getLogger().setLevel(logging.DEBUG)
start_app_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER
)
start_app_conn.connect()
start_conn = morphemic.Connection(
AMQ_USER, AMQ_PASSWORD, host=AMQ_HOST, port=AMQ_PORT_BROKER
)
start_conn.connect()
start_conn.conn.subscribe(f"/topic/{START_APP_TOPIC}", "1", ack="auto")
start_app_conn.conn.subscribe(f"/topic/{START_TOPIC}", "2", ack="auto")
start_conn.conn.set_listener("1", StartListener(start_conn.conn, START_APP_TOPIC))
start_app_conn.conn.set_listener(
"2", StartForecastingListener(start_conn.conn, START_TOPIC)
)
# msg1 = Msg()
# msg1.body = '[{"metric": "cpu_usage", "level": 3, "publish_rate": 60000}]'
# msg2 = Msg()
# msg2.body = (
# "{"
# + f'"metrics": ["cpu_usage"],"timestamp": {int(time.time())}, "epoch_start": {int(time.time()) + 30}, "number_of_forward_predictions": 8,"prediction_horizon": 60'
# + "}"
# )
# StartListener(start_conn.conn, START_APP_TOPIC).on_message(msg1)
# StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2)
while True:
pass
if __name__ == "__main__":
publish_rate = 0
all_metrics = {}
main()