Commit 3d67c58c authored by Jean-Didier Totow's avatar Jean-Didier Totow
Browse files

auto builder

parent 9e3caa20
[{"configs": "docker"}, {"languages": ["python", "go", "java"]}]
\ No newline at end of file
...@@ -5,3 +5,5 @@ This is an experimental repository in which we try to predict the performance of ...@@ -5,3 +5,5 @@ This is an experimental repository in which we try to predict the performance of
![](images/Morphemic-Page-2.png) ![](images/Morphemic-Page-2.png)
The performance model image can be built using the script /ml_code/build-performance-model.sh
\ No newline at end of file
#!/bin/bash #!/bin/bash
python3 -m grpc_tools.protoc -I./src/protos --python_out=. --grpc_python_out=. ./src/protos/service.proto python3 -m grpc_tools.protoc -I./src/protos --python_out=. --grpc_python_out=. ./src/protos/service.proto
sudo docker build -t jdtotow/performance_model -f ./deployment/Dockerfile . sudo docker build -t jdtotow/performance_model -f ./deployment/Dockerfile .
sudo docker push jdtotow/performance_model sudo docker push jdtotow/performance_model
cd ../evaluator \ No newline at end of file
sudo docker build -t jdtotow/evaluator .
sudo docker push jdtotow/evaluator
\ No newline at end of file
FROM tiangolo/uwsgi-nginx-flask:python3.8 FROM python:3.7
RUN python3 -m pip install --upgrade pip RUN pip install --upgrade pip
RUN apt update &&\
apt upgrade -y && \
apt install -y swig
COPY ./src/requirements.txt / COPY ./src/requirements.txt /
RUN pip install -r /requirements.txt RUN pip install -r /requirements.txt
ENV NGINX_WORKER_PROCESSES auto
ENV NGINX_WORKER_CONNECTIONS 1024
ENV NGINX_WORKER_OPEN_FILES 1024
ENV LISTEN_PORT 8766
ADD ./datasetlib /app/datasetlib ADD ./datasetlib /app/datasetlib
RUN pip install /app/datasetlib RUN pip install /app/datasetlib
EXPOSE 8766:8766 EXPOSE 8766 8767
EXPOSE 8767:8767 RUN apt-get update
ENV UWSGI_CHEAPER 1
RUN mkdir -p /var/performance_model RUN mkdir -p /run/pid
RUN mkdir -p /var/performance_model/datasets &&\ RUN mkdir -p /var/log/supervisor
mkdir -p /var/performance_model/models &&\
mkdir -p /var/performance_model/train &&\
mkdir -p /var/performance_model/logs &&\
mkdir -p /var/performance_model/db
COPY ./src /app RUN apt-get install -y supervisor
COPY ./deployment/uwsgi.ini /app ADD ./src /app/
#COPY ./src/db/prediction.db /var/performance_model/db COPY ./deployment/supervisord.conf /etc/supervisor/conf.d/supervisord.conf
RUN chown www-data:www-data /app/db/prediction.db WORKDIR /app
RUN chmod a+rw /app/db/ /app/db/* CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]
RUN chown -R www-data:www-data /var/performance_model \ No newline at end of file
\ No newline at end of file
FROM tiangolo/uwsgi-nginx-flask:python3.8
RUN python3 -m pip install --upgrade pip
RUN apt update &&\
apt upgrade -y && \
apt install -y swig
COPY ./src/requirements.txt /
RUN pip install -r /requirements.txt
ENV NGINX_WORKER_PROCESSES auto
ENV NGINX_WORKER_CONNECTIONS 1024
ENV NGINX_WORKER_OPEN_FILES 1024
ENV LISTEN_PORT 8766
ADD ./datasetlib /app/datasetlib
RUN pip install /app/datasetlib
EXPOSE 8766 8767
ENV UWSGI_CHEAPER 1
RUN mkdir -p /var/performance_model
RUN mkdir -p /var/performance_model/datasets &&\
mkdir -p /var/performance_model/models &&\
mkdir -p /var/performance_model/train &&\
mkdir -p /var/performance_model/logs &&\
mkdir -p /var/performance_model/db
COPY ./src /app
COPY ./deployment/uwsgi.ini /app
#COPY ./src/db/prediction.db /var/performance_model/db
RUN chown www-data:www-data /app/db/prediction.db
RUN chmod a+rw /app/db/ /app/db/*
RUN chown -R www-data:www-data /var/performance_model
...@@ -16,15 +16,29 @@ supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface ...@@ -16,15 +16,29 @@ supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[supervisorctl] [supervisorctl]
serverurl=unix:///run/supervisor.sock serverurl=unix:///run/supervisor.sock
[program:nginx] [program:evaluator]
command=/usr/sbin/nginx -g "daemon off;" -c /etc/nginx/nginx.conf command=/app/start_evaluator.sh
stdout_logfile=/dev/stdout stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0 stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0 stderr_logfile_maxbytes=0
[program:uwsgi] [program:rpc_server]
command=/usr/local/bin/uwsgi --ini /etc/uwsgi/apps-enabled/uwsgi.ini command=/app/start_rpc.sh
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:db_checker]
command=/app/start_dbchecker.sh
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
[program:manager]
command=/app/start_manager.sh
stdout_logfile=/dev/stdout stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0 stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr stderr_logfile=/dev/stderr
......
...@@ -6,28 +6,32 @@ url = "http://localhost:8766" ...@@ -6,28 +6,32 @@ url = "http://localhost:8766"
url_file = '/home/jean-didier/Projects/morphemic/performance-model/ml_code/example/dataset.csv' url_file = '/home/jean-didier/Projects/morphemic/performance-model/ml_code/example/dataset.csv'
url_file_3 = '/home/jean-didier/Projects/morphemic/performance-model/ml_code/example/all.csv' url_file_3 = '/home/jean-didier/Projects/morphemic/performance-model/ml_code/example/all.csv'
url_file_4 = '/var/performance_model/datasets/all.csv'
#features = ['time','served_request','request_rate','response_time','performance','cpu_usage','memory'] #features = ['time','served_request','request_rate','response_time','performance','cpu_usage','memory']
#features_3 = ['number','served_request','request_rate','number_instances','response_time','performance','cpu_usage','cpu_alloc','memory','memory_alloc'] #features_3 = ['number','served_request','request_rate','number_instances','response_time','performance','cpu_usage','cpu_alloc','memory','memory_alloc']
features_2 = ['cpu_usage','memory','level','response_time','latency'] #features_2 = ['cpu_usage','memory','level','response_time','latency']
features_4 = ["EstimatedRemainingTimeContext","SimulationLeftNumber","SimulationElapsedTime","NotFinishedOnTime","MinimumCoresContext","NotFinished","WillFinishTooSoonContext","NotFinishedOnTimeContext","MinimumCores","ETPercentile","RemainingSimulationTimeMetric","TotalCores"]
#post_data = {'url_file': url_file, 'application': 'application-1','target':'performance','features': features} #post_data = {'url_file': url_file, 'application': 'application-1','target':'performance','features': features}
post_data_2 = {'url_file': "", 'application': 'fcr','target':'response_time','features': features_2} #post_data_2 = {'url_file': "", 'application': 'demo','target':'response_time','features': features_2, "variant": "vm"}
#post_data_3 = {'url_file': url_file_3, 'application': 'application-3','target':'performance','features': features_3} #post_data_3 = {'url_file': url_file_3, 'application': 'application-3','target':'performance','features': features_3}
post_data_4 = {'url_file': url_file_4, 'application': 'genome','target':'EstimatedRemainingTimeContext','features': features_4,'variant':'vm'}
#print("Get model") #print("Get model")
#response = requests.post(url+"/api/v1/model", data='{"application":"application-2"}', headers={'Content-Type':'application/json'}) #response = requests.post(url+"/api/v1/model", data='{"application":"application-2"}', headers={'Content-Type':'application/json'})
#print(response.text) #print(response.text)
#response = requests.post(url+"/api/v1/model/train", data=json.dumps(post_data_2),headers={'Content-Type':'application/json'}).text response = requests.post(url+"/api/v1/model/train", data=json.dumps(post_data_4),headers={'Content-Type':'application/json'}).text
#print("Training phase") print("Training phase")
#print(response) print(response)
#time.sleep(5) #time.sleep(5)
#prediction request #prediction request
#features = {'cpu_alloc': 1 ,'memory_alloc': 64,'number_instances':4, "memory": 51086677.3333} #features = {'cpu_alloc': 1 ,'memory_alloc': 64,'number_instances':4, "memory": 51086677.3333}
features = {'cpu_usage': 31, "memory": 4500.23, 'latency': 2.1, 'level': 1} #features = {'cpu_usage': 31, "memory": 4500.23, 'latency': 2.1, 'level': 1}
post_data = {'application': 'fcr','target':'response_time','features':features} #post_data = {'application': 'fcr','target':'response_time','features':features}
response = requests.post(url+"/api/v1/model/predict", data=json.dumps(post_data),headers={'Content-Type':'application/json'}).text #response = requests.post(url+"/api/v1/model/predict", data=json.dumps(post_data),headers={'Content-Type':'application/json'}).text
print(response) #print(response)
\ No newline at end of file \ No newline at end of file
This diff is collapsed.
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
import service_pb2 as service__pb2
class PredictStub(object):
# missing associated documentation comment in .proto file
pass
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.PredictPerformance = channel.unary_unary(
'/proto.Predict/PredictPerformance',
request_serializer=service__pb2.PredictRequest.SerializeToString,
response_deserializer=service__pb2.PredictReply.FromString,
)
self.getModel = channel.unary_unary(
'/proto.Predict/getModel',
request_serializer=service__pb2.ModelRequest.SerializeToString,
response_deserializer=service__pb2.ModelReply.FromString,
)
self.trainModel = channel.unary_unary(
'/proto.Predict/trainModel',
request_serializer=service__pb2.TrainRequest.SerializeToString,
response_deserializer=service__pb2.TrainReply.FromString,
)
class PredictServicer(object):
# missing associated documentation comment in .proto file
pass
def PredictPerformance(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def getModel(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def trainModel(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_PredictServicer_to_server(servicer, server):
rpc_method_handlers = {
'PredictPerformance': grpc.unary_unary_rpc_method_handler(
servicer.PredictPerformance,
request_deserializer=service__pb2.PredictRequest.FromString,
response_serializer=service__pb2.PredictReply.SerializeToString,
),
'getModel': grpc.unary_unary_rpc_method_handler(
servicer.getModel,
request_deserializer=service__pb2.ModelRequest.FromString,
response_serializer=service__pb2.ModelReply.SerializeToString,
),
'trainModel': grpc.unary_unary_rpc_method_handler(
servicer.trainModel,
request_deserializer=service__pb2.TrainRequest.FromString,
response_serializer=service__pb2.TrainReply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'proto.Predict', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
import time, os, sqlite3, stomp, json
db_last_evaluation = time.time()
db_evaluation_period = 60*2 #2 minutes
subscription_topic = 'performance_model_evaluator'
activemq_username = os.getenv("ACTIVEMQ_USER","aaa")
activemq_password = os.getenv("ACTIVEMQ_PASSWORD","111")
activemq_hostname = os.getenv("ACTIVEMQ_HOST","localhost")
activemq_port = int(os.getenv("ACTIVEMQ_PORT","61613"))
local_database_path = os.environ.get("LOCAL_DATABASE_PATH","./db/")
def DBEvaluationRoutine():
print("DB Evaluator routine started")
global db_last_evaluation
while True:
if time.time() - db_last_evaluation >= db_evaluation_period:
try:
conn = sqlite3.connect(local_database_path + "prediction.db")
cursor = conn.cursor()
data_to_send = []
for row in cursor.execute("SELECT * FROM Prediction"):
_json = {'application': row[1], 'target': row[2], 'features': json.loads(row[4]), 'prediction': row[3], 'variant': row[4]}
data_to_send.append(_json)
conn.close()
if data_to_send != []:
conn = stomp.Connection(host_and_ports = [(activemq_hostname, activemq_port)])
conn.connect(login=activemq_username,passcode=activemq_password)
time.sleep(2)
conn.send(body=json.dumps(data_to_send), destination=subscription_topic, persistent='false')
conn.disconnect()
print("Messages pushed to activemq")
print("Removing message to Local DB")
conn = sqlite3.connect(local_database_path + "prediction.db")
cursor = conn.cursor()
cursor.execute("DELETE FROM Prediction")
conn.commit()
conn.close()
else:
print("Nothing found")
except Exception as e:
print("An error occured")
print(e)
db_last_evaluation = time.time()
time.sleep(5)
print("DB Evaluation stopped")
DBEvaluationRoutine()
\ No newline at end of file
import os, time, stomp, pickle, requests, json, math
from os import path
from threading import Thread
path_ml_model = os.environ.get("MLMODELPATH",".")
#////////////////////////////////////////////////////////////////////////////
activemq_username = os.getenv("ACTIVEMQ_USER","aaa")
activemq_password = os.getenv("ACTIVEMQ_PASSWORD","111")
activemq_hostname = os.getenv("ACTIVEMQ_HOST","localhost")
activemq_port = int(os.getenv("ACTIVEMQ_PORT","61613"))
persistence_storage_queue = "/queue/persistent_storage"
subscription_topic = 'performance_model_evaluator'
ps_management_queue = os.environ.get("PS_MANAGEMENT_QUEUE","persistent_storage")
#/////////////////////////////////////////////////////////////////////////////
tolerated_error = float(os.environ.get("TOLERATED_COMPARISON_ERROR","5"))
prediction_precision = int(os.environ.get("PREDICTION_PRECISION","90")) #90%
#/////////////////////////////////////////////////////////////////////////////
performance_model_train_url = os.environ.get("PERFORMANCE_MODEL_URL","http://localhost:8766/api/v1/train")
class EvaluationCandidate():
def __init__(self, application, target, features, prediction,variant):
self.application = application
self.target = target
self.features = features
self.prediction = prediction
self.variant = variant
self.real_value = None
self.time = time.time()
def getApplication(self):
return self.application
def getTarget(self):
return self.target
def getFeatures(self):
return self.features
def getVariant(self):
return self.variant
def getPrediction(self):
return self.prediction
def computeError(self):
if self.real_value != None:
return (abs(self.real_value - self.prediction)/self.real_value)*100
def setRealValue(self,_value):
self.real_value = _value
def match(self,features):
for key, _value in features.items():
if int(_value) != int(features[key]):
return False
return True
class Listener(object):
def __init__(self, conn,handler):
self.conn = conn
self.handler = handler
def on_error(self, headers, message):
print('received an error %s' % message)
def on_message(self, headers, message):
self.handler(message)
class Evaluation(Thread):
def __init__(self):
self.candidates = []
self.stop = False
self.subscriptions = []
self.max_candidates_size = 200
self.real_measurement = []
self.mean_squared_error_map = {}
self.evaluation_period = 60*10
self.last_evaluation = time.time()
self.tolerated_error = tolerated_error
self.readCandidatesFile()
super(Evaluation,self).__init__()
def createSubscription(self, application):
conn = stomp.Connection(host_and_ports = [(activemq_hostname, activemq_port)])
conn.connect(login=activemq_username,passcode=activemq_password)
data = {'request':'subscribe','application':application,'metrics':[],'queue': subscription_topic,'name': 'performance_model'}
conn.send(body=json.dumps(data), destination=persistence_storage_queue, persistent='false')
print("Subscription request sent for application {0}".format(application))
return True
def stopEvaluator(self):
self.stop = True
self.saveCandidates()
def handler(self, data):
try:
_json = json.loads(data)
if type(_json) == type([]):
for candidate in _json:
self.addCandidate(candidate['application'],candidate['target'],candidate['features'],candidate['prediction'], candidate['variant'])
print("{0} predictions have been added".format(len(_json)))
else:
if "metrics" in _json:
self.real_measurement.append(_json)
if time.time() - self.last_evaluation > self.evaluation_period:
self.evaluatePrecision()
self.last_evaluation = time.time()
except Exception as e:
print("An error occured while handling data from queue")
print(e)
def getFeaturesFromRealMeasurment(self,_json):
features = _json['metrics']
features.update(_json['labels'])
return features
def isClosed(self, _value1, _value2):
return abs(float(_value1) - float(_value2)) <= self.tolerated_error
def equalFeatues(self, real_features, prediction_features):
for key, value in prediction_features.items():
if not key in real_features:
return False
if not self.isClosed(real_features[key],value):
return False
return True
def computeDistance(self,real_feature, predict):
predict_feature = predict.getFeatures()
real_prediction = real_feature[predict.getTarget()]
prediction = predict.getPrediction()
f_sum = 0
for field, _value in real_feature.items():
if not field in predict_feature:
continue
if type(predict_feature[field]) == type(""):
continue
f_sum += (float(_value) - float(predict_feature[field]))**2
d_f = math.sqrt(f_sum)
d_precision = (abs(real_prediction - float(prediction))/real_prediction)*100
return (d_f,d_precision)
def selectByApplicationName(self,data,application, _type):
result = []
if _type == "real":
for real in self.real_measurement:
if real['labels']['application'] == application:
result.append(real)
else:
for pred in self.candidates:
if pred.getApplication() == application:
result.append(pred)
return result
def evaluatePrecision(self):
if len(self.real_measurement) == 0:
if len(self.candidates) > 0:
del self.subscriptions[:]
for candidate in self.candidates:
if not candidate.getApplication() in self.subscriptions:
self.createSubscription(candidate.getApplication())
self.subscriptions.append(candidate.getApplication())
self.saveCandidates()
print("No real data found")
return False
for application in self.subscriptions:
distance_map = {}
self.mean_squared_error_map[application] = []
list_real = self.selectByApplicationName(self.real_measurement,application,"real")
list_pred = self.selectByApplicationName(self.candidates,application,"predict")
for real in list_real:
real_features = self.getFeaturesFromRealMeasurment(real)
for predict in list_pred:
d_f, error = self.computeDistance(real_features,predict)
distance_map[d_f] = 100 - int(error)
distance_map = dict(sorted(distance_map.items()))
#select the 10
print("Best candidate")
k = list(distance_map.keys())[0]
print("Distance : {0}".format(k))
print("Precision in percentage : {0}%".format(distance_map[k]))
if k < prediction_precision:
#retrain request
features = list(list_pred[0].getFeatures().keys())
target = list_pred[0].getTarget()
variant = list_pred[0].getVariant()
application = list_pred[0].getApplication()
_post = {'url_file': "", 'application': application,'target':target,'features': features, 'variant': variant}
try:
response = requests.post(performance_model_train_url, data=json.dumps(_post),headers={'Content-Type':'application/json'})
except Exception as e:
print("An error occured while sending retrain request")
else:
del self.real_measurement[:]
del self.candidates[:]
def listen(self):
conn = None
status = False
while not status:
try:
print('Subscribe to the topic {0}'.format(subscription_topic))
conn = stomp.Connection(host_and_ports = [(activemq_hostname, activemq_port)])
conn.connect(login=activemq_username,passcode=activemq_password)
conn.set_listener('', Listener(conn, self.handler))
conn.subscribe(destination=subscription_topic, id=1, ack='auto')
status = True
except Exception as e:
print("Could not subscribe")
print(e)
status = False
time.sleep(30)
if not status:
time.sleep(10)
self.listen()
while not self.stop:
time.sleep(5)
conn.disconnect()
self.stop = True
def getStatus(self):
return not self.stop
def addCandidate(self,application, target, features, prediction, variant):
candidate = EvaluationCandidate(application,target,features,prediction,variant)
self.candidates.append(candidate)
if len(self.candidates) > self.max_candidates_size:
self.candidates.pop(0)
if not application in self.subscriptions:
self.createSubscription(application)