Commit 4cb7ef9e authored by maciek riedl's avatar maciek riedl
Browse files

Merge branch 'iccs-eshybrid-2.0' into 'morphemic-rc2.0'

Iccs eshybrid 2.0

See merge request !291
parents e5b3f5ba 8dd95e99
Pipeline #20885 failed with stages
in 29 minutes and 14 seconds
......@@ -2,3 +2,5 @@
sync.cfg.local
sync.cfg.production
sync.cfg.docker
sync.cfg.persistance
publisher.py
......@@ -7,6 +7,7 @@ import signal
import time
import socket
class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListener, morphemic.scheduler.Handler):
metrics= set()
......@@ -19,6 +20,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
def __init__(self,config):
self._run=False
self._force_train = False
self._interval_count =1
self.id = (config['listener'] or {'id':'eshybrid'} )['id']
self.connector = messaging.morphemic.Connection(
......@@ -50,7 +52,6 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
)
def wait_for_port(self, port, host='localhost', retries=10000000, timeout=5):
"""Wait until a port starts accepting TCP connections.
Args:
......@@ -113,17 +114,17 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
def start(self):
logging.debug("Starting ESHybrid")
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGHUP, self.signal_handler)
self._run = self.run()
if self._run:
logging.info("ESHYBRID_STARTED")
while self._run:
time.sleep(1)
if self.state=='forecasting':
self.scheduler.check(self)
continue
......@@ -136,7 +137,6 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
logging.info("Waiting for start_forecasting")
continue
self.connector.disconnect()
......@@ -146,10 +146,11 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
m,
times
)
if predictions == None or not len(predictions):
if predictions.empty:
logging.debug("No prediction available yet for metric[%s]",m)
continue
logging.debug("Got metric[%s] predictions %s",(m,predictions))
for index,row in predictions.iterrows():
t = row['ds']
payload = {
......@@ -157,31 +158,30 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
"timestamp": int(time.time()),
"probability": 0.98,
"confidence_interval": [float(8),float(15)],
"predictionTime": t,
"predictionTime": t.strftime('%s'),
}
logging.debug("Sending prediction for time %s => %s " % (t, payload) )
self.connector.send_to_topic(
"intermediate_prediction.eshybrid.%s" % m,
payload
)
#adding simple method to retrain the model
if self._last_training_time and time.time() - self._last_training_time > 3000:
if self._force_train or ( self._last_training_time and time.time() - self._last_training_time > 3000):
self._train_model()
def _train_model(self):
self.dataset.make()
if not os.path.exists("%s/%s.csv" % (self.data_set_path,self.application)):
dataset_results = self.dataset.make()
if not dataset_results.get('status',False):
logging.error("**** NO DATA FROM DATASET MAKER ****")
self._force_train = True
return
self.model.train("%s/%s.csv" % (self.data_set_path,self.application),self.metrics)
self._last_training_time = time.time()
self._force_train = False
self.model.train(dataset_results.get('url'),self.metrics)
def on_train(self,model):
self._last_training_time = time.time()
self._force_train = False
self.connector.send_to_topic("training_models",
{
"metrics": self.metrics,
......@@ -189,6 +189,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
"timestamp": int(time.time() * 1000)
})
def on_metrics_to_predict(self,res):
logging.debug("[2] Metrics to predict %s " % res)
......
import logging
import threading
import time
import uuid
from datetime import datetime, timedelta
import pandas as pd
......@@ -58,6 +59,7 @@ class UUIDModel:
class ModelStatus(enumerate):
IDLE = "IDLE"
TRAINNING = "TRAINNING"
TRAINED = "TRAINED"
PREDICTING = "PREDICTING"
......@@ -126,34 +128,39 @@ class Model:
args = self._dataHandler.to_train(metric, path)
_logger.info("Retraining for %s - %s rows " % (metric, args[0].shape[0]))
model.model_for_metric(metric).fit(args[0], args[1], verbose=False)
model.model_for_metric(metric).fit(args[0], args[1], verbose=True)
def _retrain(self, metrics, path):
with lock:
self.status = ModelStatus.TRAINNING
while self.status != ModelStatus.TRAINED:
try:
_logger.debug("Starting training model")
_logger.debug("Starting training model")
model = self._new_model()
model = self._new_model()
for m in metrics:
self._retrain_for_metric(model, m,path)
_logger.debug("Model training succesful for %s ",m)
for m in metrics:
self._retrain_for_metric(model, m,path)
_logger.debug("Model training finished")
self._model = model
self._model = model
_logger.debug("set new model")
self.status = ModelStatus.TRAINED
if self._handler:
self._handler.on_train(self)
except:
_logger.error("Not enough data - not training")
_logger.info("Waiting for next training loop")
time.sleep(30)
_logger.debug("set new model")
if self._handler:
self._handler.on_train(self)
self.status = ModelStatus.IDLE
def train(self, dataset_path, metrics):
_logger.info("Start training for %s in %s " % (metrics,dataset_path,))
if self.status == ModelStatus.IDLE:
t = threading.Thread(target=self._retrain,
args=(metrics, dataset_path))
......@@ -166,7 +173,7 @@ class Model:
_logger.debug("Request prediction for %s @ %s " % (metric,times,))
if not self._model:
_logger.error("No model trained yet")
return
return pd.DataFrame()
ret = self._model.model_for_metric(metric).predict(
self._dataHandler.to_predict(metric,times)
......
import time
import logging
import messaging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
import messaging
connector = messaging.morphemic.Connection('aaa', '111',host="localhost",port=61613)
connector.connect()
while True:
connector.send_to_topic("metrics_to_predict", [{'metric':'latency'},{'metric':'response_time'}] )
time.sleep(5)
connector = messaging.morphemic.Connection('aaa', '111',host="localhost",port=61610)
connector.connect()
connector.send_to_topic("start_forecasting.eshybrid",{'version':1, 'metrics': ['latency','response_time'], 'timestamp': 1638866003, 'epoch_start': 1638866228, 'number_of_forward_predictions': 8, 'prediction_horizon': 30} )
time.sleep(30)
connector.send_to_topic("start_forecasting.gluonmachines",{'metrics': ['ETPercentile_Ctx','SimulationElapsedTime_Ctx','SimulationLeftNumber_Ctx','WillFinishTooSoonContext','NotFinishedOnTimeContext'], 'timestamp': 1638866003, 'epoch_start': 1638866228, 'number_of_forward_predictions': 8, 'prediction_horizon': 30} )
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment