From 95caf215219cfff36bc35224f46e4c308543e875 Mon Sep 17 00:00:00 2001 From: dianajlailaty Date: Fri, 28 Jan 2022 10:48:07 +0100 Subject: [PATCH] Fixing the FCR prophet error and code refactoring --- forecasting_gluonts/gluonts_forecaster.py | 45 ++++------ forecasting_gluonts/gluonts_listener.py | 65 ++++---------- forecasting_gluonts/main.py | 5 -- forecasting_prophet/main.py | 5 +- forecasting_prophet/prophet_forecaster.py | 53 ++++++----- forecasting_prophet/prophet_listener.py | 103 ++++++++-------------- 6 files changed, 98 insertions(+), 178 deletions(-) diff --git a/forecasting_gluonts/gluonts_forecaster.py b/forecasting_gluonts/gluonts_forecaster.py index 99feb763..8589be9d 100644 --- a/forecasting_gluonts/gluonts_forecaster.py +++ b/forecasting_gluonts/gluonts_forecaster.py @@ -5,11 +5,8 @@ import itertools from sklearn import preprocessing from math import log from math import exp - -pd.set_option('display.max_row', 500) import itertools from sklearn.model_selection import ParameterGrid -# from dataset_maker import CSVData from time import time from time import sleep from datetime import datetime @@ -36,19 +33,20 @@ from sklearn.model_selection import ParameterGrid import statistics import math +pd.set_option('display.max_row', 500) directory_path = "/morphemic_project/" -pd.options.mode.chained_assignment = None +pd.options.mode.chained_assignment = None -def data_preprocessing(dataset,metric): +def data_preprocessing(dataset, metric): gluonts_dataset = pd.DataFrame(columns=['ds', 'y']) gluonts_dataset['y'] = dataset[metric] gluonts_dataset['ds'] = dataset['ems_time'] i = 0 - while (i= predictionTimes[metric]): - logging.debug("Start the prediction for metric: " + metric) + if timestamp >= predictionTimes[metric]: + logging.debug(f"Start the prediction for metric: {metric}") predictions = gluonts_forecaster.predict(models[metric], number_of_forward_predictions, prediction_horizon, epoch_start, metric) - # logging.debug(predictions) yhats = predictions['values'] yhat_lowers = predictions['mins'] yhat_uppers = predictions['maxs'] - prediction_time = epoch_start + prediction_horizon timestamp = int(time()) - # read probabilities file probs = np.load(directory_path + 'prob_file.npy', allow_pickle='TRUE').item() - - logging.debug("Sending predictions for metric: " + metric) - + logging.debug(f"Sending predictions for metric: {metric}") for k in range(0, len(predictions['values'])): yhat = yhats[k] yhat_lower = yhat_lowers[k] yhat_upper = yhat_uppers[k] - self.connector.send_to_topic('intermediate_prediction.gluonmachines.' + metric, - { "metricValue": float(yhat), "level": 3, @@ -91,7 +75,6 @@ def worker(self, body, metric): "cloud": "todo", "provider": "todo" }) - prediction_time = prediction_time + prediction_horizon epoch_start = epoch_start + prediction_horizon sleep(prediction_horizon - 5) @@ -118,9 +101,8 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen pass def on_start_forecasting_gluonmachines(self, body): - logging.debug("Gluonts Start Forecasting the following metrics :") sent_metrics = body["metrics"] - logging.debug(sent_metrics) + logging.debug(f"Gluonts Start Forecasting the following metrics: {sent_metrics}") for metric in sent_metrics: if metric not in metrics: metrics.add(metric) @@ -129,59 +111,46 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen x = threading.Thread(target=worker, args=(self, body, metric,)) x.start() - - def on_metrics_to_predict(self, body): dataset_preprocessor = CSVData(APP_NAME) data_file_path = os.path.join(os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv') dataset_preprocessor.prepare_csv() - while (not os.path.isfile(data_file_path)): - logging.debug("Waiting for dataset to be loaded") + while not os.path.isfile(data_file_path): + logging.debug(f"Waiting for dataset to be loaded") sleep(30) dataset_preprocessor.prepare_csv() - logging.debug("DATASET DOWNLOADED") - for r in body: metric = r['metric'] if not os.path.isfile(directory_path + 'models/gluonts_' + metric + ".pkl"): - logging.debug("Training a GluonTS model for metric : " + metric) + logging.debug(f"Training a GluonTS model for metric: {metric}") model = gluonts_forecaster.train(metric) pkl_path = directory_path + "models/gluonts_" + metric + ".pkl" with open(pkl_path, "wb") as f: pickle.dump(model, f) - logging.debug("Model Dumped") + logging.debug(f"Model Dumped") metrics.add(metric) - self.connector.send_to_topic("training_models", { - "metrics": list(metrics), - "forecasting_method": "gluonmachines", - "timestamp": int(time()) - } - ) - + }) def on_stop_forecasting_gluonmachines(self, body): - logging.debug("Gluonts Stop Forecasting the following metrics :") - logging.debug(body["metrics"]) + logging.debug(f"Gluonts Stop Forecasting the following metrics: {body['metrics']}") for metric in body["metrics"]: if metric in metrics: metrics_threads.remove(metric) stop_thread[metric] = True - def start(self): - logging.debug("Staring Gluonts Forecaster") + logging.debug(f"Staring Gluonts Forecaster") self.run() self._run = True - def on_disconnected(self): - print('Disconnected from ActiveMQ') + logging.debug(f'Disconnected from ActiveMQ') self.reconnect() diff --git a/forecasting_gluonts/main.py b/forecasting_gluonts/main.py index dad326fb..5615f198 100644 --- a/forecasting_gluonts/main.py +++ b/forecasting_gluonts/main.py @@ -4,12 +4,7 @@ import time logger = logging.getLogger() logger.setLevel(logging.DEBUG) - e = gluonts_listener.Gluonts() -#try: -# e.start() -#except KeyboardInterrupt: -# e.stop() e.start() while True: time.sleep(60) diff --git a/forecasting_prophet/main.py b/forecasting_prophet/main.py index 712dd378..2b38dab1 100755 --- a/forecasting_prophet/main.py +++ b/forecasting_prophet/main.py @@ -5,15 +5,12 @@ import time def main(): e = prophet_listener.Prophet() - #try: - # e.start() - #except KeyboardInterrupt: - # e.stop() e.start() while True: time.sleep(60) pass + if __name__ == '__main__': logging.config.fileConfig('/morphemic_project/logging.ini', disable_existing_loggers=False) main() diff --git a/forecasting_prophet/prophet_forecaster.py b/forecasting_prophet/prophet_forecaster.py index 2f2ee749..91d2b531 100644 --- a/forecasting_prophet/prophet_forecaster.py +++ b/forecasting_prophet/prophet_forecaster.py @@ -12,8 +12,6 @@ from scipy.stats import boxcox from scipy.special import inv_boxcox from math import log from math import exp - -pd.set_option('display.max_row', 500) import itertools from sklearn.model_selection import ParameterGrid from time import time @@ -24,31 +22,31 @@ import json import os import math import logging.config -from time import sleep +from time import sleep +pd.set_option('display.max_row', 500) directory_path = "/morphemic_project/" pd.options.mode.chained_assignment = None # logging.config.fileConfig('logging.ini', disable_existing_loggers=False) + def train(metric): data_file_path = os.path.join(os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv') init = 0 h = 0 p = 0 - while (init <= 0 or h <= 0 or p <= 0): + while init <= 0 or h <= 0 or p <= 0: dataset = pd.read_csv(data_file_path) - # hyperparameter tuning and cross validation - # should be generic for i in range(0, len(dataset['ems_time'])): dataset['ems_time'][i] = datetime.fromtimestamp(dataset['ems_time'][i]) t1 = int(dataset['ems_time'][0].strftime('%s')) t2 = int(dataset['ems_time'][len(dataset) - 1].strftime('%s')) - timeDiffInSec = int(t2 - t1) - timeDiffInMin = timeDiffInSec / 60 - init = int(timeDiffInMin / 3) + time_diff_in_sec = int(t2 - t1) + time_diff_in_min = time_diff_in_sec / 60 + init = int(time_diff_in_min / 3) h = int(init / 3) p = int(h / 2) @@ -56,21 +54,22 @@ def train(metric): horizon = str(h) + " minutes" period = str(p) + " minutes" - logging.debug("initial : ", initial) - logging.debug("horizon : ", horizon) - logging.debug("period : ", period) - if (init <= 0 or h <= 0 or p <= 0): - logging.debug("Dataset is not enough for training the model") + logging.debug(f"period: {period}") + logging.debug(f"horizon: {horizon}") + logging.debug(f"initial: {initial}") + + if init <= 0 or h <= 0 or p <= 0: + logging.debug(f"Dataset is not enough for training the model") sleep(30) - #Data preprocessing + # Data preprocessing prophet_dataset = pd.DataFrame(columns=['ds', 'y']) prophet_dataset['y'] = dataset[metric] prophet_dataset['ds'] = dataset['ems_time'] i = 0 - while (i= predictionTimes[metric]): - predictions=prophet_forecaster.predict(models[metric] , number_of_forward_predictions , prediction_horizon , epoch_start) + if timestamp >= predictionTimes[metric]: + predictions = prophet_forecaster.predict(models[metric], number_of_forward_predictions, prediction_horizon, epoch_start) yhats = predictions['yhat'].values.tolist() yhat_lowers = predictions['yhat_lower'].values.tolist() yhat_uppers = predictions['yhat_upper'].values.tolist() - - prediction_time= epoch_start+ prediction_horizon - # change it to the time of the start_forecasting was sent - - #read probabilities file - probs = np.load(directory_path+'prob_file.npy' , allow_pickle='TRUE').item() + prediction_time = epoch_start + prediction_horizon + # change it to the time of the start_forecasting was sent + # read probabilities file + probs = np.load(directory_path+'prob_file.npy', allow_pickle='TRUE').item() for k in range(0,len(predictions['yhat'].values.tolist())): yhat = yhats[k] yhat_lower = yhat_lowers[k] yhat_upper = yhat_uppers[k] - - #wait until epoch_start to send - + # wait until epoch_start to send message = { "metricValue": yhat, "level": 3, "timestamp": timestamp, "probability": probs[metric], - "confidence_interval" : [yhat_lower,yhat_upper], + "confidence_interval": [yhat_lower, yhat_upper], "horizon": prediction_horizon, - "predictionTime" : int(prediction_time), # + "predictionTime": int(prediction_time), # "refersTo": "todo", "cloud": "todo", "provider": "todo" } - - - self.connector.send_to_topic('intermediate_prediction.prophet.'+metric, message) - prediction_time=prediction_time + prediction_horizon - epoch_start = epoch_start+ prediction_horizon + prediction_time = prediction_time + prediction_horizon + epoch_start = epoch_start + prediction_horizon sleep(prediction_horizon) -class Prophet(morphemic.handler.ModelHandler,messaging.listener.MorphemicListener): +class Prophet(morphemic.handler.ModelHandler, messaging.listener.MorphemicListener): id = "prophet" metrics = set() def __init__(self): - self._run = False + self._run = False self.connector = messaging.morphemic.Connection(ACTIVEMQ_USER,ACTIVEMQ_PASSWORD, host=ACTIVEMQ_HOSTNAME, port=ACTIVEMQ_PORT) def run(self): - #logging.debug("setting up") - #sleep(180) - #logging.debug("starting the connection to ActiveMQ") self.connector.connect() self.connector.set_listener(self.id, self) self.connector.topic("start_forecasting.prophet", self.id) @@ -116,71 +97,65 @@ class Prophet(morphemic.handler.ModelHandler,messaging.listener.MorphemicListene self.connector.topic("metrics_to_predict", self.id) def reconnect(self): - print('Reconnecting to ActiveMQ') + logging.debug(f"Reconnecting to ActiveMQ") self.connector.disconnect() self.run() pass def on_start_forecasting_prophet(self, body): - logging.debug("Prophet Start Forecasting the following metrics :") + logging.debug(f"Prophet Start Forecasting the following metrics:") sent_metrics = body["metrics"] logging.debug(sent_metrics) for metric in sent_metrics: if metric not in metrics: metrics.add(metric) - #thread = threading.Thread(target=worker , args=(self, body, metric,)) - if metric not in metrics_processes: + if metric not in metrics_processes: metrics_processes[metric] = Process(target=worker, args=(self, body, metric,)) metrics_processes[metric] .start() def on_metrics_to_predict(self, body): - #getting data from datasetmaker + # getting data from dataset maker dataset_preprocessor = CSVData(APP_NAME) data_file_path = os.path.join(os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv') dataset_preprocessor.prepare_csv() - - while (not os.path.isfile(data_file_path)): - logging.debug("Waiting for dataset to be loaded") + + while not os.path.isfile(data_file_path): + logging.debug(f"Waiting for dataset to be loaded") sleep(30) dataset_preprocessor.prepare_csv() for r in body: metric = r['metric'] - #for metric in metrics: if not os.path.isfile(directory_path+'models/prophet_'+metric+".pkl"): - logging.debug("Training a Prophet model for metric : " + metric) - model=prophet_forecaster.train(metric) + logging.debug(f"Training a Prophet model for metric: {metric}") + model = prophet_forecaster.train(metric) pkl_path = directory_path+"models/prophet_"+metric+".pkl" with open(pkl_path, "wb") as f: pickle.dump(model, f) metrics.add(metric) - self.connector .send_to_topic("training_models", + self.connector.send_to_topic("training_models", { - - "metrics": list(metrics), - - "forecasting_method": "Prophet", - - "timestamp": int(time()) + "metrics": list(metrics), + "forecasting_method": "Prophet", + "timestamp": int(time()) } ) def on_stop_forecasting_prophet(self, body): - logging.debug("Prophet Stop Forecasting the following metrics :") - logging.debug(body["metrics"]) + logging.debug(f"Prophet Stop Forecasting the following metrics: {body['metrics']}") for metric in body["metrics"]: if metric in metrics: - metrics_processes[metric] .terminate() + metrics_processes[metric].terminate() metrics.remove(metric) metrics_processes.pop(metric) - logging.debug(metrics) + logging.debug(f"Metrics: {metrics}") def start(self): - logging.debug("Staring Prophet Forecaster") + logging.debug(f"Staring Prophet Forecaster") self.run() self._run = True def on_disconnected(self): - print('Disconnected from ActiveMQ') + logging.debug(f"Disconnected from ActiveMQ") self.reconnect() -- GitLab