diff --git a/morphemic-forecasting-eshybrid/.gitignore b/morphemic-forecasting-eshybrid/.gitignore index 301d16046518e2a1c712a0c3a62b7d5d3827c39d..08148e4add67a38533704f4870ccf654bc230889 100644 --- a/morphemic-forecasting-eshybrid/.gitignore +++ b/morphemic-forecasting-eshybrid/.gitignore @@ -1 +1,5 @@ -/local \ No newline at end of file +/local + +sync.cfg +sync.cfg.local +sync.cfg.production \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/forecasting/eshybrid.py b/morphemic-forecasting-eshybrid/forecasting/eshybrid.py index 416c38180f947a4eb1949c32c2b55aea593b4b14..2d554beb61475342fc1a5cb7e13cb788c0b87517 100644 --- a/morphemic-forecasting-eshybrid/forecasting/eshybrid.py +++ b/morphemic-forecasting-eshybrid/forecasting/eshybrid.py @@ -1,29 +1,52 @@ import messaging import morphemic - +import morphemic.dataset +import os import time import logging import signal - +import datetime class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListener, morphemic.scheduler.Handler): - id="eshybrid" metrics= set() forecast_config = dict() scheduler = False + application ='default_application' - def __init__(self): + def __init__(self,config): self._run=False - self.connector = messaging.morphemic.Connection('admin', 'admin') + self.id = (config['listener'] or {'id':'eshybrid'} )['id'] + self.connector = messaging.morphemic.Connection( + config['messaging']['username'], + config['messaging']['password'], + host=config['messaging']['host'], + port=config['messaging']['port'] + ) self.model = morphemic.model.Model(self) + self.application = config['persistence']['application'] + os.makedirs(config['persistence']['path_dataset'], exist_ok=True) + influx = {'hostname': config['persistence']['host'], + 'port': config['persistence']['port'], + 'username': config['persistence']['username'], + 'password': config['persistence']['password'], + 'dbname': config['persistence']['dbname'], + 'path_dataset': config['persistence']['path_dataset'] + } + + self.dataset = morphemic.dataset.DatasetMaker( + application=self.application, + start=None, + configs=influx + + ) def run(self): logging.debug("setting up") self.connector.set_listener(self.id, self) self.connector.connect() - self.connector.topic("start_forecasting.es_hybrid", self.id) - self.connector.topic("stop_forecasting.es_hybrid", self.id) + self.connector.topic("start_forecasting.%s" % self.id, self.id) + self.connector.topic("stop_forecasting.%s" % self.id, self.id) self.connector.topic("metrics_to_predict", self.id) def topic(self,topic): @@ -56,7 +79,6 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) signal.signal(signal.SIGHUP, self.signal_handler) - self.run() self._run=True while self._run: @@ -65,27 +87,53 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen time.sleep(1) def on_schedule(self, times): - predictions = self.model.predict(self.metrics,times) + for m in self.metrics: - for p in predictions[m]: + # predictions = self.model.predict(self.application, m, times) + # if not predictions: + # continue + + for t in times: + logging.debug("Sending prediction for time %s(%s) " % (datetime.datetime.fromtimestamp(t), t)) self.connector.send_to_topic( "intermediate_prediction.eshybrid.%s" % m, { - "metricValue": p['x'], - "timestamp": p['y'], - "predictionTime": int(time.time()), + "metricValue": 12.43, + "timestamp": int(time.time()), + "probability": 0.98, + "confidence_interval": [float(8),float(15)], + "predictionTime":t, } ) + # for p in predictions[m['metric']]: def on_train(self): self.connector.send_to_topic("training_models", { - "metrics": [x for x in self.metrics], + "metrics": self.metrics, "forecasting_method": self.id, "timestamp": int(time.time() * 1000) }) - def on_stop_forecasting_es_hybrid(self,res): + + def _train_model(self): + + self.dataset.make() + data = self.dataset.getData() + self.model.train(self.metrics, data) + + def on_metrics_to_predict(self,res): + + logging.debug("[6] Metrics to predics %s " % res) + for metric in self.metrics: + logging.debug("Un-subscribing from %s " % metric) + self.connector.unsubscribe(metric.metric,self.id) + self.metrics.remove(metric) + + self.metrics = [x['metric'] for x in res] + + + def on_stop_forecasting_eshybrid(self,res): logging.debug("[6] Stop Subscribing %s " % res) for metric in res[messaging.events.StopForecasting.METRICS]: @@ -99,30 +147,25 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen self.scheduler = False - def on_start_forecasting_es_hybrid(self,res): + def on_start_forecasting_eshybrid(self,res): logging.debug("[6] Start Forecasting %s " % res) + if not self.metrics: + logging.error("Start forecasting before metrics to predict ") + return + for metric in res[messaging.events.StartForecasting.METRICS]: if metric not in self.metrics: logging.debug("Subscribing to %s " % metric) self.topic(metric) - self.metrics.add(metric) - self.model.train() + + self._train_model() self.scheduler = morphemic.scheduler.Scheduler( epoch_start=res[messaging.events.StartForecasting.EPOCH_START], forward_predictons= res[messaging.events.StartForecasting.NUMBER_OF_FORWARD_PREDICTIONS], horizon= res[messaging.events.StartForecasting.PREDICTION_HORIZON] ) - def on(self,headers,res): - metric = self.get_topic_name(headers) - if self.get_topic_name(headers) in self.metrics: - predictions = self.model.predict() - for p in predictions: - self.connector.send_to_topic( - 'intermediate_prediction.%s.%s' % ( self.id, metric), - p - ) def on_error(self, headers, body): logging.error("Headers %s",headers) @@ -131,3 +174,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen def on_disconnected(self): print('disconnected') self.reconnect() + + + def on(self, headers, res): + pass \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/main.py b/morphemic-forecasting-eshybrid/main.py index dfa9ab8d0b8e2eb8f93367aefe5856d5b586f8c4..99d0136e215ca231624988254270fa4562f0cb9c 100644 --- a/morphemic-forecasting-eshybrid/main.py +++ b/morphemic-forecasting-eshybrid/main.py @@ -1,11 +1,18 @@ import logging - +import configparser +import os from forecasting import eshybrid logger = logging.getLogger() logger.setLevel(logging.DEBUG) -e = eshybrid.ESHybrid() + +config_file = "%s/%s" % (os.getcwd(), "sync.cfg") +print("Config file %s ", config_file) +config = configparser.RawConfigParser() +config.read(config_file) +config_dict = dict(config) +e = eshybrid.ESHybrid(config) try: diff --git a/morphemic-forecasting-eshybrid/morphemic/dataset/__init__.py b/morphemic-forecasting-eshybrid/morphemic/dataset/__init__.py index db2c96f242c1efdde2a86de5515b6455ce02b0f7..fbc651b7b731b4e213fbcd24a87f15847fbbec89 100644 --- a/morphemic-forecasting-eshybrid/morphemic/dataset/__init__.py +++ b/morphemic-forecasting-eshybrid/morphemic/dataset/__init__.py @@ -73,13 +73,16 @@ class DatasetMaker(): def __init__(self, application, start, configs): self.application = application self.start_filter = start - self.influxdb = InfluxDBClient(host=configs['hostname'], port=configs['port'], username=configs['username'], password=configs['password'], database=configs['dbname']) + if configs: + self.influxdb = InfluxDBClient(host=configs['hostname'], port=configs['port'], username=configs['username'], password=configs['password'], database=configs['dbname']) self.dataset = Dataset() self.tolerance = 5 + global url_path_dataset - url_path_dataset = configs['path_dataset'] - if url_path_dataset[-1] != "/": - url_path_dataset += "/" + if configs: + url_path_dataset = configs['path_dataset'] + if url_path_dataset[-1] != "/": + url_path_dataset += "/" def getIndex(self, columns, name): return columns.index(name) diff --git a/morphemic-forecasting-eshybrid/morphemic/model.py b/morphemic-forecasting-eshybrid/morphemic/model.py index 1ce0ffbc09853e4f670d99642e61a41462779152..2562291e0c9bde7b1a0e708772334778b20832e8 100644 --- a/morphemic-forecasting-eshybrid/morphemic/model.py +++ b/morphemic-forecasting-eshybrid/morphemic/model.py @@ -1,8 +1,9 @@ import logging import threading -import time import uuid + import pandas as pd + from ESRNN import ESRNN from . import configuration @@ -11,36 +12,42 @@ _logger = logging.getLogger(__name__) class UUIDModel: esrnn = False - + config = {} def __init__(self, id, config) -> None: self.id = id - self.esrnn = ESRNN(max_epochs=config['train_parameters']['max_epochs'], - batch_size=config['train_parameters']['batch_size'], - freq_of_test=config['train_parameters']['freq_of_test'], - learning_rate=float(config['train_parameters']['learning_rate']), - lr_scheduler_step_size=config['train_parameters']['lr_scheduler_step_size'], - lr_decay=config['train_parameters']['lr_decay'], - per_series_lr_multip=config['train_parameters']['per_series_lr_multip'], - gradient_clipping_threshold=config['train_parameters']['gradient_clipping_threshold'], - rnn_weight_decay=config['train_parameters']['rnn_weight_decay'], - noise_std=config['train_parameters']['noise_std'], - level_variability_penalty=config['train_parameters']['level_variability_penalty'], - testing_percentile=config['train_parameters']['testing_percentile'], - training_percentile=config['train_parameters']['training_percentile'], - ensemble=config['train_parameters']['ensemble'], - max_periods=config['data_parameters']['max_periods'], - seasonality=config['data_parameters']['seasonality'], - input_size=config['data_parameters']['input_size'], - output_size=config['data_parameters']['output_size'], - frequency=config['data_parameters']['frequency'], - cell_type=config['model_parameters']['cell_type'], - state_hsize=config['model_parameters']['state_hsize'], - dilations=config['model_parameters']['dilations'], - add_nl_layer=config['model_parameters']['add_nl_layer'], - random_seed=config['model_parameters']['random_seed'], - device='cpu') - + self.config = config + self.model_by_metric={} + + def model_for_metric(self,metric): + config = self.config + if not metric in self.model_by_metric: + self.model_by_metric[metric] = ESRNN(max_epochs=config['train_parameters']['max_epochs'], + batch_size=config['train_parameters']['batch_size'], + freq_of_test=config['train_parameters']['freq_of_test'], + learning_rate=float(config['train_parameters']['learning_rate']), + lr_scheduler_step_size=config['train_parameters']['lr_scheduler_step_size'], + lr_decay=config['train_parameters']['lr_decay'], + per_series_lr_multip=config['train_parameters']['per_series_lr_multip'], + gradient_clipping_threshold=config['train_parameters']['gradient_clipping_threshold'], + rnn_weight_decay=config['train_parameters']['rnn_weight_decay'], + noise_std=config['train_parameters']['noise_std'], + level_variability_penalty=config['train_parameters']['level_variability_penalty'], + testing_percentile=config['train_parameters']['testing_percentile'], + training_percentile=config['train_parameters']['training_percentile'], + ensemble=config['train_parameters']['ensemble'], + max_periods=config['data_parameters']['max_periods'], + seasonality=config['data_parameters']['seasonality'], + input_size=config['data_parameters']['input_size'], + output_size=config['data_parameters']['output_size'], + frequency=config['data_parameters']['frequency'], + cell_type=config['model_parameters']['cell_type'], + state_hsize=config['model_parameters']['state_hsize'], + dilations=config['model_parameters']['dilations'], + add_nl_layer=config['model_parameters']['add_nl_layer'], + random_seed=config['model_parameters']['random_seed'], + device='cpu') + return self.model_by_metric[metric] class ModelStatus(enumerate): IDLE="IDLE" @@ -48,6 +55,7 @@ class ModelStatus(enumerate): PREDICTING="PREDICTING" lock = threading.Lock() + class Model: status = ModelStatus.IDLE @@ -57,42 +65,79 @@ class Model: # integrated here self._model = False - def _new_model(self, X_train_df, y_train_df, X_test_df, y_naive_df) -> UUIDModel: + def _new_model(self) -> UUIDModel: _logger.debug("Training new model") - config = configuration.get_config('Custom') + config = configuration.get_config('Hourly') model = UUIDModel(uuid.uuid4(),config) - model.esrnn.fit(X_train_df, y_train_df, X_test_df, y_naive_df) return model + def _transform(self,m,data): + + def beautify(p): + p = p.sort_values('time') + p = p.rename(columns = {'application':'unique_id','time': 'ds'}, inplace = False) + return p + + m_data = [ d for d in data[1] if d[m] ] + m_pd = pd.DataFrame(data=m_data) + m_pd = m_pd[['application','time',m]] + m_pd = m_pd.rename(columns = {m:'y'}, inplace = False) + + + rows = m_pd.shape[0] + m_pd_train_y= m_pd.iloc[0: int(rows * 0.70)] + m_pd_test_y= m_pd.iloc[m_pd_train_y.shape[0]-1: rows] + + m_pd_train_x = m_pd_train_y[['application','time']] + m_pd_train_x['x'] = m + + m_pd_test_x= m_pd_test_y[['application','time']] + m_pd_test_x['x'] = m + + + return beautify(m_pd_train_x),beautify(m_pd_train_y), beautify(m_pd_test_x) , None #beautify(m_pd_test_y) + + def _retrain(self, metrics, data): - def _retrain(self): with lock: + self.status = ModelStatus.TRAINNING - time.sleep(10) + + _logger.debug("Starting training model") + + model = self._new_model() + for m in metrics: + args = self._transform( m, data) + model.model_for_metric(m).fit(*args) + _logger.debug("Model training finished") - #dataset maker integration - # self._model = self._new_model() + self._model = model _logger.debug("set new model") if self._handler: self._handler.on_train() + self.status = ModelStatus.IDLE - def train(self): + def train(self,metrics=[], data=[]): if self.status == ModelStatus.IDLE: - t = threading.Thread(target=self._retrain) + t = threading.Thread(target=self._retrain, args=(metrics, data)) t.start() else: logging.warning("Already Training") - def predict(self, metrics, times): - prediction = {} - for m in metrics: - prediction[m] = [] - if self._model: - prediction[m] = self._model.esrnn.predict(pd.DataFrame(data=times)) - else: - _logger.error("No model trainged yest") + def predict(self, application, metric, times): + + if not self._model: + _logger.error("No model trained yet") + return + + m_pd = pd.DataFrame(data=times, columns=['ds']) + m_pd.insert(0,'unique_id',application) + m_pd.insert(1,'x',metric) + + ret= self._model.model_for_metric(metric).predict(m_pd) + - return prediction \ No newline at end of file + return ret \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/morphemic/scheduler.py b/morphemic-forecasting-eshybrid/morphemic/scheduler.py index 165832721d56f4552c54a2aae1e0542a55d60a80..d5975b385d8c9439c0d2b1011085050d9d8b2b1a 100644 --- a/morphemic-forecasting-eshybrid/morphemic/scheduler.py +++ b/morphemic-forecasting-eshybrid/morphemic/scheduler.py @@ -1,7 +1,7 @@ import datetime -import time import logging +import time logging.basicConfig(level=logging.DEBUG) @@ -14,7 +14,7 @@ class Scheduler: :param forward_predictons: The number of forward predictions requested :param horizon: The time horizon in seconds """ - self._epoch_start = epoch_start/1000 + self._epoch_start = epoch_start self._forward_predictions = forward_predictons self._horizon = horizon self._next_time = self.compute_next() @@ -27,7 +27,7 @@ class Scheduler: Next: %s Now: %s - """ % (datetime.datetime.fromtimestamp(self._epoch_start/1000), + """ % (datetime.datetime.fromtimestamp(self._epoch_start), horizon, forward_predictons, datetime.datetime.fromtimestamp(self._next_time), @@ -37,22 +37,26 @@ class Scheduler: ) def compute_next(self): - step = int((time.time() - self._epoch_start) / self._horizon) - return self._epoch_start + ((step + 1) * self._horizon) + + t = time.time() + if self._epoch_start > t: + return self._epoch_start + + step = int( (t-self._epoch_start) / self._horizon) + return self._epoch_start + int((step+1) * self._horizon) def check(self, handler): t = int(time.time()) - # logging.debug("Checking t = %s(%s) > next_time %s(%s) " % (datetime.datetime.fromtimestamp(t), t, datetime.datetime.fromtimestamp(self._next_time), self._next_time)) + logging.debug("Checking t = %s(%s) > next_time %s(%s) " % (datetime.datetime.fromtimestamp(t), t, datetime.datetime.fromtimestamp(self._next_time), self._next_time)) times = [] if t > self._next_time: for i in range(0, self._forward_predictions): - # logging.info(" t%s %s ", i, datetime.datetime.fromtimestamp(self._next_time + ( i * self._horizon) ) ) - times.append(datetime.datetime.fromtimestamp(self._next_time + ( i * self._horizon) ) ) + logging.info(" t%s %s ", i, datetime.datetime.fromtimestamp(self._next_time + ( i * self._horizon) ) ) + times.append( int( datetime.datetime.timestamp(datetime.datetime.fromtimestamp(self._next_time + ( i * self._horizon) )) )) self._next_time = self.compute_next() - - if handler: - handler.on_schedule(times) + if handler: + handler.on_schedule(times) class Handler: diff --git a/morphemic-forecasting-eshybrid/requirements.txt b/morphemic-forecasting-eshybrid/requirements.txt index ced1896f1a4b3b12025164d7a5ef0aaea884cbf4..c804ec414b18bc66606911b2e149e4cc7aec4373 100644 --- a/morphemic-forecasting-eshybrid/requirements.txt +++ b/morphemic-forecasting-eshybrid/requirements.txt @@ -7,4 +7,5 @@ seaborn psutil stomp.py influxdb -python-daemon \ No newline at end of file +python-daemon +configparser \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/sender.py b/morphemic-forecasting-eshybrid/sender.py new file mode 100644 index 0000000000000000000000000000000000000000..c6057c6f8a6c188bf9001ed06461c53fae5aea1b --- /dev/null +++ b/morphemic-forecasting-eshybrid/sender.py @@ -0,0 +1,14 @@ +import logging +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) + + +import time +import messaging + + +connector = messaging.morphemic.Connection('aaa', '111',host="147.102.17.76",port=61610) +connector.connect() + +connector.send_to_topic("start_forecasting.eshybrid",{'metrics': ['AvgResponseTime'], 'timestamp': 1623245014409, 'epoch_start': 1623245214016, 'number_of_forward_predictions': 8, 'prediction_horizon': 30} ) + diff --git a/morphemic-forecasting-eshybrid/sync.cfg.template b/morphemic-forecasting-eshybrid/sync.cfg.template new file mode 100644 index 0000000000000000000000000000000000000000..49b70b98e333d19795f74a44d47e934ba96cb95c --- /dev/null +++ b/morphemic-forecasting-eshybrid/sync.cfg.template @@ -0,0 +1,13 @@ +[persistance] +host= +port= +username= +password= +dbname= +path_dataset= + +[messaging] +host= +port= +username= +password=