Commit 75668521 authored by Marta Różańska's avatar Marta Różańska
Browse files

Merge branch 'prophet-gluonmachines' into 'morphemic-rc1.5'

Adding the code of prophet and gluonmachines forecasters

See merge request !189
parents 25f7def1 337a0e44
Pipeline #16795 passed with stages
in 16 minutes and 57 seconds
from morphemic.dataset import DatasetMaker
import os
from filelock import FileLock
"""Script for preparing csv data downloaded form InfluxDB database, data"""
class CSVData(object):
def __init__(self, name, start_collection=None):
self.name = name
self.config = {
"hostname": os.environ.get("INFLUXDB_HOSTNAME", "localhost"),
"port": int(os.environ.get("INFLUXDB_PORT", "8086")),
"username": os.environ.get("INFLUXDB_USERNAME", "morphemic"),
"password": os.environ.get("INFLUXDB_PASSWORD", "password"),
"dbname": os.environ.get("INFLUXDB_DBNAME", "morphemic"),
"path_dataset": os.environ.get("DATA_PATH", "./"),
}
self.start_collection = start_collection
def prepare_csv(self):
lockfile = os.path.join(self.config["path_dataset"], f"{self.name}.csv")
lock = FileLock(lockfile + ".lock")
if os.path.isfile(lockfile):
with lock:
datasetmaker = DatasetMaker(
self.name, self.start_collection, self.config
)
response = datasetmaker.make()
else:
datasetmaker = DatasetMaker(self.name, self.start_collection, self.config)
response = datasetmaker.make()
FROM python:3.8.10
WORKDIR /morphemic_project
RUN apt-get clean && apt-get update -y -qq
RUN apt-get install -y --no-install-recommends apt-utils
RUN python -m pip install --upgrade pip
RUN apt-get install -y \
build-essential \
cmake \
pkg-config \
wget \
swig \
git \
curl \
unzip \
libaio1 \
nano \
freetds-dev \
unixodbc \
unixodbc-dev \
libjpeg-dev \
libtiff5-dev \
libpng-dev \
libgtk2.0-dev \
libavcodec-dev \
libavformat-dev \
libswscale-dev \
libv4l-dev \
libatlas-base-dev \
gfortran \
libhdf5-dev \
libtbb2 \
libtbb-dev \
libgl1-mesa-glx
COPY forecasting_gluonts/docker_image/requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt
ADD https://gitlab.ow2.org/melodic/morphemic-preprocessor/-/archive/morphemic-rc1.5/morphemic-preprocessor-morphemic-rc1.5.tar.gz /var/lib/morphemic/
COPY forecasting_gluonts/ ./
RUN cd /var/lib/morphemic/ \
&& tar -zxf morphemic-preprocessor-morphemic-rc1.5.tar.gz \
&& cd morphemic-preprocessor-morphemic-rc1.5 \
&& cd morphemic-datasetmaker && python3 setup.py install \
&& cd ../.. \
&& rm -rf /var/lib/morphemic \
&& mkdir -p /logs && mkdir /morphemic_project/models
CMD python /morphemic_project/main.py > /logs/gluonts.log 2>&1
numpy
stomp.py
influxdb
filelock
python-slugify
scikit-learn
pandas
scipy
tqdm
matplotlib
tensorflow
keras
PyYAML
Timeloop
mxnet
gluonts
# Remove all images
sudo docker rmi -f $(sudo docker images)
# Build the image from dockerfile and clone the latest version of my code
sudo docker build . -t gitlab.ow2.org:4567/melodic/morphemic-preprocessor/gluonmachines:morphemic-rc1.5
# Test the image
#sudo docker run -it --env-file variables.env gitlab.ow2.org:4567/melodic/morphemic-preprocessor/gluonmachines:morphemic-rc1.5
# Push the image to gitlab
sudo docker login gitlab.ow2.org:4567
sudo docker push gitlab.ow2.org:4567/melodic/morphemic-preprocessor/gluonmachines:morphemic-rc1.5
docker run -t --env-file=variables.env --network=bridge $image_name
ACTIVEMQ_USER=admin
ACTIVEMQ_PASSWORD=admin
ACTIVEMQ_PORT=61613
ACTIVEMQ_HOSTNAME=172.17.0.1
APP_NAME=default_application
METHOD=gluonmachines
DATA_PATH=/morphemic_project/forecasting_gluonts/
INFLUXDB_HOSTNAME=persistentstorage
INFLUXDB_PORT=8086
INFLUXDB_USERNAME=morphemic
INFLUXDB_PASSWORD=password
INFLUXDB_DBNAME=morphemic
import pandas as pd
import numpy as np
import logging
import itertools
from sklearn import preprocessing
from math import log
from math import exp
pd.set_option('display.max_row', 500)
import itertools
from sklearn.model_selection import ParameterGrid
#from dataset_maker import CSVData
from time import time
from datetime import datetime
from datetime import timedelta
import ast
import pickle
import json
import os
import matplotlib.pyplot as plt
import mxnet as mx
from mxnet import gluon
from gluonts.dataset.common import ListDataset
from gluonts.model.simple_feedforward import SimpleFeedForwardEstimator
from gluonts.model.deepar import DeepAREstimator
from gluonts.mx.trainer import Trainer
from gluonts.evaluation.backtest import make_evaluation_predictions
from gluonts.evaluation import Evaluator
from gluonts.dataset.util import to_pandas
from gluonts.dataset.field_names import FieldName
from gluonts.model.forecast import SampleForecast
from pandas import Timestamp
import itertools
from sklearn.model_selection import ParameterGrid
import statistics
import math
directory_path = "/morphemic_project/"
def train(metric):
data_file_path = os.path.join(os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv')
dataset = pd.read_csv(data_file_path)
#changing the names and the format of the attributes
gluonts_dataset= pd.DataFrame()
gluonts_dataset['ds'] = dataset["ems_time"]
gluonts_dataset['y']=dataset[metric]
gluonts_dataset['y'] = pd.to_numeric(gluonts_dataset['y'], errors='coerce')
for i in range (0,len(gluonts_dataset['ds'])):
gluonts_dataset['ds'] [i]= datetime.fromtimestamp(gluonts_dataset['ds'] [i])
for i in range(0,len(gluonts_dataset)):
ds=gluonts_dataset['ds'][i]
gluonts_dataset['ds'][i+1]=ds + timedelta(seconds=60)
for i in range(0,len(gluonts_dataset['y'])):
if math.isnan(float(gluonts_dataset['y'][i])):
# print("true")
gluonts_dataset['y'][i] = gluonts_dataset['y'].mean()
size = len(gluonts_dataset)
logging.debug("STARTED TRAINING FOR: "+ metric)
#splitting to train and test
test_percentage=0.2
training_window_size=int(len(gluonts_dataset)-(len(gluonts_dataset)*test_percentage))
train=gluonts_dataset[:training_window_size]
validation=gluonts_dataset[training_window_size:]
train_time = train['ds'][training_window_size-1]
validation_time = gluonts_dataset['ds'][size-1]
freq='1min'
gluonts_dataset=gluonts_dataset.set_index('ds')
train_ds = ListDataset([{"start":gluonts_dataset.index[0], "target":gluonts_dataset.y[:train_time]}],freq=freq)
validation_ds = ListDataset([{"start":gluonts_dataset.index[0],"target":gluonts_dataset.y[:validation_time]}],freq=freq)
train_entry = next(iter(train_ds))
train_entry.keys()
validation_entry = next(iter(validation_ds))
validation_entry.keys()
train_series = to_pandas(train_entry)
validation_series = to_pandas(validation_entry)
prediction_length = len(validation_series) - len(train_series)
#hyperparameter tuning and cross validation
batch_size = [75]
epochs = [5]
num_batches_per_epoch = [10]
learning_rate = [1e-3]
context_length = [5]
param_grid = {'batch_size': batch_size,
'epochs': epochs,
'num_batches_per_epoch': num_batches_per_epoch,
'learning_rate': learning_rate,
'context_length': context_length
}
grid = ParameterGrid(param_grid)
cnt = 0
for p in grid:
cnt = cnt+1
all_params = [dict(zip(param_grid.keys(), v)) for v in itertools.product(*param_grid.values())]
agg_metrics_all=list()
item_metrics_all=list()
for params in all_params:
estimator = DeepAREstimator(
prediction_length=prediction_length,
context_length=params['context_length'],
freq=freq,
trainer=Trainer(ctx="cpu",
epochs=params['epochs'],
learning_rate=params['learning_rate'],
num_batches_per_epoch=params['num_batches_per_epoch']
)
)
predictor = estimator.train(training_data = train_ds)
forecast_it, ts_it = make_evaluation_predictions(
dataset=validation_ds, # validationdataset
predictor=predictor, # predictor
num_samples=20, # number of sample paths we want for evaluation
)
forecasts = list(forecast_it)
tss = list(ts_it)
evaluator = Evaluator(quantiles=[0.1,0.5,0.9])
agg_metrics, item_metrics = evaluator(iter(tss), iter(forecasts), num_series=len(validation_ds))
#agg_metrics['num_hidden_dimensions'] = params['num_hidden_dimensions']
agg_metrics['epochs'] = params['epochs']
agg_metrics['learning_rate'] = params['learning_rate']
agg_metrics['num_batches_per_epoch'] = params['num_batches_per_epoch']
agg_metrics['context_length'] = params['context_length']
agg_metrics['forecast'] = forecasts
agg_metrics_all.append(agg_metrics)
item_metrics_all.append(item_metrics)
dataframe = pd.DataFrame(agg_metrics_all)
sorted1 = dataframe.sort_values(by=['MAPE'])
sorted1 = sorted1.reset_index(drop=True)
estimator1 = DeepAREstimator(
prediction_length=prediction_length,
context_length=sorted1['context_length'][0],
freq=freq,
trainer=Trainer(ctx="cpu",
epochs=sorted1['epochs'][0],
learning_rate=sorted1['learning_rate'][0],
num_batches_per_epoch=sorted1['num_batches_per_epoch'][0]
)
)
predictor1 = estimator1.train(training_data=validation_ds)
#checking if probabilities file exist
prob=0.8
if(os.path.isfile(directory_path+'prob_file.npy')):
probs = np.load(directory_path+'prob_file.npy' , allow_pickle='TRUE').item()
probs[metric] = prob
else:
probs=dict()
probs[metric] = prob
#writing probabilities in a file
npy_filename= directory_path+'prob_file.npy'
f = open(npy_filename, "w+")
np.save(directory_path+'prob_file.npy', probs)
f.close()
return predictor1
def predict(model , number_of_forward_predictions , prediction_horizon , epoch_start , metric ):
data_file_path = os.path.join(os.environ.get("DATA_PATH", "./"), f'{os.environ.get("APP_NAME", "demo")}.csv')
dataset = pd.read_csv(data_file_path)
gluonts_dataset= pd.DataFrame()
gluonts_dataset['ds'] = dataset["ems_time"]
gluonts_dataset['y']=dataset[metric]
for i in range (0,len(gluonts_dataset['ds'])):
gluonts_dataset['ds'] [i]= datetime.fromtimestamp(gluonts_dataset['ds'] [i])
for i in range(0,len(gluonts_dataset)):
ds=gluonts_dataset['ds'][i]
gluonts_dataset['ds'][i+1]=ds + timedelta(seconds=60)
gluonts_dataset['y'] = pd.to_numeric(gluonts_dataset['y'], errors='coerce')
for i in range(0,len(gluonts_dataset['y'])):
if math.isnan(float(gluonts_dataset['y'][i])):
gluonts_dataset['y'][i] = gluonts_dataset['y'].mean()
future = list()
for i in range(1, number_of_forward_predictions+1):
dateInSec = epoch_start + i*prediction_horizon*60
date=datetime.fromtimestamp(dateInSec)
future.append(date)
future = pd.DataFrame(future)
future.columns = ['ds']
target = list(gluonts_dataset.y[-number_of_forward_predictions:])
for i in range(0,len(target)):
new_row = {'ds': future['ds'][i], 'y':target[i]}
gluonts_dataset=gluonts_dataset.append( new_row , ignore_index=True)
gluonts_dataset=gluonts_dataset.set_index('ds')
test_time = list(future['ds'])[-1]
test_ds = ListDataset([{"start":gluonts_dataset.index[0],"target":gluonts_dataset.y[:test_time]}],freq='1min')
forecast_it, ts_it = make_evaluation_predictions(
dataset=test_ds, # test dataset
predictor=model, # predictor
num_samples=20, # number of sample paths we want for evaluation
)
forecasts = list(forecast_it)
forecast_entry = forecasts[0]
logging.debug(forecast_entry)
predictions=forecast_entry.samples
mins=list()
maxs=list()
values=list()
returnDict=dict()
for i in range(0 , number_of_forward_predictions):
mylist=list()
for line in predictions:
mylist.append(line[i])
mini = min(mylist)
maxi = max(mylist)
value = statistics.mean(mylist)
mins.append(mini)
maxs.append(maxi)
values.append(value)
returnDict = {'mins':mins, 'maxs':maxs, 'values':values}
return returnDict
import messaging
import morphemic
import gluonts_forecaster
from time import time
import logging
import signal
import threading
import numpy as np
# Libraries required for training and prediction
import os
import json
import pickle
import ast
from time import sleep
from multiprocessing import Process
from dataset_maker import CSVData
APP_NAME = os.environ.get("APP_NAME")
ACTIVEMQ_USER = os.environ.get("ACTIVEMQ_USER")
ACTIVEMQ_PASSWORD = os.environ.get("ACTIVEMQ_PASSWORD")
ACTIVEMQ_HOSTNAME = os.environ.get("ACTIVEMQ_HOSTNAME")
ACTIVEMQ_PORT = os.environ.get("ACTIVEMQ_PORT")
predictionTimes = dict()
models = dict()
#flags = {'avgResponseTime':0 , 'memory': 0}
metrics_processes=dict()
metrics = set()
directory_path = "/morphemic_project/"
def worker(self,body,metric):
timestamp = body['timestamp']
prediction_horizon = body["prediction_horizon"]
number_of_forward_predictions = body["number_of_forward_predictions"]
epoch_start= body["epoch_start"]
predictionTimes[metric] = epoch_start
while(True):
#if flags[metric] == 0:
#epoch_start = predictionTimes[metric]
#flags[metric] = 1
#load the model
if os.path.isfile(directory_path+'models/gluonts_'+metric+".pkl"):
logging.debug("Loading the trained model for metric: " + metric)
with open(directory_path+"models/gluonts_"+metric+".pkl", 'rb') as f:
models[metric] = pickle.load(f)
timestamp = int(time())
if (timestamp >= predictionTimes[metric]):
logging.debug("Start the prediction for metric: " + metric)
predictions=gluonts_forecaster.predict(models[metric] , number_of_forward_predictions , prediction_horizon , epoch_start , metric)
logging.debug(predictions)
yhats = predictions['values']
yhat_lowers = predictions['mins']
yhat_uppers = predictions['maxs']
prediction_time= epoch_start+ prediction_horizon
timestamp = int(time())
#read probabilities file
probs = np.load(directory_path+'prob_file.npy' , allow_pickle='TRUE').item()
logging.debug("Sending predictions for metric: "+ metric)
for k in range(0,len(predictions['values'])):
yhat = yhats[k]
yhat_lower = yhat_lowers[k]
yhat_upper = yhat_uppers[k]
self.connector.send_to_topic('intermediate_prediction.gluonmachines.'+metric,
{
"metricValue": float(yhat),
"level": 3,
"timestamp": timestamp,
"probability": probs[metric],
"confidence_interval" : [float(yhat_lower),float(yhat_upper)],
"horizon": prediction_horizon,
"predictionTime" : int(prediction_time),
"refersTo": "todo",
"cloud": "todo",
"provider": "todo"
})
prediction_time=prediction_time + prediction_horizon
epoch_start = epoch_start+ prediction_horizon
sleep(prediction_horizon-5)
class Gluonts(morphemic.handler.ModelHandler,messaging.listener.MorphemicListener):
id = "gluonmachines"
def __init__(self):
self._run = False
self.connector = messaging.morphemic.Connection(ACTIVEMQ_USER,ACTIVEMQ_PASSWORD, host=ACTIVEMQ_HOSTNAME, port=ACTIVEMQ_PORT)
#self.connector = messaging.morphemic.Connection('morphemic','morphemic', host='147.102.17.76', port=61616)
#self.model = morphemic.model.Model(self)
def run(self):
self.connector.connect()
self.connector.set_listener(self.id, self)
self.connector.topic("start_forecasting.gluonmachines", self.id)
self.connector.topic("stop_forecasting.gluonmachines", self.id)
self.connector.topic("metrics_to_predict", self.id)
def reconnect(self):
self.connector.disconnect()
self.run()
pass
def on_start_forecasting_gluonmachines(self, body):
logging.debug("Gluonts Start Forecasting the following metrics :")
sent_metrics = body["metrics"]
logging.debug(sent_metrics)
for metric in sent_metrics:
if metric not in metrics:
metrics.add(metric)
if metric not in metrics_processes:
metrics_processes[metric] = Process(target=worker, args=(self, body, metric,))
metrics_processes[metric] .start()
def on_metrics_to_predict(self, body):
dataset_preprocessor = CSVData(APP_NAME,start_collection='2h')
dataset_preprocessor.prepare_csv()
logging.debug("DATASET DOWNLOADED")
for r in body:
metric = r['metric']
if not os.path.isfile(directory_path+'models/gluonts_'+metric+".pkl"):
logging.debug("Training a GluonTS model for metric : " + metric)
model=gluonts_forecaster.train(metric)
pkl_path = directory_path+"models/gluonts_"+metric+".pkl"
with open(pkl_path, "wb") as f:
pickle.dump(model, f)
logging.debug("model dumped")
#flags[metric]=1
metrics.add(metric)
self.connector .send_to_topic("training_models",
{
"metrics": list(metrics),
"forecasting_method": "gluonmachines",
"timestamp": int(time())
}
)
def on_stop_forecasting_gluonmachines(self, body):
logging.debug("Gluonts Stop Forecasting the following metrics :")
logging.debug(body["metrics"])
for metric in body["metrics"]:
if metric in metrics:
#logging.debug("Remove from the list of metrics this metric: " + metric )
metrics_processes[metric] .terminate()
metrics.remove(metric)
metrics_processes.pop(metric)
def start(self):
logging.debug("Staring Gluonts Forecaster")
self.run()
self._run = True
def on_disconnected(self):
print('Disconnected from ActiveMQ')
self.reconnect()
import logging
import gluonts_listener
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)