Commit 0c62933e authored by Jean-Didier Totow's avatar Jean-Didier Totow
Browse files

performance model, persistent storage

parent 766ef22d
FROM python:3.7
RUN pip install --upgrade pip
RUN pip install stomp.py requests
RUN mkdir /app
ADD ./src /app/
WORKDIR /app/src
CMD ["python","-u","app.py"]
import os, time, stomp, pickle, requests, json, math
from os import path
from threading import Thread
path_ml_model = os.environ.get("MLMODELPATH",".")
#////////////////////////////////////////////////////////////////////////////
activemq_username = os.getenv("ACTIVEMQ_USER","aaa")
activemq_password = os.getenv("ACTIVEMQ_PASSWORD","111")
activemq_hostname = os.getenv("ACTIVEMQ_HOST","localhost")
activemq_port = int(os.getenv("ACTIVEMQ_PORT","61613"))
persistence_storage_queue = "/queue/persistent_storage"
subscription_topic = 'performance_model_evaluator'
ps_management_queue = os.environ.get("PS_MANAGEMENT_QUEUE","persistent_storage")
#/////////////////////////////////////////////////////////////////////////////
tolerated_error = float(os.environ.get("TOLERATED_COMPARISON_ERROR","5"))
prediction_precision = int(os.environ.get("PREDICTION_PRECISION","90")) #90%
#/////////////////////////////////////////////////////////////////////////////
performance_model_train_url = os.environ.get("PERFORMANCE_MODEL_URL","http://localhost:8766/api/v1/train")
class EvaluationCandidate():
def __init__(self, application, target, features, prediction,variant):
self.application = application
self.target = target
self.features = features
self.prediction = prediction
self.variant = variant
self.real_value = None
self.time = time.time()
def getApplication(self):
return self.application
def getTarget(self):
return self.target
def getFeatures(self):
return self.features
def getVariant(self):
return self.variant
def getPrediction(self):
return self.prediction
def computeError(self):
if self.real_value != None:
return (abs(self.real_value - self.prediction)/self.real_value)*100
def setRealValue(self,_value):
self.real_value = _value
def match(self,features):
for key, _value in features.items():
if int(_value) != int(features[key]):
return False
return True
class Listener(object):
def __init__(self, conn,handler):
self.conn = conn
self.handler = handler
def on_error(self, headers, message):
print('received an error %s' % message)
def on_message(self, headers, message):
self.handler(message)
class Evaluation(Thread):
def __init__(self):
self.candidates = []
self.stop = False
self.subscriptions = []
self.max_candidates_size = 200
self.real_measurement = []
self.mean_squared_error_map = {}
self.evaluation_period = 60*10
self.last_evaluation = time.time()
self.tolerated_error = tolerated_error
self.readCandidatesFile()
super(Evaluation,self).__init__()
def createSubscription(self, application):
conn = stomp.Connection(host_and_ports = [(activemq_hostname, activemq_port)])
conn.connect(login=activemq_username,passcode=activemq_password)
data = {'request':'subscribe','application':application,'metrics':[],'queue': subscription_topic,'name': 'performance_model'}
conn.send(body=json.dumps(data), destination=persistence_storage_queue, persistent='false')
print("Subscription request sent for application {0}".format(application))
return True
def stopEvaluator(self):
self.stop = True
self.saveCandidates()
def handler(self, data):
try:
_json = json.loads(data)
if type(_json) == type([]):
for candidate in _json:
self.addCandidate(candidate['application'],candidate['target'],candidate['features'],candidate['prediction'], candidate['variant'])
print("{0} predictions have been added".format(len(_json)))
else:
if "metrics" in _json:
self.real_measurement.append(_json)
if time.time() - self.last_evaluation > self.evaluation_period:
self.evaluatePrecision()
self.last_evaluation = time.time()
except Exception as e:
print("An error occured while handling data from queue")
print(e)
def getFeaturesFromRealMeasurment(self,_json):
features = _json['metrics']
features.update(_json['labels'])
return features
def isClosed(self, _value1, _value2):
return abs(float(_value1) - float(_value2)) <= self.tolerated_error
def equalFeatues(self, real_features, prediction_features):
for key, value in prediction_features.items():
if not key in real_features:
return False
if not self.isClosed(real_features[key],value):
return False
return True
def computeDistance(self,real_feature, predict):
predict_feature = predict.getFeatures()
real_prediction = real_feature[predict.getTarget()]
prediction = predict.getPrediction()
f_sum = 0
for field, _value in real_feature.items():
if not field in predict_feature:
continue
if type(predict_feature[field]) == type(""):
continue
f_sum += (float(_value) - float(predict_feature[field]))**2
d_f = math.sqrt(f_sum)
d_precision = (abs(real_prediction - float(prediction))/real_prediction)*100
return (d_f,d_precision)
def selectByApplicationName(self,data,application, _type):
result = []
if _type == "real":
for real in self.real_measurement:
if real['labels']['application'] == application:
result.append(real)
else:
for pred in self.candidates:
if pred.getApplication() == application:
result.append(pred)
return result
def evaluatePrecision(self):
if len(self.real_measurement) == 0:
if len(self.candidates) > 0:
del self.subscriptions[:]
for candidate in self.candidates:
if not candidate.getApplication() in self.subscriptions:
self.createSubscription(candidate.getApplication())
self.subscriptions.append(candidate.getApplication())
self.saveCandidates()
print("No real data found")
return False
for application in self.subscriptions:
distance_map = {}
self.mean_squared_error_map[application] = []
list_real = self.selectByApplicationName(self.real_measurement,application,"real")
list_pred = self.selectByApplicationName(self.candidates,application,"predict")
for real in list_real:
real_features = self.getFeaturesFromRealMeasurment(real)
for predict in list_pred:
d_f, error = self.computeDistance(real_features,predict)
distance_map[d_f] = 100 - int(error)
distance_map = dict(sorted(distance_map.items()))
#select the 10
print("Best candidate")
k = list(distance_map.keys())[0]
print("Distance : {0}".format(k))
print("Precision in percentage : {0}%".format(distance_map[k]))
if k < prediction_precision:
#retrain request
features = list(list_pred[0].getFeatures().keys())
target = list_pred[0].getTarget()
variant = list_pred[0].getVariant()
application = list_pred[0].getApplication()
_post = {'url_file': "", 'application': application,'target':target,'features': features, 'variant': variant}
try:
response = requests.post(performance_model_train_url, data=json.dumps(_post),headers={'Content-Type':'application/json'})
except Exception as e:
print("An error occured while sending retrain request")
else:
del self.real_measurement[:]
del self.candidates[:]
def listen(self):
conn = None
status = False
try:
print('Subscribe to the topic {0}'.format(subscription_topic))
conn = stomp.Connection(host_and_ports = [(activemq_hostname, activemq_port)])
conn.connect(login=activemq_username,passcode=activemq_password)
conn.set_listener('', Listener(conn, self.handler))
conn.subscribe(destination=subscription_topic, id=1, ack='auto')
status = True
except Exception as e:
print("Could not subscribe")
print(e)
status = False
if not status:
time.sleep(10)
self.listen()
while not self.stop:
time.sleep(5)
conn.disconnect()
self.stop = True
def getStatus(self):
return not self.stop
def addCandidate(self,application, target, features, prediction, variant):
candidate = EvaluationCandidate(application,target,features,prediction,variant)
self.candidates.append(candidate)
if len(self.candidates) > self.max_candidates_size:
self.candidates.pop(0)
if not application in self.subscriptions:
self.createSubscription(application)
self.subscriptions.append(application)
self.saveCandidates()
def readCandidatesFile(self):
if path.exists(path_ml_model+"/subscriptions.obj"):
self.subscriptions = pickle.load(open(path_ml_model+"/subscriptions.obj", 'rb'))
for application in self.subscriptions:
self.createSubscription(application)
def saveCandidates(self):
pickle.dump(self.subscriptions, open(path_ml_model+"./subscription.obj","wb"))
print("Candidates and subscriptions struct saved")
def restart(self):
self.stopEvaluator()
print("Restart in 10s")
time.sleep(10)
self.readCandidatesFile()
self.run()
def run(self):
print("Evaluator started ...")
self.listen()
evaluation = Evaluation()
evaluation.start()
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
## Directory *experimentation/*
It's our playground where we can experiment with ML techonologies and techniques. The right place for our research efforts. It tends to be a messy procedure so we must follow some common rules/structure in order to limit the mess.
Here we will develop and test some of our novel ideas and solutions before we refactor and include some of them in the *ml_code/* directory.
### Structure, Rules & Comments
Structure:
* *code/* - Here we push our notebooks, python or r scripts. We should keep descriptive titles.
* *visualizations/* - Here we push our visualizations under the proper sub-directory with proper notes in a README.
Rules & Comments:
* We store datasets in the *data/* derictory of this repository.
* We try to use clear and discriptive titles and notes. Furthermore, we try to use clean code principles, a nice reference follows: [Clean Code concepts adapted for machine learning and data science](https://github.com/davified/clean-code-ml)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
## Directory *ml_code/*
It's an ultra clean & lean place where we maintain the absolutely necessary code for the runtime.
### Next steps & Comments
We must create some scripts for the following jobs:
* we should define them...
#!/bin/bash
python3 -m grpc_tools.protoc -I./src/protos --python_out=. --grpc_python_out=. ./src/protos/service.proto
sudo docker build -t jdtotow/performance_model -f ./deployment/Dockerfile .
sudo docker push jdtotow/performance_model
cd ../evaluator
sudo docker build -t jdtotow/evaluator .
sudo docker push jdtotow/evaluator
\ No newline at end of file
datasetlib @ c9c6d3c9
Subproject commit c9c6d3c954b57f9dd3b5109514bd033da00c95db
FROM tiangolo/uwsgi-nginx-flask:python3.8
RUN python3 -m pip install --upgrade pip
RUN apt update &&\
apt upgrade -y && \
apt install -y swig
COPY ./src/requirements.txt /
RUN pip install -r /requirements.txt
ENV NGINX_WORKER_PROCESSES auto
ENV NGINX_WORKER_CONNECTIONS 1024
ENV NGINX_WORKER_OPEN_FILES 1024
ENV LISTEN_PORT 8766
ADD ./datasetlib /app/datasetlib
RUN pip install /app/datasetlib
EXPOSE 8766:8766
EXPOSE 8767:8767
ENV UWSGI_CHEAPER 1
RUN mkdir -p /var/performance_model
RUN mkdir -p /var/performance_model/datasets &&\
mkdir -p /var/performance_model/models &&\
mkdir -p /var/performance_model/train &&\
mkdir -p /var/performance_model/logs &&\
mkdir -p /var/performance_model/db
COPY ./src /app
COPY ./deployment/uwsgi.ini /app
#COPY ./src/db/prediction.db /var/performance_model/db
RUN chown www-data:www-data /app/db/prediction.db
RUN chmod a+rw /app/db/ /app/db/*
RUN chown -R www-data:www-data /var/performance_model
\ No newline at end of file
#!/bin/bash
set -e
# if the running user is an Arbitrary User ID
if ! whoami &> /dev/null; then
# make sure we have read/write access to /etc/passwd
if [ -w /etc/passwd ]; then
# write a line in /etc/passwd for the Arbitrary User ID in the 'root' group
echo "${USER_NAME:-default}:x:$(id -u):0:${USER_NAME:-default} user:${HOME}:/sbin/nologin" >> /etc/passwd
fi
fi
if [ "$1" = 'supervisord' ]; then
exec /usr/bin/supervisord
fi
exec "$@"
pid /run/nginx.pid;
error_log /var/log/nginx/error.log;
events {
worker_connections 1024;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
sendfile on;
tcp_nopush on;
client_body_temp_path /spool/nginx/client_temp 1 2;
fastcgi_temp_path /spool/nginx/fastcgi_temp 1 2;
proxy_temp_path /spool/nginx/proxy_temp 1 2;
scgi_temp_path /spool/nginx/scgi_temp 1 2;
uwsgi_temp_path /spool/nginx/uwsgi_temp 1 2;
server {
listen 8766;
server_name localhost;
access_log /var/log/nginx/access.log;
location / {
try_files $uri @app;
}
location @app {
include uwsgi_params;
uwsgi_pass unix:/run/uwsgi.sock;
}
location /static {
alias /opt/repo/src/static;
expires 1d;
}
}
}
[unix_http_server]
file=/run/supervisor.sock
chmod=0770
[supervisord]
nodaemon=true
pidfile=/run/pid/supervisord.pid
logfile=/var/log/supervisor/supervisord.log
childlogdir=/var/log/supervisor
logfile_maxbytes=50MB
logfile_backups=1
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl]
serverurl=unix:///run/supervisor.sock
[program:nginx]
command=/usr/sbin/nginx -g "daemon off;" -c /etc/nginx/nginx.conf
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:uwsgi]
command=/usr/local/bin/uwsgi --ini /etc/uwsgi/apps-enabled/uwsgi.ini
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment