Commit 92f6f0b5 authored by maciek riedl's avatar maciek riedl
Browse files

Merge branch 'enhancements' into 'morphemic-rc2.0'

Fix and Enhance Prophet and Gluonts forecasters

See merge request !315
parents f350aa61 a9f525e9
Pipeline #21745 passed with stages
in 42 minutes and 25 seconds
......@@ -46,7 +46,8 @@ def worker(self, body, metric):
with open(directory_path + "models/gluonts_" + metric + ".pkl", 'rb') as f:
models[metric] = pickle.load(f)
timestamp = int(time())
if timestamp >= predictionTimes[metric]:
timestamp_horizon = timestamp + (prediction_horizon * number_of_forward_predictions)
if timestamp_horizon >= predictionTimes[metric]:
logging.debug(f"Start the prediction for metric: {metric}")
predictions = gluonts_forecaster.predict(models[metric], number_of_forward_predictions,
prediction_horizon, epoch_start, metric)
......@@ -77,8 +78,9 @@ def worker(self, body, metric):
})
prediction_time = prediction_time + prediction_horizon
epoch_start = epoch_start + prediction_horizon
sleep(prediction_horizon - 5)
execution_time = int(time()) - timestamp
sleep_time = prediction_horizon - execution_time - 5
sleep(sleep_time)
class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListener):
id = "gluonts"
......@@ -89,7 +91,16 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen
port=ACTIVEMQ_PORT)
def run(self):
self.connector.connect()
retries = 11
while retries > 10:
try:
self.connector.connect()
retries = 1
logging.debug(f"GluonTS successefuly connected to ActiveMQ")
except:
logging.debug(f"GluonTS failed to connect to ActiveMQ")
sleep(5)
retries = retries + 1
self.connector.set_listener(self.id, self)
self.connector.topic("start_forecasting.gluonts", self.id)
self.connector.topic("stop_forecasting.gluonts", self.id)
......@@ -153,4 +164,3 @@ class Gluonts(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen
def on_disconnected(self):
logging.debug(f'Disconnected from ActiveMQ')
self.reconnect()
......@@ -14,8 +14,8 @@ import pickle
import ast
import logging.config
from stomp import exception
from time import time
from time import sleep
from time import time
from dataset_maker import CSVData
from multiprocessing import Process
......@@ -28,42 +28,40 @@ ACTIVEMQ_PORT = os.environ.get("ACTIVEMQ_PORT")
predictionTimes = dict()
models = dict()
metrics_processes = dict()
metrics = set()
directory_path = "/morphemic_project/"
def worker(self, body, metric):
timestamp = body['timestamp']
prediction_horizon = body["prediction_horizon"]
number_of_forward_predictions = body["number_of_forward_predictions"]
number_of_forward_predictions = body["number_of_forward_predictions"]
epoch_start = body["epoch_start"]
predictionTimes[metric] = epoch_start
messages = list()
f = 0
while not os.path.isfile(directory_path + 'models/prophet_' + metric + ".pkl"):
sleep(30)
logging.debug(f"Waiting for the trained model for metric: {metric}")
while True:
with open(directory_path+"models/prophet_"+metric+".pkl", 'rb') as f:
with open(directory_path + "models/prophet_" + metric + ".pkl", 'rb') as f:
models[metric] = pickle.load(f)
timestamp = int(time())
if timestamp >= predictionTimes[metric]:
predictions = prophet_forecaster.predict(models[metric], number_of_forward_predictions, prediction_horizon, epoch_start)
timestamp = int(time())
timestamp_horizon = timestamp + (prediction_horizon * number_of_forward_predictions)
if timestamp_horizon >= predictionTimes[metric]:
predictions = prophet_forecaster.predict(models[metric], number_of_forward_predictions, prediction_horizon,
epoch_start)
yhats = predictions['yhat'].values.tolist()
yhat_lowers = predictions['yhat_lower'].values.tolist()
yhat_uppers = predictions['yhat_upper'].values.tolist()
prediction_time = epoch_start + prediction_horizon
# change it to the time of the start_forecasting was sent
# read probabilities file
probs = np.load(directory_path+'prob_file.npy', allow_pickle='TRUE').item()
for k in range(0,len(predictions['yhat'].values.tolist())):
probs = np.load(directory_path + 'prob_file.npy', allow_pickle='TRUE').item()
for k in range(0, len(predictions['yhat'].values.tolist())):
yhat = yhats[k]
yhat_lower = yhat_lowers[k]
yhat_upper = yhat_uppers[k]
# wait until epoch_start to send
start_sending_time = time.time()
message = {
"metricValue": yhat,
"level": 3,
......@@ -71,28 +69,41 @@ def worker(self, body, metric):
"probability": probs[metric],
"confidence_interval": [yhat_lower, yhat_upper],
"horizon": prediction_horizon,
"predictionTime": int(prediction_time), #
"predictionTime": int(prediction_time),
"refersTo": "todo",
"cloud": "todo",
"provider": "todo"
}
self.connector.send_to_topic('intermediate_prediction.prophet.'+metric, message)
"provider": "todo"
}
self.connector.send_to_topic('intermediate_prediction.prophet.' + metric, message)
prediction_time = prediction_time + prediction_horizon
epoch_start = epoch_start + prediction_horizon
execution_time = time.time() - start_sending_time
sleep(prediction_horizon - execution_time)
execution_time = int(time()) - timestamp
sleep_time = prediction_horizon - execution_time
sleep(sleep_time)
class Prophet(morphemic.handler.ModelHandler, messaging.listener.MorphemicListener):
id = "prophet"
metrics = set()
last_training_time = 0
deployed_version = 1
def __init__(self):
self._run = False
self.connector = messaging.morphemic.Connection(ACTIVEMQ_USER,ACTIVEMQ_PASSWORD, host=ACTIVEMQ_HOSTNAME, port=ACTIVEMQ_PORT)
self.connector = messaging.morphemic.Connection(ACTIVEMQ_USER, ACTIVEMQ_PASSWORD, host=ACTIVEMQ_HOSTNAME,
port=ACTIVEMQ_PORT)
def run(self):
self.connector.connect()
retries = 11
while retries > 10:
try:
self.connector.connect()
retries = 1
logging.debug(f"Prophet connected to ActiveMQ")
except:
logging.debug(f"Prophet failed to connect to ActiveMQ")
sleep(5)
retries += 1
self.connector.set_listener(self.id, self)
self.connector.topic("start_forecasting.prophet", self.id)
self.connector.topic("stop_forecasting.prophet", self.id)
......@@ -104,45 +115,68 @@ class Prophet(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen
self.run()
pass
def retrain(self):
while True:
time.sleep(300)
dataset_preprocessor = CSVData(APP_NAME)
data_file_path = os.path.join(os.environ.get("DATA_PATH", "./"),
f'{os.environ.get("APP_NAME", "demo")}.csv')
dataset_preprocessor.prepare_csv()
while not os.path.isfile(data_file_path):
logging.debug(f"Waiting for dataset to be loaded")
sleep(30)
dataset_preprocessor.prepare_csv()
for metric in self.metrics:
if os.path.isfile(directory_path + 'models/prophet_' + metric + "_" + self.deployed_version + ".pkl"):
logging.debug(f"Retraining the Prophet model for metric: {metric}")
model = prophet_forecaster.train(metric)
self.deployed_version += 1
new_version_pkl_path = directory_path + "models/prophet_" + metric + "_" + self.deployed_version + ".pkl"
with open(new_version_pkl_path, "wb") as f:
pickle.dump(model, f)
def on_start_forecasting_prophet(self, body):
logging.debug(f"Prophet Start Forecasting the following metrics:")
sent_metrics = body["metrics"]
logging.debug(sent_metrics)
for metric in sent_metrics:
if metric not in metrics:
metrics.add(metric)
if metric not in self.metrics:
self.metrics.add(metric)
if metric not in metrics_processes:
metrics_processes[metric] = Process(target=worker, args=(self, body, metric,))
metrics_processes[metric] .start()
metrics_processes[metric] = Process(target=worker, args=(self, body, metric))
metrics_processes[metric].start()
#self.retrain(sent_metrics)
def on_metrics_to_predict(self, body):
# getting data from dataset maker
dataset_preprocessor = CSVData(APP_NAME)
data_file_path = os.path.join(os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv')
dataset_preprocessor.prepare_csv()
while not os.path.isfile(data_file_path):
logging.debug(f"Waiting for dataset to be loaded")
sleep(30)
dataset_preprocessor.prepare_csv()
for r in body:
metric = r['metric']
if not os.path.isfile(directory_path+'models/prophet_'+metric+".pkl"):
if not os.path.isfile(directory_path + 'models/prophet_' + metric + ".pkl"):
logging.debug(f"Training a Prophet model for metric: {metric}")
model = prophet_forecaster.train(metric)
pkl_path = directory_path+"models/prophet_"+metric+".pkl"
last_training_time = int(time())
pkl_path = directory_path + "models/prophet_" + metric + ".pkl"
with open(pkl_path, "wb") as f:
pickle.dump(model, f)
metrics.add(metric)
self.metrics.add(metric)
self.connector.send_to_topic("training_models",
{
"metrics": list(metrics),
"forecasting_method": "Prophet",
"timestamp": int(time())
}
)
{
"metrics": list(self.metrics),
"forecasting_method": "Prophet",
"timestamp": int(time())
}
)
def on_stop_forecasting_prophet(self, body):
logging.debug(f"Prophet Stop Forecasting the following metrics: {body['metrics']}")
......@@ -156,7 +190,7 @@ class Prophet(morphemic.handler.ModelHandler, messaging.listener.MorphemicListen
def start(self):
logging.debug(f"Staring Prophet Forecaster")
self.run()
self._run = True
self._run = True
def on_disconnected(self):
logging.debug(f"Disconnected from ActiveMQ")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment