Commit 225272cc authored by Andreas Tsagkaropoulos's avatar Andreas Tsagkaropoulos
Browse files

Merge branch 'slo-severity-calculator-master' into amq-message-java-library

# Conflicts:
#	amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerSubscriber.java
parents 6900789e c4859c94
# Gitlab CI/CD script for the Morphemic Preprocessor project
variables:
MAVEN_IMAGE: "maven:3.5.2-jdk-8"
MAVEN_IMAGE: "maven:3.8.1-jdk-11"
MAVEN_IMAGE_JDK_8: "maven:3.5.2-jdk-8"
LOCAL_REPO: "127.0.0.1:5000"
DOCKER_REPO: "gitlab.ow2.org:4567"
......@@ -10,7 +11,9 @@ variables:
DOCKER_DRIVER: overlay
DOCKER_TLS_CERTDIR: "/certs"
SCHEDULING_ABSTRACTION_LAYER_CLI: "mvn -DskipTests --batch-mode -f scheduling-abstraction-layer/pom.xml"
SCHEDULING_ABSTRACTION_LAYER_CLI: "mvn --batch-mode -f scheduling-abstraction-layer/pom.xml"
AMQ_MESSAGE_JAVA_LIBRARY_CLI: "mvn --batch-mode -f amq-message-java-library/pom.xml"
SLO_SEVERITY_CALCULATOR_CLI: "mvn --batch-mode -f morphemic-slo-severity-calculator/pom.xml"
cache:
paths:
......@@ -34,13 +37,31 @@ stages:
build:scheduling-abstraction-layer:
stage: build
image: $MAVEN_IMAGE
image: $MAVEN_IMAGE_JDK_8
script:
- $SCHEDULING_ABSTRACTION_LAYER_CLI clean install
artifacts:
paths:
- /builds/melodic/morphemic-preprocessor/maven_repo/org/activeeon/scheduling-abstraction-layer/
build:amq-message-java-library:
stage: build
image: $MAVEN_IMAGE
script:
- $AMQ_MESSAGE_JAVA_LIBRARY_CLI clean install
artifacts:
paths:
- /builds/melodic/morphemic-preprocessor/maven_repo/gr/ntua/imu/morphemic/amq-message-java-library/
build:slo-severity-calculator:
stage: build
image: $MAVEN_IMAGE
script:
- $SLO_SEVERITY_CALCULATOR_CLI clean install
artifacts:
paths:
- /builds/melodic/morphemic-preprocessor/maven_repo/gr/ntua/imu/morphemic/SLOSeverityCalculator/
deploy:performance-model:
stage: deploy
image: $DOCKER_DIND_IMAGE
......@@ -54,12 +75,44 @@ deploy:performance-model:
- docker build -t performance_model -f ./deployment/Dockerfile .
- docker image ls
- echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin
- docker tag $LOCAL_REPO/performance_model:unknown $CI_REGISTRY_IMAGE/performance_model:$CI_COMMIT_BRANCH
- docker tag performance_model:latest $CI_REGISTRY_IMAGE/performance_model:$CI_COMMIT_BRANCH
- docker push $CI_REGISTRY_IMAGE/performance_model:$CI_COMMIT_BRANCH
deploy:persistent-storage-database:
stage: deploy
image: $DOCKER_DIND_IMAGE
only:
- master
- morphemic-rc1.5
services:
- $DOCKER_DIND_SERVICE
script:
- cd morphemic-persistent-storage/database
- docker build -t persistent_storage_database -f ./Dockerfile .
- docker image ls
- echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin
- docker tag persistent_storage_database:latest $CI_REGISTRY_IMAGE/persistent_storage_database:$CI_COMMIT_BRANCH
- docker push $CI_REGISTRY_IMAGE/persistent_storage_database:$CI_COMMIT_BRANCH
deploy:persistent-storage-activemq:
stage: deploy
image: $DOCKER_DIND_IMAGE
only:
- master
- morphemic-rc1.5
services:
- $DOCKER_DIND_SERVICE
script:
- cd morphemic-persistent-storage/example
- docker build -t persistent_storage_activemq -f ./Dockerfile .
- docker image ls
- echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin
- docker tag persistent_storage_activemq:latest $CI_REGISTRY_IMAGE/persistent_storage_activemq:$CI_COMMIT_BRANCH
- docker push $CI_REGISTRY_IMAGE/persistent_storage_activemq:$CI_COMMIT_BRANCH
deploy:scheduling-abstraction-layer:
stage: deploy
image: $MAVEN_IMAGE
image: $MAVEN_IMAGE_JDK_8
only:
- master
- morphemic-rc1.5
......@@ -67,3 +120,60 @@ deploy:scheduling-abstraction-layer:
- build:scheduling-abstraction-layer
script:
- $SCHEDULING_ABSTRACTION_LAYER_CLI deploy
deploy:amq-message-java-library:
stage: deploy
image: $MAVEN_IMAGE
only:
- master
- morphemic-rc1.5
dependencies:
- build:amq-message-java-library
script:
- $AMQ_MESSAGE_JAVA_LIBRARY_CLI deploy
deploy:slo-severity-calculator:
stage: deploy
image: $MAVEN_IMAGE
only:
- master
- morphemic-rc1.5
dependencies:
- build:slo-severity-calculator
script:
- cd morphemic-slo-severity-calculator
- docker build -t slo_severity_calculator -f ./Dockerfile .
- docker image ls
- echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin
- docker tag slo_severity_calculator:latest $CI_REGISTRY_IMAGE/slo_severity_calculator:$CI_COMMIT_BRANCH
- docker push $CI_REGISTRY_IMAGE/slo_severity_calculator:$CI_COMMIT_BRANCH
deploy:nbeats:
stage: deploy
image: $DOCKER_DIND_IMAGE
only:
- master
- morphemic-rc1.5
services:
- $DOCKER_DIND_SERVICE
script:
- docker build -t nbeats -f ./deployment/nbeats/Dockerfile .
- docker image ls
- echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin
- docker tag nbeats:latest $CI_REGISTRY_IMAGE/nbeats:$CI_COMMIT_BRANCH
- docker push $CI_REGISTRY_IMAGE/nbeats:$CI_COMMIT_BRANCH
deploy:tft:
stage: deploy
image: $DOCKER_DIND_IMAGE
only:
- master
- morphemic-rc1.5
services:
- $DOCKER_DIND_SERVICE
script:
- docker build -t tft -f ./deployment/tft/Dockerfile .
- docker image ls
- echo "$K8S_SECRET_DOCKER_PASSWORD" | docker login $CI_REGISTRY -u $K8S_SECRET_DOCKER_USER --password-stdin
- docker tag tft:latest $CI_REGISTRY_IMAGE/tft:$CI_COMMIT_BRANCH
- docker push $CI_REGISTRY_IMAGE/tft:$CI_COMMIT_BRANCH
......@@ -19,7 +19,7 @@
<artifactId>amq-message-java-library</artifactId>
<name>AMQ message Java library</name>
<groupId>gr.ntua.imu.morphemic</groupId>
<version>1.0.0</version>
<version>1.1.0-SNAPSHOT</version>
<dependencies>
......@@ -46,8 +46,31 @@
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>maven-central</id>
<url>https://repo1.maven.org/maven2/</url>
</repository>
<repository>
<id>eu.7bulls</id>
<name>Melodic 7bulls repository</name>
<url>https://nexus.7bulls.eu:8443/repository/maven-public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<distributionManagement>
<snapshotRepository>
<id>eu.7bulls</id>
......@@ -76,8 +99,25 @@
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.1</version>
<executions>
<execution>
<id>default-deploy</id>
<phase>deploy</phase>
<goals>
<goal>deploy</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
......@@ -7,6 +7,15 @@ import javax.jms.*;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.logging.Logger;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import static java.util.logging.Level.INFO;
@Slf4j
......@@ -38,7 +47,18 @@ public class BrokerSubscriber {
client.receiveEvents(url, topic, stop_signal,message -> {
try {
if (message!=null) {
function.apply(topic, ((TextMessage) message).getText());
if (message instanceof TextMessage){
function.apply(topic, ((TextMessage) message).getText());
}else if (message instanceof ActiveMQBytesMessage) {
try {
String json_string = (((ActiveMQBytesMessage) message).readUTF());
Logger.getAnonymousLogger().log(INFO, json_string);
function.apply(topic, json_string);
}
catch (Exception e){
e.printStackTrace();
}
}
}
} catch (JMSException j) {
log.info("Shutting down subscriber...");
......
......@@ -13,48 +13,39 @@ class Connection:
host='localhost',
port=61613,
debug=True):
self.hosts = [(host, port)]
self.username = username
self.password = password
self.conn = stomp.Connection(host_and_ports=self.hosts)
if debug:
print("Enabling debug")
logging.debug("Enabling debug")
self.conn.set_listener('print', PrintingListener())
def _build_id(self,topic,id):
return "id.%s-%s" % (topic,id)
def set_listener(self, id, listener):
if self.conn:
self.conn.set_listener(id,listener)
def subscribe(self,destination, id, ack='auto'):
if not self.conn:
raise RuntimeError('You need to connect first')
ref = next((item for item in self.subscriptions if item['destination'] == destination and item['id'] == id), None)
if not ref:
self.subscriptions.append(
{
'id': id,
'destination': destination,
'ack': ack,
}
)
self.conn.subscribe(destination, id, ack)
def topic(self,destination, id, ack='auto'):
self.subscribe("/topic/%s" % destination ,id,ack)
self.subscribe("/topic/%s" % destination ,self._build_id(destination,id),ack)
def queue(self,destination, id, ack='auto'):
self.subscribe("/queue/%s" % destination,id,ack)
self.subscribe("/queue/%s" % destination ,self._build_id(destination,id),ack)
def unsubscribe(self, topic,id):
def unsubscribe(self, id):
if not self.conn:
return
if not self.conn.running:
return
self.conn.unsubscribe(id)
self.conn.unsubscribe(self._build_id(topic,id))
def connect(self, wait=True):
......@@ -63,15 +54,8 @@ class Connection:
return
self.conn.connect(self.username, self.password, wait=wait)
for s in self.subscriptions:
self.conn.subscribe(s['destination'], s['id'], s['ack'])
def disconnect(self):
for s in self.subscriptions:
self.unsubscribe(s['id'])
self.conn.disconnect()
......
......@@ -2,6 +2,7 @@
from stomp.listener import ConnectionListener
import logging
import json
from slugify import slugify
class MorphemicListener(ConnectionListener):
......@@ -11,19 +12,38 @@ class MorphemicListener(ConnectionListener):
match = getattr(event,'_match')
return headers.get('destination').startswith(match)
def _unknown_message(self,body):
logging.debug("Unknown message %s ",body)
def has_topic_name(self,headers, string):
return headers.get('destination').startswith(string)
def get_topic_name(self,headers):
return headers.get('destination').replace('/topic/','')
def on(self,headers, res):
logging.debug("Unknown message %s ",res)
pass
def has_topic_name(self,headers, string):
return headers.get('destination').startswith(string)
def get_topic_name(self,headers):
return headers.get('destination').replace('/topic/','')
def on(self,headers, res):
logging.debug("Unknown message %s ",res)
pass
def on_message(self, headers, body):
logging.debug("Headers %s",headers)
logging.debug(" %s",body)
func_name='on_%s' % headers.get('destination').replace('/topic/','')
res = json.loads(body)
func_name='on_%s' % slugify(headers.get('destination').replace('/topic/',''), separator='_',)
if hasattr(self,func_name):
func = getattr(self, func_name)
func(json.loads(body))
func(res)
else:
self._unknown_message(body)
self.on(headers,res)
# Model deployment
Model deployment code for tft and nbeats models. Each modules consists of :
- Dockerfile
- After installing all dependecies runs main.py script
- env (file with enviromental variables) is used as a parameter
- docker_run.sh (docker compose will be used later)
- TODO: build docker-compose, decide which volumes will be used (e.g data path, model path, app config path)
- main.py
- Starts running scripts after the message from amq from metrics_to_predict topic
- Runs independently (as sepparated processes) two scripts, one for training (retrain.py) the other for prediction (predict.py)
- retrain.py
- Trains (at the moment as a cyclic job with fixed 10 minutes frequency) models
- Currently one model per metric is used, however that can be changed for tft
- After models trainings messages are send to training_models topic
- TODO: change retraining conditions (needs to be discussed), add automl hyperparameters optimization
- predict.py
- Sends prediction (at the moment it is a cyclic event with configurable number of forward predictions which may be changed with start_frecasting message), currently each metrics predictions are published with the same frequency (but according to the examples in python stomp.py library predictions frequency may differ accross the metrics, so maybe this will be changed in future)
- Until the first model is trained, no prediction is sent
- m preidctions where m is the number of forward predictions for given metric are currently sent as sepparated messages, this solution can be replaced by one message with lists
- TODO: verify which parameters like are stable and which may be changed by amq messages (prediction_horizon, publish_rate), parallel the predictions, cache models, fill properly fields like 'refersTo', 'provider', calculate properl confidence interval (nbeats)
- src
- folder with all helpers
- TODO: install dataset-maker package in a more proper way
How to test?
Ensure that there are no docker images nor containers, check if ports for influxdb, amq are not already in use.
From morphemic-persistent-storage:
docker-compose up -d
From this directory:
cd tft
docker build -t stomp_app -f Dockerfile ../..
./docker_run.sh
from amq web console (http://localhost:8161/admin) message (json format) e.g:
[{"metric": "cpu_usage", "level": 3, "publish_rate": 60}]
might be sent from topic: metrics_to_predict
then
{
"metrics": ["cpu_usage"],
"timestamp": 143532341251,
"epoch_start": 143532341252,
"number_of_forward_predictions": 5,
"prediction_horizon": 120
}
from topic start_forecating.[METHOD NAME] e.g start_forecasting.tft
Until the first model is trained, predictions will not be sent
FROM python:3.8-slim-buster
# Install Python dependencies.
WORKDIR /wd
COPY deployment/tft/requirements.txt .
RUN pip3 install -r requirements.txt
RUN mkdir models
# Copy the rest of the codebase into the image
COPY deployment/tft ./
COPY morphemic-datasetmaker ./morphemic-datasetmaker
COPY amq-message-python-library ./amq-message-python-library
RUN cd morphemic-datasetmaker && python3 setup.py install && cd ..
RUN rm -r morphemic-datasetmaker
RUN mv amq-message-python-library amq_message_python_library
CMD ["python3", "main.py"]
docker run -t --env-file=env --network=host stomp_app
AMQ_HOSTNAME=localhost
AMQ_USER=admin
AMQ_PASSWORD=admin
AMQ_PORT=61613
APP_NAME=demo
METHOD=nbeats
DATA_PATH=./
INFLUXDB_HOSTNAME=localhost
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
INFLUXDB_DBNAME=morphemic
import os
from multiprocessing import Pool
import stomp
import json
from amq_message_python_library import * # python amq-message-python-library
import logging
AMQ_USER = os.environ.get("AMQ_USER", "admin")
AMQ_PASSWORD = os.environ.get("AMQ_PASSWORD", "admin")
START_APP_TOPIC = "metrics_to_predict"
METHOD = os.environ.get("METHOD", "tft")
START_TOPIC = f"start_forecasting.{METHOD}"
def run_process(args):
print("running")
os.system(f"python {args[0]} '{args[1]}'")
def start_training(metrics_to_predict):
processes = (("retrain.py", metrics_to_predict), ("predict.py", metrics_to_predict))
pool = Pool(processes=2)
pool.map(run_process, processes)
class StartListener(stomp.ConnectionListener):
"""Custom listener, parameters:
- conn (stomp connector)
- topic_name, name of topic to subscribe"""
def __init__(self, conn, topic_name):
self.conn = conn
self.topic_name = topic_name
def on_error(self, frame):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
print(self.topic_name)
logging.debug(f" Body: {frame.body}")
message = json.loads(frame.body)
global publish_rate, all_metrics
publish_rate = message[0]["publish_rate"]
all_metrics = message
class StartForecastingListener(stomp.ConnectionListener):
"""Custom listener, parameters:
- conn (stomp connector)
- topic_name, name of topic to subscribe"""
def __init__(self, conn, topic_name):
self.conn = conn
self.topic_name = topic_name
def on_error(self, frame):
print('received an error "%s"' % frame.body)
def on_message(self, frame):
message = json.loads(frame.body)
message["publish_rate"] = publish_rate
message["all_metrics"] = all_metrics
message = json.dumps(message)
start_training(message)
self.conn.disconnect()
class Msg(object):
def __init__(self):
self.body = None
def main():
logging.getLogger().setLevel(logging.DEBUG)
print("start")
print()
start_app_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
start_app_conn.connect()
start_conn = morphemic.Connection(AMQ_USER, AMQ_PASSWORD)
start_conn.connect()
start_conn.conn.subscribe(f"/topic/{START_APP_TOPIC}", "1", ack="auto")
start_app_conn.conn.subscribe(f"/topic/{START_TOPIC}", "2", ack="auto")
start_conn.conn.set_listener("1", StartListener(start_conn.conn, START_APP_TOPIC))
start_app_conn.conn.set_listener(
"2", StartForecastingListener(start_conn.conn, START_TOPIC)
)
# print("start")
# msg1 = Msg()
# msg1.body = '[{"metric": "cpu_usage", "level": 3, "publish_rate": 60}]'
# msg2 = Msg()
# msg2.body = """{
# "metrics": ["cpu_usage"],
# "timestamp": 1623057648907,
# "epoch_start": 1623057698298,
# "number_of_forward_predictions": 5,
# "prediction_horizon": 120}"""
# StartListener(start_conn.conn, START_APP_TOPIC).on_message(msg1)
# StartForecastingListener(start_conn.conn, START_APP_TOPIC).on_message(msg2)
while True:
pass
if __name__ == "__main__":
publish_rate = 0
all_metrics = {}
main()
data:
csv_path: demo.csv
training:
bs: 8