Commit 5bc51745 authored by Marta Różańska's avatar Marta Różańska
Browse files

Merge branch 'gluonts-prophet' into 'morphemic-rc2.0'

Fixing the FCR prophet error and code refactoring

See merge request !253
parents 0bff53d7 3daf3779
Pipeline #19147 passed with stages
in 38 minutes and 46 seconds
......@@ -5,11 +5,8 @@ import itertools
from sklearn import preprocessing
from math import log
from math import exp
pd.set_option('display.max_row', 500)
import itertools
from sklearn.model_selection import ParameterGrid
# from dataset_maker import CSVData
from time import time
from time import sleep
from datetime import datetime
......@@ -36,19 +33,20 @@ from sklearn.model_selection import ParameterGrid
import statistics
import math
pd.set_option('display.max_row', 500)
directory_path = "/morphemic_project/"
pd.options.mode.chained_assignment = None
pd.options.mode.chained_assignment = None
def data_preprocessing(dataset,metric):
def data_preprocessing(dataset, metric):
gluonts_dataset = pd.DataFrame(columns=['ds', 'y'])
gluonts_dataset['y'] = dataset[metric]
gluonts_dataset['ds'] = dataset['ems_time']
i = 0
while (i<len(gluonts_dataset)):
while i<len(gluonts_dataset):
if (gluonts_dataset['y'][i] == "None"):
if gluonts_dataset['y'][i] == "None":
gluonts_dataset['y'][i] = np.nan
i += 1
......@@ -71,24 +69,17 @@ def train(metric):
dataset = pd.read_csv(data_file_path)
while len(dataset) < 50:
dataset = pd.read_csv(data_file_path)
logging.debug("Waiting for dataset to be loaded")
logging.debug(f"Waiting for dataset to be loaded")
sleep(30)
gluonts_dataset = data_preprocessing(dataset,metric)
size = len(gluonts_dataset)
logging.debug("STARTED TRAINING FOR: " + metric)
logging.debug(f"STARTED TRAINING FOR: {metric}")
# splitting to train and test
test_percentage = 0.2
training_window_size = int(len(gluonts_dataset) - (len(gluonts_dataset) * test_percentage))
train = gluonts_dataset[:training_window_size]
validation = gluonts_dataset[training_window_size:]
train_time = train['ds'][training_window_size - 1]
training_dataset = gluonts_dataset[:training_window_size]
train_time = training_dataset['ds'][training_window_size - 1]
validation_time = gluonts_dataset['ds'][size - 1]
freq = '1min'
......@@ -142,7 +133,7 @@ def train(metric):
)
predictor = estimator.train(training_data=train_ds)
forecast_it, ts_it = make_evaluation_predictions(
dataset=validation_ds, # validationdataset
dataset=validation_ds, # validation dataset
predictor=predictor, # predictor
num_samples=20, # number of sample paths we want for evaluation
)
......@@ -178,7 +169,7 @@ def train(metric):
# checking if probabilities file exist
prob = 0.8
if (os.path.isfile(directory_path + 'prob_file.npy')):
if os.path.isfile(directory_path + 'prob_file.npy'):
probs = np.load(directory_path + 'prob_file.npy', allow_pickle='TRUE').item()
probs[metric] = prob
else:
......@@ -200,8 +191,8 @@ def predict(model, number_of_forward_predictions, prediction_horizon, epoch_star
future = list()
for i in range(1, number_of_forward_predictions + 1):
dateInSec = epoch_start + i * prediction_horizon * 60
date = datetime.fromtimestamp(dateInSec)
date_in_sec = epoch_start + i * prediction_horizon * 60
date = datetime.fromtimestamp(date_in_sec)
future.append(date)
future = pd.DataFrame(future)
future.columns = ['ds']
......@@ -224,18 +215,14 @@ def predict(model, number_of_forward_predictions, prediction_horizon, epoch_star
)
forecasts = list(forecast_it)
forecast_entry = forecasts[0]
logging.debug(forecast_entry)
predictions = forecast_entry.samples
mins = list()
maxs = list()
values = list()
returnDict = dict()
for i in range(0, number_of_forward_predictions):
mylist = list()
for line in predictions:
......@@ -248,7 +235,5 @@ def predict(model, number_of_forward_predictions, prediction_horizon, epoch_star
mins.append(mini)
maxs.append(maxi)
values.append(value)
returnDict = {'mins': mins, 'maxs': maxs, 'values': values}
return returnDict
return_dict = {'mins': mins, 'maxs': maxs, 'values': values}
return return_dict
......@@ -6,8 +6,6 @@ import logging
import signal
import threading
import numpy as np
# Libraries required for training and prediction
import os
import json
......@@ -25,8 +23,6 @@ ACTIVEMQ_PORT = os.environ.get("ACTIVEMQ_PORT")
predictionTimes = dict()
models = dict()
# flags = {'avgResponseTime':0 , 'memory': 0}
stop_thread = dict()
metrics = set()
metrics_threads = set()
......@@ -35,50 +31,38 @@ directory_path = "/morphemic_project/"
def worker(self, body, metric):
stop_thread[metric] = False
timestamp = body['timestamp']
prediction_horizon = body["prediction_horizon"]
number_of_forward_predictions = body["number_of_forward_predictions"]
epoch_start = body["epoch_start"]
predictionTimes[metric] = epoch_start
while (not os.path.isfile(directory_path + 'models/gluonts_' + metric + ".pkl")):
while not os.path.isfile(directory_path + 'models/gluonts_' + metric + ".pkl"):
sleep(30)
logging.debug("Waiting for the trained model for metric: " + metric)
while (True):
if (stop_thread[metric] == True):
logging.debug(f"Waiting for the trained model for metric: {metric}")
while True:
if stop_thread[metric] == True:
break
if os.path.isfile(directory_path + 'models/gluonts_' + metric + ".pkl"):
logging.debug("Loading the trained model for metric: " + metric)
logging.debug(f"Loading the trained model for metric: {metric}")
with open(directory_path + "models/gluonts_" + metric + ".pkl", 'rb') as f:
models[metric] = pickle.load(f)
timestamp = int(time())
if (timestamp >= predictionTimes[metric]):
logging.debug("Start the prediction for metric: " + metric)
if timestamp >= predictionTimes[metric]:
logging.debug(f"Start the prediction for metric: {metric}")
predictions = gluonts_forecaster.predict(models[metric], number_of_forward_predictions,
prediction_horizon, epoch_start, metric)
# logging.debug(predictions)
yhats = predictions['values']
yhat_lowers = predictions['mins']
yhat_uppers = predictions['maxs']
prediction_time = epoch_start + prediction_horizon
timestamp = int(time())
# read probabilities file
probs = np.load(directory_path + 'prob_file.npy', allow_pickle='TRUE').item()
logging.debug("Sending predictions for metric: " + metric)
logging.debug(f"Sending predictions for metric: {metric}")
for k in range(0, len(predictions['values'])):
yhat = yhats[k]
yhat_lower = yhat_lowers[k]
yhat_upper = yhat_uppers[k]
self.connector.send_to_topic('intermediate_prediction.gluonmachines.' + metric,
{
"metricValue": float(yhat),
"level": 3,
......@@ -91,7 +75,6 @@ def worker(self, body, metric):
"cloud": "todo",
"provider": "todo"
})
prediction_time = prediction_time + prediction_horizon
epoch_start = epoch_start + prediction_horizon
sleep(prediction_horizon - 5)
......@@ -118,9 +101,8 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen
pass
def on_start_forecasting_gluonmachines(self, body):
logging.debug("Gluonts Start Forecasting the following metrics :")
sent_metrics = body["metrics"]
logging.debug(sent_metrics)
logging.debug(f"Gluonts Start Forecasting the following metrics: {sent_metrics}")
for metric in sent_metrics:
if metric not in metrics:
metrics.add(metric)
......@@ -129,59 +111,46 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen
x = threading.Thread(target=worker, args=(self, body, metric,))
x.start()
def on_metrics_to_predict(self, body):
dataset_preprocessor = CSVData(APP_NAME)
data_file_path = os.path.join(os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv')
dataset_preprocessor.prepare_csv()
while (not os.path.isfile(data_file_path)):
logging.debug("Waiting for dataset to be loaded")
while not os.path.isfile(data_file_path):
logging.debug(f"Waiting for dataset to be loaded")
sleep(30)
dataset_preprocessor.prepare_csv()
logging.debug("DATASET DOWNLOADED")
for r in body:
metric = r['metric']
if not os.path.isfile(directory_path + 'models/gluonts_' + metric + ".pkl"):
logging.debug("Training a GluonTS model for metric : " + metric)
logging.debug(f"Training a GluonTS model for metric: {metric}")
model = gluonts_forecaster.train(metric)
pkl_path = directory_path + "models/gluonts_" + metric + ".pkl"
with open(pkl_path, "wb") as f:
pickle.dump(model, f)
logging.debug("Model Dumped")
logging.debug(f"Model Dumped")
metrics.add(metric)
self.connector.send_to_topic("training_models",
{
"metrics": list(metrics),
"forecasting_method": "gluonmachines",
"timestamp": int(time())
}
)
})
def on_stop_forecasting_gluonmachines(self, body):
logging.debug("Gluonts Stop Forecasting the following metrics :")
logging.debug(body["metrics"])
logging.debug(f"Gluonts Stop Forecasting the following metrics: {body['metrics']}")
for metric in body["metrics"]:
if metric in metrics:
metrics_threads.remove(metric)
stop_thread[metric] = True
def start(self):
logging.debug("Staring Gluonts Forecaster")
logging.debug(f"Staring Gluonts Forecaster")
self.run()
self._run = True
def on_disconnected(self):
print('Disconnected from ActiveMQ')
logging.debug(f'Disconnected from ActiveMQ')
self.reconnect()
......@@ -4,12 +4,7 @@ import time
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
e = gluonts_listener.Gluonts()
#try:
# e.start()
#except KeyboardInterrupt:
# e.stop()
e.start()
while True:
time.sleep(60)
......
......@@ -5,15 +5,12 @@ import time
def main():
e = prophet_listener.Prophet()
#try:
# e.start()
#except KeyboardInterrupt:
# e.stop()
e.start()
while True:
time.sleep(60)
pass
if __name__ == '__main__':
logging.config.fileConfig('/morphemic_project/logging.ini', disable_existing_loggers=False)
main()
......
......@@ -12,8 +12,6 @@ from scipy.stats import boxcox
from scipy.special import inv_boxcox
from math import log
from math import exp
pd.set_option('display.max_row', 500)
import itertools
from sklearn.model_selection import ParameterGrid
from time import time
......@@ -24,31 +22,31 @@ import json
import os
import math
import logging.config
from time import sleep
from time import sleep
pd.set_option('display.max_row', 500)
directory_path = "/morphemic_project/"
pd.options.mode.chained_assignment = None
# logging.config.fileConfig('logging.ini', disable_existing_loggers=False)
def train(metric):
data_file_path = os.path.join(os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv')
init = 0
h = 0
p = 0
while (init <= 0 or h <= 0 or p <= 0):
while init <= 0 or h <= 0 or p <= 0:
dataset = pd.read_csv(data_file_path)
# hyperparameter tuning and cross validation
# should be generic
for i in range(0, len(dataset['ems_time'])):
dataset['ems_time'][i] = datetime.fromtimestamp(dataset['ems_time'][i])
t1 = int(dataset['ems_time'][0].strftime('%s'))
t2 = int(dataset['ems_time'][len(dataset) - 1].strftime('%s'))
timeDiffInSec = int(t2 - t1)
timeDiffInMin = timeDiffInSec / 60
init = int(timeDiffInMin / 3)
time_diff_in_sec = int(t2 - t1)
time_diff_in_min = time_diff_in_sec / 60
init = int(time_diff_in_min / 3)
h = int(init / 3)
p = int(h / 2)
......@@ -56,21 +54,22 @@ def train(metric):
horizon = str(h) + " minutes"
period = str(p) + " minutes"
logging.debug("initial : ", initial)
logging.debug("horizon : ", horizon)
logging.debug("period : ", period)
if (init <= 0 or h <= 0 or p <= 0):
logging.debug("Dataset is not enough for training the model")
logging.debug(f"period: {period}")
logging.debug(f"horizon: {horizon}")
logging.debug(f"initial: {initial}")
if init <= 0 or h <= 0 or p <= 0:
logging.debug(f"Dataset is not enough for training the model")
sleep(30)
#Data preprocessing
# Data preprocessing
prophet_dataset = pd.DataFrame(columns=['ds', 'y'])
prophet_dataset['y'] = dataset[metric]
prophet_dataset['ds'] = dataset['ems_time']
i = 0
while (i<len(prophet_dataset)):
if (prophet_dataset['y'][i] == "None"):
while i < len(prophet_dataset):
if prophet_dataset['y'][i] == "None":
prophet_dataset['y'][i] = np.nan
i += 1
......@@ -78,11 +77,11 @@ def train(metric):
prophet_dataset = prophet_dataset.ffill(axis ='rows')
prophet_dataset = prophet_dataset.fillna(0)
logging.debug("Cleaned prophet_dataset : ", prophet_dataset)
logging.debug(f"Cleaned prophet_dataset: {prophet_dataset}")
#Model trainig
logging.debug("STARTED TRAINING FOR: " + metric)
train = prophet_dataset[:len(prophet_dataset)]
# Model training
logging.debug(f"STARTED TRAINING FOR: {metric}")
training_dataset = prophet_dataset[:len(prophet_dataset)]
'''
changepoint_prior_scale = [0.1,0.2,0.3,0.4,0.5]
......@@ -109,7 +108,7 @@ def train(metric):
grid = ParameterGrid(param_grid)
cnt = 0
for p in grid:
cnt = cnt + 1
cnt += 1
all_params = [dict(zip(param_grid.keys(), v)) for v in itertools.product(*param_grid.values())]
rmses = [] # Store the RMSEs for each params here
......@@ -119,7 +118,7 @@ def train(metric):
# use cross validation to evaluate all parameters
for params in all_params:
m = Prophet(**params).fit(train) # Fit model with given params
m = Prophet(**params).fit(training_dataset) # Fit model with given params
df_cv = cross_validation(m, initial=initial, period=period, horizon=horizon)
df_p = performance_metrics(df_cv, rolling_window=1)
rmses.append(df_p['rmse'].values[0])
......@@ -159,9 +158,9 @@ def train(metric):
# holidays_prior_scale = parameters['holidays_prior_scale'][0],
# mcmc_samples = parameters['mcmc_samples'][0]
)
final_model.fit(train)
final_model.fit(training_dataset)
# checking if probabilities file exist
if (os.path.isfile(directory_path + 'prob_file.npy')):
if os.path.isfile(directory_path + 'prob_file.npy'):
probs = np.load(directory_path + 'prob_file.npy', allow_pickle='TRUE').item()
probs[metric] = prob
......@@ -180,8 +179,8 @@ def train(metric):
def predict(model, number_of_forward_predictions, prediction_horizon, epoch_start):
future = list()
for i in range(1, number_of_forward_predictions + 1):
dateInSec = epoch_start + i * prediction_horizon
date = datetime.fromtimestamp(dateInSec)
date_in_sec = epoch_start + i * prediction_horizon
date = datetime.fromtimestamp(date_in_sec)
future.append(date)
future = pd.DataFrame(future)
future.columns = ['ds']
......
......@@ -19,10 +19,6 @@ from time import sleep
from dataset_maker import CSVData
from multiprocessing import Process
#logging.config.fileConfig('logging.ini', disable_existing_loggers=False)
APP_NAME = os.environ.get("APP_NAME")
ACTIVEMQ_USER = os.environ.get("ACTIVEMQ_USER")
ACTIVEMQ_PASSWORD = os.environ.get("ACTIVEMQ_PASSWORD")
......@@ -31,84 +27,69 @@ ACTIVEMQ_PORT = os.environ.get("ACTIVEMQ_PORT")
predictionTimes = dict()
models = dict()
#flags = {'avgResponseTime':0 , 'memory': 0}
metrics_processes=dict()
metrics_processes = dict()
metrics = set()
directory_path = "/morphemic_project/"
def worker(self,body,metric):
def worker(self, body, metric):
timestamp = body['timestamp']
prediction_horizon = body["prediction_horizon"]
number_of_forward_predictions = body["number_of_forward_predictions"]
epoch_start= body["epoch_start"]
epoch_start = body["epoch_start"]
predictionTimes[metric] = epoch_start
messages=list()
f=0
while (not os.path.isfile(directory_path+'models/prophet_'+metric+".pkl")):
messages = list()
f = 0
while not os.path.isfile(directory_path + 'models/prophet_' + metric + ".pkl"):
sleep(30)
logging.debug("Waiting for the trained model for metric: " + metric)
logging.debug(f"Waiting for the trained model for metric: {metric}")
while(True):
#if flags[metric] == 0:
#epoch_start = predictionTimes[metric]
#flags[metric] = 1
#load the model
while True:
with open(directory_path+"models/prophet_"+metric+".pkl", 'rb') as f:
models[metric] = pickle.load(f)
timestamp = int(time())
if (timestamp >= predictionTimes[metric]):
predictions=prophet_forecaster.predict(models[metric] , number_of_forward_predictions , prediction_horizon , epoch_start)
if timestamp >= predictionTimes[metric]:
predictions = prophet_forecaster.predict(models[metric], number_of_forward_predictions, prediction_horizon, epoch_start)
yhats = predictions['yhat'].values.tolist()
yhat_lowers = predictions['yhat_lower'].values.tolist()
yhat_uppers = predictions['yhat_upper'].values.tolist()
prediction_time= epoch_start+ prediction_horizon
# change it to the time of the start_forecasting was sent
#read probabilities file
probs = np.load(directory_path+'prob_file.npy' , allow_pickle='TRUE').item()
prediction_time = epoch_start + prediction_horizon
# change it to the time of the start_forecasting was sent
# read probabilities file
probs = np.load(directory_path+'prob_file.npy', allow_pickle='TRUE').item()
for k in range(0,len(predictions['yhat'].values.tolist())):
yhat = yhats[k]
yhat_lower = yhat_lowers[k]
yhat_upper = yhat_uppers[k]
#wait until epoch_start to send
# wait until epoch_start to send
message = {
"metricValue": yhat,
"level": 3,
"timestamp": timestamp,
"probability": probs[metric],
"confidence_interval" : [yhat_lower,yhat_upper],
"confidence_interval": [yhat_lower, yhat_upper],
"horizon": prediction_horizon,
"predictionTime" : int(prediction_time), #
"predictionTime": int(prediction_time), #
"refersTo": "todo",
"cloud": "todo",
"provider": "todo"
}
self.connector.send_to_topic('intermediate_prediction.prophet.'+metric, message)
prediction_time=prediction_time + prediction_horizon
epoch_start = epoch_start+ prediction_horizon
prediction_time = prediction_time + prediction_horizon
epoch_start = epoch_start + prediction_horizon
sleep(prediction_horizon)
class Prophet(morphemic.handler.ModelHandler,messaging.listener.MorphemicListener):
class Prophet(morphemic.handler.ModelHandler, messaging.listener.MorphemicListener):
id = "prophet"
metrics = set()
def __init__(self):
self._run = False
self._run = False
self.connector = messaging.morphemic.Connection(ACTIVEMQ_USER,ACTIVEMQ_PASSWORD, host=ACTIVEMQ_HOSTNAME, port=ACTIVEMQ_PORT)
def run(self):
#logging.debug("setting up")
#sleep(180)
#logging.debug("starting the connection to ActiveMQ")
self.connector.connect()
self.connector.set_listener(self.id, self)
self.connector.topic("start_forecasting.prophet", self.id)
......@@ -116,71 +97,65 @@ class Prophet(morphemic.handler.ModelHandler,messaging.listener.MorphemicListene
self.connector.topic("metrics_to_predict", self.id)
def reconnect(self):
print('Reconnecting to ActiveMQ')
logging.debug(f"Reconnecting to ActiveMQ")
self.connector.disconnect()
self.run()
pass
def on_start_forecasting_prophet(self, body):
logging.debug("Prophet Start Forecasting the following metrics :")
logging.debug(f"Prophet Start Forecasting the following metrics:")
sent_metrics = body["metrics"]
logging.debug(sent_metrics)
for metric in sent_metrics:
if metric not in metrics:
metrics.add(metric)
#thread = threading.Thread(target=worker , args=(self, body, metric,))
if metric not in metrics_processes:
if metric not in metrics_processes:
metrics_processes[metric] = Process(target=worker, args=(self, body, metric,))
metrics_processes[metric] .start()
def on_metrics_to_predict(self, body):
#getting data from datasetmaker
# getting data from dataset maker
dataset_preprocessor = CSVData(APP_NAME)
data_file_path = os.path.join(os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv')
dataset_preprocessor.prepare_csv()
while (not os.path.isfile(data_file_path)):
logging.