From 3c6a638c8f8e15008459512403f2977f015e2355 Mon Sep 17 00:00:00 2001 From: Fotis Paraskevopoulos Date: Wed, 26 May 2021 13:09:11 +0300 Subject: [PATCH 1/2] Uploading eshybrid forecasting module --- morphemic-forecasting-eshybrid/.gitignore | 1 + morphemic-forecasting-eshybrid/ESRNN/ESRNN.py | 650 ++++++++++++++++++ .../ESRNN/ESRNNensemble.py | 436 ++++++++++++ .../ESRNN/__init__.py | 1 + .../ESRNN/m4_data.py | 245 +++++++ .../ESRNN/m4_run.py | 117 ++++ .../ESRNN/tests/test_esrnn.py | 120 ++++ .../ESRNN/utils/DRNN.py | 288 ++++++++ .../ESRNN/utils/ESRNN.py | 289 ++++++++ .../ESRNN/utils/__init__.py | 0 .../ESRNN/utils/config.py | 53 ++ .../ESRNN/utils/data.py | 150 ++++ .../ESRNN/utils/losses.py | 124 ++++ .../ESRNN/utils_configs.py | 232 +++++++ .../ESRNN/utils_evaluation.py | 400 +++++++++++ .../ESRNN/utils_visualization.py | 145 ++++ morphemic-forecasting-eshybrid/__init__.py | 4 + .../forecasting/__init__.py | 2 + .../forecasting/eshybrid.py | 133 ++++ morphemic-forecasting-eshybrid/main.py | 15 + morphemic-forecasting-eshybrid/messaging | 1 + .../morphemic/__init__.py | 6 + .../morphemic/configuration.py | 232 +++++++ .../morphemic/dataset.py | 33 + .../morphemic/dataset/__init__.py | 154 +++++ .../morphemic/handler.py | 5 + .../morphemic/model.py | 98 +++ .../morphemic/scheduler.py | 61 ++ .../requirements.txt | 10 + 29 files changed, 4005 insertions(+) create mode 100644 morphemic-forecasting-eshybrid/.gitignore create mode 100644 morphemic-forecasting-eshybrid/ESRNN/ESRNN.py create mode 100644 morphemic-forecasting-eshybrid/ESRNN/ESRNNensemble.py create mode 100644 morphemic-forecasting-eshybrid/ESRNN/__init__.py create mode 100644 morphemic-forecasting-eshybrid/ESRNN/m4_data.py create mode 100644 morphemic-forecasting-eshybrid/ESRNN/m4_run.py create mode 100644 morphemic-forecasting-eshybrid/ESRNN/tests/test_esrnn.py create mode 100644 morphemic-forecasting-eshybrid/ESRNN/utils/DRNN.py create mode 100644 morphemic-forecasting-eshybrid/ESRNN/utils/ESRNN.py create mode 100644 morphemic-forecasting-eshybrid/ESRNN/utils/__init__.py create mode 100644 morphemic-forecasting-eshybrid/ESRNN/utils/config.py create mode 100644 morphemic-forecasting-eshybrid/ESRNN/utils/data.py create mode 100644 morphemic-forecasting-eshybrid/ESRNN/utils/losses.py create mode 100644 morphemic-forecasting-eshybrid/ESRNN/utils_configs.py create mode 100644 morphemic-forecasting-eshybrid/ESRNN/utils_evaluation.py create mode 100644 morphemic-forecasting-eshybrid/ESRNN/utils_visualization.py create mode 100644 morphemic-forecasting-eshybrid/__init__.py create mode 100644 morphemic-forecasting-eshybrid/forecasting/__init__.py create mode 100644 morphemic-forecasting-eshybrid/forecasting/eshybrid.py create mode 100644 morphemic-forecasting-eshybrid/main.py create mode 120000 morphemic-forecasting-eshybrid/messaging create mode 100644 morphemic-forecasting-eshybrid/morphemic/__init__.py create mode 100644 morphemic-forecasting-eshybrid/morphemic/configuration.py create mode 100644 morphemic-forecasting-eshybrid/morphemic/dataset.py create mode 100644 morphemic-forecasting-eshybrid/morphemic/dataset/__init__.py create mode 100644 morphemic-forecasting-eshybrid/morphemic/handler.py create mode 100644 morphemic-forecasting-eshybrid/morphemic/model.py create mode 100644 morphemic-forecasting-eshybrid/morphemic/scheduler.py create mode 100644 morphemic-forecasting-eshybrid/requirements.txt diff --git a/morphemic-forecasting-eshybrid/.gitignore b/morphemic-forecasting-eshybrid/.gitignore new file mode 100644 index 00000000..139b3d7b --- /dev/null +++ b/morphemic-forecasting-eshybrid/.gitignore @@ -0,0 +1 @@ +./local \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/ESRNN/ESRNN.py b/morphemic-forecasting-eshybrid/ESRNN/ESRNN.py new file mode 100644 index 00000000..85e0a8db --- /dev/null +++ b/morphemic-forecasting-eshybrid/ESRNN/ESRNN.py @@ -0,0 +1,650 @@ +import os +import time + +import numpy as np +import pandas as pd + +import torch +import torch.nn as nn +import torch.optim as optim +from torch.optim.lr_scheduler import StepLR + +from pathlib import Path +from copy import deepcopy + +from ESRNN.utils.config import ModelConfig +from ESRNN.utils.ESRNN import _ESRNN +from ESRNN.utils.losses import SmylLoss, PinballLoss +from ESRNN.utils.data import Iterator + +from ESRNN.utils_evaluation import owa + + +class ESRNN(object): + """ Exponential Smoothing Recurrent Neural Network + + Pytorch Implementation of the M4 time series forecasting competition winner. + Proposed by Smyl. The model uses a hybrid approach of Machine Learning and + statistical methods by combining recurrent neural networks to model a common + trend with shared parameters across series, and multiplicative Holt-Winter + exponential smoothing. + + Parameters + ---------- + max_epochs: int + maximum number of complete passes to train data during fit + freq_of_test: int + period for the diagnostic evaluation of the model. + learning_rate: float + size of the stochastic gradient descent steps + lr_scheduler_step_size: int + this step_size is the period for each learning rate decay + per_series_lr_multip: float + multiplier for per-series parameters smoothing and initial + seasonalities learning rate (default 1.0) + gradient_eps: float + term added to the Adam optimizer denominator to improve + numerical stability (default: 1e-8) + gradient_clipping_threshold: float + max norm of gradient vector, with all parameters treated + as a single vector + rnn_weight_decay: float + parameter to control classic L2/Tikhonov regularization + of the rnn parameters + noise_std: float + standard deviation of white noise added to input during + fit to avoid the model from memorizing the train data + level_variability_penalty: float + this parameter controls the strength of the penalization + to the wigglines of the level vector, induces smoothness + in the output + testing_percentile: float + This value is only for diagnostic evaluation. + In case of percentile predictions this parameter controls + for the value predicted, when forecasting point value, + the forecast is the median, so percentile=50. + training_percentile: float + To reduce the model's tendency to over estimate, the + training_percentile can be set to fit a smaller value + through the Pinball Loss. + batch_size: int + number of training examples for the stochastic gradient steps + seasonality: int list + list of seasonalities of the time series + Hourly [24, 168], Daily [7], Weekly [52], Monthly [12], + Quarterly [4], Yearly []. + input_size: int + input size of the recurrent neural network, usually a + multiple of seasonality + output_size: int + output_size or forecast horizon of the recurrent neural + network, usually multiple of seasonality + random_seed: int + random_seed for pseudo random pytorch initializer and + numpy random generator + exogenous_size: int + size of one hot encoded categorical variable, invariannt + per time series of the panel + min_inp_seq_length: int + description + max_periods: int + Parameter to chop longer series, to last max_periods, + max e.g. 40 years + cell_type: str + Type of RNN cell, available GRU, LSTM, RNN, ResidualLSTM. + state_hsize: int + dimension of hidden state of the recurrent neural network + dilations: int list + each list represents one chunk of Dilated LSTMS, connected in + standard ResNet fashion + add_nl_layer: bool + whether to insert a tanh() layer between the RNN stack and the + linear adaptor (output) layers + device: str + pytorch device either 'cpu' or 'cuda' + Notes + ----- + **References:** + `M4 Competition Conclusions + `__ + `Original Dynet Implementation of ESRNN + `__ + """ + def __init__(self, max_epochs=15, batch_size=1, batch_size_test=64, freq_of_test=-1, + learning_rate=1e-3, lr_scheduler_step_size=9, lr_decay=0.9, + per_series_lr_multip=1.0, gradient_eps=1e-8, gradient_clipping_threshold=20, + rnn_weight_decay=0, noise_std=0.001, + level_variability_penalty=80, + testing_percentile=50, training_percentile=50, ensemble=False, + cell_type='LSTM', + state_hsize=40, dilations=[[1, 2], [4, 8]], + add_nl_layer=False, seasonality=[4], input_size=4, output_size=8, + frequency=None, max_periods=20, random_seed=1, + device='cpu', root_dir='./'): + super(ESRNN, self).__init__() + self.mc = ModelConfig(max_epochs=max_epochs, batch_size=batch_size, batch_size_test=batch_size_test, + freq_of_test=freq_of_test, learning_rate=learning_rate, + lr_scheduler_step_size=lr_scheduler_step_size, lr_decay=lr_decay, + per_series_lr_multip=per_series_lr_multip, + gradient_eps=gradient_eps, gradient_clipping_threshold=gradient_clipping_threshold, + rnn_weight_decay=rnn_weight_decay, noise_std=noise_std, + level_variability_penalty=level_variability_penalty, + testing_percentile=testing_percentile, training_percentile=training_percentile, + ensemble=ensemble, + cell_type=cell_type, + state_hsize=state_hsize, dilations=dilations, add_nl_layer=add_nl_layer, + seasonality=seasonality, input_size=input_size, output_size=output_size, + frequency=frequency, max_periods=max_periods, random_seed=random_seed, + device=device, root_dir=root_dir) + self._fitted = False + + def train(self, dataloader, max_epochs, + warm_start=False, shuffle=True, verbose=True): + """ + Auxiliary function, pytorch train procedure for the ESRNN model + + Parameters: + ------- + dataloader: pytorch dataloader + max_epochs: int + warm_start: bool + shuffle: bool + verbose: bool + + Returns + ------- + self : returns an instance of self. + """ + + if self.mc.ensemble: + self.esrnn_ensemble = [deepcopy(self.esrnn).to(self.mc.device)] * 5 + + if verbose: print(15*'='+' Training ESRNN ' + 15*'=' + '\n') + + # Optimizers + if not warm_start: + self.es_optimizer = optim.Adam(params=self.esrnn.es.parameters(), + lr=self.mc.learning_rate*self.mc.per_series_lr_multip, + betas=(0.9, 0.999), eps=self.mc.gradient_eps) + + self.es_scheduler = StepLR(optimizer=self.es_optimizer, + step_size=self.mc.lr_scheduler_step_size, + gamma=0.9) + + self.rnn_optimizer = optim.Adam(params=self.esrnn.rnn.parameters(), + lr=self.mc.learning_rate, + betas=(0.9, 0.999), eps=self.mc.gradient_eps, + weight_decay=self.mc.rnn_weight_decay) + + self.rnn_scheduler = StepLR(optimizer=self.rnn_optimizer, + step_size=self.mc.lr_scheduler_step_size, + gamma=self.mc.lr_decay) + + # Loss Functions + train_tau = self.mc.training_percentile / 100 + train_loss = SmylLoss(tau=train_tau, + level_variability_penalty=self.mc.level_variability_penalty) + + eval_tau = self.mc.testing_percentile / 100 + eval_loss = PinballLoss(tau=eval_tau) + + for epoch in range(max_epochs): + self.esrnn.train() + start = time.time() + if shuffle: + dataloader.shuffle_dataset(random_seed=epoch) + losses = [] + for j in range(dataloader.n_batches): + self.es_optimizer.zero_grad() + self.rnn_optimizer.zero_grad() + + batch = dataloader.get_batch() + windows_y, windows_y_hat, levels = self.esrnn(batch) + + # Pinball loss on normalized values + loss = train_loss(windows_y, windows_y_hat, levels) + losses.append(loss.data.cpu().numpy()) + #print("loss", loss) + + loss.backward() + + torch.nn.utils.clip_grad_norm_(self.esrnn.rnn.parameters(), + self.mc.gradient_clipping_threshold) + torch.nn.utils.clip_grad_norm_(self.esrnn.es.parameters(), + self.mc.gradient_clipping_threshold) + self.rnn_optimizer.step() + self.es_optimizer.step() + + # Decay learning rate + self.es_scheduler.step() + self.rnn_scheduler.step() + + if self.mc.ensemble: + copy_esrnn = deepcopy(self.esrnn) + copy_esrnn.eval() + self.esrnn_ensemble.pop(0) + self.esrnn_ensemble.append(copy_esrnn) + + + # Evaluation + self.train_loss = np.mean(losses) + if verbose: + print("========= Epoch {} finished =========".format(epoch)) + print("Training time: {}".format(round(time.time()-start, 5))) + print("Training loss ({} prc): {:.5f}".format(self.mc.training_percentile, + self.train_loss)) + + if (epoch % self.mc.freq_of_test == 0) and (self.mc.freq_of_test > 0): + if self.y_test_df is not None: + self.test_loss = self.model_evaluation(dataloader, eval_loss) + print("Testing loss ({} prc): {:.5f}".format(self.mc.testing_percentile, + self.test_loss)) + self.evaluate_model_prediction(self.y_train_df, self.X_test_df, + self.y_test_df, self.y_hat_benchmark, epoch=epoch) + self.esrnn.train() + + if verbose: print('Train finished! \n') + + def per_series_evaluation(self, dataloader, criterion): + """ + Auxiliary function, evaluate ESRNN model for training + procedure supervision. + + Parameters + ---------- + dataloader: pytorch dataloader + criterion: pytorch test criterion + """ + + with torch.no_grad(): + # Create fast dataloader + if self.mc.n_series < self.mc.batch_size_test: new_batch_size = self.mc.n_series + else: new_batch_size = self.mc.batch_size_test + dataloader.update_batch_size(new_batch_size) + + per_series_losses = [] + for j in range(dataloader.n_batches): + batch = dataloader.get_batch() + windows_y, windows_y_hat, _ = self.esrnn(batch) + loss = criterion(windows_y, windows_y_hat) + per_series_losses += loss.data.cpu().numpy().tolist() + + dataloader.update_batch_size(self.mc.batch_size) + return per_series_losses + + def model_evaluation(self, dataloader, criterion): + """ + Auxiliary function, evaluate ESRNN model for training + procedure supervision. + + Parameters + ---------- + dataloader: pytorch dataloader + criterion: pytorch test criterion + + Returns + ------- + model_loss: float + loss for train supervision purpose. + """ + + with torch.no_grad(): + # Create fast dataloader + if self.mc.n_series < self.mc.batch_size_test: new_batch_size = self.mc.n_series + else: new_batch_size = self.mc.batch_size_test + dataloader.update_batch_size(new_batch_size) + + model_loss = 0.0 + for j in range(dataloader.n_batches): + batch = dataloader.get_batch() + windows_y, windows_y_hat, _ = self.esrnn(batch) + loss = criterion(windows_y, windows_y_hat) + model_loss += loss.data.cpu().numpy() + + model_loss /= dataloader.n_batches + dataloader.update_batch_size(self.mc.batch_size) + return model_loss + + def evaluate_model_prediction(self, y_train_df, X_test_df, y_test_df, y_hat_benchmark='y_hat_naive2', epoch=None): + """ + Evaluate ESRNN model against benchmark in y_test_df + + Parameters + ---------- + y_train_df: pandas dataframe + panel with columns 'unique_id', 'ds', 'y' + X_test_df: pandas dataframe + panel with columns 'unique_id', 'ds', 'x' + y_test_df: pandas dataframe + panel with columns 'unique_id', 'ds', 'y' and a column + y_hat_benchmark identifying benchmark predictions + y_hat_benchmark: str + column name of benchmark predictions, default y_hat_naive2 + + Returns + ------- + model_owa : float + relative improvement of model with respect to benchmark, measured with + the M4 overall weighted average. + smape: float + relative improvement of model with respect to benchmark, measured with + the symmetric mean absolute percentage error. + mase: float + relative improvement of model with respect to benchmark, measured with + the M4 mean absolute scaled error. + """ + + assert self._fitted, "Model not fitted yet" + + y_panel = y_test_df.filter(['unique_id', 'ds', 'y']) + y_benchmark_panel = y_test_df.filter(['unique_id', 'ds', y_hat_benchmark]) + y_benchmark_panel.rename(columns={y_hat_benchmark: 'y_hat'}, inplace=True) + y_hat_panel = self.predict(X_test_df) + y_insample = y_train_df.filter(['unique_id', 'ds', 'y']) + + model_owa, model_mase, model_smape = owa(y_panel, y_hat_panel, + y_benchmark_panel, y_insample, + seasonality=self.mc.naive_seasonality) + + if self.min_owa > model_owa: + self.min_owa = model_owa + if epoch is not None: + self.min_epoch = epoch + + print('OWA: {} '.format(np.round(model_owa, 3))) + print('SMAPE: {} '.format(np.round(model_smape, 3))) + print('MASE: {} '.format(np.round(model_mase, 3))) + + return model_owa, model_mase, model_smape + + def fit(self, X_df, y_df, X_test_df=None, y_test_df=None, y_hat_benchmark='y_hat_naive2', + warm_start=False, shuffle=True, verbose=True): + """ + Fit ESRNN model. + + Parameters + ---------- + X_df : pandas dataframe + Train dataframe in long format with columns 'unique_id', 'ds' + and 'x'. + - 'unique_id' an identifier of each independent time series. + - 'ds' is a datetime column + - 'x' is a single exogenous variable + y_df : pandas dataframe + Train dataframe in long format with columns 'unique_id', 'ds' and 'y'. + - 'unique_id' an identifier of each independent time series. + - 'ds' is a datetime column + - 'y' is the column with the target values + X_test_df: pandas dataframe + Optional test dataframe with columns 'unique_id', 'ds' and 'x'. + If provided the fit procedure will evaluate the intermediate + performance within training epochs. + y_test_df: pandas dataframe + Optional test dataframe with columns 'unique_id', 'ds' and 'x' and + y_hat_benchmark column. + If provided the fit procedure will evaluate the intermediate + performance within training epochs. + y_hat_benchmark: str + Name of the benchmark model for the comparison of the relative + improvement of the model. + + Returns + ------- + self : returns an instance of self. + """ + + # Transform long dfs to wide numpy + assert type(X_df) == pd.core.frame.DataFrame + assert type(y_df) == pd.core.frame.DataFrame + assert all([(col in X_df) for col in ['unique_id', 'ds', 'x']]) + assert all([(col in y_df) for col in ['unique_id', 'ds', 'y']]) + if y_test_df is not None: + assert y_hat_benchmark in y_test_df.columns, 'benchmark is not present in y_test_df, use y_hat_benchmark to define it' + + # Storing dfs for OWA evaluation, initializing min_owa + self.y_train_df = y_df + self.X_test_df = X_test_df + self.y_test_df = y_test_df + self.min_owa = 4.0 + self.min_epoch = 0 + + self.int_ds = isinstance(self.y_train_df['ds'][0], (int, np.int, np.int64)) + + self.y_hat_benchmark = y_hat_benchmark + + X, y = self.long_to_wide(X_df, y_df) + assert len(X)==len(y) + assert X.shape[1]>=3 + + # Exogenous variables + unique_categories = np.unique(X[:, 1]) + self.mc.category_to_idx = dict((word, index) for index, word in enumerate(unique_categories)) + exogenous_size = len(unique_categories) + + # Create batches (device in mc) + self.train_dataloader = Iterator(mc=self.mc, X=X, y=y) + + # Random Seeds (model initialization) + torch.manual_seed(self.mc.random_seed) + np.random.seed(self.mc.random_seed) + + # Initialize model + n_series = self.train_dataloader.n_series + self.instantiate_esrnn(exogenous_size, n_series) + + # Validating frequencies + X_train_frequency = pd.infer_freq(X_df.head()['ds']) + y_train_frequency = pd.infer_freq(y_df.head()['ds']) + self.frequencies = [X_train_frequency, y_train_frequency] + + if (X_test_df is not None) and (y_test_df is not None): + X_test_frequency = pd.infer_freq(X_test_df.head()['ds']) + y_test_frequency = pd.infer_freq(y_test_df.head()['ds']) + self.frequencies += [X_test_frequency, y_test_frequency] + + assert len(set(self.frequencies)) <= 1, \ + "Match the frequencies of the dataframes {}".format(self.frequencies) + + self.mc.frequency = self.frequencies[0] + print("Infered frequency: {}".format(self.mc.frequency)) + + # Train model + self._fitted = True + self.train(dataloader=self.train_dataloader, max_epochs=self.mc.max_epochs, + warm_start=warm_start, shuffle=shuffle, verbose=verbose) + + def instantiate_esrnn(self, exogenous_size, n_series): + """Auxiliary function used at beginning of train to instantiate ESRNN""" + + self.mc.exogenous_size = exogenous_size + self.mc.n_series = n_series + self.esrnn = _ESRNN(self.mc).to(self.mc.device) + + def predict(self, X_df, decomposition=False): + """ + Predict using the ESRNN model. + + Parameters + ---------- + X_df : pandas dataframe + Dataframe in LONG format with columns 'unique_id', 'ds' + and 'x'. + - 'unique_id' an identifier of each independent time series. + - 'ds' is a datetime column + - 'x' is a single exogenous variable + + Returns + ------- + Y_hat_panel : pandas dataframe + Dataframe in LONG format with columns 'unique_id', 'ds' + and 'x'. + - 'unique_id' an identifier of each independent time series. + - 'ds' datetime columnn that matches the dates in X_df + - 'y_hat' is the column with the predicted target values + """ + + #print(9*'='+' Predicting ESRNN ' + 9*'=' + '\n') + assert type(X_df) == pd.core.frame.DataFrame + assert 'unique_id' in X_df + assert self._fitted, "Model not fitted yet" + + self.esrnn.eval() + + # Create fast dataloader + if self.mc.n_series < self.mc.batch_size_test: new_batch_size = self.mc.n_series + else: new_batch_size = self.mc.batch_size_test + self.train_dataloader.update_batch_size(new_batch_size) + dataloader = self.train_dataloader + + # Create Y_hat_panel placeholders + output_size = self.mc.output_size + n_unique_id = len(dataloader.sort_key['unique_id']) + panel_unique_id = pd.Series(dataloader.sort_key['unique_id']).repeat(output_size) + + #access column with last train date + panel_last_ds = pd.Series(dataloader.X[:, 2]) + panel_ds = [] + for i in range(len(panel_last_ds)): + ranges = pd.date_range(start=panel_last_ds[i], periods=output_size+1, freq=self.mc.frequency) + panel_ds += list(ranges[1:]) + + panel_y_hat= np.zeros((output_size * n_unique_id)) + + # Predict + count = 0 + for j in range(dataloader.n_batches): + batch = dataloader.get_batch() + batch_size = batch.y.shape[0] + + if self.mc.ensemble: + y_hat = torch.zeros((5,batch_size,output_size)) + for i in range(5): + y_hat[i,:,:] = self.esrnn_ensemble[i].predict(batch) + y_hat = torch.mean(y_hat,0) + else: + y_hat = self.esrnn.predict(batch) + + y_hat = y_hat.data.cpu().numpy() + + panel_y_hat[count:count+output_size*batch_size] = y_hat.flatten() + count += output_size*batch_size + + Y_hat_panel_dict = {'unique_id': panel_unique_id, + 'ds': panel_ds, + 'y_hat': panel_y_hat} + + assert len(panel_ds) == len(panel_y_hat) == len(panel_unique_id) + + Y_hat_panel = pd.DataFrame.from_dict(Y_hat_panel_dict) + + if 'ds' in X_df: + Y_hat_panel = X_df.merge(Y_hat_panel, on=['unique_id', 'ds'], how='left') + else: + Y_hat_panel = X_df.merge(Y_hat_panel, on=['unique_id'], how='left') + + self.train_dataloader.update_batch_size(self.mc.batch_size) + return Y_hat_panel + + def long_to_wide(self, X_df, y_df): + """ + Auxiliary function to wrangle LONG format dataframes + to a wide format compatible with ESRNN inputs. + + Parameters + ---------- + X_df : pandas dataframe + Dataframe in long format with columns 'unique_id', 'ds' + and 'x'. + - 'unique_id' an identifier of each independent time series. + - 'ds' is a datetime column + - 'x' is a single exogenous variable + y_df : pandas dataframe + Dataframe in long format with columns 'unique_id', 'ds' and 'y'. + - 'unique_id' an identifier of each independent time series. + - 'ds' is a datetime column + - 'y' is the column with the target values + + Returns + ------- + X: numpy array, shape (n_unique_ids, n_time) + y: numpy array, shape (n_unique_ids, n_time) + """ + data = X_df.copy() + data['y'] = y_df['y'].copy() + sorted_ds = np.sort(data['ds'].unique()) + ds_map = {} + for dmap, t in enumerate(sorted_ds): + ds_map[t] = dmap + data['ds_map'] = data['ds'].map(ds_map) + data = data.sort_values(by=['ds_map','unique_id']) + df_wide = data.pivot(index='unique_id', columns='ds_map')['y'] + + x_unique = data[['unique_id', 'x']].groupby('unique_id').first() + last_ds = data[['unique_id', 'ds']].groupby('unique_id').last() + assert len(x_unique)==len(data.unique_id.unique()) + df_wide['x'] = x_unique + df_wide['last_ds'] = last_ds + df_wide = df_wide.reset_index().rename_axis(None, axis=1) + + ds_cols = data.ds_map.unique().tolist() + X = df_wide.filter(items=['unique_id', 'x', 'last_ds']).values + y = df_wide.filter(items=ds_cols).values + + return X, y + + def get_dir_name(self, root_dir=None): + """Auxiliary function to save ESRNN model""" + if not root_dir: + assert self.mc.root_dir + root_dir = self.mc.root_dir + + data_dir = self.mc.dataset_name + model_parent_dir = os.path.join(root_dir, data_dir) + model_path = ['esrnn_{}'.format(str(self.mc.copy))] + model_dir = os.path.join(model_parent_dir, '_'.join(model_path)) + return model_dir + + def save(self, model_dir=None, copy=None): + """Auxiliary function to save ESRNN model""" + if copy is not None: + self.mc.copy = copy + + if not model_dir: + assert self.mc.root_dir + model_dir = self.get_dir_name() + + if not os.path.exists(model_dir): + os.makedirs(model_dir) + + rnn_filepath = os.path.join(model_dir, "rnn.model") + es_filepath = os.path.join(model_dir, "es.model") + + print('Saving model to:\n {}'.format(model_dir)+'\n') + torch.save({'model_state_dict': self.es.state_dict()}, es_filepath) + torch.save({'model_state_dict': self.rnn.state_dict()}, rnn_filepath) + + def load(self, model_dir=None, copy=None): + """Auxiliary function to load ESRNN model""" + if copy is not None: + self.mc.copy = copy + + if not model_dir: + assert self.mc.root_dir + model_dir = self.get_dir_name() + + rnn_filepath = os.path.join(model_dir, "rnn.model") + es_filepath = os.path.join(model_dir, "es.model") + path = Path(es_filepath) + + if path.is_file(): + print('Loading model from:\n {}'.format(model_dir)+'\n') + + checkpoint = torch.load(es_filepath, map_location=self.mc.device) + self.es.load_state_dict(checkpoint['model_state_dict']) + self.es.to(self.mc.device) + + checkpoint = torch.load(rnn_filepath, map_location=self.mc.device) + self.rnn.load_state_dict(checkpoint['model_state_dict']) + self.rnn.to(self.mc.device) + else: + print('Model path {} does not exist'.format(path)) diff --git a/morphemic-forecasting-eshybrid/ESRNN/ESRNNensemble.py b/morphemic-forecasting-eshybrid/ESRNN/ESRNNensemble.py new file mode 100644 index 00000000..2a742dd7 --- /dev/null +++ b/morphemic-forecasting-eshybrid/ESRNN/ESRNNensemble.py @@ -0,0 +1,436 @@ +import os +import time +import random + +import numpy as np +import pandas as pd + +import torch + +from pathlib import Path + +from ESRNN.utils.config import ModelConfig +from ESRNN.utils.losses import DisaggregatedPinballLoss +from ESRNN.utils.data import Iterator + +from ESRNN.ESRNN import ESRNN + +from ESRNN.utils_evaluation import owa + +class ESRNNensemble(object): + """ Exponential Smoothing Recurrent Neural Network Ensemble + + Pytorch Implementation of the M4 time series forecasting competition winner. + Proposed by Smyl. The model ensembles multiple ESRNNs that use a hybrid approach + of Machine Learning and statistical methods by combining recurrent neural networks + to model a common trend with shared parameters across series, and multiplicative + Holt-Winter exponential smoothing. + + Parameters + ---------- + n_models: int + the number of ESRNNs in the ensemble. + n_top: int + the number of ESRNNs from the n_models pool, to which a + particular time series gets assigned during fitting procedure. + max_epochs: int + maximum number of complete passes to train data during fit + freq_of_test: int + period for the diagnostic evaluation of the model. + learning_rate: float + size of the stochastic gradient descent steps + lr_scheduler_step_size: int + this step_size is the period for each learning rate decay + per_series_lr_multip: float + multiplier for per-series parameters smoothing and initial + seasonalities learning rate (default 1.0) + gradient_eps: float + term added to the Adam optimizer denominator to improve + numerical stability (default: 1e-8) + gradient_clipping_threshold: float + max norm of gradient vector, with all parameters treated + as a single vector + rnn_weight_decay: float + parameter to control classic L2/Tikhonov regularization + of the rnn parameters + noise_std: float + standard deviation of white noise added to input during + fit to avoid the model from memorizing the train data + level_variability_penalty: float + this parameter controls the strength of the penalization + to the wigglines of the level vector, induces smoothness + in the output + testing_percentile: float + This value is only for diagnostic evaluation. + In case of percentile predictions this parameter controls + for the value predicted, when forecasting point value, + the forecast is the median, so percentile=50. + training_percentile: float + To reduce the model's tendency to over estimate, the + training_percentile can be set to fit a smaller value + through the Pinball Loss. + batch_size: int + number of training examples for the stochastic gradient steps + seasonality: int list + list of seasonalities of the time series + Hourly [24, 168], Daily [7], Weekly [52], Monthly [12], + Quarterly [4], Yearly []. + input_size: int + input size of the recurrent neural network, usually a + multiple of seasonality + output_size: int + output_size or forecast horizon of the recurrent neural + network, usually multiple of seasonality + random_seed: int + random_seed for pseudo random pytorch initializer and + numpy random generator + exogenous_size: int + size of one hot encoded categorical variable, invariannt + per time series of the panel + min_inp_seq_length: int + description + max_periods: int + Parameter to chop longer series, to last max_periods, + max e.g. 40 years + cell_type: str + Type of RNN cell, available GRU, LSTM, RNN, ResidualLSTM. + state_hsize: int + dimension of hidden state of the recurrent neural network + dilations: int list + each list represents one chunk of Dilated LSTMS, connected in + standard ResNet fashion + add_nl_layer: bool + whether to insert a tanh() layer between the RNN stack and the + linear adaptor (output) layers + device: str + pytorch device either 'cpu' or 'cuda' + Notes + ----- + **References:** + `M4 Competition Conclusions + `__ + `Original Dynet Implementation of ESRNN + `__ + """ + def __init__(self, n_models=1, n_top=1, max_epochs=15, batch_size=1, batch_size_test=128, + freq_of_test=-1, learning_rate=1e-3, lr_scheduler_step_size=9, lr_decay=0.9, + per_series_lr_multip=1.0, gradient_eps=1e-8, gradient_clipping_threshold=20, + rnn_weight_decay=0, noise_std=0.001, level_variability_penalty=80, + testing_percentile=50, training_percentile=50, ensemble=False, cell_type='LSTM', + state_hsize=40, dilations=[[1, 2], [4, 8]], + add_nl_layer=False, seasonality=[4], input_size=4, output_size=8, + frequency='D', max_periods=20, random_seed=1, + device='cuda', root_dir='./'): + super(ESRNNensemble, self).__init__() + + self.n_models = n_models + self.n_top = n_top + assert n_models>=2, "Number of models for ensemble should be greater than 1" + assert n_top<=n_models, "Number of top models should be smaller than models to ensemble" + self.big_float = 1e6 + self.mc = ModelConfig(max_epochs=max_epochs, batch_size=batch_size, batch_size_test=batch_size_test, + freq_of_test=freq_of_test, learning_rate=learning_rate, + lr_scheduler_step_size=lr_scheduler_step_size, lr_decay=lr_decay, + per_series_lr_multip=per_series_lr_multip, + gradient_eps=gradient_eps, gradient_clipping_threshold=gradient_clipping_threshold, + rnn_weight_decay=rnn_weight_decay, noise_std=noise_std, + level_variability_penalty=level_variability_penalty, + testing_percentile=testing_percentile, training_percentile=training_percentile, + ensemble=ensemble, cell_type=cell_type, + state_hsize=state_hsize, dilations=dilations, add_nl_layer=add_nl_layer, + seasonality=seasonality, input_size=input_size, output_size=output_size, + frequency=frequency, max_periods=max_periods, random_seed=random_seed, + device=device, root_dir=root_dir) + self._fitted = False + + def fit(self, X_df, y_df, X_test_df=None, y_test_df=None, shuffle=True): + """ + Fit ESRNN ensemble. + + Parameters + ---------- + X_df : pandas dataframe + Train dataframe in long format with columns 'unique_id', 'ds' + and 'x'. + - 'unique_id' an identifier of each independent time series. + - 'ds' is a datetime column + - 'x' is a single exogenous variable + y_df : pandas dataframe + Train dataframe in long format with columns 'unique_id', 'ds' and 'y'. + - 'unique_id' an identifier of each independent time series. + - 'ds' is a datetime column + - 'y' is the column with the target values + X_test_df: pandas dataframe + Optional test dataframe with columns 'unique_id', 'ds' and 'x'. + If provided the fit procedure will evaluate the intermediate + performance within training epochs. + y_test_df: pandas dataframe + Optional test dataframe with columns 'unique_id', 'ds' and 'x' and + y_hat_benchmark column. + If provided the fit procedure will evaluate the intermediate + performance within training epochs. + shuffle: boolean + Name of the benchmark model for the comparison of the relative + improvement of the model. + + Returns + ------- + self : returns an instance of self. + """ + + # Transform long dfs to wide numpy + assert type(X_df) == pd.core.frame.DataFrame + assert type(y_df) == pd.core.frame.DataFrame + assert all([(col in X_df) for col in ['unique_id', 'ds', 'x']]) + assert all([(col in y_df) for col in ['unique_id', 'ds', 'y']]) + + # Storing dfs for OWA evaluation, initializing min_owa + self.y_train_df = y_df + self.X_test_df = X_test_df + self.y_test_df = y_test_df + self.min_owa = 4.0 + self.min_epoch = 0 + + # Exogenous variables + unique_categories = X_df['x'].unique() + self.mc.category_to_idx = dict((word, index) for index, word in enumerate(unique_categories)) + self.mc.exogenous_size = len(unique_categories) + + self.unique_ids = X_df['unique_id'].unique() + self.mc.n_series = len(self.unique_ids) + + # Set seeds + self.shuffle = shuffle + torch.manual_seed(self.mc.random_seed) + np.random.seed(self.mc.random_seed) + + # Initial series random assignment to models + self.series_models_map = np.zeros((self.mc.n_series, self.n_models)) + n_initial_models = int(np.ceil(self.n_models/2)) + for i in range(self.mc.n_series): + id_models = np.random.choice(self.n_models, n_initial_models) + self.series_models_map[i,id_models] = 1 + + self.esrnn_ensemble = [] + for _ in range(self.n_models): + esrnn = ESRNN(max_epochs=self.mc.max_epochs, batch_size=self.mc.batch_size, batch_size_test=self.mc.batch_size_test, + freq_of_test=-1, learning_rate=self.mc.learning_rate, + lr_scheduler_step_size=self.mc.lr_scheduler_step_size, lr_decay=self.mc.lr_decay, + per_series_lr_multip=self.mc.per_series_lr_multip, + gradient_eps=self.mc.gradient_eps, gradient_clipping_threshold=self.mc.gradient_clipping_threshold, + rnn_weight_decay=self.mc.rnn_weight_decay, noise_std=self.mc.noise_std, + level_variability_penalty=self.mc.level_variability_penalty, + testing_percentile=self.mc.testing_percentile, + training_percentile=self.mc.training_percentile, ensemble=self.mc.ensemble, + cell_type=self.mc.cell_type, + state_hsize=self.mc.state_hsize, dilations=self.mc.dilations, add_nl_layer=self.mc.add_nl_layer, + seasonality=self.mc.seasonality, input_size=self.mc.input_size, output_size=self.mc.output_size, + frequency=self.mc.frequency, max_periods=self.mc.max_periods, random_seed=self.mc.random_seed, + device=self.mc.device, root_dir=self.mc.root_dir) + + # To instantiate _ESRNN object within ESRNN class we need n_series + esrnn.instantiate_esrnn(self.mc.exogenous_size, self.mc.n_series) + esrnn._fitted = True + self.esrnn_ensemble.append(esrnn) + + self.X, self.y = esrnn.long_to_wide(X_df, y_df) + assert len(self.X)==len(self.y) + assert self.X.shape[1]>=3 + + # Train model + self._fitted = True + self.train() + + def train(self): + """ + Auxiliary function, pytorch train procedure for the ESRNN ensemble + + Parameters: + ------- + self: instance of self. + + Returns + ------- + self : returns an instance of self. + """ + + # Initial performance matrix + self.performance_matrix = np.ones((self.mc.n_series, self.n_models)) * self.big_float + warm_start = False + train_tau = self.mc.training_percentile/100 + criterion = DisaggregatedPinballLoss(train_tau) + + # Train epoch loop + for epoch in range(self.mc.max_epochs): + start = time.time() + + # Solve degenerate models + for model_id in range(self.n_models): + if np.sum(self.series_models_map[:,model_id])==0: + print('Reassigning random series to model ', model_id) + n_sample_series= int(self.mc.n_series/2) + index_series = np.random.choice(self.mc.n_series, n_sample_series, replace=False) + self.series_models_map[index_series, model_id] = 1 + + # Model loop + for model_id, esrnn in enumerate(self.esrnn_ensemble): + # Train model with subset data + dataloader = Iterator(mc = self.mc, X=self.X, y=self.y, + weights=self.series_models_map[:, model_id]) + esrnn.train(dataloader, max_epochs=1, warm_start=warm_start, shuffle=self.shuffle, verbose=False) + + # Compute model performance for each series + dataloader = Iterator(mc=self.mc, X=self.X, y=self.y) + per_series_evaluation = esrnn.per_series_evaluation(dataloader, criterion=criterion) + self.performance_matrix[:, model_id] = per_series_evaluation + + # Reassign series to models + self.series_models_map = np.zeros((self.mc.n_series, self.n_models)) + top_models = np.argpartition(self.performance_matrix, self.n_top)[:, :self.n_top] + for i in range(self.mc.n_series): + self.series_models_map[i, top_models[i,:]] = 1 + + warm_start = True + + print("========= Epoch {} finished =========".format(epoch)) + print("Training time: {}".format(round(time.time()-start, 5))) + self.train_loss = np.einsum('ij,ij->i',self.performance_matrix, self.series_models_map)/self.n_top + self.train_loss = np.mean(self.train_loss) + print("Training loss ({} prc): {:.5f}".format(self.mc.training_percentile, + self.train_loss)) + print('Models num series', np.sum(self.series_models_map, axis=0)) + + if (epoch % self.mc.freq_of_test == 0) and (self.mc.freq_of_test > 0): + if self.y_test_df is not None: + self.evaluate_model_prediction(self.y_train_df, self.X_test_df, + self.y_test_df, epoch=epoch) + print('Train finished! \n') + + def predict(self, X_df): + """ + Predict using the ESRNN ensemble. + + Parameters + ---------- + X_df : pandas dataframe + Dataframe in LONG format with columns 'unique_id', 'ds' + and 'x'. + - 'unique_id' an identifier of each independent time series. + - 'ds' is a datetime column + - 'x' is a single exogenous variable + + Returns + ------- + Y_hat_panel : pandas dataframe + Dataframe in LONG format with columns 'unique_id', 'ds' + and 'x'. + - 'unique_id' an identifier of each independent time series. + - 'ds' datetime columnn that matches the dates in X_df + - 'y_hat' is the column with the predicted target values + """ + + assert type(X_df) == pd.core.frame.DataFrame + assert 'unique_id' in X_df + assert self._fitted, "Model not fitted yet" + + dataloader = Iterator(mc=self.mc, X=self.X, y=self.y) + + output_size = self.mc.output_size + n_unique_id = len(dataloader.sort_key['unique_id']) + + ensemble_y_hat = np.zeros((self.n_models, n_unique_id, output_size)) + + for model_id, esrnn in enumerate(self.esrnn_ensemble): + esrnn.esrnn.eval() + + # Predict ALL series + count = 0 + for j in range(dataloader.n_batches): + batch = dataloader.get_batch() + batch_size = batch.y.shape[0] + + y_hat = esrnn.esrnn.predict(batch) + + y_hat = y_hat.data.cpu().numpy() + + ensemble_y_hat[model_id, count:count+batch_size, :] = y_hat + count += batch_size + + # Weighted average of prediction for n_top best models per series + # (n_models x n_unique_id x output_size) (n_unique_id x n_models) + y_hat = np.einsum('ijk,ji->jk', ensemble_y_hat, self.series_models_map) / self.n_top + y_hat = y_hat.flatten() + + panel_unique_id = pd.Series(dataloader.sort_key['unique_id']).repeat(output_size) + panel_last_ds = pd.Series(dataloader.X[:, 2]).repeat(output_size) + + panel_delta = list(range(1, output_size+1)) * n_unique_id + panel_delta = pd.to_timedelta(panel_delta, unit=self.mc.frequency) + panel_ds = panel_last_ds + panel_delta + + assert len(panel_ds) == len(y_hat) == len(panel_unique_id) + + Y_hat_panel_dict = {'unique_id': panel_unique_id, + 'ds': panel_ds, + 'y_hat': y_hat} + + Y_hat_panel = pd.DataFrame.from_dict(Y_hat_panel_dict) + + if 'ds' in X_df: + Y_hat_panel = X_df.merge(Y_hat_panel, on=['unique_id', 'ds'], how='left') + else: + Y_hat_panel = X_df.merge(Y_hat_panel, on=['unique_id'], how='left') + + return Y_hat_panel + + def evaluate_model_prediction(self, y_train_df, X_test_df, y_test_df, epoch=None): + """ + Evaluate ESRNN model against benchmark in y_test_df + + Parameters + ---------- + y_train_df: pandas dataframe + panel with columns 'unique_id', 'ds', 'y' + X_test_df: pandas dataframe + panel with columns 'unique_id', 'ds', 'x' + y_test_df: pandas dataframe + panel with columns 'unique_id', 'ds', 'y' and a column + y_hat_naive2 identifying benchmark predictions + epoch: int + the number of epoch to check early stopping results + + Returns + ------- + model_owa : float + relative improvement of model with respect to benchmark, measured with + the M4 overall weighted average. + smape: float + relative improvement of model with respect to benchmark, measured with + the symmetric mean absolute percentage error. + mase: float + relative improvement of model with respect to benchmark, measured with + the M4 mean absolute scaled error. + """ + + assert self._fitted, "Model not fitted yet" + + y_panel = y_test_df.filter(['unique_id', 'ds', 'y']) + y_naive2_panel = y_test_df.filter(['unique_id', 'ds', 'y_hat_naive2']) + y_naive2_panel.rename(columns={'y_hat_naive2': 'y_hat'}, inplace=True) + y_hat_panel = self.predict(X_test_df) + y_insample = y_train_df.filter(['unique_id', 'ds', 'y']) + + model_owa, model_mase, model_smape = owa(y_panel, y_hat_panel, + y_naive2_panel, y_insample, + seasonality=self.mc.naive_seasonality) + + if self.min_owa > model_owa: + self.min_owa = model_owa + if epoch is not None: + self.min_epoch = epoch + + print('OWA: {} '.format(np.round(model_owa, 3))) + print('SMAPE: {} '.format(np.round(model_smape, 3))) + print('MASE: {} '.format(np.round(model_mase, 3))) + + return model_owa, model_mase, model_smape diff --git a/morphemic-forecasting-eshybrid/ESRNN/__init__.py b/morphemic-forecasting-eshybrid/ESRNN/__init__.py new file mode 100644 index 00000000..8eca21b4 --- /dev/null +++ b/morphemic-forecasting-eshybrid/ESRNN/__init__.py @@ -0,0 +1 @@ +from ESRNN.ESRNN import * diff --git a/morphemic-forecasting-eshybrid/ESRNN/m4_data.py b/morphemic-forecasting-eshybrid/ESRNN/m4_data.py new file mode 100644 index 00000000..a90dd1b6 --- /dev/null +++ b/morphemic-forecasting-eshybrid/ESRNN/m4_data.py @@ -0,0 +1,245 @@ +import os +from six.moves import urllib +import subprocess + +import numpy as np +import pandas as pd + +from ESRNN.utils_evaluation import Naive2 + + +seas_dict = {'Hourly': {'seasonality': 24, 'input_size': 24, + 'output_size': 48, 'freq': 'H'}, + 'Daily': {'seasonality': 7, 'input_size': 7, + 'output_size': 14, 'freq': 'D'}, + 'Weekly': {'seasonality': 52, 'input_size': 52, + 'output_size': 13, 'freq': 'W'}, + 'Monthly': {'seasonality': 12, 'input_size': 12, + 'output_size':18, 'freq': 'M'}, + 'Quarterly': {'seasonality': 4, 'input_size': 4, + 'output_size': 8, 'freq': 'Q'}, + 'Yearly': {'seasonality': 1, 'input_size': 4, + 'output_size': 6, 'freq': 'D'}} + +SOURCE_URL = 'https://raw.githubusercontent.com/Mcompetitions/M4-methods/master/Dataset/' + + +def maybe_download(filename, directory): + """ + Download the data from M4's website, unless it's already here. + + Parameters + ---------- + filename: str + Filename of M4 data with format /Type/Frequency.csv. Example: /Test/Daily-train.csv + directory: str + Custom directory where data will be downloaded. + """ + data_directory = directory + "/m4" + train_directory = data_directory + "/Train/" + test_directory = data_directory + "/Test/" + + if not os.path.exists(data_directory): + os.mkdir(data_directory) + if not os.path.exists(train_directory): + os.mkdir(train_directory) + if not os.path.exists(test_directory): + os.mkdir(test_directory) + + filepath = os.path.join(data_directory, filename) + if not os.path.exists(filepath): + filepath, _ = urllib.request.urlretrieve(SOURCE_URL + filename, filepath) + size = os.path.getsize(filepath) + print('Successfully downloaded', filename, size, 'bytes.') + return filepath + +def m4_parser(dataset_name, directory, num_obs=1000000): + """ + Transform M4 data into a panel. + + Parameters + ---------- + dataset_name: str + Frequency of the data. Example: 'Yearly'. + directory: str + Custom directory where data will be saved. + num_obs: int + Number of time series to return. + """ + data_directory = directory + "/m4" + train_directory = data_directory + "/Train/" + test_directory = data_directory + "/Test/" + freq = seas_dict[dataset_name]['freq'] + + m4_info = pd.read_csv(data_directory+'/M4-info.csv', usecols=['M4id','category']) + m4_info = m4_info[m4_info['M4id'].str.startswith(dataset_name[0])].reset_index(drop=True) + + # Train data + train_path='{}{}-train.csv'.format(train_directory, dataset_name) + + train_df = pd.read_csv(train_path, nrows=num_obs) + train_df = train_df.rename(columns={'V1':'unique_id'}) + + train_df = pd.wide_to_long(train_df, stubnames=["V"], i="unique_id", j="ds").reset_index() + train_df = train_df.rename(columns={'V':'y'}) + train_df = train_df.dropna() + train_df['split'] = 'train' + train_df['ds'] = train_df['ds']-1 + # Get len of series per unique_id + len_series = train_df.groupby('unique_id').agg({'ds': 'max'}).reset_index() + len_series.columns = ['unique_id', 'len_serie'] + + # Test data + test_path='{}{}-test.csv'.format(test_directory, dataset_name) + + test_df = pd.read_csv(test_path, nrows=num_obs) + test_df = test_df.rename(columns={'V1':'unique_id'}) + + test_df = pd.wide_to_long(test_df, stubnames=["V"], i="unique_id", j="ds").reset_index() + test_df = test_df.rename(columns={'V':'y'}) + test_df = test_df.dropna() + test_df['split'] = 'test' + test_df = test_df.merge(len_series, on='unique_id') + test_df['ds'] = test_df['ds'] + test_df['len_serie'] - 1 + test_df = test_df[['unique_id','ds','y','split']] + + df = pd.concat((train_df,test_df)) + df = df.sort_values(by=['unique_id', 'ds']).reset_index(drop=True) + + # Create column with dates with freq of dataset + len_series = df.groupby('unique_id').agg({'ds': 'max'}).reset_index() + dates = [] + for i in range(len(len_series)): + len_serie = len_series.iloc[i,1] + ranges = pd.date_range(start='1970/01/01', periods=len_serie, freq=freq) + dates += list(ranges) + df.loc[:,'ds'] = dates + + df = df.merge(m4_info, left_on=['unique_id'], right_on=['M4id']) + df.drop(columns=['M4id'], inplace=True) + df = df.rename(columns={'category': 'x'}) + + X_train_df = df[df['split']=='train'].filter(items=['unique_id', 'ds', 'x']) + y_train_df = df[df['split']=='train'].filter(items=['unique_id', 'ds', 'y']) + X_test_df = df[df['split']=='test'].filter(items=['unique_id', 'ds', 'x']) + y_test_df = df[df['split']=='test'].filter(items=['unique_id', 'ds', 'y']) + + X_train_df = X_train_df.reset_index(drop=True) + y_train_df = y_train_df.reset_index(drop=True) + X_test_df = X_test_df.reset_index(drop=True) + y_test_df = y_test_df.reset_index(drop=True) + + return X_train_df, y_train_df, X_test_df, y_test_df + +def naive2_predictions(dataset_name, directory, num_obs, y_train_df = None, y_test_df = None): + """ + Computes Naive2 predictions. + + Parameters + ---------- + directory: str + Custom directory where data will be saved. + num_obs: int + Number of time series to return. + y_train_df: DataFrame + Y train set returned by m4_parser + y_test_df: DataFrame + Y test set returned by m4_parser + """ + # Read train and test data + if (y_train_df is None) or (y_test_df is None): + _, y_train_df, _, y_test_df = m4_parser(dataset_name, directory, num_obs) + + + config_dict = seas_dict + if isinstance(dataset_name,dict): + config_dict['manual'] = dataset_name + dataset_name = 'manual' + + + seasonality = config_dict[dataset_name]['seasonality'] + input_size = config_dict[dataset_name]['input_size'] + output_size = config_dict[dataset_name]['output_size'] + freq = config_dict [dataset_name]['freq'] + + print('Preparing {} dataset'.format(dataset_name)) + print('Preparing Naive2 {} dataset predictions'.format(dataset_name)) + + # Naive2 + y_naive2_df = pd.DataFrame(columns=['unique_id', 'ds', 'y_hat']) + + # Sort X by unique_id for faster loop + y_train_df = y_train_df.sort_values(by=['unique_id', 'ds']) + # List of uniques ids + unique_ids = y_train_df['unique_id'].unique() + # Panel of fitted models + for unique_id in unique_ids: + # Fast filter X and y by id. + top_row = np.asscalar(y_train_df['unique_id'].searchsorted(unique_id, 'left')) + bottom_row = np.asscalar(y_train_df['unique_id'].searchsorted(unique_id, 'right')) + y_id = y_train_df[top_row:bottom_row] + + y_naive2 = pd.DataFrame(columns=['unique_id', 'ds', 'y_hat']) + y_naive2['ds'] = pd.date_range(start=y_id.ds.max(), + periods=output_size+1, freq=freq)[1:] + y_naive2['unique_id'] = unique_id + y_naive2['y_hat'] = Naive2(seasonality).fit(y_id.y.to_numpy()).predict(output_size) + y_naive2_df = y_naive2_df.append(y_naive2) + + y_naive2_df = y_test_df.merge(y_naive2_df, on=['unique_id', 'ds'], how='left') + y_naive2_df.rename(columns={'y_hat': 'y_hat_naive2'}, inplace=True) + + results_dir = directory + '/results' + naive2_file = results_dir + '/{}-naive2predictions_{}.csv'.format(dataset_name, num_obs) + y_naive2_df.to_csv(naive2_file, encoding='utf-8', index=None) + + return y_naive2_df + +def prepare_m4_data(dataset_name, directory, num_obs): + """ + Pipeline that obtains M4 times series, tranforms it and gets naive2 predictions. + + Parameters + ---------- + dataset_name: str + Frequency of the data. Example: 'Yearly'. + directory: str + Custom directory where data will be saved. + num_obs: int + Number of time series to return. + py_predictions: bool + whether use python or r predictions + """ + m4info_filename = maybe_download('M4-info.csv', directory) + + dailytrain_filename = maybe_download('Train/Daily-train.csv', directory) + hourlytrain_filename = maybe_download('Train/Hourly-train.csv', directory) + monthlytrain_filename = maybe_download('Train/Monthly-train.csv', directory) + quarterlytrain_filename = maybe_download('Train/Quarterly-train.csv', directory) + weeklytrain_filename = maybe_download('Train/Weekly-train.csv', directory) + yearlytrain_filename = maybe_download('Train/Yearly-train.csv', directory) + + dailytest_filename = maybe_download('Test/Daily-test.csv', directory) + hourlytest_filename = maybe_download('Test/Hourly-test.csv', directory) + monthlytest_filename = maybe_download('Test/Monthly-test.csv', directory) + quarterlytest_filename = maybe_download('Test/Quarterly-test.csv', directory) + weeklytest_filename = maybe_download('Test/Weekly-test.csv', directory) + yearlytest_filename = maybe_download('Test/Yearly-test.csv', directory) + print('\n') + + X_train_df, y_train_df, X_test_df, y_test_df = m4_parser(dataset_name, directory, num_obs) + + results_dir = directory + '/results' + if not os.path.exists(results_dir): + os.mkdir(results_dir) + + naive2_file = results_dir + '/{}-naive2predictions_{}.csv' + naive2_file = naive2_file.format(dataset_name, num_obs) + + if not os.path.exists(naive2_file): + y_naive2_df = naive2_predictions(dataset_name, directory, num_obs, y_train_df, y_test_df) + else: + y_naive2_df = pd.read_csv(naive2_file) + y_naive2_df['ds'] = pd.to_datetime(y_naive2_df['ds']) + + return X_train_df, y_train_df, X_test_df, y_naive2_df diff --git a/morphemic-forecasting-eshybrid/ESRNN/m4_run.py b/morphemic-forecasting-eshybrid/ESRNN/m4_run.py new file mode 100644 index 00000000..ef9028ce --- /dev/null +++ b/morphemic-forecasting-eshybrid/ESRNN/m4_run.py @@ -0,0 +1,117 @@ +import os +import argparse +import itertools +import ast +import pickle +import time + +import os +import numpy as np +import pandas as pd + +from ESRNN.m4_data import prepare_m4_data +from ESRNN.utils_evaluation import evaluate_prediction_owa +from ESRNN.utils_configs import get_config + +from ESRNN import ESRNN + +import torch + +def main(args): + config = get_config(args.dataset) + if config['data_parameters']['frequency'] == 'Y': + config['data_parameters']['frequency'] = None + + #Setting needed parameters + os.environ['CUDA_VISIBLE_DEVICES'] = str(args.gpu_id) + + if args.num_obs: + num_obs = args.num_obs + else: + num_obs = 100000 + + if args.use_cpu == 1: + config['device'] = 'cpu' + else: + assert torch.cuda.is_available(), 'No cuda devices detected. You can try using CPU instead.' + + #Reading data + print('Reading data') + X_train_df, y_train_df, X_test_df, y_test_df = prepare_m4_data(dataset_name=args.dataset, + directory=args.results_directory, + num_obs=num_obs) + + # Instantiate model + model = ESRNN(max_epochs=config['train_parameters']['max_epochs'], + batch_size=config['train_parameters']['batch_size'], + freq_of_test=config['train_parameters']['freq_of_test'], + learning_rate=float(config['train_parameters']['learning_rate']), + lr_scheduler_step_size=config['train_parameters']['lr_scheduler_step_size'], + lr_decay=config['train_parameters']['lr_decay'], + per_series_lr_multip=config['train_parameters']['per_series_lr_multip'], + gradient_clipping_threshold=config['train_parameters']['gradient_clipping_threshold'], + rnn_weight_decay=config['train_parameters']['rnn_weight_decay'], + noise_std=config['train_parameters']['noise_std'], + level_variability_penalty=config['train_parameters']['level_variability_penalty'], + testing_percentile=config['train_parameters']['testing_percentile'], + training_percentile=config['train_parameters']['training_percentile'], + ensemble=config['train_parameters']['ensemble'], + max_periods=config['data_parameters']['max_periods'], + seasonality=config['data_parameters']['seasonality'], + input_size=config['data_parameters']['input_size'], + output_size=config['data_parameters']['output_size'], + frequency=config['data_parameters']['frequency'], + cell_type=config['model_parameters']['cell_type'], + state_hsize=config['model_parameters']['state_hsize'], + dilations=config['model_parameters']['dilations'], + add_nl_layer=config['model_parameters']['add_nl_layer'], + random_seed=config['model_parameters']['random_seed'], + device=config['device']) + + if args.test == 1: + model = ESRNN(max_epochs=1, + batch_size=20, + seasonality=config['data_parameters']['seasonality'], + input_size=config['data_parameters']['input_size'], + output_size=config['data_parameters']['output_size'], + frequency=config['data_parameters']['frequency'], + device=config['device']) + + # Fit model + # If y_test_df is provided the model will evaluate predictions on this set every freq_test epochs + model.fit(X_train_df, y_train_df, X_test_df, y_test_df) + + # Predict on test set + print('\nForecasting') + y_hat_df = model.predict(X_test_df) + + # Evaluate predictions + print(15*'=', ' Final evaluation ', 14*'=') + seasonality = config['data_parameters']['seasonality'] + if not seasonality: + seasonality = 1 + else: + seasonality = seasonality[0] + + final_owa, final_mase, final_smape = evaluate_prediction_owa(y_hat_df, y_train_df, + X_test_df, y_test_df, + naive2_seasonality=seasonality) + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Replicate M4 results for the ESRNN model') + parser.add_argument("--dataset", required=True, type=str, + choices=['Yearly', 'Quarterly', 'Monthly', 'Weekly', 'Hourly', 'Daily'], + help="set of M4 time series to be tested") + parser.add_argument("--results_directory", required=True, type=str, + help="directory where M4 data will be downloaded") + parser.add_argument("--gpu_id", required=False, type=int, + help="an integer that specify which GPU will be used") + parser.add_argument("--use_cpu", required=False, type=int, + help="1 to use CPU instead of GPU (uses GPU by default)") + parser.add_argument("--num_obs", required=False, type=int, + help="number of M4 time series to be tested (uses all data by default)") + parser.add_argument("--test", required=False, type=int, + help="run fast for tests (no test by default)") + args = parser.parse_args() + + main(args) diff --git a/morphemic-forecasting-eshybrid/ESRNN/tests/test_esrnn.py b/morphemic-forecasting-eshybrid/ESRNN/tests/test_esrnn.py new file mode 100644 index 00000000..f8c2c3e7 --- /dev/null +++ b/morphemic-forecasting-eshybrid/ESRNN/tests/test_esrnn.py @@ -0,0 +1,120 @@ +#Testing ESRNN +import runpy +import os + +print('\n') +print(10*'='+'TEST ESRNN'+10*'=') +print('\n') + +def test_esrnn_hourly(): + if not os.path.exists('./data'): + os.mkdir('./data') + + print('\n') + print(10*'='+'HOURLY'+10*'=') + print('\n') + + exec_str = 'python -m ESRNN.m4_run --dataset Hourly ' + exec_str += '--results_directory ./data --gpu_id 0 ' + exec_str += '--use_cpu 1 --num_obs 100 --test 1' + results = os.system(exec_str) + + if results==0: + print('Test completed') + else: + raise Exception('Something went wrong') + +def test_esrnn_weekly(): + if not os.path.exists('./data'): + os.mkdir('./data') + + print('\n') + print(10*'='+'WEEKLY'+10*'=') + print('\n') + + exec_str = 'python -m ESRNN.m4_run --dataset Weekly ' + exec_str += '--results_directory ./data --gpu_id 0 ' + exec_str += '--use_cpu 1 --num_obs 100 --test 1' + results = os.system(exec_str) + + if results==0: + print('Test completed') + else: + raise Exception('Something went wrong') + + +def test_esrnn_daily(): + if not os.path.exists('./data'): + os.mkdir('./data') + + print('\n') + print(10*'='+'DAILY'+10*'=') + print('\n') + + exec_str = 'python -m ESRNN.m4_run --dataset Daily ' + exec_str += '--results_directory ./data --gpu_id 0 ' + exec_str += '--use_cpu 1 --num_obs 100 --test 1' + results = os.system(exec_str) + + if results==0: + print('Test completed') + else: + raise Exception('Something went wrong') + + +def test_esrnn_monthly(): + if not os.path.exists('./data'): + os.mkdir('./data') + + + print('\n') + print(10*'='+'MONTHLY'+10*'=') + print('\n') + + exec_str = 'python -m ESRNN.m4_run --dataset Monthly ' + exec_str += '--results_directory ./data --gpu_id 0 ' + exec_str += '--use_cpu 1 --num_obs 100 --test 1' + results = os.system(exec_str) + + if results==0: + print('Test completed') + else: + raise Exception('Something went wrong') + + +def test_esrnn_quarterly(): + if not os.path.exists('./data'): + os.mkdir('./data') + + print('\n') + print(10*'='+'QUARTERLY'+10*'=') + print('\n') + + exec_str = 'python -m ESRNN.m4_run --dataset Quarterly ' + exec_str += '--results_directory ./data --gpu_id 0 ' + exec_str += '--use_cpu 1 --num_obs 100 --test 1' + results = os.system(exec_str) + + if results==0: + print('Test completed') + else: + raise Exception('Something went wrong') + + +def test_esrnn_yearly(): + if not os.path.exists('./data'): + os.mkdir('./data') + + print('\n') + print(10*'='+'YEARLY'+10*'=') + print('\n') + + exec_str = 'python -m ESRNN.m4_run --dataset Yearly ' + exec_str += '--results_directory ./data --gpu_id 0 ' + exec_str += '--use_cpu 1 --num_obs 100 --test 1' + results = os.system(exec_str) + + if results==0: + print('Test completed') + else: + raise Exception('Something went wrong') diff --git a/morphemic-forecasting-eshybrid/ESRNN/utils/DRNN.py b/morphemic-forecasting-eshybrid/ESRNN/utils/DRNN.py new file mode 100644 index 00000000..0f08f6f6 --- /dev/null +++ b/morphemic-forecasting-eshybrid/ESRNN/utils/DRNN.py @@ -0,0 +1,288 @@ +# Dilated Recurrent Neural Networks. https://papers.nips.cc/paper/6613-dilated-recurrent-neural-networks.pdf +# implementation from https://github.com/zalandoresearch/pytorch-dilated-rnn +# Residual LSTM: Design of a Deep Recurrent Architecture for Distant Speech Recognition. https://arxiv.org/abs/1701.03360 +# A Dual-Stage Attention-Based recurrent neural network for time series prediction. https://arxiv.org/abs/1704.02971 + +import torch +import torch.nn as nn +import torch.autograd as autograd + +#import torch.jit as jit + +use_cuda = torch.cuda.is_available() + + +class LSTMCell(nn.Module): #jit.ScriptModule + def __init__(self, input_size, hidden_size, dropout=0.): + super(LSTMCell, self).__init__() + self.input_size = input_size + self.hidden_size = hidden_size + self.weight_ih = nn.Parameter(torch.randn(4 * hidden_size, input_size)) + self.weight_hh = nn.Parameter(torch.randn(4 * hidden_size, hidden_size)) + self.bias_ih = nn.Parameter(torch.randn(4 * hidden_size)) + self.bias_hh = nn.Parameter(torch.randn(4 * hidden_size)) + self.dropout = dropout + + #@jit.script_method + def forward(self, input, hidden): + # type: (Tensor, Tuple[Tensor, Tensor]) -> Tuple[Tensor, Tuple[Tensor, Tensor]] + hx, cx = hidden[0].squeeze(0), hidden[1].squeeze(0) + gates = (torch.matmul(input, self.weight_ih.t()) + self.bias_ih + + torch.matmul(hx, self.weight_hh.t()) + self.bias_hh) + ingate, forgetgate, cellgate, outgate = gates.chunk(4, 1) + + ingate = torch.sigmoid(ingate) + forgetgate = torch.sigmoid(forgetgate) + cellgate = torch.tanh(cellgate) + outgate = torch.sigmoid(outgate) + + cy = (forgetgate * cx) + (ingate * cellgate) + hy = outgate * torch.tanh(cy) + + return hy, (hy, cy) + + +class ResLSTMCell(nn.Module): + def __init__(self, input_size, hidden_size, dropout=0.): + super(ResLSTMCell, self).__init__() + self.register_buffer('input_size', torch.Tensor([input_size])) + self.register_buffer('hidden_size', torch.Tensor([hidden_size])) + self.weight_ii = nn.Parameter(torch.randn(3 * hidden_size, input_size)) + self.weight_ic = nn.Parameter(torch.randn(3 * hidden_size, hidden_size)) + self.weight_ih = nn.Parameter(torch.randn(3 * hidden_size, hidden_size)) + self.bias_ii = nn.Parameter(torch.randn(3 * hidden_size)) + self.bias_ic = nn.Parameter(torch.randn(3 * hidden_size)) + self.bias_ih = nn.Parameter(torch.randn(3 * hidden_size)) + self.weight_hh = nn.Parameter(torch.randn(1 * hidden_size, hidden_size)) + self.bias_hh = nn.Parameter(torch.randn(1 * hidden_size)) + self.weight_ir = nn.Parameter(torch.randn(hidden_size, input_size)) + self.dropout = dropout + + #@jit.script_method + def forward(self, input, hidden): + # type: (Tensor, Tuple[Tensor, Tensor]) -> Tuple[Tensor, Tuple[Tensor, Tensor]] + hx, cx = hidden[0].squeeze(0), hidden[1].squeeze(0) + + ifo_gates = (torch.matmul(input, self.weight_ii.t()) + self.bias_ii + + torch.matmul(hx, self.weight_ih.t()) + self.bias_ih + + torch.matmul(cx, self.weight_ic.t()) + self.bias_ic) + ingate, forgetgate, outgate = ifo_gates.chunk(3, 1) + + cellgate = torch.matmul(hx, self.weight_hh.t()) + self.bias_hh + + ingate = torch.sigmoid(ingate) + forgetgate = torch.sigmoid(forgetgate) + cellgate = torch.tanh(cellgate) + outgate = torch.sigmoid(outgate) + + cy = (forgetgate * cx) + (ingate * cellgate) + ry = torch.tanh(cy) + + if self.input_size == self.hidden_size: + hy = outgate * (ry + input) + else: + hy = outgate * (ry + torch.matmul(input, self.weight_ir.t())) + return hy, (hy, cy) + + +class ResLSTMLayer(nn.Module): + def __init__(self, input_size, hidden_size, dropout=0.): + super(ResLSTMLayer, self).__init__() + self.input_size = input_size + self.hidden_size = hidden_size + self.cell = ResLSTMCell(input_size, hidden_size, dropout=0.) + + #@jit.script_method + def forward(self, input, hidden): + # type: (Tensor, Tuple[Tensor, Tensor]) -> Tuple[Tensor, Tuple[Tensor, Tensor]] + inputs = input.unbind(0) + #outputs = torch.jit.annotate(List[Tensor], []) + outputs = [] + for i in range(len(inputs)): + out, hidden = self.cell(inputs[i], hidden) + outputs += [out] + outputs = torch.stack(outputs) + return outputs, hidden + + +class AttentiveLSTMLayer(nn.Module): + def __init__(self, input_size, hidden_size, dropout=0.0): + super(AttentiveLSTMLayer, self).__init__() + self.input_size = input_size + self.hidden_size = hidden_size + attention_hsize = hidden_size + self.attention_hsize = attention_hsize + + self.cell = LSTMCell(input_size, hidden_size) + self.attn_layer = nn.Sequential(nn.Linear(2 * hidden_size + input_size, attention_hsize), + nn.Tanh(), + nn.Linear(attention_hsize, 1)) + self.softmax = nn.Softmax(dim=0) + self.dropout = dropout + + #@jit.script_method + def forward(self, input, hidden): + inputs = input.unbind(0) + #outputs = torch.jit.annotate(List[Tensor], []) + outputs = [] + + for t in range(len(input)): + # attention on windows + hx, cx = hidden[0].squeeze(0), hidden[1].squeeze(0) + hx_rep = hx.repeat(len(inputs), 1, 1) + cx_rep = cx.repeat(len(inputs), 1, 1) + x = torch.cat((input, hx_rep, cx_rep), dim=-1) + l = self.attn_layer(x) + beta = self.softmax(l) + context = torch.bmm(beta.permute(1, 2, 0), + input.permute(1, 0, 2)).squeeze(1) + out, hidden = self.cell(context, hidden) + outputs += [out] + outputs = torch.stack(outputs) + return outputs, hidden + + +class DRNN(nn.Module): + + def __init__(self, n_input, n_hidden, n_layers, dilations, dropout=0, cell_type='GRU', batch_first=False): + + super(DRNN, self).__init__() + + self.dilations = dilations + self.cell_type = cell_type + self.batch_first = batch_first + + layers = [] + if self.cell_type == "GRU": + cell = nn.GRU + elif self.cell_type == "RNN": + cell = nn.RNN + elif self.cell_type == "LSTM": + cell = nn.LSTM + elif self.cell_type == "ResLSTM": + cell = ResLSTMLayer + elif self.cell_type == "AttentiveLSTM": + cell = AttentiveLSTMLayer + else: + raise NotImplementedError + + for i in range(n_layers): + if i == 0: + c = cell(n_input, n_hidden, dropout=dropout) + else: + c = cell(n_hidden, n_hidden, dropout=dropout) + layers.append(c) + self.cells = nn.Sequential(*layers) + + def forward(self, inputs, hidden=None): + if self.batch_first: + inputs = inputs.transpose(0, 1) + outputs = [] + for i, (cell, dilation) in enumerate(zip(self.cells, self.dilations)): + if hidden is None: + inputs, _ = self.drnn_layer(cell, inputs, dilation) + else: + inputs, hidden[i] = self.drnn_layer(cell, inputs, dilation, hidden[i]) + + outputs.append(inputs[-dilation:]) + + if self.batch_first: + inputs = inputs.transpose(0, 1) + return inputs, outputs + + def drnn_layer(self, cell, inputs, rate, hidden=None): + + n_steps = len(inputs) + batch_size = inputs[0].size(0) + hidden_size = cell.hidden_size + + inputs, dilated_steps = self._pad_inputs(inputs, n_steps, rate) + dilated_inputs = self._prepare_inputs(inputs, rate) + + if hidden is None: + dilated_outputs, hidden = self._apply_cell(dilated_inputs, cell, batch_size, rate, hidden_size) + else: + hidden = self._prepare_inputs(hidden, rate) + dilated_outputs, hidden = self._apply_cell(dilated_inputs, cell, batch_size, rate, hidden_size, + hidden=hidden) + + splitted_outputs = self._split_outputs(dilated_outputs, rate) + outputs = self._unpad_outputs(splitted_outputs, n_steps) + + return outputs, hidden + + def _apply_cell(self, dilated_inputs, cell, batch_size, rate, hidden_size, hidden=None): + if hidden is None: + if self.cell_type == 'LSTM' or self.cell_type == 'ResLSTM' or self.cell_type == 'AttentiveLSTM': + c, m = self.init_hidden(batch_size * rate, hidden_size) + hidden = (c.unsqueeze(0), m.unsqueeze(0)) + else: + hidden = self.init_hidden(batch_size * rate, hidden_size).unsqueeze(0) + + dilated_outputs, hidden = cell(dilated_inputs, hidden) # compatibility hack + + return dilated_outputs, hidden + + def _unpad_outputs(self, splitted_outputs, n_steps): + return splitted_outputs[:n_steps] + + def _split_outputs(self, dilated_outputs, rate): + batchsize = dilated_outputs.size(1) // rate + + blocks = [dilated_outputs[:, i * batchsize: (i + 1) * batchsize, :] for i in range(rate)] + + interleaved = torch.stack((blocks)).transpose(1, 0).contiguous() + interleaved = interleaved.view(dilated_outputs.size(0) * rate, + batchsize, + dilated_outputs.size(2)) + return interleaved + + def _pad_inputs(self, inputs, n_steps, rate): + iseven = (n_steps % rate) == 0 + + if not iseven: + dilated_steps = n_steps // rate + 1 + + zeros_ = torch.zeros(dilated_steps * rate - inputs.size(0), + inputs.size(1), + inputs.size(2)) + if use_cuda: + zeros_ = zeros_.cuda() + + inputs = torch.cat((inputs, autograd.Variable(zeros_))) + else: + dilated_steps = n_steps // rate + + return inputs, dilated_steps + + def _prepare_inputs(self, inputs, rate): + dilated_inputs = torch.cat([inputs[j::rate, :, :] for j in range(rate)], 1) + return dilated_inputs + + def init_hidden(self, batch_size, hidden_dim): + hidden = autograd.Variable(torch.zeros(batch_size, hidden_dim)) + if use_cuda: + hidden = hidden.cuda() + if self.cell_type == "LSTM" or self.cell_type == 'ResLSTM' or self.cell_type == 'AttentiveLSTM': + memory = autograd.Variable(torch.zeros(batch_size, hidden_dim)) + if use_cuda: + memory = memory.cuda() + return hidden, memory + else: + return hidden + + +if __name__ == '__main__': + n_inp = 4 + n_hidden = 4 + n_layers = 2 + batch_size = 3 + n_windows = 2 + cell_type = 'ResLSTM' + + model = DRNN(n_inp, n_hidden, n_layers=n_layers, cell_type=cell_type, dilations=[1,2]) + + test_x1 = torch.autograd.Variable(torch.randn(n_windows, batch_size, n_inp)) + test_x2 = torch.autograd.Variable(torch.randn(n_windows, batch_size, n_inp)) + + out, hidden = model(test_x1) diff --git a/morphemic-forecasting-eshybrid/ESRNN/utils/ESRNN.py b/morphemic-forecasting-eshybrid/ESRNN/utils/ESRNN.py new file mode 100644 index 00000000..d2f07295 --- /dev/null +++ b/morphemic-forecasting-eshybrid/ESRNN/utils/ESRNN.py @@ -0,0 +1,289 @@ +import torch +import torch.nn as nn +from ESRNN.utils.DRNN import DRNN +import numpy as np + +#import torch.jit as jit + + +class _ES(nn.Module): + def __init__(self, mc): + super(_ES, self).__init__() + self.mc = mc + self.n_series = self.mc.n_series + self.output_size = self.mc.output_size + assert len(self.mc.seasonality) in [0, 1, 2] + + def gaussian_noise(self, input_data, std=0.2): + size = input_data.size() + noise = torch.autograd.Variable(input_data.data.new(size).normal_(0, std)) + return input_data + noise + + #@jit.script_method + def compute_levels_seasons(self, y, idxs): + pass + + def normalize(self, y, level, seasonalities): + pass + + def predict(self, trend, levels, seasonalities): + pass + + def forward(self, ts_object): + # parse mc + input_size = self.mc.input_size + output_size = self.mc.output_size + exogenous_size = self.mc.exogenous_size + noise_std = self.mc.noise_std + seasonality = self.mc.seasonality + batch_size = len(ts_object.idxs) + + # Parse ts_object + y = ts_object.y + idxs = ts_object.idxs + n_series, n_time = y.shape + if self.training: + windows_end = n_time-input_size-output_size+1 + windows_range = range(windows_end) + else: + windows_start = n_time-input_size-output_size+1 + windows_end = n_time-input_size+1 + + windows_range = range(windows_start, windows_end) + n_windows = len(windows_range) + assert n_windows>0 + + # Initialize windows, levels and seasonalities + levels, seasonalities = self.compute_levels_seasons(y, idxs) + windows_y_hat = torch.zeros((n_windows, batch_size, input_size+exogenous_size), + device=self.mc.device) + windows_y = torch.zeros((n_windows, batch_size, output_size), + device=self.mc.device) + + for i, window in enumerate(windows_range): + # Windows yhat + y_hat_start = window + y_hat_end = input_size + window + + # Y_hat deseasonalization and normalization + window_y_hat = self.normalize(y=y[:, y_hat_start:y_hat_end], + level=levels[:, [y_hat_end-1]], + seasonalities=seasonalities, + start=y_hat_start, end=y_hat_end) + + if self.training: + window_y_hat = self.gaussian_noise(window_y_hat, std=noise_std) + + # Concatenate categories + if exogenous_size>0: + window_y_hat = torch.cat((window_y_hat, ts_object.categories), 1) + + windows_y_hat[i, :, :] += window_y_hat + + # Windows y (for loss during train) + if self.training: + y_start = y_hat_end + y_end = y_start+output_size + # Y deseasonalization and normalization + window_y = self.normalize(y=y[:, y_start:y_end], + level=levels[:, [y_start]], + seasonalities=seasonalities, + start=y_start, end=y_end) + windows_y[i, :, :] += window_y + + return windows_y_hat, windows_y, levels, seasonalities + +class _ESM(_ES): + def __init__(self, mc): + super(_ESM, self).__init__(mc) + # Level and Seasonality Smoothing parameters + # 1 level, S seasonalities, S init_seas + embeds_size = 1 + len(self.mc.seasonality) + sum(self.mc.seasonality) + init_embeds = torch.ones((self.n_series, embeds_size)) * 0.5 + self.embeds = nn.Embedding(self.n_series, embeds_size) + self.embeds.weight.data.copy_(init_embeds) + self.register_buffer('seasonality', torch.LongTensor(self.mc.seasonality)) + + #@jit.script_method + def compute_levels_seasons(self, y, idxs): + """ + Computes levels and seasons + """ + # Lookup parameters per serie + #seasonality = self.seasonality + embeds = self.embeds(idxs) + lev_sms = torch.sigmoid(embeds[:, 0]) + + # Initialize seasonalities + seas_prod = torch.ones(len(y[:,0])).to(y.device) + #seasonalities1 = torch.jit.annotate(List[Tensor], []) + #seasonalities2 = torch.jit.annotate(List[Tensor], []) + seasonalities1 = [] + seasonalities2 = [] + seas_sms1 = torch.ones(1).to(y.device) + seas_sms2 = torch.ones(1).to(y.device) + + if len(self.seasonality)>0: + seas_sms1 = torch.sigmoid(embeds[:, 1]) + init_seas1 = torch.exp(embeds[:, 2:(2+self.seasonality[0])]).unbind(1) + assert len(init_seas1) == self.seasonality[0] + + for i in range(len(init_seas1)): + seasonalities1 += [init_seas1[i]] + seasonalities1 += [init_seas1[0]] + seas_prod = seas_prod * init_seas1[0] + + if len(self.seasonality)==2: + seas_sms2 = torch.sigmoid(embeds[:, 2+self.seasonality[0]]) + init_seas2 = torch.exp(embeds[:, 3+self.seasonality[0]:]).unbind(1) + assert len(init_seas2) == self.seasonality[1] + + for i in range(len(init_seas2)): + seasonalities2 += [init_seas2[i]] + seasonalities2 += [init_seas2[0]] + seas_prod = seas_prod * init_seas2[0] + + # Initialize levels + #levels = torch.jit.annotate(List[Tensor], []) + levels = [] + levels += [y[:,0]/seas_prod] + + # Recursive seasonalities and levels + ys = y.unbind(1) + n_time = len(ys) + for t in range(1, n_time): + + seas_prod_t = torch.ones(len(y[:,t])).to(y.device) + if len(self.seasonality)>0: + seas_prod_t = seas_prod_t * seasonalities1[t] + if len(self.seasonality)==2: + seas_prod_t = seas_prod_t * seasonalities2[t] + + newlev = lev_sms * (ys[t] / seas_prod_t) + (1-lev_sms) * levels[t-1] + levels += [newlev] + + if len(self.seasonality)==1: + newseason1 = seas_sms1 * (ys[t] / newlev) + (1-seas_sms1) * seasonalities1[t] + seasonalities1 += [newseason1] + + if len(self.seasonality)==2: + newseason1 = seas_sms1 * (ys[t] / (newlev * seasonalities2[t])) + \ + (1-seas_sms1) * seasonalities1[t] + seasonalities1 += [newseason1] + newseason2 = seas_sms2 * (ys[t] / (newlev * seasonalities1[t])) + \ + (1-seas_sms2) * seasonalities2[t] + seasonalities2 += [newseason2] + + levels = torch.stack(levels).transpose(1,0) + + #seasonalities = torch.jit.annotate(List[Tensor], []) + seasonalities = [] + + if len(self.seasonality)>0: + seasonalities += [torch.stack(seasonalities1).transpose(1,0)] + + if len(self.seasonality)==2: + seasonalities += [torch.stack(seasonalities2).transpose(1,0)] + + return levels, seasonalities + + def normalize(self, y, level, seasonalities, start, end): + # Deseasonalization and normalization + y_n = y / level + for s in range(len(self.seasonality)): + y_n /= seasonalities[s][:, start:end] + y_n = torch.log(y_n) + return y_n + + def predict(self, trend, levels, seasonalities): + output_size = self.mc.output_size + seasonality = self.mc.seasonality + n_time = levels.shape[1] + + # Denormalize + trend = torch.exp(trend) + + # Completion of seasonalities if prediction horizon is larger than seasonality + # Naive2 like prediction, to avoid recursive forecasting + for s in range(len(seasonality)): + if output_size > seasonality[s]: + repetitions = int(np.ceil(output_size/seasonality[s]))-1 + last_season = seasonalities[s][:, -seasonality[s]:] + extra_seasonality = last_season.repeat((1, repetitions)) + seasonalities[s] = torch.cat((seasonalities[s], extra_seasonality), 1) + + # Deseasonalization and normalization (inverse) + y_hat = trend * levels[:,[n_time-1]] + for s in range(len(seasonality)): + y_hat *= seasonalities[s][:, n_time:(n_time+output_size)] + + return y_hat + +class _RNN(nn.Module): + def __init__(self, mc): + super(_RNN, self).__init__() + self.mc = mc + self.layers = len(mc.dilations) + + layers = [] + for grp_num in range(len(mc.dilations)): + if grp_num == 0: + input_size = mc.input_size + mc.exogenous_size + else: + input_size = mc.state_hsize + layer = DRNN(input_size, + mc.state_hsize, + n_layers=len(mc.dilations[grp_num]), + dilations=mc.dilations[grp_num], + cell_type=mc.cell_type) + layers.append(layer) + + self.rnn_stack = nn.Sequential(*layers) + + if self.mc.add_nl_layer: + self.MLPW = nn.Linear(mc.state_hsize, mc.state_hsize) + + self.adapterW = nn.Linear(mc.state_hsize, mc.output_size) + + def forward(self, input_data): + for layer_num in range(len(self.rnn_stack)): + residual = input_data + output, _ = self.rnn_stack[layer_num](input_data) + if layer_num > 0: + output += residual + input_data = output + + if self.mc.add_nl_layer: + input_data = self.MLPW(input_data) + input_data = torch.tanh(input_data) + + input_data = self.adapterW(input_data) + return input_data + + +class _ESRNN(nn.Module): + def __init__(self, mc): + super(_ESRNN, self).__init__() + self.mc = mc + self.es = _ESM(mc).to(self.mc.device) + self.rnn = _RNN(mc).to(self.mc.device) + + def forward(self, ts_object): + # ES Forward + windows_y_hat, windows_y, levels, seasonalities = self.es(ts_object) + + # RNN Forward + windows_y_hat = self.rnn(windows_y_hat) + + return windows_y, windows_y_hat, levels + + def predict(self, ts_object): + # ES Forward + windows_y_hat, _, levels, seasonalities = self.es(ts_object) + + # RNN Forward + windows_y_hat = self.rnn(windows_y_hat) + trend = windows_y_hat[-1,:,:] # Last observation prediction + + y_hat = self.es.predict(trend, levels, seasonalities) + return y_hat diff --git a/morphemic-forecasting-eshybrid/ESRNN/utils/__init__.py b/morphemic-forecasting-eshybrid/ESRNN/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/morphemic-forecasting-eshybrid/ESRNN/utils/config.py b/morphemic-forecasting-eshybrid/ESRNN/utils/config.py new file mode 100644 index 00000000..72ad9b77 --- /dev/null +++ b/morphemic-forecasting-eshybrid/ESRNN/utils/config.py @@ -0,0 +1,53 @@ +class ModelConfig(object): + def __init__(self, max_epochs, batch_size, batch_size_test, freq_of_test, + learning_rate, lr_scheduler_step_size, lr_decay, + per_series_lr_multip, gradient_eps, gradient_clipping_threshold, + rnn_weight_decay, + noise_std, + level_variability_penalty, + testing_percentile, training_percentile, ensemble, + cell_type, + state_hsize, dilations, add_nl_layer, seasonality, input_size, output_size, + frequency, max_periods, random_seed, device, root_dir): + + # Train Parameters + self.max_epochs = max_epochs + self.batch_size = batch_size + self.batch_size_test = batch_size_test + self.freq_of_test = freq_of_test + self.learning_rate = learning_rate + self.lr_scheduler_step_size = lr_scheduler_step_size + self.lr_decay = lr_decay + self.per_series_lr_multip = per_series_lr_multip + self.gradient_eps = gradient_eps + self.gradient_clipping_threshold = gradient_clipping_threshold + self.rnn_weight_decay = rnn_weight_decay + self.noise_std = noise_std + self.level_variability_penalty = level_variability_penalty + self.testing_percentile = testing_percentile + self.training_percentile = training_percentile + self.ensemble = ensemble + self.device = device + + # Model Parameters + self.cell_type = cell_type + self.state_hsize = state_hsize + self.dilations = dilations + self.add_nl_layer = add_nl_layer + self.random_seed = random_seed + + # Data Parameters + self.seasonality = seasonality + if len(seasonality)>0: + self.naive_seasonality = seasonality[0] + else: + self.naive_seasonality = 1 + self.input_size = input_size + self.input_size_i = self.input_size + self.output_size = output_size + self.output_size_i = self.output_size + self.frequency = frequency + self.min_series_length = self.input_size_i + self.output_size_i + self.max_series_length = (max_periods * self.input_size) + self.min_series_length + self.max_periods = max_periods + self.root_dir = root_dir \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/ESRNN/utils/data.py b/morphemic-forecasting-eshybrid/ESRNN/utils/data.py new file mode 100644 index 00000000..4505f107 --- /dev/null +++ b/morphemic-forecasting-eshybrid/ESRNN/utils/data.py @@ -0,0 +1,150 @@ +import numpy as np +import torch + + +class Batch(): + def __init__(self, mc, y, last_ds, categories, idxs): + # Parse Model config + exogenous_size = mc.exogenous_size + device = mc.device + + # y: time series values + n = len(y) + y = np.float32(y) + self.idxs = torch.LongTensor(idxs).to(device) + self.y = y + if (self.y.shape[1] > mc.max_series_length): + y = y[:, -mc.max_series_length:] + self.y = torch.tensor(y).float() + + # last_ds: last time for prediction purposes + self.last_ds = last_ds + + # categories: exogenous categoric data + if exogenous_size >0: + self.categories = np.zeros((len(idxs), exogenous_size)) + cols_idx = np.array([mc.category_to_idx[category] for category in categories]) + rows_idx = np.array(range(len(cols_idx))) + self.categories[rows_idx, cols_idx] = 1 + self.categories = torch.from_numpy(self.categories).float() + + self.y = self.y.to(device) + self.categories = self.categories.to(device) + + +class Iterator(object): + """ Time Series Iterator. + + Parameters + ---------- + mc: ModelConfig object + ModelConfig object with inherited hyperparameters: + batch_size, and exogenous_size, from the ESRNN + initialization. + X: array, shape (n_unique_id, 3) + Panel array with unique_id, last date stamp and + exogenous variable. + y: array, shape (n_unique_id, n_time) + Panel array in wide format with unique_id, last + date stamp and time series values. + Returns + ---------- + self : object + Iterator method get_batch() returns a batch of time + series objects defined by the Batch class. + """ + def __init__(self, mc, X, y, weights=None): + if weights is not None: + assert len(weights)==len(X) + train_ids = np.where(weights==1)[0] + self.X = X[train_ids,:] + self.y = y[train_ids,:] + else: + self.X = X + self.y = y + assert len(X)==len(y) + + # Parse Model config + self.mc = mc + self.batch_size = mc.batch_size + + self.unique_idxs = np.unique(self.X[:, 0]) + assert len(self.unique_idxs)==len(self.X) + self.n_series = len(self.unique_idxs) + + #assert self.batch_size <= self.n_series + + # Initialize batch iterator + self.b = 0 + self.n_batches = int(np.ceil(self.n_series / self.batch_size)) + shuffle = list(range(self.n_series)) + self.sort_key = {'unique_id': [self.unique_idxs[i] for i in shuffle], + 'sort_key': shuffle} + + def update_batch_size(self, new_batch_size): + self.batch_size = new_batch_size + assert self.batch_size <= self.n_series + self.n_batches = int(np.ceil(self.n_series / self.batch_size)) + + def shuffle_dataset(self, random_seed=1): + """Return the examples in the dataset in order, or shuffled.""" + # Random Seed + np.random.seed(random_seed) + self.random_seed = random_seed + shuffle = np.random.choice(self.n_series, self.n_series, replace=False) + self.X = self.X[shuffle] + self.y = self.y[shuffle] + + old_sort_key = self.sort_key['sort_key'] + old_unique_idxs = self.sort_key['unique_id'] + self.sort_key = {'unique_id': [old_unique_idxs[i] for i in shuffle], + 'sort_key': [old_sort_key[i] for i in shuffle]} + + def get_trim_batch(self, unique_id): + if unique_id==None: + # Compute the indexes of the minibatch. + first = (self.b * self.batch_size) + last = min((first + self.batch_size), self.n_series) + else: + # Obtain unique_id index + assert unique_id in self.sort_key['unique_id'], "unique_id, not fitted" + first = self.sort_key['unique_id'].index(unique_id) + last = first+1 + + # Extract values for batch + unique_idxs = self.sort_key['unique_id'][first:last] + batch_idxs = self.sort_key['sort_key'][first:last] + + batch_y = self.y[first:last] + batch_categories = self.X[first:last, 1] + batch_last_ds = self.X[first:last, 2] + + len_series = np.count_nonzero(~np.isnan(batch_y), axis=1) + min_len = min(len_series) + last_numeric = (~np.isnan(batch_y)).cumsum(1).argmax(1)+1 + + # Trimming to match min_len + y_b = np.zeros((batch_y.shape[0], min_len)) + for i in range(batch_y.shape[0]): + y_b[i] = batch_y[i,(last_numeric[i]-min_len):last_numeric[i]] + batch_y = y_b + + assert not np.isnan(batch_y).any(), \ + "clean np.nan's from unique_idxs: {}".format(unique_idxs) + assert batch_y.shape[0]==len(batch_idxs)==len(batch_last_ds)==len(batch_categories) + assert batch_y.shape[1]>=1 + + # Feed to Batch + batch = Batch(mc=self.mc, y=batch_y, last_ds=batch_last_ds, + categories=batch_categories, idxs=batch_idxs) + self.b = (self.b + 1) % self.n_batches + return batch + + def get_batch(self, unique_id=None): + return self.get_trim_batch(unique_id) + + def __len__(self): + return self.n_batches + + def __iter__(self): + pass diff --git a/morphemic-forecasting-eshybrid/ESRNN/utils/losses.py b/morphemic-forecasting-eshybrid/ESRNN/utils/losses.py new file mode 100644 index 00000000..23dc1c05 --- /dev/null +++ b/morphemic-forecasting-eshybrid/ESRNN/utils/losses.py @@ -0,0 +1,124 @@ +import torch +import torch.nn as nn + +class PinballLoss(nn.Module): + """ Pinball Loss + Computes the pinball loss between y and y_hat. + + Parameters + ---------- + y: tensor + actual values in torch tensor. + y_hat: tensor (same shape as y) + predicted values in torch tensor. + tau: float, between 0 and 1 + the slope of the pinball loss, in the context of + quantile regression, the value of tau determines the + conditional quantile level. + + Returns + ---------- + pinball_loss: + average accuracy for the predicted quantile + """ + def __init__(self, tau=0.5): + super(PinballLoss, self).__init__() + self.tau = tau + + def forward(self, y, y_hat): + delta_y = torch.sub(y, y_hat) + pinball = torch.max(torch.mul(self.tau, delta_y), torch.mul((self.tau-1), delta_y)) + pinball = pinball.mean() + return pinball + +class LevelVariabilityLoss(nn.Module): + """ Level Variability Loss + Computes the variability penalty for the level. + + Parameters + ---------- + levels: tensor with shape (batch, n_time) + levels obtained from exponential smoothing component of ESRNN + level_variability_penalty: float + this parameter controls the strength of the penalization + to the wigglines of the level vector, induces smoothness + in the output + + Returns + ---------- + level_var_loss: + wiggliness loss for the level vector + """ + def __init__(self, level_variability_penalty): + super(LevelVariabilityLoss, self).__init__() + self.level_variability_penalty = level_variability_penalty + + def forward(self, levels): + assert levels.shape[1] > 2 + level_prev = torch.log(levels[:, :-1]) + level_next = torch.log(levels[:, 1:]) + log_diff_of_levels = torch.sub(level_prev, level_next) + + log_diff_prev = log_diff_of_levels[:, :-1] + log_diff_next = log_diff_of_levels[:, 1:] + diff = torch.sub(log_diff_prev, log_diff_next) + level_var_loss = diff**2 + level_var_loss = level_var_loss.mean() * self.level_variability_penalty + return level_var_loss + +class StateLoss(nn.Module): + pass + +class SmylLoss(nn.Module): + """Computes the Smyl Loss that combines level variability with + with Pinball loss. + windows_y: tensor of actual values, + shape (n_windows, batch_size, window_size). + windows_y_hat: tensor of predicted values, + shape (n_windows, batch_size, window_size). + levels: levels obtained from exponential smoothing component of ESRNN. + tensor with shape (batch, n_time). + return: smyl_loss. + """ + def __init__(self, tau, level_variability_penalty=0.0): + super(SmylLoss, self).__init__() + self.pinball_loss = PinballLoss(tau) + self.level_variability_loss = LevelVariabilityLoss(level_variability_penalty) + + def forward(self, windows_y, windows_y_hat, levels): + smyl_loss = self.pinball_loss(windows_y, windows_y_hat) + if self.level_variability_loss.level_variability_penalty>0: + log_diff_of_levels = self.level_variability_loss(levels) + smyl_loss += log_diff_of_levels + return smyl_loss + + +class DisaggregatedPinballLoss(nn.Module): + """ Pinball Loss + Computes the pinball loss between y and y_hat. + + Parameters + ---------- + y: tensor + actual values in torch tensor. + y_hat: tensor (same shape as y) + predicted values in torch tensor. + tau: float, between 0 and 1 + the slope of the pinball loss, in the context of + quantile regression, the value of tau determines the + conditional quantile level. + + Returns + ---------- + pinball_loss: + average accuracy for the predicted quantile + """ + def __init__(self, tau=0.5): + super(DisaggregatedPinballLoss, self).__init__() + self.tau = tau + + def forward(self, y, y_hat): + delta_y = torch.sub(y, y_hat) + pinball = torch.max(torch.mul(self.tau, delta_y), torch.mul((self.tau-1), delta_y)) + pinball = pinball.mean(axis=0).mean(axis=1) + return pinball \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/ESRNN/utils_configs.py b/morphemic-forecasting-eshybrid/ESRNN/utils_configs.py new file mode 100644 index 00000000..5a3bcced --- /dev/null +++ b/morphemic-forecasting-eshybrid/ESRNN/utils_configs.py @@ -0,0 +1,232 @@ +def get_config(dataset_name): + """ + Returns dict config + + Parameters + ---------- + dataset_name: str + """ + allowed_dataset_names = ('Yearly', 'Monthly', 'Weekly', 'Hourly', 'Quarterly', 'Daily') + if dataset_name not in allowed_dataset_names: + raise ValueError(f'kind must be one of {allowed_kinds}') + + if dataset_name == 'Yearly': + return YEARLY + elif dataset_name == 'Monthly': + return MONTHLY + elif dataset_name == 'Weekly': + return WEEKLY + elif dataset_name == 'Hourly': + return HOURLY + elif dataset_name == 'Quarterly': + return QUARTERLY + elif dataset_name == 'Daily': + return DAILY + +YEARLY = { + 'device': 'cuda', + 'train_parameters': { + 'max_epochs': 25, + 'batch_size': 4, + 'freq_of_test': 5, + 'learning_rate': '1e-4', + 'lr_scheduler_step_size': 10, + 'lr_decay': 0.1, + 'per_series_lr_multip': 0.8, + 'gradient_clipping_threshold': 50, + 'rnn_weight_decay': 0, + 'noise_std': 0.001, + 'level_variability_penalty': 100, + 'testing_percentile': 50, + 'training_percentile': 50, + 'ensemble': False + }, + 'data_parameters': { + 'max_periods': 25, + 'seasonality': [], + 'input_size': 4, + 'output_size': 6, + 'frequency': 'Y' + }, + 'model_parameters': { + 'cell_type': 'LSTM', + 'state_hsize': 40, + 'dilations': [[1], [6]], + 'add_nl_layer': False, + 'random_seed': 117982 + } +} + +MONTHLY = { + 'device': 'cuda', + 'train_parameters': { + 'max_epochs': 15, + 'batch_size': 64, + 'freq_of_test': 4, + 'learning_rate': '7e-4', + 'lr_scheduler_step_size': 12, + 'lr_decay': 0.2, + 'per_series_lr_multip': 0.5, + 'gradient_clipping_threshold': 20, + 'rnn_weight_decay': 0, + 'noise_std': 0.001, + 'level_variability_penalty': 50, + 'testing_percentile': 50, + 'training_percentile': 45, + 'ensemble': False + }, + 'data_parameters': { + 'max_periods': 36, + 'seasonality': [12], + 'input_size': 12, + 'output_size': 18, + 'frequency': 'M' + }, + 'model_parameters': { + 'cell_type': 'LSTM', + 'state_hsize': 50, + 'dilations': [[1, 3, 6, 12]], + 'add_nl_layer': False, + 'random_seed': 1 + } +} + + +WEEKLY = { + 'device': 'cuda', + 'train_parameters': { + 'max_epochs': 50, + 'batch_size': 32, + 'freq_of_test': 10, + 'learning_rate': '1e-2', + 'lr_scheduler_step_size': 10, + 'lr_decay': 0.5, + 'per_series_lr_multip': 1.0, + 'gradient_clipping_threshold': 20, + 'rnn_weight_decay': 0, + 'noise_std': 0.001, + 'level_variability_penalty': 100, + 'testing_percentile': 50, + 'training_percentile': 50, + 'ensemble': True + }, + 'data_parameters': { + 'max_periods': 31, + 'seasonality': [], + 'input_size': 10, + 'output_size': 13, + 'frequency': 'W' + }, + 'model_parameters': { + 'cell_type': 'ResLSTM', + 'state_hsize': 40, + 'dilations': [[1, 52]], + 'add_nl_layer': False, + 'random_seed': 2 + } +} + +HOURLY = { + 'device': 'cuda', + 'train_parameters': { + 'max_epochs': 20, + 'batch_size': 32, + 'freq_of_test': 5, + 'learning_rate': '1e-2', + 'lr_scheduler_step_size': 7, + 'lr_decay': 0.5, + 'per_series_lr_multip': 1.0, + 'gradient_clipping_threshold': 50, + 'rnn_weight_decay': 0, + 'noise_std': 0.001, + 'level_variability_penalty': 30, + 'testing_percentile': 50, + 'training_percentile': 50, + 'ensemble': True + }, + 'data_parameters': { + 'max_periods': 371, + 'seasonality': [24, 168], + 'input_size': 24, + 'output_size': 48, + 'frequency': 'H' + }, + 'model_parameters': { + 'cell_type': 'LSTM', + 'state_hsize': 40, + 'dilations': [[1, 4, 24, 168]], + 'add_nl_layer': False, + 'random_seed': 1 + } +} + +QUARTERLY = { + 'device': 'cuda', + 'train_parameters': { + 'max_epochs': 30, + 'batch_size': 16, + 'freq_of_test': 5, + 'learning_rate': '5e-4', + 'lr_scheduler_step_size': 10, + 'lr_decay': 0.5, + 'per_series_lr_multip': 1.0, + 'gradient_clipping_threshold': 20, + 'rnn_weight_decay': 0, + 'noise_std': 0.001, + 'level_variability_penalty': 100, + 'testing_percentile': 50, + 'training_percentile': 50, + 'ensemble': False + }, + 'data_parameters': { + 'max_periods': 20, + 'seasonality': [4], + 'input_size': 4, + 'output_size': 8, + 'frequency': 'Q' + }, + 'model_parameters': { + 'cell_type': 'LSTM', + 'state_hsize': 40, + 'dilations': [[1, 2, 4, 8]], + 'add_nl_layer': False, + 'random_seed': 3 + } +} + +DAILY = { + 'device': 'cuda', + 'train_parameters': { + 'max_epochs': 20, + 'batch_size': 64, + 'freq_of_test': 2, + 'learning_rate': '1e-2', + 'lr_scheduler_step_size': 4, + 'lr_decay': 0.3333, + 'per_series_lr_multip': 0.5, + 'gradient_clipping_threshold': 50, + 'rnn_weight_decay': 0, + 'noise_std': 0.0001, + 'level_variability_penalty': 100, + 'testing_percentile': 50, + 'training_percentile': 65, + 'ensemble': False + }, + 'data_parameters': { + 'max_periods': 15, + 'seasonality': [7], + 'input_size': 7, + 'output_size': 14, + 'frequency': 'D' + }, + 'model_parameters': { + 'n_models': 5, + 'n_top': 4, + 'cell_type': + 'LSTM', + 'state_hsize': 40, + 'dilations': [[1, 7, 28]], + 'add_nl_layer': True, + 'random_seed': 1 + } +} diff --git a/morphemic-forecasting-eshybrid/ESRNN/utils_evaluation.py b/morphemic-forecasting-eshybrid/ESRNN/utils_evaluation.py new file mode 100644 index 00000000..65a70a8a --- /dev/null +++ b/morphemic-forecasting-eshybrid/ESRNN/utils_evaluation.py @@ -0,0 +1,400 @@ +import numpy as np +from numpy.random import seed +seed(1) + +import pandas as pd +from math import sqrt + + +######################## +# UTILITY MODELS +######################## + +def detrend(insample_data): + """ + Calculates a & b parameters of LRL + :param insample_data: + :return: + """ + x = np.arange(len(insample_data)) + a, b = np.polyfit(x, insample_data, 1) + return a, b + +def deseasonalize(original_ts, ppy): + """ + Calculates and returns seasonal indices + :param original_ts: original data + :param ppy: periods per year + :return: + """ + """ + # === get in-sample data + original_ts = original_ts[:-out_of_sample] + """ + if seasonality_test(original_ts, ppy): + # ==== get moving averages + ma_ts = moving_averages(original_ts, ppy) + + # ==== get seasonality indices + le_ts = original_ts * 100 / ma_ts + le_ts = np.hstack((le_ts, np.full((ppy - (len(le_ts) % ppy)), np.nan))) + le_ts = np.reshape(le_ts, (-1, ppy)) + si = np.nanmean(le_ts, 0) + norm = np.sum(si) / (ppy * 100) + si = si / norm + else: + si = np.ones(ppy) + + return si + +def moving_averages(ts_init, window): + """ + Calculates the moving averages for a given TS + :param ts_init: the original time series + :param window: window length + :return: moving averages ts + """ + """ + As noted by Professor Isidro Lloret Galiana: + line 82: + if len(ts_init) % 2 == 0: + + should be changed to + if window % 2 == 0: + + This change has a minor (less then 0.05%) impact on the calculations of the seasonal indices + In order for the results to be fully replicable this change is not incorporated into the code below + """ + ts_init = pd.Series(ts_init) + + if len(ts_init) % 2 == 0: + ts_ma = ts_init.rolling(window, center=True).mean() + ts_ma = ts_ma.rolling(2, center=True).mean() + ts_ma = np.roll(ts_ma, -1) + else: + ts_ma = ts_init.rolling(window, center=True).mean() + + return ts_ma + +def seasonality_test(original_ts, ppy): + """ + Seasonality test + :param original_ts: time series + :param ppy: periods per year + :return: boolean value: whether the TS is seasonal + """ + s = acf(original_ts, 1) + for i in range(2, ppy): + s = s + (acf(original_ts, i) ** 2) + + limit = 1.645 * (sqrt((1 + 2 * s) / len(original_ts))) + + return (abs(acf(original_ts, ppy))) > limit + +def acf(data, k): + """ + Autocorrelation function + :param data: time series + :param k: lag + :return: + """ + m = np.mean(data) + s1 = 0 + for i in range(k, len(data)): + s1 = s1 + ((data[i] - m) * (data[i - k] - m)) + + s2 = 0 + for i in range(0, len(data)): + s2 = s2 + ((data[i] - m) ** 2) + + return float(s1 / s2) + +class Naive: + """ + Naive model. + This benchmark model produces a forecast that is equal to + the last observed value for a given time series. + """ + def __init__(self): + pass + + def fit(self, ts_init): + """ + ts_init: the original time series + ts_naive: last observations of time series + """ + self.ts_naive = [ts_init[-1]] + return self + + def predict(self, h): + return np.array(self.ts_naive * h) + +class SeasonalNaive: + """ + Seasonal Naive model. + This benchmark model produces a forecast that is equal to + the last observed value of the same season for a given time + series. + """ + def __init__(self): + pass + + def fit(self, ts_init, seasonality): + """ + ts_init: the original time series + frcy: frequency of the time series + ts_naive: last observations of time series + """ + self.ts_seasonal_naive = ts_init[-seasonality:] + return self + + def predict(self, h): + repetitions = int(np.ceil(h/len(self.ts_seasonal_naive))) + y_hat = np.tile(self.ts_seasonal_naive, reps=repetitions)[:h] + return y_hat + +class Naive2: + """ + Naive2 model. + Popular benchmark model for time series forecasting that automatically adapts + to the potential seasonality of a series based on an autocorrelation test. + If the series is seasonal the model composes the predictions of Naive and SeasonalNaive, + else the model predicts on the simple Naive. + """ + def __init__(self, seasonality): + self.seasonality = seasonality + + def fit(self, ts_init): + seasonality_in = deseasonalize(ts_init, ppy=self.seasonality) + windows = int(np.ceil(len(ts_init) / self.seasonality)) + + self.ts_init = ts_init + self.s_hat = np.tile(seasonality_in, reps=windows)[:len(ts_init)] + self.ts_des = ts_init / self.s_hat + + return self + + def predict(self, h): + s_hat = SeasonalNaive().fit(self.s_hat, + seasonality=self.seasonality).predict(h) + r_hat = Naive().fit(self.ts_des).predict(h) + y_hat = s_hat * r_hat + return y_hat + +######################## +# METRICS +######################## + +def mse(y, y_hat): + """ + Calculates Mean Squared Error. + + Parameters + ---------- + y: numpy array + actual test values + y_hat: numpy array + predicted values + + Returns + ------- + mse: float + mean squared error + """ + y = np.reshape(y, (-1,)) + y_hat = np.reshape(y_hat, (-1,)) + mse = np.mean(np.square(y - y_hat)).item() + return mse + +def mape(y, y_hat): + """ + Calculates Mean Absolute Percentage Error. + + Parameters + ---------- + y: numpy array + actual test values + y_hat: numpy array + predicted values + + Returns + ------- + mape: float + mean absolute percentage error + """ + y = np.reshape(y, (-1,)) + y_hat = np.reshape(y_hat, (-1,)) + mape = np.mean(np.abs(y - y_hat) / np.abs(y)) + return mape + +def smape(y, y_hat): + """ + Calculates Symmetric Mean Absolute Percentage Error. + + Parameters + ---------- + y: numpy array + actual test values + y_hat: numpy array + predicted values + + Returns + ------- + smape: float + symmetric mean absolute percentage error + """ + y = np.reshape(y, (-1,)) + y_hat = np.reshape(y_hat, (-1,)) + smape = np.mean(2.0 * np.abs(y - y_hat) / (np.abs(y) + np.abs(y_hat))) + return smape + +def mase(y, y_hat, y_train, seasonality): + """ + Calculates Mean Absolute Scaled Error. + + Parameters + ---------- + y: numpy array + actual test values + y_hat: numpy array + predicted values + y_train: numpy array + actual train values for Naive1 predictions + seasonality: int + main frequency of the time series + Quarterly 4, Daily 7, Monthly 12 + + Returns + ------- + mase: float + mean absolute scaled error + """ + y_hat_naive = [] + for i in range(seasonality, len(y_train)): + y_hat_naive.append(y_train[(i - seasonality)]) + + masep = np.mean(abs(y_train[seasonality:] - y_hat_naive)) + mase = np.mean(abs(y - y_hat)) / masep + return mase + +######################## +# PANEL EVALUATION +######################## + +def evaluate_panel(y_panel, y_hat_panel, metric, + y_insample=None, seasonality=None): + """ + Calculates metric for y_panel and y_hat_panel + y_panel: pandas df + panel with columns unique_id, ds, y + y_naive2_panel: pandas df + panel with columns unique_id, ds, y_hat + y_insample: pandas df + panel with columns unique_id, ds, y (train) + this is used in the MASE + seasonality: int + main frequency of the time series + Quarterly 4, Daily 7, Monthly 12 + return: list of metric evaluations + """ + metric_name = metric.__code__.co_name + + y_panel = y_panel.sort_values(['unique_id', 'ds']) + y_hat_panel = y_hat_panel.sort_values(['unique_id', 'ds']) + if y_insample is not None: + y_insample = y_insample.sort_values(['unique_id', 'ds']) + + assert len(y_panel)==len(y_hat_panel) + assert all(y_panel.unique_id.unique() == y_hat_panel.unique_id.unique()), "not same u_ids" + + evaluation = [] + for u_id in y_panel.unique_id.unique(): + top_row = np.asscalar(y_panel['unique_id'].searchsorted(u_id, 'left')) + bottom_row = np.asscalar(y_panel['unique_id'].searchsorted(u_id, 'right')) + y_id = y_panel[top_row:bottom_row].y.to_numpy() + + top_row = np.asscalar(y_hat_panel['unique_id'].searchsorted(u_id, 'left')) + bottom_row = np.asscalar(y_hat_panel['unique_id'].searchsorted(u_id, 'right')) + y_hat_id = y_hat_panel[top_row:bottom_row].y_hat.to_numpy() + assert len(y_id)==len(y_hat_id) + + if metric_name == 'mase': + assert (y_insample is not None) and (seasonality is not None) + top_row = np.asscalar(y_insample['unique_id'].searchsorted(u_id, 'left')) + bottom_row = np.asscalar(y_insample['unique_id'].searchsorted(u_id, 'right')) + y_insample_id = y_insample[top_row:bottom_row].y.to_numpy() + evaluation_id = metric(y_id, y_hat_id, y_insample_id, seasonality) + else: + evaluation_id = metric(y_id, y_hat_id) + evaluation.append(evaluation_id) + return evaluation + +def owa(y_panel, y_hat_panel, y_naive2_panel, y_insample, seasonality): + """ + Calculates MASE, sMAPE for Naive2 and current model + then calculatess Overall Weighted Average. + y_panel: pandas df + panel with columns unique_id, ds, y + y_hat_panel: pandas df + panel with columns unique_id, ds, y_hat + y_naive2_panel: pandas df + panel with columns unique_id, ds, y_hat + y_insample: pandas df + panel with columns unique_id, ds, y (train) + this is used in the MASE + seasonality: int + main frequency of the time series + Quarterly 4, Daily 7, Monthly 12 + return: OWA + """ + total_mase = evaluate_panel(y_panel, y_hat_panel, mase, + y_insample, seasonality) + total_mase_naive2 = evaluate_panel(y_panel, y_naive2_panel, mase, + y_insample, seasonality) + total_smape = evaluate_panel(y_panel, y_hat_panel, smape) + total_smape_naive2 = evaluate_panel(y_panel, y_naive2_panel, smape) + + assert len(total_mase) == len(total_mase_naive2) + assert len(total_smape) == len(total_smape_naive2) + assert len(total_mase) == len(total_smape) + + naive2_mase = np.mean(total_mase_naive2) + naive2_smape = np.mean(total_smape_naive2) * 100 + + model_mase = np.mean(total_mase) + model_smape = np.mean(total_smape) * 100 + + model_owa = ((model_mase/naive2_mase) + (model_smape/naive2_smape))/2 + return model_owa, model_mase, model_smape + +def evaluate_prediction_owa(y_hat_df, y_train_df, X_test_df, y_test_df, + naive2_seasonality): + """ + y_hat_df: pandas df + panel with columns unique_id, ds, y_hat + y_train_df: pandas df + panel with columns unique_id, ds, y + X_test_df: pandas df + panel with columns unique_id, ds, x + y_test_df: pandas df + panel with columns unique_id, ds, y, y_hat_naive2 + naive2_seasonality: int + seasonality for the Naive2 predictions (needed for owa) + model: python class + python class with predict method + """ + y_panel = y_test_df.filter(['unique_id', 'ds', 'y']) + y_naive2_panel = y_test_df.filter(['unique_id', 'ds', 'y_hat_naive2']) + y_naive2_panel.rename(columns={'y_hat_naive2': 'y_hat'}, inplace=True) + y_hat_panel = y_hat_df + y_insample = y_train_df.filter(['unique_id', 'ds', 'y']) + + model_owa, model_mase, model_smape = owa(y_panel, y_hat_panel, + y_naive2_panel, y_insample, + seasonality=naive2_seasonality) + + print(15*'=', ' Model evaluation ', 14*'=') + print('OWA: {} '.format(np.round(model_owa, 3))) + print('SMAPE: {} '.format(np.round(model_smape, 3))) + print('MASE: {} '.format(np.round(model_mase, 3))) + return model_owa, model_mase, model_smape diff --git a/morphemic-forecasting-eshybrid/ESRNN/utils_visualization.py b/morphemic-forecasting-eshybrid/ESRNN/utils_visualization.py new file mode 100644 index 00000000..4cdacb6d --- /dev/null +++ b/morphemic-forecasting-eshybrid/ESRNN/utils_visualization.py @@ -0,0 +1,145 @@ +import math +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt +plt.style.use('ggplot') + +import seaborn as sns +from itertools import product +import random + + +def plot_prediction(y, y_hat): + """ + y: pandas df + panel with columns unique_id, ds, y + y_hat: pandas df + panel with columns unique_id, ds, y_hat + """ + pd.plotting.register_matplotlib_converters() + + plt.plot(y.ds, y.y, label = 'y') + plt.plot(y_hat.ds, y_hat.y_hat, label='y_hat') + plt.legend(loc='upper left') + plt.show() + +def plot_grid_prediction(y, y_hat, plot_random=True, unique_ids=None, save_file_name = None): + """ + y: pandas df + panel with columns unique_id, ds, y + y_hat: pandas df + panel with columns unique_id, ds, y_hat + plot_random: bool + if unique_ids will be sampled + unique_ids: list + unique_ids to plot + save_file_name: str + file name to save plot + """ + pd.plotting.register_matplotlib_converters() + + fig, axes = plt.subplots(2, 4, figsize = (24,8)) + + if not unique_ids: + unique_ids = y['unique_id'].unique() + + assert len(unique_ids) >= 8, "Must provide at least 8 ts" + + if plot_random: + unique_ids = random.sample(set(unique_ids), k=8) + + for i, (idx, idy) in enumerate(product(range(2), range(4))): + y_uid = y[y.unique_id == unique_ids[i]] + y_uid_hat = y_hat[y_hat.unique_id == unique_ids[i]] + + axes[idx, idy].plot(y_uid.ds, y_uid.y, label = 'y') + axes[idx, idy].plot(y_uid_hat.ds, y_uid_hat.y_hat, label='y_hat') + axes[idx, idy].set_title(unique_ids[i]) + axes[idx, idy].legend(loc='upper left') + + plt.show() + + if save_file_name: + fig.savefig(save_file_name, bbox_inches='tight', pad_inches=0) + + +def plot_distributions(distributions_dict, fig_title=None, xlabel=None): + n_distributions = len(distributions_dict.keys()) + fig, ax = plt.subplots(1, figsize=(7, 5.5)) + plt.subplots_adjust(wspace=0.35) + + n_colors = len(distributions_dict.keys()) + colors = sns.color_palette("hls", n_colors) + + for idx, dist_name in enumerate(distributions_dict.keys()): + train_dist_plot = sns.kdeplot(distributions_dict[dist_name], + bw='silverman', + label=dist_name, + color=colors[idx]) + if xlabel is not None: + ax.set_xlabel(xlabel, fontsize=14) + ax.set_ylabel('Density', fontsize=14) + ax.set_title(fig_title, fontsize=15.5) + ax.grid(True) + ax.legend(loc='center left', bbox_to_anchor=(1, 0.5)) + + fig.tight_layout() + if fig_title is not None: + fig_title = fig_title.replace(' ', '_') + plot_file = "./results/plots/{}_distributions.png".format(fig_title) + plt.savefig(plot_file, bbox_inches = "tight", dpi=300) + plt.show() + +def plot_cat_distributions(df, cat, var): + unique_cats = df[cat].unique() + cat_dict = {} + for c in unique_cats: + cat_dict[c] = df[df[cat]==c][var].values + + plot_distributions(cat_dict, xlabel=var) + +def plot_single_cat_distributions(distributions_dict, ax, fig_title=None, xlabel=None): + n_distributions = len(distributions_dict.keys()) + + n_colors = len(distributions_dict.keys()) + colors = sns.color_palette("hls", n_colors) + + for idx, dist_name in enumerate(distributions_dict.keys()): + train_dist_plot = sns.distplot(distributions_dict[dist_name], + #bw='silverman', + #kde=False, + rug=True, + label=dist_name, + color=colors[idx], + ax=ax) + if xlabel is not None: + ax.set_xlabel(xlabel, fontsize=14) + ax.set_ylabel('Density', fontsize=14) + ax.set_title(fig_title, fontsize=15.5) + ax.grid(True) + ax.legend(loc='center left', bbox_to_anchor=(1, 0.5)) + +def plot_grid_cat_distributions(df, cats, var): + cols = int(np.ceil(len(cats)/2)) + fig, axs = plt.subplots(2, cols, figsize=(4*cols, 5.5)) + plt.subplots_adjust(wspace=0.95) + plt.subplots_adjust(hspace=0.5) + + for idx, cat in enumerate(cats): + unique_cats = df[cat].unique() + cat_dict = {} + for c in unique_cats: + values = df[df[cat]==c][var].values + values = values[~np.isnan(values)] + if len(values)>0: + cat_dict[c] = values + + row = int(np.round((idx/len(cats))+0.001, 0)) + col = idx % cols + plot_single_cat_distributions(cat_dict, axs[row, col], + fig_title=cat, xlabel=var) + + min_owa = math.floor(df.min_owa.min() * 1000) / 1000 + suptitle = var + ': ' + str(min_owa) + fig.suptitle(suptitle, fontsize=18) + plt.show() diff --git a/morphemic-forecasting-eshybrid/__init__.py b/morphemic-forecasting-eshybrid/__init__.py new file mode 100644 index 00000000..68c849b8 --- /dev/null +++ b/morphemic-forecasting-eshybrid/__init__.py @@ -0,0 +1,4 @@ + +from . import ESRNN +from . import messaging +from . import forecasting \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/forecasting/__init__.py b/morphemic-forecasting-eshybrid/forecasting/__init__.py new file mode 100644 index 00000000..76c8675e --- /dev/null +++ b/morphemic-forecasting-eshybrid/forecasting/__init__.py @@ -0,0 +1,2 @@ + +from . import eshybrid \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/forecasting/eshybrid.py b/morphemic-forecasting-eshybrid/forecasting/eshybrid.py new file mode 100644 index 00000000..416c3818 --- /dev/null +++ b/morphemic-forecasting-eshybrid/forecasting/eshybrid.py @@ -0,0 +1,133 @@ +import messaging +import morphemic + +import time +import logging +import signal + + +class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListener, morphemic.scheduler.Handler): + + id="eshybrid" + metrics= set() + forecast_config = dict() + scheduler = False + + def __init__(self): + self._run=False + self.connector = messaging.morphemic.Connection('admin', 'admin') + self.model = morphemic.model.Model(self) + + def run(self): + logging.debug("setting up") + self.connector.set_listener(self.id, self) + self.connector.connect() + self.connector.topic("start_forecasting.es_hybrid", self.id) + self.connector.topic("stop_forecasting.es_hybrid", self.id) + self.connector.topic("metrics_to_predict", self.id) + + def topic(self,topic): + self.connector.topic(topic, self.id) + + def unsubscribe(self,topic): + self.connector.unsubscribe(topic, self.id) + + def reconnect(self): + self.connector.disconnect() + self.run() + pass + + def cleanup(self): + logging.debug("Cleaning Up...") + scheduler=False + pass + + def signal_handler(self, signum, frame): + logging.debug("SIGHUP") + self.stop() + + def stop(self): + logging.debug("Stopping...") + self._run = False + self.cleanup() + + def start(self): + logging.debug("Starting ESHybrid") + signal.signal(signal.SIGINT, self.signal_handler) + signal.signal(signal.SIGTERM, self.signal_handler) + signal.signal(signal.SIGHUP, self.signal_handler) + + self.run() + self._run=True + while self._run: + if self.scheduler: + self.scheduler.check(self) + time.sleep(1) + + def on_schedule(self, times): + predictions = self.model.predict(self.metrics,times) + for m in self.metrics: + for p in predictions[m]: + self.connector.send_to_topic( + "intermediate_prediction.eshybrid.%s" % m, + { + "metricValue": p['x'], + "timestamp": p['y'], + "predictionTime": int(time.time()), + } + ) + + def on_train(self): + self.connector.send_to_topic("training_models", + { + "metrics": [x for x in self.metrics], + "forecasting_method": self.id, + "timestamp": int(time.time() * 1000) + }) + + def on_stop_forecasting_es_hybrid(self,res): + logging.debug("[6] Stop Subscribing %s " % res) + + for metric in res[messaging.events.StopForecasting.METRICS]: + if metric in self.metrics: + logging.debug("Un-subscribing from %s " % metric) + self.connector.unsubscribe(metric,self.id) + self.metrics.remove(metric) + + self.forecast_config = res + if not len(self.metrics): + self.scheduler = False + + + def on_start_forecasting_es_hybrid(self,res): + logging.debug("[6] Start Forecasting %s " % res) + + for metric in res[messaging.events.StartForecasting.METRICS]: + if metric not in self.metrics: + logging.debug("Subscribing to %s " % metric) + self.topic(metric) + self.metrics.add(metric) + self.model.train() + self.scheduler = morphemic.scheduler.Scheduler( + epoch_start=res[messaging.events.StartForecasting.EPOCH_START], + forward_predictons= res[messaging.events.StartForecasting.NUMBER_OF_FORWARD_PREDICTIONS], + horizon= res[messaging.events.StartForecasting.PREDICTION_HORIZON] + ) + + def on(self,headers,res): + metric = self.get_topic_name(headers) + if self.get_topic_name(headers) in self.metrics: + predictions = self.model.predict() + for p in predictions: + self.connector.send_to_topic( + 'intermediate_prediction.%s.%s' % ( self.id, metric), + p + ) + + def on_error(self, headers, body): + logging.error("Headers %s",headers) + logging.error(" %s",body) + + def on_disconnected(self): + print('disconnected') + self.reconnect() diff --git a/morphemic-forecasting-eshybrid/main.py b/morphemic-forecasting-eshybrid/main.py new file mode 100644 index 00000000..dfa9ab8d --- /dev/null +++ b/morphemic-forecasting-eshybrid/main.py @@ -0,0 +1,15 @@ +import logging + +from forecasting import eshybrid + +logger = logging.getLogger() +logger.setLevel(logging.DEBUG) + +e = eshybrid.ESHybrid() + + +try: + e.start() +except KeyboardInterrupt: + e.stop() + diff --git a/morphemic-forecasting-eshybrid/messaging b/morphemic-forecasting-eshybrid/messaging new file mode 120000 index 00000000..6a99709a --- /dev/null +++ b/morphemic-forecasting-eshybrid/messaging @@ -0,0 +1 @@ +../amq-message-python-library \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/morphemic/__init__.py b/morphemic-forecasting-eshybrid/morphemic/__init__.py new file mode 100644 index 00000000..55b77fbf --- /dev/null +++ b/morphemic-forecasting-eshybrid/morphemic/__init__.py @@ -0,0 +1,6 @@ + + +from . import model +from . import configuration +from . import handler +from . import scheduler diff --git a/morphemic-forecasting-eshybrid/morphemic/configuration.py b/morphemic-forecasting-eshybrid/morphemic/configuration.py new file mode 100644 index 00000000..851431bf --- /dev/null +++ b/morphemic-forecasting-eshybrid/morphemic/configuration.py @@ -0,0 +1,232 @@ +def get_config(dataset_name): + """ + Returns dict config + + Parameters + ---------- + dataset_name: str + """ + allowed_dataset_names = ('Yearly', 'Monthly', 'Weekly', 'Hourly', 'Quarterly', 'Daily') + if dataset_name not in allowed_dataset_names: + raise ValueError(f'kind must be one of {allowed_kinds}') + + if dataset_name == 'Yearly': + return YEARLY + elif dataset_name == 'Monthly': + return MONTHLY + elif dataset_name == 'Weekly': + return WEEKLY + elif dataset_name == 'Hourly': + return HOURLY + elif dataset_name == 'Quarterly': + return QUARTERLY + elif dataset_name == 'Daily': + return DAILY + +YEARLY = { + 'device': 'cuda', + 'train_parameters': { + 'max_epochs': 25, + 'batch_size': 4, + 'freq_of_test': 5, + 'learning_rate': '1e-4', + 'lr_scheduler_step_size': 10, + 'lr_decay': 0.1, + 'per_series_lr_multip': 0.8, + 'gradient_clipping_threshold': 50, + 'rnn_weight_decay': 0, + 'noise_std': 0.001, + 'level_variability_penalty': 100, + 'testing_percentile': 50, + 'training_percentile': 50, + 'ensemble': False + }, + 'data_parameters': { + 'max_periods': 25, + 'seasonality': [], + 'input_size': 4, + 'output_size': 6, + 'frequency': 'Y' + }, + 'model_parameters': { + 'cell_type': 'LSTM', + 'state_hsize': 40, + 'dilations': [[1], [6]], + 'add_nl_layer': False, + 'random_seed': 117982 + } +} + +MONTHLY = { + 'device': 'cuda', + 'train_parameters': { + 'max_epochs': 15, + 'batch_size': 64, + 'freq_of_test': 4, + 'learning_rate': '7e-4', + 'lr_scheduler_step_size': 12, + 'lr_decay': 0.2, + 'per_series_lr_multip': 0.5, + 'gradient_clipping_threshold': 20, + 'rnn_weight_decay': 0, + 'noise_std': 0.001, + 'level_variability_penalty': 50, + 'testing_percentile': 50, + 'training_percentile': 45, + 'ensemble': False + }, + 'data_parameters': { + 'max_periods': 36, + 'seasonality': [12], + 'input_size': 12, + 'output_size': 18, + 'frequency': 'M' + }, + 'model_parameters': { + 'cell_type': 'LSTM', + 'state_hsize': 50, + 'dilations': [[1, 3, 6, 12]], + 'add_nl_layer': False, + 'random_seed': 1 + } +} + + +WEEKLY = { + 'device': 'cuda', + 'train_parameters': { + 'max_epochs': 50, + 'batch_size': 32, + 'freq_of_test': 10, + 'learning_rate': '1e-2', + 'lr_scheduler_step_size': 10, + 'lr_decay': 0.5, + 'per_series_lr_multip': 1.0, + 'gradient_clipping_threshold': 20, + 'rnn_weight_decay': 0, + 'noise_std': 0.001, + 'level_variability_penalty': 100, + 'testing_percentile': 50, + 'training_percentile': 50, + 'ensemble': True + }, + 'data_parameters': { + 'max_periods': 31, + 'seasonality': [], + 'input_size': 10, + 'output_size': 13, + 'frequency': 'W' + }, + 'model_parameters': { + 'cell_type': 'ResLSTM', + 'state_hsize': 40, + 'dilations': [[1, 52]], + 'add_nl_layer': False, + 'random_seed': 2 + } +} + +HOURLY = { + 'device': 'cuda', + 'train_parameters': { + 'max_epochs': 20, + 'batch_size': 32, + 'freq_of_test': 5, + 'learning_rate': '1e-2', + 'lr_scheduler_step_size': 7, + 'lr_decay': 0.5, + 'per_series_lr_multip': 1.0, + 'gradient_clipping_threshold': 50, + 'rnn_weight_decay': 0, + 'noise_std': 0.001, + 'level_variability_penalty': 30, + 'testing_percentile': 50, + 'training_percentile': 50, + 'ensemble': True + }, + 'data_parameters': { + 'max_periods': 371, + 'seasonality': [24, 168], + 'input_size': 24, + 'output_size': 48, + 'frequency': 'H' + }, + 'model_parameters': { + 'cell_type': 'LSTM', + 'state_hsize': 40, + 'dilations': [[1, 4, 24, 168]], + 'add_nl_layer': False, + 'random_seed': 1 + } +} + +QUARTERLY = { + 'device': 'cuda', + 'train_parameters': { + 'max_epochs': 30, + 'batch_size': 16, + 'freq_of_test': 5, + 'learning_rate': '5e-4', + 'lr_scheduler_step_size': 10, + 'lr_decay': 0.5, + 'per_series_lr_multip': 1.0, + 'gradient_clipping_threshold': 20, + 'rnn_weight_decay': 0, + 'noise_std': 0.001, + 'level_variability_penalty': 100, + 'testing_percentile': 50, + 'training_percentile': 50, + 'ensemble': False + }, + 'data_parameters': { + 'max_periods': 20, + 'seasonality': [4], + 'input_size': 4, + 'output_size': 8, + 'frequency': 'Q' + }, + 'model_parameters': { + 'cell_type': 'LSTM', + 'state_hsize': 40, + 'dilations': [[1, 2, 4, 8]], + 'add_nl_layer': False, + 'random_seed': 3 + } +} + +DAILY = { + 'device': 'cuda', + 'train_parameters': { + 'max_epochs': 20, + 'batch_size': 64, + 'freq_of_test': 2, + 'learning_rate': '1e-2', + 'lr_scheduler_step_size': 4, + 'lr_decay': 0.3333, + 'per_series_lr_multip': 0.5, + 'gradient_clipping_threshold': 50, + 'rnn_weight_decay': 0, + 'noise_std': 0.0001, + 'level_variability_penalty': 100, + 'testing_percentile': 50, + 'training_percentile': 65, + 'ensemble': False + }, + 'data_parameters': { + 'max_periods': 15, + 'seasonality': [7], + 'input_size': 7, + 'output_size': 14, + 'frequency': 'D' + }, + 'model_parameters': { + 'n_models': 5, + 'n_top': 4, + 'cell_type': + 'LSTM', + 'state_hsize': 40, + 'dilations': [[1, 7, 28]], + 'add_nl_layer': True, + 'random_seed': 1 + } +} diff --git a/morphemic-forecasting-eshybrid/morphemic/dataset.py b/morphemic-forecasting-eshybrid/morphemic/dataset.py new file mode 100644 index 00000000..055df50d --- /dev/null +++ b/morphemic-forecasting-eshybrid/morphemic/dataset.py @@ -0,0 +1,33 @@ + +import time, json, requests + + +class Dataset: + + def __init__(self, url, port,database,application,usernaname,password): + + self._url =url + self._port =port + self._database =database + self._application =application + self._usernaname =usernaname + self._password =password + + def count(self): + + + """ + curl -XPOST localhost:8086/api/v2/query -sS -H 'Accept:application/csv' -H 'Content-type:application/vnd.flux' -H 'Authorization: Token username:password' -d 'from(bucket:"telegraf")|> range(start:-5m) |> filter(fn:(r) => r._measurement == "cpu")' + """ + + url = self._url + username = self._port + password = self._password + database = self._database + application = 'demo' + params = '-sS' + + headers = {'Accept': 'application/csv', 'Content-type': 'application/vnd.flux','Authorization': 'Token '+username+':'+password} + data_post = 'from(bucket:"'+database+'")|> range(start:-5m)|> filter(fn:(r) => r._measurement == "'+application+'")' + + response = requests.post(url+'/api/v2/query',data=json.dumps(data_post),headers=headers) diff --git a/morphemic-forecasting-eshybrid/morphemic/dataset/__init__.py b/morphemic-forecasting-eshybrid/morphemic/dataset/__init__.py new file mode 100644 index 00000000..db2c96f2 --- /dev/null +++ b/morphemic-forecasting-eshybrid/morphemic/dataset/__init__.py @@ -0,0 +1,154 @@ +import os, json, time +from influxdb import InfluxDBClient +import pandas as pd +from datetime import datetime + +url_path_dataset = None + +class Row(): + def __init__(self, features,metricsname): + self.features = features + if "time" in self.features: + time_str = self.features["time"] + _obj = datetime.strptime(time_str,'%Y-%m-%dT%H:%M:%S.%fZ') + self.features["time"] = int(_obj.timestamp()) + if 'application' in metricsname: + metricsname.remove('application') + for field_name in metricsname: + if not field_name in self.features: + self.features[field_name] = None + + def getTime(self): + if "time" in self.features: + return self.features["time"] + if "timestamp" in self.features: + return self.features["timestamp"] + return None + + def makeCsvRow(self): + if "application" in self.features: + del self.features["application"] + result = '' + for key, _value in self.features.items(): + result += "{0},".format(_value) + return result[:-1] + "\n" + +class Dataset(): + def __init__(self): + self.rows = {} + self.size = 0 + def addRow(self,row): + self.rows[row.getTime()] = row + self.size +=1 + def reset(self): + self.rows = {} + self.size = 0 + print("Dataset reset") + def getSize(self): + return self.size + def sortRows(self): + return sorted(list(self.rows.values()), key=lambda x: x.getTime(), reverse=True) + def getRows(self): + return list(self.rows.values()) + def getRow(self,_time, tolerance): + for i in range(tolerance): + if int(_time + i) in self.rows: + return self.rows[int(_time+i)] + return None + def save(self,metricnames,application_name): + if "application" in metricnames: + metricnames.remove("application") + dataset_content = '' + for metric in metricnames: + dataset_content += "{0},".format(metric) + dataset_content = dataset_content[:-1] + "\n" + for row in list(self.rows.values()): + dataset_content += row.makeCsvRow() + _file = open(url_path_dataset + "{0}.csv".format(application_name),'w') + _file.write(dataset_content) + _file.close() + return url_path_dataset + "{0}.csv".format(application_name) + +class DatasetMaker(): + def __init__(self, application, start, configs): + self.application = application + self.start_filter = start + self.influxdb = InfluxDBClient(host=configs['hostname'], port=configs['port'], username=configs['username'], password=configs['password'], database=configs['dbname']) + self.dataset = Dataset() + self.tolerance = 5 + global url_path_dataset + url_path_dataset = configs['path_dataset'] + if url_path_dataset[-1] != "/": + url_path_dataset += "/" + + def getIndex(self, columns, name): + return columns.index(name) + + def makeRow(self,columns, values): + row = {} + index = 0 + for column in columns: + row[column] = values[index] + index +=1 + return row + + def prepareResultSet(self, result_set): + result = [] + columns = result_set["series"][0]["columns"] + series_values = result_set["series"][0]["values"] + index = 0 + for _values in series_values: + row = self.makeRow(columns,_values) + result.append(row) + return result + + def make(self): + try: + self.influxdb.ping() + except Exception as e: + print("Could not establish connexion with InfluxDB, please verify connexion parameters") + print(e) + return {"message": "Could not establish connexion with InfluxDB, please verify connexion parameters"} + if self.getData() == None: + return {"message":"No data found"} + + metricnames, _data = self.getData() + for _row in _data: + row = Row(_row,metricnames) + self.dataset.addRow(row) + + print("Rows construction completed") + print("{0} rows found".format(self.dataset.getSize())) + #self.dataset.sortRows() + url = self.dataset.save(metricnames,self.application) + features = self.getFeatures(url) + if features == None: + return {'status': False, 'message': 'An error occured while building dataset'} + return {'status': True,'url': url, 'application': self.application, 'features': features} + + def getFeatures(self, url): + try: + df = pd.read_csv(url) + return df.columns.to_list() + except Exception as e: + print("Cannot extract data feature list") + return None + + def extractMeasurement(self, _json): + return _json["series"][0]["columns"] + + def getData(self): + query = None + try: + if self.start_filter != None and self.start_filter != "": + query = "SELECT * FROM " + self.application +" WHERE time > now() - "+ self.start_filter + else: + query = "SELECT * FROM " + self.application + result_set = self.influxdb.query(query=query) + series = self.extractMeasurement(result_set.raw) + #self.influxdb.close() #closing connexion + return [series, self.prepareResultSet(result_set.raw)] + except Exception as e: + print("Could not collect query data points") + print(e) + return None diff --git a/morphemic-forecasting-eshybrid/morphemic/handler.py b/morphemic-forecasting-eshybrid/morphemic/handler.py new file mode 100644 index 00000000..4f1ca27a --- /dev/null +++ b/morphemic-forecasting-eshybrid/morphemic/handler.py @@ -0,0 +1,5 @@ + + +class ModelHandler: + def on_train(self): + pass diff --git a/morphemic-forecasting-eshybrid/morphemic/model.py b/morphemic-forecasting-eshybrid/morphemic/model.py new file mode 100644 index 00000000..1ce0ffbc --- /dev/null +++ b/morphemic-forecasting-eshybrid/morphemic/model.py @@ -0,0 +1,98 @@ +import logging +import threading +import time +import uuid +import pandas as pd +from ESRNN import ESRNN +from . import configuration + +_logger = logging.getLogger(__name__) + +class UUIDModel: + + esrnn = False + + + def __init__(self, id, config) -> None: + self.id = id + self.esrnn = ESRNN(max_epochs=config['train_parameters']['max_epochs'], + batch_size=config['train_parameters']['batch_size'], + freq_of_test=config['train_parameters']['freq_of_test'], + learning_rate=float(config['train_parameters']['learning_rate']), + lr_scheduler_step_size=config['train_parameters']['lr_scheduler_step_size'], + lr_decay=config['train_parameters']['lr_decay'], + per_series_lr_multip=config['train_parameters']['per_series_lr_multip'], + gradient_clipping_threshold=config['train_parameters']['gradient_clipping_threshold'], + rnn_weight_decay=config['train_parameters']['rnn_weight_decay'], + noise_std=config['train_parameters']['noise_std'], + level_variability_penalty=config['train_parameters']['level_variability_penalty'], + testing_percentile=config['train_parameters']['testing_percentile'], + training_percentile=config['train_parameters']['training_percentile'], + ensemble=config['train_parameters']['ensemble'], + max_periods=config['data_parameters']['max_periods'], + seasonality=config['data_parameters']['seasonality'], + input_size=config['data_parameters']['input_size'], + output_size=config['data_parameters']['output_size'], + frequency=config['data_parameters']['frequency'], + cell_type=config['model_parameters']['cell_type'], + state_hsize=config['model_parameters']['state_hsize'], + dilations=config['model_parameters']['dilations'], + add_nl_layer=config['model_parameters']['add_nl_layer'], + random_seed=config['model_parameters']['random_seed'], + device='cpu') + + +class ModelStatus(enumerate): + IDLE="IDLE" + TRAINNING="TRAINNING" + PREDICTING="PREDICTING" + +lock = threading.Lock() +class Model: + + status = ModelStatus.IDLE + + def __init__(self, handler=False) -> None: + self._handler= handler + # integrated here + self._model = False + + def _new_model(self, X_train_df, y_train_df, X_test_df, y_naive_df) -> UUIDModel: + _logger.debug("Training new model") + config = configuration.get_config('Custom') + model = UUIDModel(uuid.uuid4(),config) + model.esrnn.fit(X_train_df, y_train_df, X_test_df, y_naive_df) + return model + + + def _retrain(self): + with lock: + self.status = ModelStatus.TRAINNING + time.sleep(10) + _logger.debug("Model training finished") + + #dataset maker integration + # self._model = self._new_model() + + _logger.debug("set new model") + if self._handler: + self._handler.on_train() + self.status = ModelStatus.IDLE + + def train(self): + if self.status == ModelStatus.IDLE: + t = threading.Thread(target=self._retrain) + t.start() + else: + logging.warning("Already Training") + + def predict(self, metrics, times): + prediction = {} + for m in metrics: + prediction[m] = [] + if self._model: + prediction[m] = self._model.esrnn.predict(pd.DataFrame(data=times)) + else: + _logger.error("No model trainged yest") + + return prediction \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/morphemic/scheduler.py b/morphemic-forecasting-eshybrid/morphemic/scheduler.py new file mode 100644 index 00000000..16583272 --- /dev/null +++ b/morphemic-forecasting-eshybrid/morphemic/scheduler.py @@ -0,0 +1,61 @@ + +import datetime +import time +import logging + +logging.basicConfig(level=logging.DEBUG) + +class Scheduler: + + def __init__(self, epoch_start=0, forward_predictons=5, horizon=600): + """ + + :param epoch_start: The start UNIX UTC timestamp in milliseconds + :param forward_predictons: The number of forward predictions requested + :param horizon: The time horizon in seconds + """ + self._epoch_start = epoch_start/1000 + self._forward_predictions = forward_predictons + self._horizon = horizon + self._next_time = self.compute_next() + + logging.debug( + """ + Epoch: %s + Horizon: %s + Step: %s + Next: %s + Now: %s + + """ % (datetime.datetime.fromtimestamp(self._epoch_start/1000), + horizon, + forward_predictons, + datetime.datetime.fromtimestamp(self._next_time), + datetime.datetime.now(), + + ) + ) + + def compute_next(self): + step = int((time.time() - self._epoch_start) / self._horizon) + return self._epoch_start + ((step + 1) * self._horizon) + + def check(self, handler): + t = int(time.time()) + # logging.debug("Checking t = %s(%s) > next_time %s(%s) " % (datetime.datetime.fromtimestamp(t), t, datetime.datetime.fromtimestamp(self._next_time), self._next_time)) + times = [] + if t > self._next_time: + for i in range(0, self._forward_predictions): + # logging.info(" t%s %s ", i, datetime.datetime.fromtimestamp(self._next_time + ( i * self._horizon) ) ) + times.append(datetime.datetime.fromtimestamp(self._next_time + ( i * self._horizon) ) ) + + self._next_time = self.compute_next() + + if handler: + handler.on_schedule(times) + + +class Handler: + + def on_schedule(self, times): + pass \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/requirements.txt b/morphemic-forecasting-eshybrid/requirements.txt new file mode 100644 index 00000000..ced1896f --- /dev/null +++ b/morphemic-forecasting-eshybrid/requirements.txt @@ -0,0 +1,10 @@ +neptune-client +numpy +pandas +torch>=1.3.1 +matplotlib +seaborn +psutil +stomp.py +influxdb +python-daemon \ No newline at end of file -- GitLab From 0a02eca0e30162d1b088e9ef0ea7d910d7a65f27 Mon Sep 17 00:00:00 2001 From: Fotis Paraskevopoulos Date: Wed, 26 May 2021 13:11:33 +0300 Subject: [PATCH 2/2] Ignoring local folder --- morphemic-forecasting-eshybrid/.gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/morphemic-forecasting-eshybrid/.gitignore b/morphemic-forecasting-eshybrid/.gitignore index 139b3d7b..301d1604 100644 --- a/morphemic-forecasting-eshybrid/.gitignore +++ b/morphemic-forecasting-eshybrid/.gitignore @@ -1 +1 @@ -./local \ No newline at end of file +/local \ No newline at end of file -- GitLab