Commit b19b0dc3 authored by Fotis Paraskevopoulos's avatar Fotis Paraskevopoulos
Browse files

refractored during hackathon

parent 9b9418d2
import messaging
import morphemic
import morphemic.dataset
import os
import datetime
import logging
import math
import os
import signal
import time
import socket
import datetime, pytz, time
import math
import time
import messaging
import morphemic
import morphemic.dataset
_logger = logging.getLogger(__name__)
_logger.setLevel(level=logging.INFO)
class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListener, morphemic.scheduler.Handler):
class ESHybrid(morphemic.model_manager.ModelHandler, messaging.listener.MorphemicListener, morphemic.scheduler.Handler):
metrics= set()
forecast_config = dict()
......@@ -32,7 +37,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
timeout=6000,
keepalive=True
)
self.model = morphemic.model.Model(config['persistence']['application'],self)
self.model = morphemic.model_manager.ModelManager(config['persistence']['application'],self)
self.application = config['persistence']['application']
self.data_set_path =config['persistence']['path_dataset']
os.makedirs(self.data_set_path, exist_ok=True)
......@@ -68,20 +73,20 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
with socket.create_connection((host, port), timeout=timeout):
break
except OSError as ex:
logging.debug("Failed to connect to %s:%s waiting for 5 seconds " % (host,port))
_logger.error("Failed to connect to %s:%s waiting for 5 seconds " % (host,port))
retries = retries-1
time.sleep(5.00)
if retries <=0:
logging.error("Failed to connect aborting")
_logger.error("Failed to connect aborting")
return retries >0
def run(self):
logging.debug("setting up")
_logger.debug("setting up")
if not self.wait_for_port(self.connector.hosts[0][1],self.connector.hosts[0][0]):
logging.debug("couldn't connect to host")
_logger.warning("couldn't connect to host")
return False
self.connector.set_listener(self.id, self)
......@@ -104,17 +109,17 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
def signal_handler(self, signum, frame):
logging.debug("SIGHUP")
_logger.debug("SIGHUP")
self.stop()
def stop(self):
logging.debug("Stopping...")
_logger.debug("Stopping...")
self._run = False
def start(self):
logging.debug("Starting ESHybrid")
_logger.info("Starting ESHYBRID")
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
......@@ -122,7 +127,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
self._run = self.run()
if self._run:
logging.info("ESHYBRID_STARTED")
_logger.info("ESHYBRID_STARTED")
while self._run:
time.sleep(1)
......@@ -131,19 +136,19 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
continue
if self.state=='pending_metrics':
logging.info("Waiting for metrics_to_predict")
_logger.debug("Waiting for metrics_to_predict")
continue
if self.state=='pending_forecast':
logging.info("Waiting for start_forecasting")
_logger.debug("Waiting for start_forecasting")
continue
self.connector.disconnect()
def on_schedule(self, times):
for m in self.metrics:
for m in self.metrics:
times_in_utc = [
int(datetime.datetime.utcfromtimestamp(x).strftime('%s')) for x in times
]
......@@ -152,12 +157,14 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
m,
times_in_utc
)
predictions = predictions.dropna()
if predictions.empty:
logging.debug("No prediction available yet for metric[%s]",m)
_logger.warning("No prediction available yet for metric[%s] and ds=",(m,times_in_utc))
continue
logging.debug("Got metric[%s] predictions",(m))
predictions.head()
_logger.debug("Got metric[%s] predictions \n %s" % (m, predictions.head()))
for index,row in predictions.iterrows():
t = row['ds']
......@@ -176,7 +183,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
"cloud":"todo",
"provider":"todo"
}
logging.debug("Sending prediction for time %s => %s " % (t, payload) )
_logger.info("Sending prediction for time %s => %s " % (t, payload) )
self.connector.send_to_topic(
"intermediate_prediction.eshybrid.%s" % m,
payload
......@@ -186,14 +193,23 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
self._train_model()
def _train_model(self):
if not self.model.is_idle():
_logger.warning("Model currently training")
return
# ~/melodic-tests/forecastingModule/morphemic/influxHandlingScripts$
# ./downloadMetricFromInflux.sh [metric] morphemic
# ./downloadMetricFromInflux.sh default_application morphemic
# downloaded/morphemic/default_application.csv
dataset_results = self.dataset.make()
if not dataset_results.get('status',False):
logging.error("**** NO DATA FROM DATASET MAKER ****")
_logger.error("**** NO DATA FROM DATASET MAKER ****")
self._force_train = True
return
self._force_train = False
self.model.train(dataset_results.get('url'),self.metrics)
self.model.train(dataset_results.get('url'),self.metrics,output_size=self.scheduler.compute_output_size())
def on_train(self,model):
self._last_training_time = time.time()
......@@ -208,28 +224,28 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
def on_metrics_to_predict(self,res):
logging.debug("[2] Metrics to predict %s " % res)
_logger.debug("[2] Metrics to predict %s " % res)
if self.state != 'pending_metrics':
logging.warning("[2] We already have metrics >> %s " % self.metrics)
_logger.warning("[2] We already have metrics >> %s " % self.metrics)
return
for metric in self.metrics:
logging.debug("Un-subscribing from %s " % metric)
_logger.debug("Un-subscribing from %s " % metric)
self.metrics.remove(metric)
self.metrics = [x['metric'] for x in res]
self.metrics = ['MinimumCoresContext'] # [x['metric'] for x in res]
self.state = 'pending_forecast'
def on_stop_forecasting_eshybrid(self,res):
logging.debug("[6] Stop Subscribing %s " % res)
_logger.debug("[6] Stop Subscribing %s " % res)
if self.state !='forecasting':
return
for metric in res[messaging.events.StopForecasting.METRICS]:
if metric in self.metrics:
logging.debug("Un-subscribing from %s " % metric)
_logger.debug("Un-subscribing from %s " % metric)
self.connector.unsubscribe(metric,self.id)
self.metrics.remove(metric)
......@@ -240,38 +256,38 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
def on_start_forecasting_eshybrid(self,res):
logging.debug("[7] Start Forecasting %s " % res)
_logger.debug("[7] Start Forecasting %s " % res)
if self.state == 'forecasting':
logging.warning("Already forecasting")
_logger.warning("Already forecasting")
return
self.forecasting_version = res[messaging.events.StartForecasting.VERSION]
if not self.metrics:
logging.error("Start forecasting before metrics to predict ")
_logger.error("Start forecasting before metrics to predict ")
return
for metric in res[messaging.events.StartForecasting.METRICS]:
if metric not in self.metrics:
logging.debug("Subscribing to %s " % metric)
_logger.debug("Subscribing to %s " % metric)
self.topic(metric)
self._train_model()
self.scheduler = morphemic.scheduler.Scheduler(
epoch_start=res[messaging.events.StartForecasting.EPOCH_START],
forward_predictons= res[messaging.events.StartForecasting.NUMBER_OF_FORWARD_PREDICTIONS],
horizon= res[messaging.events.StartForecasting.PREDICTION_HORIZON]
)
self._train_model()
self.state='forecasting'
def on_error(self, headers, body):
logging.error("Headers %s",headers)
logging.error(" %s",body)
_logger.error("Headers %s",headers)
_logger.error(" %s",body)
def on_disconnected(self):
logging.error('disconnected')
_logger.error('disconnected')
if self._run:
self.reconnect()
......
import logging
import configparser
import os
import argparse
import logging
from forecasting import eshybrid
logging.basicConfig(level=logging.ERROR)
def main():
parser = argparse.ArgumentParser(description='Run eshybrid forecaster')
......
from . import model
from . import uuid_model
from . import model_manager
from . import configuration
from . import handler
from . import data
from . import scheduler
......@@ -31,7 +31,7 @@ def get_config(dataset_name):
'level_variability_penalty': 30,
'testing_percentile': 40,
'training_percentile': 60,
'ensemble': False
'ensemble': True
},
'data_parameters': {
'max_periods': 120,
......
import time
import pandas as pd
import numpy as np
import logging
from datetime import datetime
_logger = logging.getLogger(__name__)
_logger.setLevel(level=logging.DEBUG)
class Satinizer:
def __init__(self, application):
self._application = application
def to_train(self, metric, path):
df = pd.read_csv(path)
df = df[['ems_time', metric]]
df = df.replace('None', np.nan)
df.dropna(inplace=True)
df.reset_index(drop=True, inplace=True)
df.rename(columns={'ems_time': 'ds', metric: 'y'}, inplace=True)
t=time.time()
first_ds = df[0:].ds.values[0]
last_ds = df[-1:].ds.values[0]
output_size = int(t-last_ds)
df['ds'] = pd.to_datetime(df['ds'], unit='s')
df['y'] = df['y'].astype(float)
df = df.set_index('ds').resample('1S').asfreq()
df = df.shift(periods=output_size,freq='1S')
df = df.interpolate(method='linear')
df.reset_index(level=0, inplace=True)
train_y = df.copy()
train_y.insert(0, column='unique_id', value=self._application)
train_x = train_y[['unique_id', 'ds']]
train_x['x'] = metric
_logger.info("Data to train for metric %s [%s - %s] required output %s" % (metric, first_ds,last_ds,output_size))
return train_x, train_y, first_ds, last_ds
def to_predict(self, metric, times):
m_pd = pd.DataFrame(data=[datetime.fromtimestamp(x) for x in times], columns=['ds'])
m_pd.insert(1, 'unique_id', self._application)
m_pd.insert(2, 'x', metric)
return m_pd
class ModelHandler:
def on_train(self,model):
pass
import logging
import threading
import time
import uuid
import pandas as pd
from . import configuration
from . import data
from .uuid_model import UUIDModel
_logger = logging.getLogger(__name__)
_logger.setLevel(level=logging.DEBUG)
class ModelHandler:
def on_train(self,model):
pass
class ModelStatus(enumerate):
IDLE = "IDLE"
TRAINNING = "TRAINNING"
TRAINED = "TRAINED"
PREDICTING = "PREDICTING"
lock = threading.Lock()
class ModelManager:
status = ModelStatus.IDLE
model_by_metric = {}
def __init__(self, application, handler=ModelHandler()) -> None:
self._handler = handler
# integrated here
self._model = None
self._dataHandler = data.Satinizer(application)
def is_idle(self):
return self.status == ModelStatus.IDLE
def train(self, dataset_path, metrics, output_size=60):
_logger.info("Start training for %s in %s " % (metrics,dataset_path,))
self._output_size=output_size
if self.status == ModelStatus.IDLE:
t = threading.Thread(target=self._retrain,
args=(metrics, output_size, dataset_path))
t.start()
else:
logging.warning("Already Training")
def _new_model(self, **kwargs) -> UUIDModel:
_logger.debug("Training new model")
config = configuration.get_config('Morphemic')
if kwargs:
for k in kwargs:
if k in config:
config.update(kwargs[k])
model = UUIDModel(uuid.uuid4(), config)
return model
def _retrain(self, metrics, output_size, path):
with lock:
self.status = ModelStatus.TRAINNING
while self.status != ModelStatus.TRAINED:
_logger.debug("Starting training model")
model = self._new_model(data_paremeters = {
'output_size': output_size
})
for m in metrics:
try:
self._retrain_for_metric(model, m,path)
_logger.debug("Model training successful for %s ",m)
except ValueError as e:
_logger.error("Not enough data for metric %s - not training \n\n%s", (m,e))
self._model = model
_logger.debug("Updating trained model")
self.status = ModelStatus.TRAINED
if self._handler:
self._handler.on_train(self)
_logger.info("Waiting for next training loop")
time.sleep(15)
self.status = ModelStatus.IDLE
def _retrain_for_metric(self,model,metric,path):
try:
args = self._dataHandler.to_train(metric, path)
_logger.info("Retraining for %s - %s rows = outputsize %s " % (metric, args[0].shape[0], args[3]))
model.model_for_metric(metric).fit(args[0], args[1], verbose=True)
except ValueError as e:
_logger.error("Couldn't prepare data for metric %s\n%s" % (metric,e))
def predict(self, metric, times):
_logger.debug("Request prediction for %s @ %s " % (metric,times,))
ret = pd.DataFrame()
if not self._model:
_logger.warning("No model trained yet")
return pd.DataFrame()
try:
df = self._model.model_for_metric(metric).predict(
self._dataHandler.to_predict(metric,times)
)
ret = df
except AssertionError as e :
_logger.error("Model not fitted yet \n\n%s\n\n ",e)
except ValueError as e:
_logger.error("Couldn't fit predictions for %s \n\n%s\n\n ",(metric,e))
except :
_logger.error("Unknown error for predictions for %s",(metric))
return ret
......@@ -3,7 +3,10 @@ import datetime
import logging
import time
logging.basicConfig(level=logging.DEBUG)
_logger = logging.getLogger(__name__)
_logger.setLevel(level=logging.INFO)
class Scheduler:
......@@ -19,7 +22,7 @@ class Scheduler:
self._forward_predictions = forward_predictons
self._horizon = horizon
self._next_time = self.compute_next()
logging.debug(
_logger.debug(
"""
Epoch: %s
Horizon: %s
......@@ -44,14 +47,17 @@ class Scheduler:
step = int( (t-self._epoch_start) / self._horizon)
return self._epoch_start + int((step+1) * self._horizon)
def compute_output_size(self):
return self._forward_predictions*self._horizon
def check(self, handler):
t = int(time.time())
logging.debug("Checking t = %s(%s) > next_time %s(%s) " % (datetime.datetime.fromtimestamp(t), t, datetime.datetime.fromtimestamp(self._next_time), self._next_time))
# logging.debug("Checking t = %s(%s) > next_time %s(%s) " % (datetime.datetime.fromtimestamp(t), t, datetime.datetime.fromtimestamp(self._next_time), self._next_time))
times = []
if t > self._next_time:
for i in range(0, self._forward_predictions):
_t = self._next_time + ( i * self._horizon)
logging.info(" step-t%s %s %s ", i, _t, datetime.datetime.fromtimestamp(_t ))
_logger.info(" step-t%s %s %s ", i, _t, datetime.datetime.fromtimestamp(_t ))
times.append(_t)
self._next_time = self.compute_next()
......
import logging
import threading
import time
import uuid
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from ESRNN import ESRNN
from . import configuration
_logger = logging.getLogger(__name__)
class UUIDModel:
esrnn = False
config = {}
model_by_metric = {}
def __init__(self, id, config) -> None:
self.id = id
......@@ -55,131 +44,3 @@ class UUIDModel:
device='cpu')
return self.model_by_metric[metric]
class ModelStatus(enumerate):
IDLE = "IDLE"
TRAINNING = "TRAINNING"
TRAINED = "TRAINED"
PREDICTING = "PREDICTING"
class DataHandler:
def __init__(self, application):
self._application = application
def to_train(self, metric, path):
df = pd.read_csv(path)
df = df[['ems_time', metric]]
df = df.replace('None', np.nan)
df.dropna(inplace=True)
df.reset_index(drop=True, inplace=True)
df.rename(columns={'ems_time': 'ds', metric: 'y'}, inplace=True)
df['ds'] = pd.to_datetime(df['ds'], unit='s')
df['y'] = df['y'].astype(float)
df = df.set_index('ds').resample('1S').asfreq()
df = df.interpolate(method='linear')
df.reset_index(level=0, inplace=True)
train_y = df.copy()
train_y.insert(0, column='unique_id', value=self._application)
train_x = train_y[['unique_id', 'ds']]
train_x['x'] = metric
return train_x, train_y
def to_predict(self, metric, times):
m_pd = pd.DataFrame(data=[datetime.fromtimestamp(x) for x in times], columns=['ds'])
m_pd.insert(1, 'unique_id', self._application)
m_pd.insert(2, 'x', metric)
return m_pd
lock = threading.Lock()
class Model: