From 89a1d15c6c6869d6410d4ee45ff01abb4732101e Mon Sep 17 00:00:00 2001 From: Fotis Paraskevopoulos Date: Wed, 28 Sep 2022 12:15:19 +0300 Subject: [PATCH 1/2] MOR-245 --- morphemic-forecasting-eshybrid/__init__.py | 3 +- .../forecasting/eshybrid.py | 6 +- ...ut.tfevents.1662549060.fotis.local.29134.0 | Bin 0 -> 40 bytes .../lightning_logs/version_0/hparams.yaml | 1 + .../morphemic/__init__.py | 3 +- .../morphemic/configuration.py | 14 +- .../morphemic/data.py | 55 ---- .../morphemic/model/__init__.py | 3 + .../morphemic/model/data.py | 148 +++++++++++ .../morphemic/model/preprocess_dataset.py | 247 ++++++++++++++++++ .../morphemic/model_manager.py | 144 +++++++--- .../morphemic/uuid_model.py | 46 ---- .../test_frequency.py | 19 ++ morphemic-forecasting-eshybrid/test_future.py | 93 ------- .../test_illustrative.py | 111 -------- morphemic-forecasting-eshybrid/test_model.py | 86 ++++++ .../test_processor.py | 51 ++++ 17 files changed, 674 insertions(+), 356 deletions(-) create mode 100644 morphemic-forecasting-eshybrid/lightning_logs/version_0/events.out.tfevents.1662549060.fotis.local.29134.0 create mode 100644 morphemic-forecasting-eshybrid/lightning_logs/version_0/hparams.yaml delete mode 100644 morphemic-forecasting-eshybrid/morphemic/data.py create mode 100644 morphemic-forecasting-eshybrid/morphemic/model/__init__.py create mode 100644 morphemic-forecasting-eshybrid/morphemic/model/data.py create mode 100644 morphemic-forecasting-eshybrid/morphemic/model/preprocess_dataset.py delete mode 100644 morphemic-forecasting-eshybrid/morphemic/uuid_model.py create mode 100644 morphemic-forecasting-eshybrid/test_frequency.py delete mode 100644 morphemic-forecasting-eshybrid/test_future.py delete mode 100644 morphemic-forecasting-eshybrid/test_illustrative.py create mode 100644 morphemic-forecasting-eshybrid/test_model.py create mode 100644 morphemic-forecasting-eshybrid/test_processor.py diff --git a/morphemic-forecasting-eshybrid/__init__.py b/morphemic-forecasting-eshybrid/__init__.py index 68c849b8..87fec3cd 100644 --- a/morphemic-forecasting-eshybrid/__init__.py +++ b/morphemic-forecasting-eshybrid/__init__.py @@ -1,4 +1,5 @@ from . import ESRNN from . import messaging -from . import forecasting \ No newline at end of file +from . import forecasting +from . import neuralforecast \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/forecasting/eshybrid.py b/morphemic-forecasting-eshybrid/forecasting/eshybrid.py index bced6f15..2ee9ecc4 100644 --- a/morphemic-forecasting-eshybrid/forecasting/eshybrid.py +++ b/morphemic-forecasting-eshybrid/forecasting/eshybrid.py @@ -37,7 +37,7 @@ class ESHybrid(morphemic.model_manager.ModelHandler, messaging.listener.Morphemi timeout=6000, keepalive=True ) - self.model = morphemic.model_manager.ModelManager(config['persistence']['application'],self) + self.model = morphemic.model_manager.ModelManager(self) self.application = config['persistence']['application'] self.data_set_path =config['persistence']['path_dataset'] os.makedirs(self.data_set_path, exist_ok=True) @@ -199,10 +199,6 @@ class ESHybrid(morphemic.model_manager.ModelHandler, messaging.listener.Morphemi _logger.warning("Model currently training") return - # ~/melodic-tests/forecastingModule/morphemic/influxHandlingScripts$ - # ./downloadMetricFromInflux.sh [metric] morphemic - # ./downloadMetricFromInflux.sh default_application morphemic - # downloaded/morphemic/default_application.csv dataset_results = self.dataset.make() if not dataset_results.get('status',False): _logger.error("**** NO DATA FROM DATASET MAKER ****") diff --git a/morphemic-forecasting-eshybrid/lightning_logs/version_0/events.out.tfevents.1662549060.fotis.local.29134.0 b/morphemic-forecasting-eshybrid/lightning_logs/version_0/events.out.tfevents.1662549060.fotis.local.29134.0 new file mode 100644 index 0000000000000000000000000000000000000000..734200a85f7209ce1c8f179639d24feb3b224fb6 GIT binary patch literal 40 rcmb1OfPlsI-b$PaM=b^AkKJ&T;!P?_%*@ksElbTSu`)W~6Y~lH!fFgZ literal 0 HcmV?d00001 diff --git a/morphemic-forecasting-eshybrid/lightning_logs/version_0/hparams.yaml b/morphemic-forecasting-eshybrid/lightning_logs/version_0/hparams.yaml new file mode 100644 index 00000000..0967ef42 --- /dev/null +++ b/morphemic-forecasting-eshybrid/lightning_logs/version_0/hparams.yaml @@ -0,0 +1 @@ +{} diff --git a/morphemic-forecasting-eshybrid/morphemic/__init__.py b/morphemic-forecasting-eshybrid/morphemic/__init__.py index 8e2925af..e454b796 100644 --- a/morphemic-forecasting-eshybrid/morphemic/__init__.py +++ b/morphemic-forecasting-eshybrid/morphemic/__init__.py @@ -1,7 +1,6 @@ -from . import uuid_model +from . import model from . import model_manager from . import configuration -from . import data from . import scheduler diff --git a/morphemic-forecasting-eshybrid/morphemic/configuration.py b/morphemic-forecasting-eshybrid/morphemic/configuration.py index 05b9815c..f4fc3f6e 100644 --- a/morphemic-forecasting-eshybrid/morphemic/configuration.py +++ b/morphemic-forecasting-eshybrid/morphemic/configuration.py @@ -18,9 +18,9 @@ def get_config(dataset_name): "verbose":False }, 'train_parameters': { - 'max_epochs': 30, + 'max_epochs': 60, 'batch_size': 1, - 'freq_of_test': 5, + 'freq_of_test': 10, 'learning_rate': '1e-2', 'lr_scheduler_step_size': 7, 'lr_decay': 0.5, @@ -29,22 +29,22 @@ def get_config(dataset_name): 'rnn_weight_decay': 0, 'noise_std': 0.01, 'level_variability_penalty': 30, - 'testing_percentile': 40, - 'training_percentile': 60, + 'testing_percentile': 30, + 'training_percentile': 70, 'ensemble': True }, 'data_parameters': { 'max_periods': 120, - 'seasonality': [ 30 ], + 'seasonality': [ 1, 60 ], 'input_size': 1, 'output_size': 3600, - 'frequency': 'S' + 'frequency': '30S' }, 'model_parameters': { 'cell_type': 'LSTM', 'state_hsize': 40, 'dilations': [[1, 6]], - 'add_nl_layer': False, + 'add_nl_layer': True, 'random_seed': 1 } } diff --git a/morphemic-forecasting-eshybrid/morphemic/data.py b/morphemic-forecasting-eshybrid/morphemic/data.py deleted file mode 100644 index 5e80ca4f..00000000 --- a/morphemic-forecasting-eshybrid/morphemic/data.py +++ /dev/null @@ -1,55 +0,0 @@ -import time -import pandas as pd -import numpy as np -import logging -from datetime import datetime - -_logger = logging.getLogger(__name__) -_logger.setLevel(level=logging.DEBUG) - - -class Satinizer: - - def __init__(self, application): - self._application = application - - def to_train(self, metric, path): - - df = pd.read_csv(path) - df = df[['ems_time', metric]] - df = df.replace('None', np.nan) - df.dropna(inplace=True) - df.reset_index(drop=True, inplace=True) - df.rename(columns={'ems_time': 'ds', metric: 'y'}, inplace=True) - - t=time.time() - first_ds = df[0:].ds.values[0] - last_ds = df[-1:].ds.values[0] - output_size = int(t-last_ds) - - - df['ds'] = pd.to_datetime(df['ds'], unit='s') - df['y'] = df['y'].astype(float) - - df = df.set_index('ds').resample('1S').asfreq() - df = df.shift(periods=output_size,freq='1S') - df = df.interpolate(method='linear') - df.reset_index(level=0, inplace=True) - - train_y = df.copy() - train_y.insert(0, column='unique_id', value=self._application) - - train_x = train_y[['unique_id', 'ds']] - train_x['x'] = metric - - _logger.info("Data to train for metric %s [%s - %s] required output %s" % (metric, first_ds,last_ds,output_size)) - - return train_x, train_y, first_ds, last_ds - - def to_predict(self, metric, times): - - m_pd = pd.DataFrame(data=[datetime.fromtimestamp(x) for x in times], columns=['ds']) - m_pd.insert(1, 'unique_id', self._application) - m_pd.insert(2, 'x', metric) - - return m_pd diff --git a/morphemic-forecasting-eshybrid/morphemic/model/__init__.py b/morphemic-forecasting-eshybrid/morphemic/model/__init__.py new file mode 100644 index 00000000..3ad8b309 --- /dev/null +++ b/morphemic-forecasting-eshybrid/morphemic/model/__init__.py @@ -0,0 +1,3 @@ + + +from . import data diff --git a/morphemic-forecasting-eshybrid/morphemic/model/data.py b/morphemic-forecasting-eshybrid/morphemic/model/data.py new file mode 100644 index 00000000..fa59d649 --- /dev/null +++ b/morphemic-forecasting-eshybrid/morphemic/model/data.py @@ -0,0 +1,148 @@ +import time +import pandas as pd +import numpy as np +import logging +import math +from datetime import datetime + +from . import preprocess_dataset + +_logger = logging.getLogger(__name__) +_logger.setLevel(level=logging.DEBUG) + + +seconds_per_unit = {"s": 1, "m": 60, "h": 3600, "d": 86400, "w": 604800} + + +class DataProcessor: + + last_ds = None + train_y = pd.DataFrame() + train_x = pd.DataFrame() + horizon = 8 + + def __init__(self, metric, horizon, path, frequency='30s'): + + self.horizon=horizon + self.shift_coefficient = 0 + self._ready=False + self._metric=metric + self._path=path + self._frequency=frequency + self._frequency_in_seconds= self._convert_to_seconds(frequency) + self._frequency_in_milliseconds= self._frequency_in_seconds*1000 + + def _convert_to_seconds(self, s): + return int(s[:-1]) * seconds_per_unit[s[-1]] + + + def to_train(self, metric, path): + + df = pd.read_csv(path) + df = df[['ems_time', metric]] + df = df.replace('None', np.nan) + df.dropna(inplace=True) + df.reset_index(drop=True, inplace=True) + df.rename(columns={'ems_time': 'ds', metric: 'y'}, inplace=True) + + t=time.time() + first_ds = df[0:].ds.values[0] + last_ds = df[-1:].ds.values[0] + output_size = int(t-last_ds) + + df['ds'] = pd.to_datetime(df['ds'], unit='s') + df['y'] = df['y'].astype(float) + + df = df.set_index('ds').resample('1S').asfreq() + # df = df.shift(periods=output_size,freq='1S') + df = df.interpolate(method='linear') + df.reset_index(level=0, inplace=True) + + train_y = df.copy() + train_y.insert(0, column='unique_id', value='0') + + train_x = train_y[['unique_id', 'ds']] + train_x['x'] = metric + + _logger.info("Data to train for metric %s [%s - %s] shifted by %s [%s - %s ] " % (metric, first_ds, last_ds, output_size, df[0:].ds.values[0], df[-1:].ds.values[0])) + + return train_x, train_y, first_ds, last_ds + + + def _sanitize(self,dataframe): + dataframe.rename(columns={'series':'unique_id','ems_time': 'ds', self._metric: 'y'}, inplace=True) + dataframe['ds'] = pd.to_datetime(dataframe['ds'], unit='ms') + dataframe['y'] = dataframe['y'].astype(float) + dataframe['unique_id'] = dataframe['unique_id'].astype(str) + return dataframe + + def _original(self, df): + original_y = df[['series','ems_time',self._metric]] + return self._sanitize(original_y) + + def _pre_process_series(self, series, shift_periods, dataset_frame): + + train_y = dataset_frame[dataset_frame['series'] == series] + + train_y = self._sanitize(train_y) + train_y = train_y.set_index('ds').asfreq(self._frequency) + train_y = train_y.shift(periods=shift_periods,freq=self._frequency) + + train_y = train_y.dropna() + train_y.reset_index(level=0, inplace=True) + + return train_y.copy() + + def pre_process(self): + + dataset = pd.read_csv(self._path) + ts_dataset = preprocess_dataset.Dataset( + dataset, + target_column=self._metric, + prediction_length=self.horizon, + publish_rate=self._frequency_in_milliseconds, + ) + + if len(ts_dataset.series_idxs) <=0: + logging.warning("Not enough data to train model") + return + + df = ts_dataset.dataset[['series','ems_time', self._metric]] + + + self.original_y=self._original(df) + t = int(time.time()) + self.last_ds = df[-1:].ems_time.values[0] / 1000 + shift_periods = math.ceil((t - self.last_ds) / self._frequency_in_seconds ) + + train_y=pd.DataFrame() + for series_idx in ts_dataset.series_idxs: + train_y = pd.concat([train_y,self._pre_process_series(series_idx,shift_periods,df)]) + + + min = train_y['y'].min() + if min < 0: + self.shift_coefficient = abs(min) + + train_y['y'] = train_y['y']+abs(min) + train_x = train_y[['unique_id', 'ds']] + train_x['x'] = self._metric + + self.train_y=train_y + self.train_x=train_x + self._ready = True + + def is_ready(self): + return self._ready + + def shift(self,data_frame, key='y_hat'): + data_frame[key] = data_frame[key] - self.shift_coefficient + return data_frame + + def to_predict(self, metric, times, series): + + m_pd = pd.DataFrame(data=[datetime.fromtimestamp(x) for x in times], columns=['ds']) + m_pd.insert(1, 'unique_id', series or self._application) + m_pd.insert(2, 'x', metric) + + return m_pd diff --git a/morphemic-forecasting-eshybrid/morphemic/model/preprocess_dataset.py b/morphemic-forecasting-eshybrid/morphemic/model/preprocess_dataset.py new file mode 100644 index 00000000..e0e5c198 --- /dev/null +++ b/morphemic-forecasting-eshybrid/morphemic/model/preprocess_dataset.py @@ -0,0 +1,247 @@ +import pandas as pd +from pytorch_forecasting import TimeSeriesDataSet +from pytorch_forecasting.data import NaNLabelEncoder +import numpy as np +import logging +import time + +pd.options.mode.chained_assignment = None + +"""Script for preparing time series dataset from pythorch-forecasting package +TODO: add checking whether data consists of multiple series, handle nans values""" + + +class Dataset(object): + def __init__( + self, + dataset, + target_column="value", + time_column="ems_time", + tv_unknown_reals=[], + known_reals=[], + tv_unknown_cat=[], + static_reals=[], + classification=0, + context_length=40, + prediction_length=5, + publish_rate=10000, + ): + + self.max_missing_values = ( + 20 # max consecutive missing values allowed per series + ) + self.series_idxs=np.array([]) + self.target_column = target_column + self.time_column = time_column + self.tv_unknown_cat = tv_unknown_cat + self.known_reals = known_reals + self.tv_unknown_reals = tv_unknown_reals + self.static_reals = static_reals + self.classification = classification + self.context_length = context_length + self.prediction_length = prediction_length + self.publish_rate = publish_rate + self.dataset = dataset + self.dropped_recent_series = True # default set to be true + if self.dataset.shape[0] > 0: + self.check_gap() + self.n = dataset.shape[0] + if self.dataset.shape[0] > 0: + self.ts_dataset = self.create_time_series_dataset() + + def cut_nan_start(self, dataset): + dataset.index = range(dataset.shape[0]) + first_not_nan_index = dataset[self.target_column].first_valid_index() + if first_not_nan_index == first_not_nan_index: # check is if it;s not np.nan + if first_not_nan_index is not None: + if first_not_nan_index > -1: + return dataset[dataset.index > first_not_nan_index] + else: + return dataset.dropna() + + def fill_na(self, dataset): + dataset = dataset.replace(np.inf, np.nan) + dataset = dataset.ffill(axis="rows") + return dataset + + def convert_formats(self, dataset): + if not self.classification: + dataset[self.target_column] = dataset[self.target_column].astype(float) + else: + dataset[self.target_column] = dataset[self.target_column].astype(int) + + for name in self.tv_unknown_cat: + dataset[name] = dataset[name].astype(str) + return dataset + + def convert_time_to_ms(self): + if self.dataset.shape[0] > 0: + digit_len = len(str(int(self.dataset[self.time_column].values[0]))) + if digit_len >= 13: + self.dataset[self.time_column] = self.dataset[self.time_column].apply( + lambda x: int(str(int(x))[:13]) + ) + else: + self.dataset[self.time_column] = self.dataset[self.time_column].apply( + lambda x: int(int(str(int(x))[:digit_len]) * 10 ** (13 - digit_len)) + ) + self.dataset[self.time_column] = self.dataset[self.time_column].apply( + lambda x: int(x // 1e4 * 1e4) + ) + + def add_obligatory_columns(self, dataset): + n = dataset.shape[0] + dataset["time_idx"] = range(n) # TODO check time gaps + return dataset + + def get_time_difference_current(self): + if self.dataset.shape[0] > 0: + last_timestamp_database = self.dataset[self.time_column].values[-1] + current_time = int(time.time()) + logging.info( + f"Time difference between last timestamp and current time: {current_time - last_timestamp_database}" + ) + + return current_time - last_timestamp_database + + def check_gap(self): + if (self.dataset.shape[0] > 0) and (self.target_column in self.dataset.columns): + self.dataset = self.dataset.groupby(by=[self.time_column]).min() + self.dataset[self.time_column] = self.dataset.index + self.dataset.index = range(self.dataset.shape[0]) + self.convert_time_to_ms() + self.dataset[self.target_column] = pd.to_numeric( + self.dataset[self.target_column], errors="coerce" + ).fillna(np.nan) + self.dataset = self.dataset.replace(np.inf, np.nan) + self.dataset = self.dataset.dropna(subset=[self.target_column]) + if self.dataset.shape[0] > 0: + max_gap = self.dataset[self.time_column].diff().abs().max() + logging.info( + f"Metric: {self.target_column} Max time gap in series {max_gap}" + ) + print(f" Metric: {self.target_column} Max time gap in series {max_gap}") + series_freq = ( + (self.dataset[self.time_column]) + .diff() + .fillna(0) + .value_counts() + .index.values[0] + ) + + logging.info( + f"Metric: {self.target_column} Detected series with {series_freq} frequency" + ) + print( + f"Metric: {self.target_column} Detected series with {series_freq} frequency" + ) + if series_freq != self.publish_rate: + logging.info( + f"Metric: {self.target_column} Detected series with {series_freq} frequency, but the frequency should be: {self.publish_rate}!" + ) + print( + f"Metric: {self.target_column} Detected series with {series_freq} frequency, but the frequency should be: {self.publish_rate}!" + ) + + # check series length + series = np.split( + self.dataset, + *np.where( + self.dataset[self.time_column] + .diff() + .abs() + .fillna(0) + .astype(int) + >= np.abs(self.max_missing_values * self.publish_rate) + ), + ) + logging.info(f"Metric: {self.target_column} {len(series)} series found") + print(f"{len(series)} series found") + preprocessed_series = [] + for i, s in enumerate(series): + s = self.fill_na(s) + s = self.cut_nan_start(s) + s = self.add_obligatory_columns(s) + s["split"] = "train" + s = self.convert_formats(s) + logging.info( + f"Metric: {self.target_column} Found series {i} of length: {s.shape[0]}, required data rows: {self.prediction_length * 2 + self.context_length}" + ) + if s.shape[0] > self.prediction_length + self.context_length: + s["series"] = i + preprocessed_series.append(s) + if i == len(series) - 1: + logging.info( + f"Metric: {self.target_column} Fresh data rows: {s.shape[0]}, required fresh data rows: {self.prediction_length * 2 + self.context_length}" + ) + + logging.info( + f"Metric: {self.target_column} {len(preprocessed_series)} long enough series found" + ) + print(f"{len(preprocessed_series)} long enough series found") + # logging.info(f"") + + if preprocessed_series: + self.dataset = pd.concat(preprocessed_series) + self.series_idxs=self.dataset.series.unique() + if self.dataset["series"].max() != len(series) - 1: + self.dropped_recent_series = True + else: + self.dropped_recent_series = False + else: + self.dataset = pd.DataFrame() + self.dropped_recent_series = True + self.dataset.index = range(self.dataset.shape[0]) + else: + self.dataset = pd.DataFrame() + self.dropped_recent_series = True + logging.info(f"metric: {self.target_column} no data found") + if self.dataset.shape[0] > 0: + self.get_time_difference_current() + + def inherited_dataset(self, split1, split2): + df1 = ( + self.dataset[lambda x: x.split == split1] + .groupby("series", as_index=False) + .apply(lambda x: x.iloc[-self.context_length :]) + ) # previous split fragment + df2 = self.dataset[lambda x: x.split == split2] # split part + inh_dataset = pd.concat([df1, df2]) + inh_dataset = inh_dataset.sort_values(by=["series", "time_idx"]) + inh_dataset = TimeSeriesDataSet.from_dataset( + self.ts_dataset, inh_dataset, min_prediction_idx=0, stop_randomization=True + ) + return inh_dataset + + def create_time_series_dataset(self): + if not self.classification: + self.time_varying_unknown_reals = [ + self.target_column + ] + self.tv_unknown_reals + self.time_varying_unknown_categoricals = self.tv_unknown_cat + else: + self.time_varying_unknown_reals = self.tv_unknown_reals + self.time_varying_unknown_categoricals = [ + self.target_column + ] + self.tv_unknown_cat + + ts_dataset = TimeSeriesDataSet( + self.dataset[lambda x: x.split == "train"], + time_idx="time_idx", + target=self.target_column, + categorical_encoders={"series": NaNLabelEncoder().fit(self.dataset.series)}, + group_ids=["series"], + time_varying_unknown_reals=[self.target_column], + min_encoder_length=self.context_length, + max_encoder_length=self.context_length, + max_prediction_length=self.prediction_length, + min_prediction_length=self.prediction_length, + add_relative_time_idx=False, + # allow_missings=False, + ) + return ts_dataset + + def get_from_dataset(self, dataset): + return TimeSeriesDataSet.from_dataset( + self.ts_dataset, dataset, min_prediction_idx=0, stop_randomization=True + ) diff --git a/morphemic-forecasting-eshybrid/morphemic/model_manager.py b/morphemic-forecasting-eshybrid/morphemic/model_manager.py index 4519301a..bae4a0d3 100644 --- a/morphemic-forecasting-eshybrid/morphemic/model_manager.py +++ b/morphemic-forecasting-eshybrid/morphemic/model_manager.py @@ -1,3 +1,4 @@ +from ESRNN import ESRNN import logging import threading import time @@ -5,8 +6,7 @@ import uuid import pandas as pd from . import configuration -from . import data -from .uuid_model import UUIDModel +from . import model _logger = logging.getLogger(__name__) _logger.setLevel(level=logging.DEBUG) @@ -22,20 +22,107 @@ class ModelStatus(enumerate): TRAINED = "TRAINED" PREDICTING = "PREDICTING" +lock = threading.Lock() + +class MetricModel: + + config = {} + model_by_metric = {} + + def __init__(self, id, metric, output_size, path) -> None: + self.id = id + + config = configuration.get_config('Morphemic').copy() + config['data_parameters'].update({ + 'output_size': output_size + }) + self._metric=metric + self.ready=False + self.config = config + self._data_processor= model.data.DataProcessor( + metric, + output_size, + path + ) + self._model = ESRNN(max_epochs=self.config['train_parameters']['max_epochs'], + batch_size=self.config['train_parameters']['batch_size'], + freq_of_test=self.config['train_parameters']['freq_of_test'], + learning_rate=float(config['train_parameters']['learning_rate']), + lr_scheduler_step_size=self.config['train_parameters'][ + 'lr_scheduler_step_size'], + lr_decay=self.config['train_parameters']['lr_decay'], + per_series_lr_multip=self.config['train_parameters'][ + 'per_series_lr_multip'], + gradient_clipping_threshold=self.config['train_parameters'][ + 'gradient_clipping_threshold'], + rnn_weight_decay=self.config['train_parameters']['rnn_weight_decay'], + noise_std=self.config['train_parameters']['noise_std'], + level_variability_penalty=self.config['train_parameters'][ + 'level_variability_penalty'], + testing_percentile=self.config['train_parameters']['testing_percentile'], + training_percentile=self.config['train_parameters']['training_percentile'], + ensemble=self.config['train_parameters']['ensemble'], + max_periods=self.config['data_parameters']['max_periods'], + seasonality=self.config['data_parameters']['seasonality'], + input_size=self.config['data_parameters']['input_size'], + output_size=self.config['data_parameters']['output_size'], + frequency=self.config['data_parameters']['frequency'], + cell_type=self.config['model_parameters']['cell_type'], + state_hsize=self.config['model_parameters']['state_hsize'], + dilations=self.config['model_parameters']['dilations'], + add_nl_layer=self.config['model_parameters']['add_nl_layer'], + random_seed=self.config['model_parameters']['random_seed'], + device='cpu') + + + def train(self): + if self.ready: + _logger.error("[%s] Already trained model for %s " % (self.id, self._metric) ) + self.ready=True + self._data_processor.pre_process() + + if not self._data_processor.is_ready(): + return False + + try: + _logger.info("[%s] Retraining for %s rows = outputsize %s " % ( + self.id, + self._metric, + self._data_processor.train_y.shape[0] + )) + self._model.fit( + self._data_processor.train_x, + self._data_processor.train_y, + ) + self.ready=True + except ValueError as e: + _logger.error("[%s] Couldn't prepare data for metric %s\n%s" % (self.id, self._metric,e)) + + return self.ready + + def predict(self,times): + + if not self.ready: + _logger.error("[%s] Can't predict with unready model " % self.id) + return + test_values = pd.DataFrame(data=times, columns=['ds']) + test_values = test_values.set_index('ds') + test_values['unique_id'] = self._data_processor.train_y['unique_id'].max() + test_values['x'] = self._metric + test_values.reset_index(level=0, inplace=True) + predictions = self._model.predict(test_values) + return self._data_processor.shift(predictions) -lock = threading.Lock() class ModelManager: status = ModelStatus.IDLE model_by_metric = {} - def __init__(self, application, handler=ModelHandler()) -> None: + def __init__(self, handler=ModelHandler()) -> None: self._handler = handler - # integrated here - self._model = None - self._dataHandler = data.Satinizer(application) + def is_idle(self): return self.status == ModelStatus.IDLE @@ -51,36 +138,30 @@ class ModelManager: else: logging.warning("Already Training") - def _new_model(self, **kwargs) -> UUIDModel: - _logger.debug("Training new model") - config = configuration.get_config('Morphemic') - - if kwargs: - for k in kwargs: - if k in config: - config.update(kwargs[k]) - model = UUIDModel(uuid.uuid4(), config) - return model def _retrain(self, metrics, output_size, path): with lock: self.status = ModelStatus.TRAINNING + while self.status != ModelStatus.TRAINED: _logger.debug("Starting training model") - model = self._new_model(data_paremeters = { - 'output_size': output_size - }) for m in metrics: try: - self._retrain_for_metric(model, m,path) - _logger.debug("Model training successful for %s ",m) + self.model_by_metric[m] = MetricModel( + uuid.uuid4(), + m, + output_size, + path) + ret = self.model_by_metric[m].train() + if ret: + _logger.debug("Model training successful for %s ",m) + except ValueError as e: _logger.error("Not enough data for metric %s - not training \n\n%s", (m,e)) - self._model = model _logger.debug("Updating trained model") self.status = ModelStatus.TRAINED if self._handler: @@ -90,27 +171,18 @@ class ModelManager: self.status = ModelStatus.IDLE - def _retrain_for_metric(self,model,metric,path): - try: - args = self._dataHandler.to_train(metric, path) - _logger.info("Retraining for %s - %s rows = outputsize %s " % (metric, args[0].shape[0], args[3])) - model.model_for_metric(metric).fit(args[0], args[1], verbose=True) - except ValueError as e: - _logger.error("Couldn't prepare data for metric %s\n%s" % (metric,e)) def predict(self, metric, times): _logger.debug("Request prediction for %s @ %s " % (metric,times,)) ret = pd.DataFrame() - if not self._model: - _logger.warning("No model trained yet") + if not self.model_by_metric[metric] \ + or not self.model_by_metric[metric].ready: + _logger.warning("No model available for metric [%s] " % metric) return pd.DataFrame() try: - df = self._model.model_for_metric(metric).predict( - self._dataHandler.to_predict(metric,times) - ) - ret = df + return self.model_by_metric[metric].predict(times) except AssertionError as e : _logger.error("Model not fitted yet \n\n%s\n\n ",e) except ValueError as e: diff --git a/morphemic-forecasting-eshybrid/morphemic/uuid_model.py b/morphemic-forecasting-eshybrid/morphemic/uuid_model.py deleted file mode 100644 index e5de47ce..00000000 --- a/morphemic-forecasting-eshybrid/morphemic/uuid_model.py +++ /dev/null @@ -1,46 +0,0 @@ -from ESRNN import ESRNN - -class UUIDModel: - - config = {} - model_by_metric = {} - - def __init__(self, id, config) -> None: - self.id = id - self.config = config - self.model_by_metric = {} - - def model_for_metric(self, metric): - config = self.config - if not metric in self.model_by_metric: - self.model_by_metric[metric] = ESRNN(max_epochs=config['train_parameters']['max_epochs'], - batch_size=config['train_parameters']['batch_size'], - freq_of_test=config['train_parameters']['freq_of_test'], - learning_rate=float(config['train_parameters']['learning_rate']), - lr_scheduler_step_size=config['train_parameters'][ - 'lr_scheduler_step_size'], - lr_decay=config['train_parameters']['lr_decay'], - per_series_lr_multip=config['train_parameters'][ - 'per_series_lr_multip'], - gradient_clipping_threshold=config['train_parameters'][ - 'gradient_clipping_threshold'], - rnn_weight_decay=config['train_parameters']['rnn_weight_decay'], - noise_std=config['train_parameters']['noise_std'], - level_variability_penalty=config['train_parameters'][ - 'level_variability_penalty'], - testing_percentile=config['train_parameters']['testing_percentile'], - training_percentile=config['train_parameters']['training_percentile'], - ensemble=config['train_parameters']['ensemble'], - max_periods=config['data_parameters']['max_periods'], - seasonality=config['data_parameters']['seasonality'], - input_size=config['data_parameters']['input_size'], - output_size=config['data_parameters']['output_size'], - frequency=config['data_parameters']['frequency'], - cell_type=config['model_parameters']['cell_type'], - state_hsize=config['model_parameters']['state_hsize'], - dilations=config['model_parameters']['dilations'], - add_nl_layer=config['model_parameters']['add_nl_layer'], - random_seed=config['model_parameters']['random_seed'], - device='cpu') - return self.model_by_metric[metric] - diff --git a/morphemic-forecasting-eshybrid/test_frequency.py b/morphemic-forecasting-eshybrid/test_frequency.py new file mode 100644 index 00000000..ef906f5a --- /dev/null +++ b/morphemic-forecasting-eshybrid/test_frequency.py @@ -0,0 +1,19 @@ +import datetime +import time +import pandas as pd + +t = time.time() + +ds = [ int(t) + i*30 for i in range(8)] + + +print(ds) + +pd_test = pd.DataFrame(data=ds, columns=['ds']) +pd_test['ds'] = pd.to_datetime(pd_test['ds'],unit='s') +pd_test = pd_test.set_index('ds').asfreq('30S') +pd_test['unique_id'] = '1' +pd_test['x'] = 'Hello' +pd_test = pd_test.shift(periods=8,freq='30S') + +print(pd_test) \ No newline at end of file diff --git a/morphemic-forecasting-eshybrid/test_future.py b/morphemic-forecasting-eshybrid/test_future.py deleted file mode 100644 index 155ba461..00000000 --- a/morphemic-forecasting-eshybrid/test_future.py +++ /dev/null @@ -1,93 +0,0 @@ -import logging -import configparser -import os -import argparse -import morphemic -import time -import datetime -import pandas as pd -import numpy as np -import plotly.express as px -import uuid - -from sklearn.metrics import mean_absolute_error,mean_squared_error,mean_absolute_percentage_error - - -from forecasting import eshybrid - -metrics = [ - "EstimatedRemainingTimeContext", - "SimulationLeftNumber", - "SimulationElapsedTime", - "NotFinishedOnTime", - "NotFinished", - "WillFinishTooSoonContext", - "NotFinishedOnTimeContext", - "MinimumCores", - "ETPercentile", - "RemainingSimulationTimeMetric", - "TotalCores" -] - - -metrics = [ - "MinimumCoresContext" -] - - -def smape(act,forc): - return 100/len(act) * np.sum(2 * np.abs(forc - act) / (np.abs(act) + np.abs(forc))) - - - -def test(c, metric): - - parser = argparse.ArgumentParser(description='Run eshybrid forecaster') - parser.add_argument('--config', help='Config file to run, default sync.cfg') - args = parser.parse_args() - - config_file = "%s/%s" % (os.getcwd(), args.config or "sync.cfg") - print("Config file %s ", config_file) - config = configparser.RawConfigParser() - config.read(config_file) - dataset_file = "/Users/fotisp/Development/python/morphemic/local/out/dataset-maker/default_application.csv" - print("Training %s " % dataset_file) - - data_handler = morphemic.data.Satinizer(config['persistence']['application']) - args = data_handler.to_train(metric,dataset_file) - config = morphemic.configuration.get_config(c) - - config['data_parameters'].update({ - 'output_size': 240 - }) - - model = morphemic.uuid_model.UUIDModel(uuid.uuid4(), config) - - t = time.time() - pd_test_times = data_handler.to_predict('MinimumCoresContext', - [ - int(datetime.datetime.utcfromtimestamp(t+(i * 30)).strftime('%s')) for i in range(1,8) - ]) - model.model_for_metric(metric) \ - .fit(args[0],args[1], verbose=True) - predictions=model.model_for_metric(metric).predict(pd_test_times) - - fig = px.line(args[1], x='ds', y='y', title="Interpolated metric %s" % metric) - fig.add_scatter(x=predictions['ds'], y=predictions['y_hat'], mode='lines', name="Predictions") - fig.show() - - - -def main(): - start = time.time() - for config in ['Morphemic']: - for m in metrics: - test(config,m) - - print("Testing took %s " % (time.time() - start) ) - - -if __name__ == '__main__': - main() - - diff --git a/morphemic-forecasting-eshybrid/test_illustrative.py b/morphemic-forecasting-eshybrid/test_illustrative.py deleted file mode 100644 index 6e287fd8..00000000 --- a/morphemic-forecasting-eshybrid/test_illustrative.py +++ /dev/null @@ -1,111 +0,0 @@ -import logging -import configparser -import os -import argparse -import morphemic -import time -import datetime -import pandas as pd -import numpy as np -import plotly.express as px -import uuid - -from sklearn.metrics import mean_absolute_error,mean_squared_error,mean_absolute_percentage_error - - -from forecasting import eshybrid - -metrics = [ -"EstimatedRemainingTimeContext", -"SimulationLeftNumber", -"SimulationElapsedTime", -"NotFinishedOnTime", -"NotFinished", -"WillFinishTooSoonContext", -"NotFinishedOnTimeContext", -"MinimumCores", -"ETPercentile", -"RemainingSimulationTimeMetric", -"TotalCores" -] - - -def smape(act,forc): - return 100/len(act) * np.sum(2 * np.abs(forc - act) / (np.abs(act) + np.abs(forc))) - - - -def test(c, metric): - - parser = argparse.ArgumentParser(description='Run eshybrid forecaster') - parser.add_argument('--config', help='Config file to run, default sync.cfg') - args = parser.parse_args() - - config_file = "%s/%s" % (os.getcwd(), args.config or "sync.cfg") - print("Config file %s ", config_file) - config = configparser.RawConfigParser() - config.read(config_file) - dataset_file = "%s/test_%s_%s.csv" % (config['persistence']['path_dataset'] , config['persistence']['application'],metric) - - df = pd.read_csv(dataset_file) - df.rename(columns={'time':'ems_time'},inplace=True) - df = df[["ems_time",metric]] - df.dropna(inplace=True) - df.reset_index(level=0, inplace=True) - - rows= df.shape[0] - pd_train_y = df.iloc[0: int(rows * 0.90)] - pd_train_y.reset_index(level=0, inplace=True) - pd_train_y.to_csv(dataset_file) - - pd_test_y= df.iloc[pd_train_y.shape[0]: rows] - pd_test_times = pd_test_y['ems_time'] - first_seconds = pd_test_y.iloc[0]['ems_time'] - last_seconds = pd_test_y.iloc[-1]['ems_time'] - - data_handler = morphemic.mode.DataHandler(config['persistence']['application']) - - config = morphemic.configuration.get_config(c) - config['data_parameters'].update({ - 'output_size':int(last_seconds-first_seconds+1)+30 - }) - - model = morphemic.model.UUIDModel(uuid.uuid4(), config) - - args = data_handler.to_train(metric,dataset_file) - print("Training %s " % dataset_file) - model.model_for_metric(metric).fit(args[0],args[1],verbose=False) - predictions=model.model_for_metric(metric).predict(data_handler.to_predict(metric,pd_test_times.to_numpy())) - print("%s -> MAE = %s" % (metric, mean_absolute_error(pd_test_y[metric].to_numpy(), predictions['y_hat'].to_numpy()))) - print("%s -> MSE = %s" % (metric, mean_squared_error(pd_test_y[metric].to_numpy(), predictions['y_hat'].to_numpy()))) - print("%s -> MAPE = %s" % (metric, mean_absolute_percentage_error(pd_test_y[metric].to_numpy(), predictions['y_hat'].to_numpy()))) - print("%s -> SMAPE = %s" % (metric, smape(pd_test_y[metric].to_numpy(), predictions['y_hat'].to_numpy()))) - - args = data_handler.to_train(metric,dataset_file) - plot_test_y = pd_test_y.copy() - plot_test_y['ems_time'] = pd.to_datetime(plot_test_y['ems_time'],unit='s') - - plot_train_y = pd_train_y.copy() - plot_train_y['ems_time'] = pd.to_datetime(pd_train_y['ems_time'],unit='s') - - fig = px.line(plot_train_y, x='ems_time', y=metric, title="Plotting test for %s with %s " % (metric,c)) - fig.add_scatter(x=args[1]['ds'], y=args[1]['y'], mode='lines', name="Interpolated") - fig.add_scatter(x=plot_test_y['ems_time'], y=plot_test_y[metric], mode='lines', name="Test") - fig.add_scatter(x=predictions['ds'], y=predictions['y_hat'], mode='lines', name="Predictions") - fig.show() - - - -def main(): - start = time.time() - for config in ['Morphemic']: - for m in metrics: - test(config,m) - - print("Testing took %s " % (time.time() - start) ) - - -if __name__ == '__main__': - main() - - diff --git a/morphemic-forecasting-eshybrid/test_model.py b/morphemic-forecasting-eshybrid/test_model.py new file mode 100644 index 00000000..ad1e7dfe --- /dev/null +++ b/morphemic-forecasting-eshybrid/test_model.py @@ -0,0 +1,86 @@ +import logging +import math + +import configparser +import os +import argparse +import morphemic +import time +import datetime +import pandas as pd +import numpy as np +import plotly.express as px +import uuid + +from morphemic.model_manager import MetricModel + +logging.basicConfig(level=logging.DEBUG) + +metrics = [ + "MinimumCoresContext" +] + + +def test(c, metric): + + + dataset_file = "/Users/fotisp/Development/python/morphemic/local/out/dataset-maker/default_application.csv" + print("Training %s " % dataset_file) + + horizon = 120 + frequency='30s' + data_processor= morphemic.model.data.DataProcessor( + metric, + horizon, + dataset_file + ) + data_processor.pre_process() + + if not data_processor.is_ready(): + return False + + + model= MetricModel( + "test", + metric, + horizon, + dataset_file + ) + test_values = data_processor.train_y[-8:].ds.values + pd_test = pd.DataFrame(data=test_values, columns=['ds']) + pd_test = pd_test.set_index('ds').asfreq(frequency) + pd_test['unique_id'] = data_processor.train_y['unique_id'].max() + pd_test['x'] = metric + pd_test = pd_test.shift(periods=horizon,freq=frequency) + pd_test.reset_index(level=0, inplace=True) + + model._model.fit( + data_processor.train_x, + data_processor.train_y, + shuffle=False, + verbose=True) + + predictions=model._model.predict(pd_test) + predictions = data_processor.shift(predictions) + + fig = px.line(data_processor.original_y, x='ds',y='y',title="Original Data") + for i in data_processor.train_y.unique_id.unique(): + fig.add_scatter(x=data_processor.train_y[data_processor.train_y['unique_id'] == i ]['ds'], y=data_processor.train_y[data_processor.train_y['unique_id'] ==i ]['y'], mode='lines', name="series "+i) + + print("Go predictions %s ",predictions) + fig.add_scatter(x=predictions['ds'], y=predictions['y_hat'], mode='lines', name="Predictions") + fig.show() + +def main(): + start = time.time() + for config in ['Morphemic']: + for m in metrics: + test(config,m) + + print("Testing took %s " % (time.time() - start) ) + + +if __name__ == '__main__': + main() + + diff --git a/morphemic-forecasting-eshybrid/test_processor.py b/morphemic-forecasting-eshybrid/test_processor.py new file mode 100644 index 00000000..7b27fdd8 --- /dev/null +++ b/morphemic-forecasting-eshybrid/test_processor.py @@ -0,0 +1,51 @@ +import logging + +import numpy as np + +from datetime import datetime,timedelta +from morphemic.model_manager import ModelHandler +from morphemic.model_manager import ModelManager + +logging.basicConfig(level=logging.DEBUG) + + +metrics = [ + "MinimumCoresContext" +] + + +def test(metric): + + + dataset_file = "/Users/fotisp/Development/python/morphemic/local/out/dataset-maker/default_application.csv" + + print("Training %s " % dataset_file) + + + class MyHandler(ModelHandler): + + def on_train(self,model_manager): + + times = model_manager.model_by_metric[metric]._data_processor.train_y[-8:].ds.values + + # move values forward by the window + times = [x + np.timedelta64(len(times)*30,'s') for x in times ] + + predictions = model_manager.predict(metric,times) + + print("Go predictions %s ",predictions) + + + model_manager= ModelManager(MyHandler()) + model_manager.train(dataset_file,[metric],) + + +def main(): + for m in metrics: + test(m) + + +if __name__ == '__main__': + main() + + -- GitLab From 81efd73a08d664bc665c89ec6b30f3ac95e25ab1 Mon Sep 17 00:00:00 2001 From: Fotis Paraskevopoulos Date: Fri, 30 Sep 2022 15:56:07 +0300 Subject: [PATCH 2/2] Fixing invalid requirement --- morphemic-forecasting-eshybrid/Dockerfile | 1 + morphemic-forecasting-eshybrid/requirements.txt | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/morphemic-forecasting-eshybrid/Dockerfile b/morphemic-forecasting-eshybrid/Dockerfile index b7327fa0..1be7f4aa 100644 --- a/morphemic-forecasting-eshybrid/Dockerfile +++ b/morphemic-forecasting-eshybrid/Dockerfile @@ -27,6 +27,7 @@ RUN cd /var/lib/morphemic/ \ && rm -rf /var/lib/morphemic COPY docker-entrypoint.sh /app +RUN chmod +x /app/docker-entrypoint.sh WORKDIR /app diff --git a/morphemic-forecasting-eshybrid/requirements.txt b/morphemic-forecasting-eshybrid/requirements.txt index b12aab7d..23c9ed8d 100644 --- a/morphemic-forecasting-eshybrid/requirements.txt +++ b/morphemic-forecasting-eshybrid/requirements.txt @@ -7,4 +7,5 @@ seaborn psutil influxdb python-daemon -configparser \ No newline at end of file +configparser +pytorch-forecasting==0.8.4 \ No newline at end of file -- GitLab