Commit e5905f44 authored by Marta Różańska's avatar Marta Różańska
Browse files

Merge branch 'iccs-eshybrid' into 'morphemic-rc1.5'

Iccs eshybrid

See merge request !247
parents 3046d101 580fda80
Pipeline #19237 passed with stages
in 27 minutes and 10 seconds
import os
from six.moves import urllib
import subprocess
import numpy as np
import pandas as pd
from ESRNN.utils_evaluation import Naive2
seas_dict = {'Hourly': {'seasonality': 24, 'input_size': 24,
'output_size': 48, 'freq': 'H'},
'Daily': {'seasonality': 7, 'input_size': 7,
'output_size': 14, 'freq': 'D'},
'Weekly': {'seasonality': 52, 'input_size': 52,
'output_size': 13, 'freq': 'W'},
'Monthly': {'seasonality': 12, 'input_size': 12,
'output_size':18, 'freq': 'M'},
'Quarterly': {'seasonality': 4, 'input_size': 4,
'output_size': 8, 'freq': 'Q'},
'Yearly': {'seasonality': 1, 'input_size': 4,
'output_size': 6, 'freq': 'D'}}
SOURCE_URL = 'https://raw.githubusercontent.com/Mcompetitions/M4-methods/master/Dataset/'
def maybe_download(filename, directory):
"""
Download the data from M4's website, unless it's already here.
Parameters
----------
filename: str
Filename of M4 data with format /Type/Frequency.csv. Example: /Test/Daily-train.csv
directory: str
Custom directory where data will be downloaded.
"""
data_directory = directory + "/m4"
train_directory = data_directory + "/Train/"
test_directory = data_directory + "/Test/"
if not os.path.exists(data_directory):
os.mkdir(data_directory)
if not os.path.exists(train_directory):
os.mkdir(train_directory)
if not os.path.exists(test_directory):
os.mkdir(test_directory)
filepath = os.path.join(data_directory, filename)
if not os.path.exists(filepath):
filepath, _ = urllib.request.urlretrieve(SOURCE_URL + filename, filepath)
size = os.path.getsize(filepath)
print('Successfully downloaded', filename, size, 'bytes.')
return filepath
def m4_parser(dataset_name, directory, num_obs=1000000):
"""
Transform M4 data into a panel.
Parameters
----------
dataset_name: str
Frequency of the data. Example: 'Yearly'.
directory: str
Custom directory where data will be saved.
num_obs: int
Number of time series to return.
"""
data_directory = directory + "/m4"
train_directory = data_directory + "/Train/"
test_directory = data_directory + "/Test/"
freq = seas_dict[dataset_name]['freq']
m4_info = pd.read_csv(data_directory+'/M4-info.csv', usecols=['M4id','category'])
m4_info = m4_info[m4_info['M4id'].str.startswith(dataset_name[0])].reset_index(drop=True)
# Train data
train_path='{}{}-train.csv'.format(train_directory, dataset_name)
train_df = pd.read_csv(train_path, nrows=num_obs)
train_df = train_df.rename(columns={'V1':'unique_id'})
train_df = pd.wide_to_long(train_df, stubnames=["V"], i="unique_id", j="ds").reset_index()
train_df = train_df.rename(columns={'V':'y'})
train_df = train_df.dropna()
train_df['split'] = 'train'
train_df['ds'] = train_df['ds']-1
# Get len of series per unique_id
len_series = train_df.groupby('unique_id').agg({'ds': 'max'}).reset_index()
len_series.columns = ['unique_id', 'len_serie']
# Test data
test_path='{}{}-test.csv'.format(test_directory, dataset_name)
test_df = pd.read_csv(test_path, nrows=num_obs)
test_df = test_df.rename(columns={'V1':'unique_id'})
test_df = pd.wide_to_long(test_df, stubnames=["V"], i="unique_id", j="ds").reset_index()
test_df = test_df.rename(columns={'V':'y'})
test_df = test_df.dropna()
test_df['split'] = 'test'
test_df = test_df.merge(len_series, on='unique_id')
test_df['ds'] = test_df['ds'] + test_df['len_serie'] - 1
test_df = test_df[['unique_id','ds','y','split']]
df = pd.concat((train_df,test_df))
df = df.sort_values(by=['unique_id', 'ds']).reset_index(drop=True)
# Create column with dates with freq of dataset
len_series = df.groupby('unique_id').agg({'ds': 'max'}).reset_index()
dates = []
for i in range(len(len_series)):
len_serie = len_series.iloc[i,1]
ranges = pd.date_range(start='1970/01/01', periods=len_serie, freq=freq)
dates += list(ranges)
df.loc[:,'ds'] = dates
df = df.merge(m4_info, left_on=['unique_id'], right_on=['M4id'])
df.drop(columns=['M4id'], inplace=True)
df = df.rename(columns={'category': 'x'})
X_train_df = df[df['split']=='train'].filter(items=['unique_id', 'ds', 'x'])
y_train_df = df[df['split']=='train'].filter(items=['unique_id', 'ds', 'y'])
X_test_df = df[df['split']=='test'].filter(items=['unique_id', 'ds', 'x'])
y_test_df = df[df['split']=='test'].filter(items=['unique_id', 'ds', 'y'])
X_train_df = X_train_df.reset_index(drop=True)
y_train_df = y_train_df.reset_index(drop=True)
X_test_df = X_test_df.reset_index(drop=True)
y_test_df = y_test_df.reset_index(drop=True)
return X_train_df, y_train_df, X_test_df, y_test_df
def naive2_predictions(dataset_name, directory, num_obs, y_train_df = None, y_test_df = None):
"""
Computes Naive2 predictions.
Parameters
----------
directory: str
Custom directory where data will be saved.
num_obs: int
Number of time series to return.
y_train_df: DataFrame
Y train set returned by m4_parser
y_test_df: DataFrame
Y test set returned by m4_parser
"""
# Read train and test data
if (y_train_df is None) or (y_test_df is None):
_, y_train_df, _, y_test_df = m4_parser(dataset_name, directory, num_obs)
config_dict = seas_dict
if isinstance(dataset_name,dict):
config_dict['manual'] = dataset_name
dataset_name = 'manual'
seasonality = config_dict[dataset_name]['seasonality']
input_size = config_dict[dataset_name]['input_size']
output_size = config_dict[dataset_name]['output_size']
freq = config_dict [dataset_name]['freq']
print('Preparing {} dataset'.format(dataset_name))
print('Preparing Naive2 {} dataset predictions'.format(dataset_name))
# Naive2
y_naive2_df = pd.DataFrame(columns=['unique_id', 'ds', 'y_hat'])
# Sort X by unique_id for faster loop
y_train_df = y_train_df.sort_values(by=['unique_id', 'ds'])
# List of uniques ids
unique_ids = y_train_df['unique_id'].unique()
# Panel of fitted models
for unique_id in unique_ids:
# Fast filter X and y by id.
top_row = np.asscalar(y_train_df['unique_id'].searchsorted(unique_id, 'left'))
bottom_row = np.asscalar(y_train_df['unique_id'].searchsorted(unique_id, 'right'))
y_id = y_train_df[top_row:bottom_row]
y_naive2 = pd.DataFrame(columns=['unique_id', 'ds', 'y_hat'])
y_naive2['ds'] = pd.date_range(start=y_id.ds.max(),
periods=output_size+1, freq=freq)[1:]
y_naive2['unique_id'] = unique_id
y_naive2['y_hat'] = Naive2(seasonality).fit(y_id.y.to_numpy()).predict(output_size)
y_naive2_df = y_naive2_df.append(y_naive2)
y_naive2_df = y_test_df.merge(y_naive2_df, on=['unique_id', 'ds'], how='left')
y_naive2_df.rename(columns={'y_hat': 'y_hat_naive2'}, inplace=True)
results_dir = directory + '/results'
naive2_file = results_dir + '/{}-naive2predictions_{}.csv'.format(dataset_name, num_obs)
y_naive2_df.to_csv(naive2_file, encoding='utf-8', index=None)
return y_naive2_df
def prepare_m4_data(dataset_name, directory, num_obs):
"""
Pipeline that obtains M4 times series, tranforms it and gets naive2 predictions.
Parameters
----------
dataset_name: str
Frequency of the data. Example: 'Yearly'.
directory: str
Custom directory where data will be saved.
num_obs: int
Number of time series to return.
py_predictions: bool
whether use python or r predictions
"""
m4info_filename = maybe_download('M4-info.csv', directory)
dailytrain_filename = maybe_download('Train/Daily-train.csv', directory)
hourlytrain_filename = maybe_download('Train/Hourly-train.csv', directory)
monthlytrain_filename = maybe_download('Train/Monthly-train.csv', directory)
quarterlytrain_filename = maybe_download('Train/Quarterly-train.csv', directory)
weeklytrain_filename = maybe_download('Train/Weekly-train.csv', directory)
yearlytrain_filename = maybe_download('Train/Yearly-train.csv', directory)
dailytest_filename = maybe_download('Test/Daily-test.csv', directory)
hourlytest_filename = maybe_download('Test/Hourly-test.csv', directory)
monthlytest_filename = maybe_download('Test/Monthly-test.csv', directory)
quarterlytest_filename = maybe_download('Test/Quarterly-test.csv', directory)
weeklytest_filename = maybe_download('Test/Weekly-test.csv', directory)
yearlytest_filename = maybe_download('Test/Yearly-test.csv', directory)
print('\n')
X_train_df, y_train_df, X_test_df, y_test_df = m4_parser(dataset_name, directory, num_obs)
results_dir = directory + '/results'
if not os.path.exists(results_dir):
os.mkdir(results_dir)
naive2_file = results_dir + '/{}-naive2predictions_{}.csv'
naive2_file = naive2_file.format(dataset_name, num_obs)
if not os.path.exists(naive2_file):
y_naive2_df = naive2_predictions(dataset_name, directory, num_obs, y_train_df, y_test_df)
else:
y_naive2_df = pd.read_csv(naive2_file)
y_naive2_df['ds'] = pd.to_datetime(y_naive2_df['ds'])
return X_train_df, y_train_df, X_test_df, y_naive2_df
import os
import argparse
import itertools
import ast
import pickle
import time
import os
import numpy as np
import pandas as pd
from ESRNN.m4_data import prepare_m4_data
from ESRNN.utils_evaluation import evaluate_prediction_owa
from ESRNN.utils_configs import get_config
from ESRNN import ESRNN
import torch
def main(args):
config = get_config(args.dataset)
if config['data_parameters']['frequency'] == 'Y':
config['data_parameters']['frequency'] = None
#Setting needed parameters
os.environ['CUDA_VISIBLE_DEVICES'] = str(args.gpu_id)
if args.num_obs:
num_obs = args.num_obs
else:
num_obs = 100000
if args.use_cpu == 1:
config['device'] = 'cpu'
else:
assert torch.cuda.is_available(), 'No cuda devices detected. You can try using CPU instead.'
#Reading data
print('Reading data')
X_train_df, y_train_df, X_test_df, y_test_df = prepare_m4_data(dataset_name=args.dataset,
directory=args.results_directory,
num_obs=num_obs)
# Instantiate model
model = ESRNN(max_epochs=config['train_parameters']['max_epochs'],
batch_size=config['train_parameters']['batch_size'],
freq_of_test=config['train_parameters']['freq_of_test'],
learning_rate=float(config['train_parameters']['learning_rate']),
lr_scheduler_step_size=config['train_parameters']['lr_scheduler_step_size'],
lr_decay=config['train_parameters']['lr_decay'],
per_series_lr_multip=config['train_parameters']['per_series_lr_multip'],
gradient_clipping_threshold=config['train_parameters']['gradient_clipping_threshold'],
rnn_weight_decay=config['train_parameters']['rnn_weight_decay'],
noise_std=config['train_parameters']['noise_std'],
level_variability_penalty=config['train_parameters']['level_variability_penalty'],
testing_percentile=config['train_parameters']['testing_percentile'],
training_percentile=config['train_parameters']['training_percentile'],
ensemble=config['train_parameters']['ensemble'],
max_periods=config['data_parameters']['max_periods'],
seasonality=config['data_parameters']['seasonality'],
input_size=config['data_parameters']['input_size'],
output_size=config['data_parameters']['output_size'],
frequency=config['data_parameters']['frequency'],
cell_type=config['model_parameters']['cell_type'],
state_hsize=config['model_parameters']['state_hsize'],
dilations=config['model_parameters']['dilations'],
add_nl_layer=config['model_parameters']['add_nl_layer'],
random_seed=config['model_parameters']['random_seed'],
device=config['device'])
if args.test == 1:
model = ESRNN(max_epochs=1,
batch_size=20,
seasonality=config['data_parameters']['seasonality'],
input_size=config['data_parameters']['input_size'],
output_size=config['data_parameters']['output_size'],
frequency=config['data_parameters']['frequency'],
device=config['device'])
# Fit model
# If y_test_df is provided the model will evaluate predictions on this set every freq_test epochs
model.fit(X_train_df, y_train_df, X_test_df, y_test_df)
# Predict on test set
print('\nForecasting')
y_hat_df = model.predict(X_test_df)
# Evaluate predictions
print(15*'=', ' Final evaluation ', 14*'=')
seasonality = config['data_parameters']['seasonality']
if not seasonality:
seasonality = 1
else:
seasonality = seasonality[0]
final_owa, final_mase, final_smape = evaluate_prediction_owa(y_hat_df, y_train_df,
X_test_df, y_test_df,
naive2_seasonality=seasonality)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Replicate M4 results for the ESRNN model')
parser.add_argument("--dataset", required=True, type=str,
choices=['Yearly', 'Quarterly', 'Monthly', 'Weekly', 'Hourly', 'Daily'],
help="set of M4 time series to be tested")
parser.add_argument("--results_directory", required=True, type=str,
help="directory where M4 data will be downloaded")
parser.add_argument("--gpu_id", required=False, type=int,
help="an integer that specify which GPU will be used")
parser.add_argument("--use_cpu", required=False, type=int,
help="1 to use CPU instead of GPU (uses GPU by default)")
parser.add_argument("--num_obs", required=False, type=int,
help="number of M4 time series to be tested (uses all data by default)")
parser.add_argument("--test", required=False, type=int,
help="run fast for tests (no test by default)")
args = parser.parse_args()
main(args)
#Testing ESRNN
import runpy
import os
print('\n')
print(10*'='+'TEST ESRNN'+10*'=')
print('\n')
def test_esrnn_hourly():
if not os.path.exists('./data'):
os.mkdir('./data')
print('\n')
print(10*'='+'HOURLY'+10*'=')
print('\n')
exec_str = 'python -m ESRNN.m4_run --dataset Hourly '
exec_str += '--results_directory ./data --gpu_id 0 '
exec_str += '--use_cpu 1 --num_obs 100 --test 1'
results = os.system(exec_str)
if results==0:
print('Test completed')
else:
raise Exception('Something went wrong')
def test_esrnn_weekly():
if not os.path.exists('./data'):
os.mkdir('./data')
print('\n')
print(10*'='+'WEEKLY'+10*'=')
print('\n')
exec_str = 'python -m ESRNN.m4_run --dataset Weekly '
exec_str += '--results_directory ./data --gpu_id 0 '
exec_str += '--use_cpu 1 --num_obs 100 --test 1'
results = os.system(exec_str)
if results==0:
print('Test completed')
else:
raise Exception('Something went wrong')
def test_esrnn_daily():
if not os.path.exists('./data'):
os.mkdir('./data')
print('\n')
print(10*'='+'DAILY'+10*'=')
print('\n')
exec_str = 'python -m ESRNN.m4_run --dataset Daily '
exec_str += '--results_directory ./data --gpu_id 0 '
exec_str += '--use_cpu 1 --num_obs 100 --test 1'
results = os.system(exec_str)
if results==0:
print('Test completed')
else:
raise Exception('Something went wrong')
def test_esrnn_monthly():
if not os.path.exists('./data'):
os.mkdir('./data')
print('\n')
print(10*'='+'MONTHLY'+10*'=')
print('\n')
exec_str = 'python -m ESRNN.m4_run --dataset Monthly '
exec_str += '--results_directory ./data --gpu_id 0 '
exec_str += '--use_cpu 1 --num_obs 100 --test 1'
results = os.system(exec_str)
if results==0:
print('Test completed')
else:
raise Exception('Something went wrong')
def test_esrnn_quarterly():
if not os.path.exists('./data'):
os.mkdir('./data')
print('\n')
print(10*'='+'QUARTERLY'+10*'=')
print('\n')
exec_str = 'python -m ESRNN.m4_run --dataset Quarterly '
exec_str += '--results_directory ./data --gpu_id 0 '
exec_str += '--use_cpu 1 --num_obs 100 --test 1'
results = os.system(exec_str)
if results==0:
print('Test completed')
else:
raise Exception('Something went wrong')
def test_esrnn_yearly():
if not os.path.exists('./data'):
os.mkdir('./data')
print('\n')
print(10*'='+'YEARLY'+10*'=')
print('\n')
exec_str = 'python -m ESRNN.m4_run --dataset Yearly '
exec_str += '--results_directory ./data --gpu_id 0 '
exec_str += '--use_cpu 1 --num_obs 100 --test 1'
results = os.system(exec_str)
if results==0:
print('Test completed')
else:
raise Exception('Something went wrong')
......@@ -2,10 +2,10 @@ import messaging
import morphemic
import morphemic.dataset
import os
import time
import logging
import signal
import datetime
import time
import socket
class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListener, morphemic.scheduler.Handler):
......@@ -16,6 +16,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
def __init__(self,config):
self._run=False
self._interval_count =1
self.id = (config['listener'] or {'id':'eshybrid'} )['id']
self.connector = messaging.morphemic.Connection(
config['messaging']['username'],
......@@ -25,9 +26,11 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
timeout=6000,
keepalive=True
)
self.model = morphemic.model.Model(self)
self.model = morphemic.model.Model(config['persistence']['application'],self)
self.application = config['persistence']['application']
os.makedirs(config['persistence']['path_dataset'], exist_ok=True)
self.data_set_path =config['persistence']['path_dataset']
os.makedirs(self.data_set_path, exist_ok=True)
influx = {'hostname': config['persistence']['host'],
'port': config['persistence']['port'],
'username': config['persistence']['username'],
......@@ -43,13 +46,45 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
)
def wait_for_port(self, port, host='localhost', retries=5.0, timeout=5):
"""Wait until a port starts accepting TCP connections.
Args:
port (int): Port number.
host (str): Host address on which the port should exist.
timeout (float): In seconds. How long to wait before raising errors.
Raises:
TimeoutError: The port isn't accepting connection after time specified in `timeout`.
"""
while retries > 0:
try:
with socket.create_connection((host, port), timeout=timeout):
break
except OSError as ex:
logging.debug("Failed to connect to %s:%s waiting for 5 seconds " % (host,port))
retries = retries-1
time.sleep(5.00)
if retries <=0:
logging.error("Failed to connect aborting")
return retries >0
def run(self):
logging.debug("setting up")
if not self.wait_for_port(self.connector.hosts[0][1],self.connector.hosts[0][0]):
logging.debug("couldn't connect to host")
return False
self.connector.set_listener(self.id, self)
self.connector.connect()
self.connector.topic("start_forecasting.%s" % self.id, self.id)
self.connector.topic("stop_forecasting.%s" % self.id, self.id)
self.connector.topic("metrics_to_predict", self.id)
return True
def topic(self,topic):
self.connector.topic(topic, self.id)
......@@ -71,29 +106,40 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
logging.debug("Stopping...")
self._run = False
def start(self):
logging.debug("Starting ESHybrid")
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGHUP, self.signal_handler)
self.run()
self._run=True
while self._run:
if self.scheduler:
self.scheduler.check(self)
time.sleep(1)
self.connector.disconnect()