Commit 729e2d14 authored by Marta Różańska's avatar Marta Różańska
Browse files

Merge branch 'iccs-eshybrid' into 'morphemic-rc1.5'

Iccs eshybrid

See merge request !151
parents ca060769 d48c3008
/local
\ No newline at end of file
/local
sync.cfg
sync.cfg.local
sync.cfg.production
\ No newline at end of file
import messaging
import morphemic
import morphemic.dataset
import os
import time
import logging
import signal
import datetime
class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListener, morphemic.scheduler.Handler):
id="eshybrid"
metrics= set()
forecast_config = dict()
scheduler = False
application ='default_application'
def __init__(self):
def __init__(self,config):
self._run=False
self.connector = messaging.morphemic.Connection('admin', 'admin')
self.id = (config['listener'] or {'id':'eshybrid'} )['id']
self.connector = messaging.morphemic.Connection(
config['messaging']['username'],
config['messaging']['password'],
host=config['messaging']['host'],
port=config['messaging']['port']
)
self.model = morphemic.model.Model(self)
self.application = config['persistence']['application']
os.makedirs(config['persistence']['path_dataset'], exist_ok=True)
influx = {'hostname': config['persistence']['host'],
'port': config['persistence']['port'],
'username': config['persistence']['username'],
'password': config['persistence']['password'],
'dbname': config['persistence']['dbname'],
'path_dataset': config['persistence']['path_dataset']
}
self.dataset = morphemic.dataset.DatasetMaker(
application=self.application,
start=None,
configs=influx
)
def run(self):
logging.debug("setting up")
self.connector.set_listener(self.id, self)
self.connector.connect()
self.connector.topic("start_forecasting.es_hybrid", self.id)
self.connector.topic("stop_forecasting.es_hybrid", self.id)
self.connector.topic("start_forecasting.%s" % self.id, self.id)
self.connector.topic("stop_forecasting.%s" % self.id, self.id)
self.connector.topic("metrics_to_predict", self.id)
def topic(self,topic):
......@@ -56,7 +79,6 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGHUP, self.signal_handler)
self.run()
self._run=True
while self._run:
......@@ -65,27 +87,53 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
time.sleep(1)
def on_schedule(self, times):
predictions = self.model.predict(self.metrics,times)
for m in self.metrics:
for p in predictions[m]:
# predictions = self.model.predict(self.application, m, times)
# if not predictions:
# continue
for t in times:
logging.debug("Sending prediction for time %s(%s) " % (datetime.datetime.fromtimestamp(t), t))
self.connector.send_to_topic(
"intermediate_prediction.eshybrid.%s" % m,
{
"metricValue": p['x'],
"timestamp": p['y'],
"predictionTime": int(time.time()),
"metricValue": 12.43,
"timestamp": int(time.time()),
"probability": 0.98,
"confidence_interval": [float(8),float(15)],
"predictionTime":t,
}
)
# for p in predictions[m['metric']]:
def on_train(self):
self.connector.send_to_topic("training_models",
{
"metrics": [x for x in self.metrics],
"metrics": self.metrics,
"forecasting_method": self.id,
"timestamp": int(time.time() * 1000)
})
def on_stop_forecasting_es_hybrid(self,res):
def _train_model(self):
self.dataset.make()
data = self.dataset.getData()
self.model.train(self.metrics, data)
def on_metrics_to_predict(self,res):
logging.debug("[6] Metrics to predics %s " % res)
for metric in self.metrics:
logging.debug("Un-subscribing from %s " % metric)
self.connector.unsubscribe(metric.metric,self.id)
self.metrics.remove(metric)
self.metrics = [x['metric'] for x in res]
def on_stop_forecasting_eshybrid(self,res):
logging.debug("[6] Stop Subscribing %s " % res)
for metric in res[messaging.events.StopForecasting.METRICS]:
......@@ -99,30 +147,25 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
self.scheduler = False
def on_start_forecasting_es_hybrid(self,res):
def on_start_forecasting_eshybrid(self,res):
logging.debug("[6] Start Forecasting %s " % res)
if not self.metrics:
logging.error("Start forecasting before metrics to predict ")
return
for metric in res[messaging.events.StartForecasting.METRICS]:
if metric not in self.metrics:
logging.debug("Subscribing to %s " % metric)
self.topic(metric)
self.metrics.add(metric)
self.model.train()
self._train_model()
self.scheduler = morphemic.scheduler.Scheduler(
epoch_start=res[messaging.events.StartForecasting.EPOCH_START],
forward_predictons= res[messaging.events.StartForecasting.NUMBER_OF_FORWARD_PREDICTIONS],
horizon= res[messaging.events.StartForecasting.PREDICTION_HORIZON]
)
def on(self,headers,res):
metric = self.get_topic_name(headers)
if self.get_topic_name(headers) in self.metrics:
predictions = self.model.predict()
for p in predictions:
self.connector.send_to_topic(
'intermediate_prediction.%s.%s' % ( self.id, metric),
p
)
def on_error(self, headers, body):
logging.error("Headers %s",headers)
......@@ -131,3 +174,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
def on_disconnected(self):
print('disconnected')
self.reconnect()
def on(self, headers, res):
pass
\ No newline at end of file
import logging
import configparser
import os
from forecasting import eshybrid
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
e = eshybrid.ESHybrid()
config_file = "%s/%s" % (os.getcwd(), "sync.cfg")
print("Config file %s ", config_file)
config = configparser.RawConfigParser()
config.read(config_file)
config_dict = dict(config)
e = eshybrid.ESHybrid(config)
try:
......
......@@ -73,13 +73,16 @@ class DatasetMaker():
def __init__(self, application, start, configs):
self.application = application
self.start_filter = start
self.influxdb = InfluxDBClient(host=configs['hostname'], port=configs['port'], username=configs['username'], password=configs['password'], database=configs['dbname'])
if configs:
self.influxdb = InfluxDBClient(host=configs['hostname'], port=configs['port'], username=configs['username'], password=configs['password'], database=configs['dbname'])
self.dataset = Dataset()
self.tolerance = 5
global url_path_dataset
url_path_dataset = configs['path_dataset']
if url_path_dataset[-1] != "/":
url_path_dataset += "/"
if configs:
url_path_dataset = configs['path_dataset']
if url_path_dataset[-1] != "/":
url_path_dataset += "/"
def getIndex(self, columns, name):
return columns.index(name)
......
import logging
import threading
import time
import uuid
import pandas as pd
from ESRNN import ESRNN
from . import configuration
......@@ -11,36 +12,42 @@ _logger = logging.getLogger(__name__)
class UUIDModel:
esrnn = False
config = {}
def __init__(self, id, config) -> None:
self.id = id
self.esrnn = ESRNN(max_epochs=config['train_parameters']['max_epochs'],
batch_size=config['train_parameters']['batch_size'],
freq_of_test=config['train_parameters']['freq_of_test'],
learning_rate=float(config['train_parameters']['learning_rate']),
lr_scheduler_step_size=config['train_parameters']['lr_scheduler_step_size'],
lr_decay=config['train_parameters']['lr_decay'],
per_series_lr_multip=config['train_parameters']['per_series_lr_multip'],
gradient_clipping_threshold=config['train_parameters']['gradient_clipping_threshold'],
rnn_weight_decay=config['train_parameters']['rnn_weight_decay'],
noise_std=config['train_parameters']['noise_std'],
level_variability_penalty=config['train_parameters']['level_variability_penalty'],
testing_percentile=config['train_parameters']['testing_percentile'],
training_percentile=config['train_parameters']['training_percentile'],
ensemble=config['train_parameters']['ensemble'],
max_periods=config['data_parameters']['max_periods'],
seasonality=config['data_parameters']['seasonality'],
input_size=config['data_parameters']['input_size'],
output_size=config['data_parameters']['output_size'],
frequency=config['data_parameters']['frequency'],
cell_type=config['model_parameters']['cell_type'],
state_hsize=config['model_parameters']['state_hsize'],
dilations=config['model_parameters']['dilations'],
add_nl_layer=config['model_parameters']['add_nl_layer'],
random_seed=config['model_parameters']['random_seed'],
device='cpu')
self.config = config
self.model_by_metric={}
def model_for_metric(self,metric):
config = self.config
if not metric in self.model_by_metric:
self.model_by_metric[metric] = ESRNN(max_epochs=config['train_parameters']['max_epochs'],
batch_size=config['train_parameters']['batch_size'],
freq_of_test=config['train_parameters']['freq_of_test'],
learning_rate=float(config['train_parameters']['learning_rate']),
lr_scheduler_step_size=config['train_parameters']['lr_scheduler_step_size'],
lr_decay=config['train_parameters']['lr_decay'],
per_series_lr_multip=config['train_parameters']['per_series_lr_multip'],
gradient_clipping_threshold=config['train_parameters']['gradient_clipping_threshold'],
rnn_weight_decay=config['train_parameters']['rnn_weight_decay'],
noise_std=config['train_parameters']['noise_std'],
level_variability_penalty=config['train_parameters']['level_variability_penalty'],
testing_percentile=config['train_parameters']['testing_percentile'],
training_percentile=config['train_parameters']['training_percentile'],
ensemble=config['train_parameters']['ensemble'],
max_periods=config['data_parameters']['max_periods'],
seasonality=config['data_parameters']['seasonality'],
input_size=config['data_parameters']['input_size'],
output_size=config['data_parameters']['output_size'],
frequency=config['data_parameters']['frequency'],
cell_type=config['model_parameters']['cell_type'],
state_hsize=config['model_parameters']['state_hsize'],
dilations=config['model_parameters']['dilations'],
add_nl_layer=config['model_parameters']['add_nl_layer'],
random_seed=config['model_parameters']['random_seed'],
device='cpu')
return self.model_by_metric[metric]
class ModelStatus(enumerate):
IDLE="IDLE"
......@@ -48,6 +55,7 @@ class ModelStatus(enumerate):
PREDICTING="PREDICTING"
lock = threading.Lock()
class Model:
status = ModelStatus.IDLE
......@@ -57,42 +65,79 @@ class Model:
# integrated here
self._model = False
def _new_model(self, X_train_df, y_train_df, X_test_df, y_naive_df) -> UUIDModel:
def _new_model(self) -> UUIDModel:
_logger.debug("Training new model")
config = configuration.get_config('Custom')
config = configuration.get_config('Hourly')
model = UUIDModel(uuid.uuid4(),config)
model.esrnn.fit(X_train_df, y_train_df, X_test_df, y_naive_df)
return model
def _transform(self,m,data):
def beautify(p):
p = p.sort_values('time')
p = p.rename(columns = {'application':'unique_id','time': 'ds'}, inplace = False)
return p
m_data = [ d for d in data[1] if d[m] ]
m_pd = pd.DataFrame(data=m_data)
m_pd = m_pd[['application','time',m]]
m_pd = m_pd.rename(columns = {m:'y'}, inplace = False)
rows = m_pd.shape[0]
m_pd_train_y= m_pd.iloc[0: int(rows * 0.70)]
m_pd_test_y= m_pd.iloc[m_pd_train_y.shape[0]-1: rows]
m_pd_train_x = m_pd_train_y[['application','time']]
m_pd_train_x['x'] = m
m_pd_test_x= m_pd_test_y[['application','time']]
m_pd_test_x['x'] = m
return beautify(m_pd_train_x),beautify(m_pd_train_y), beautify(m_pd_test_x) , None #beautify(m_pd_test_y)
def _retrain(self, metrics, data):
def _retrain(self):
with lock:
self.status = ModelStatus.TRAINNING
time.sleep(10)
_logger.debug("Starting training model")
model = self._new_model()
for m in metrics:
args = self._transform( m, data)
model.model_for_metric(m).fit(*args)
_logger.debug("Model training finished")
#dataset maker integration
# self._model = self._new_model()
self._model = model
_logger.debug("set new model")
if self._handler:
self._handler.on_train()
self.status = ModelStatus.IDLE
def train(self):
def train(self,metrics=[], data=[]):
if self.status == ModelStatus.IDLE:
t = threading.Thread(target=self._retrain)
t = threading.Thread(target=self._retrain, args=(metrics, data))
t.start()
else:
logging.warning("Already Training")
def predict(self, metrics, times):
prediction = {}
for m in metrics:
prediction[m] = []
if self._model:
prediction[m] = self._model.esrnn.predict(pd.DataFrame(data=times))
else:
_logger.error("No model trainged yest")
def predict(self, application, metric, times):
if not self._model:
_logger.error("No model trained yet")
return
m_pd = pd.DataFrame(data=times, columns=['ds'])
m_pd.insert(0,'unique_id',application)
m_pd.insert(1,'x',metric)
ret= self._model.model_for_metric(metric).predict(m_pd)
return prediction
\ No newline at end of file
return ret
\ No newline at end of file
import datetime
import time
import logging
import time
logging.basicConfig(level=logging.DEBUG)
......@@ -14,7 +14,7 @@ class Scheduler:
:param forward_predictons: The number of forward predictions requested
:param horizon: The time horizon in seconds
"""
self._epoch_start = epoch_start/1000
self._epoch_start = epoch_start
self._forward_predictions = forward_predictons
self._horizon = horizon
self._next_time = self.compute_next()
......@@ -27,7 +27,7 @@ class Scheduler:
Next: %s
Now: %s
""" % (datetime.datetime.fromtimestamp(self._epoch_start/1000),
""" % (datetime.datetime.fromtimestamp(self._epoch_start),
horizon,
forward_predictons,
datetime.datetime.fromtimestamp(self._next_time),
......@@ -37,22 +37,26 @@ class Scheduler:
)
def compute_next(self):
step = int((time.time() - self._epoch_start) / self._horizon)
return self._epoch_start + ((step + 1) * self._horizon)
t = time.time()
if self._epoch_start > t:
return self._epoch_start
step = int( (t-self._epoch_start) / self._horizon)
return self._epoch_start + int((step+1) * self._horizon)
def check(self, handler):
t = int(time.time())
# logging.debug("Checking t = %s(%s) > next_time %s(%s) " % (datetime.datetime.fromtimestamp(t), t, datetime.datetime.fromtimestamp(self._next_time), self._next_time))
logging.debug("Checking t = %s(%s) > next_time %s(%s) " % (datetime.datetime.fromtimestamp(t), t, datetime.datetime.fromtimestamp(self._next_time), self._next_time))
times = []
if t > self._next_time:
for i in range(0, self._forward_predictions):
# logging.info(" t%s %s ", i, datetime.datetime.fromtimestamp(self._next_time + ( i * self._horizon) ) )
times.append(datetime.datetime.fromtimestamp(self._next_time + ( i * self._horizon) ) )
logging.info(" t%s %s ", i, datetime.datetime.fromtimestamp(self._next_time + ( i * self._horizon) ) )
times.append( int( datetime.datetime.timestamp(datetime.datetime.fromtimestamp(self._next_time + ( i * self._horizon) )) ))
self._next_time = self.compute_next()
if handler:
handler.on_schedule(times)
if handler:
handler.on_schedule(times)
class Handler:
......
......@@ -7,4 +7,5 @@ seaborn
psutil
stomp.py
influxdb
python-daemon
\ No newline at end of file
python-daemon
configparser
\ No newline at end of file
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
import time
import messaging
connector = messaging.morphemic.Connection('aaa', '111',host="147.102.17.76",port=61610)
connector.connect()
connector.send_to_topic("start_forecasting.eshybrid",{'metrics': ['AvgResponseTime'], 'timestamp': 1623245014409, 'epoch_start': 1623245214016, 'number_of_forward_predictions': 8, 'prediction_horizon': 30} )
[persistance]
host=
port=
username=
password=
dbname=
path_dataset=
[messaging]
host=
port=
username=
password=
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment