Commit d48c3008 authored by Fotis Paraskevopoulos's avatar Fotis Paraskevopoulos
Browse files

Adding sync options

parent d96be487
/local
\ No newline at end of file
/local
sync.cfg
sync.cfg.local
sync.cfg.production
\ No newline at end of file
......@@ -9,27 +9,33 @@ import datetime
class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListener, morphemic.scheduler.Handler):
id="eshybrid"
metrics= set()
forecast_config = dict()
scheduler = False
application ='default_application'
def __init__(self):
def __init__(self,config):
self._run=False
self.connector = messaging.morphemic.Connection('aaa', '111',host="147.102.17.76",port=61610)
self.id = (config['listener'] or {'id':'eshybrid'} )['id']
self.connector = messaging.morphemic.Connection(
config['messaging']['username'],
config['messaging']['password'],
host=config['messaging']['host'],
port=config['messaging']['port']
)
self.model = morphemic.model.Model(self)
os.makedirs("/tmp/dataset-maker", exist_ok=True)
influx = {'hostname': '147.102.17.76',
'port': 8086,
'username': 'morphemic',
'password': 'password',
'dbname': 'morphemic',
'path_dataset': '/tmp/dataset-maker'
self.application = config['persistence']['application']
os.makedirs(config['persistence']['path_dataset'], exist_ok=True)
influx = {'hostname': config['persistence']['host'],
'port': config['persistence']['port'],
'username': config['persistence']['username'],
'password': config['persistence']['password'],
'dbname': config['persistence']['dbname'],
'path_dataset': config['persistence']['path_dataset']
}
self.dataset = morphemic.dataset.DatasetMaker(
application='default_application',
application=self.application,
start=None,
configs=influx
......@@ -81,28 +87,30 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
time.sleep(1)
def on_schedule(self, times):
for m in self.metrics:
for t in times:
predictions = self.model.predict(m['metric'], t)
# predictions = self.model.predict(self.application, m, times)
# if not predictions:
# continue
for t in times:
logging.debug("Sending prediction for time %s(%s) " % (datetime.datetime.fromtimestamp(t), t))
for p in predictions[m['metric']]:
self.connector.send_to_topic(
"intermediate_prediction.eshybrid.%s" % m['metric'],
{
"metricValue": p['x'],
"timestamp": int(time.time()),
"probability": 0.98,
"confidence_interval": [float(8),float(15)],
"predictionTime": p['y'],
}
)
self.connector.send_to_topic(
"intermediate_prediction.eshybrid.%s" % m,
{
"metricValue": 12.43,
"timestamp": int(time.time()),
"probability": 0.98,
"confidence_interval": [float(8),float(15)],
"predictionTime":t,
}
)
# for p in predictions[m['metric']]:
def on_train(self):
self.connector.send_to_topic("training_models",
{
"metrics": [x['metric'] for x in self.metrics],
"metrics": self.metrics,
"forecasting_method": self.id,
"timestamp": int(time.time() * 1000)
})
......@@ -119,20 +127,10 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
logging.debug("[6] Metrics to predics %s " % res)
for metric in self.metrics:
logging.debug("Un-subscribing from %s " % metric)
self.connector.unsubscribe(metric,self.id)
self.connector.unsubscribe(metric.metric,self.id)
self.metrics.remove(metric)
self.metrics = res
# for metric in res[messaging.events.StopForecasting.METRICS]:
# if metric in self.metrics:
# logging.debug("Un-subscribing from %s " % metric)
# self.connector.unsubscribe(metric,self.id)
# self.metrics.remove(metric)
#
# self.forecast_config = res
# if not len(self.metrics):
# self.scheduler = False
self.metrics = [x['metric'] for x in res]
def on_stop_forecasting_eshybrid(self,res):
......
import logging
import configparser
import os
from forecasting import eshybrid
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
e = eshybrid.ESHybrid()
config_file = "%s/%s" % (os.getcwd(), "sync.cfg")
print("Config file %s ", config_file)
config = configparser.RawConfigParser()
config.read(config_file)
config_dict = dict(config)
e = eshybrid.ESHybrid(config)
try:
......
......@@ -12,35 +12,42 @@ _logger = logging.getLogger(__name__)
class UUIDModel:
esrnn = False
config = {}
def __init__(self, id, config) -> None:
self.id = id
self.esrnn = ESRNN(max_epochs=config['train_parameters']['max_epochs'],
batch_size=config['train_parameters']['batch_size'],
freq_of_test=config['train_parameters']['freq_of_test'],
learning_rate=float(config['train_parameters']['learning_rate']),
lr_scheduler_step_size=config['train_parameters']['lr_scheduler_step_size'],
lr_decay=config['train_parameters']['lr_decay'],
per_series_lr_multip=config['train_parameters']['per_series_lr_multip'],
gradient_clipping_threshold=config['train_parameters']['gradient_clipping_threshold'],
rnn_weight_decay=config['train_parameters']['rnn_weight_decay'],
noise_std=config['train_parameters']['noise_std'],
level_variability_penalty=config['train_parameters']['level_variability_penalty'],
testing_percentile=config['train_parameters']['testing_percentile'],
training_percentile=config['train_parameters']['training_percentile'],
ensemble=config['train_parameters']['ensemble'],
max_periods=config['data_parameters']['max_periods'],
seasonality=config['data_parameters']['seasonality'],
input_size=config['data_parameters']['input_size'],
output_size=config['data_parameters']['output_size'],
frequency=config['data_parameters']['frequency'],
cell_type=config['model_parameters']['cell_type'],
state_hsize=config['model_parameters']['state_hsize'],
dilations=config['model_parameters']['dilations'],
add_nl_layer=config['model_parameters']['add_nl_layer'],
random_seed=config['model_parameters']['random_seed'],
device='cpu')
self.config = config
self.model_by_metric={}
def model_for_metric(self,metric):
config = self.config
if not metric in self.model_by_metric:
self.model_by_metric[metric] = ESRNN(max_epochs=config['train_parameters']['max_epochs'],
batch_size=config['train_parameters']['batch_size'],
freq_of_test=config['train_parameters']['freq_of_test'],
learning_rate=float(config['train_parameters']['learning_rate']),
lr_scheduler_step_size=config['train_parameters']['lr_scheduler_step_size'],
lr_decay=config['train_parameters']['lr_decay'],
per_series_lr_multip=config['train_parameters']['per_series_lr_multip'],
gradient_clipping_threshold=config['train_parameters']['gradient_clipping_threshold'],
rnn_weight_decay=config['train_parameters']['rnn_weight_decay'],
noise_std=config['train_parameters']['noise_std'],
level_variability_penalty=config['train_parameters']['level_variability_penalty'],
testing_percentile=config['train_parameters']['testing_percentile'],
training_percentile=config['train_parameters']['training_percentile'],
ensemble=config['train_parameters']['ensemble'],
max_periods=config['data_parameters']['max_periods'],
seasonality=config['data_parameters']['seasonality'],
input_size=config['data_parameters']['input_size'],
output_size=config['data_parameters']['output_size'],
frequency=config['data_parameters']['frequency'],
cell_type=config['model_parameters']['cell_type'],
state_hsize=config['model_parameters']['state_hsize'],
dilations=config['model_parameters']['dilations'],
add_nl_layer=config['model_parameters']['add_nl_layer'],
random_seed=config['model_parameters']['random_seed'],
device='cpu')
return self.model_by_metric[metric]
class ModelStatus(enumerate):
IDLE="IDLE"
......@@ -58,17 +65,38 @@ class Model:
# integrated here
self._model = False
def _new_model(self, X_train_df, y_train_df, X_test_df, y_naive_df) -> UUIDModel:
def _new_model(self) -> UUIDModel:
_logger.debug("Training new model")
config = configuration.get_config('Custom')
config = configuration.get_config('Hourly')
model = UUIDModel(uuid.uuid4(),config)
model.esrnn.fit(X_train_df, y_train_df, X_test_df, y_naive_df)
return model
def _transform(self,data,metrics):
return pd.Dataframe(),pd.Dataframe(),pd.Dataframe(),pd.Dataframe()
def _transform(self,m,data):
def beautify(p):
p = p.sort_values('time')
p = p.rename(columns = {'application':'unique_id','time': 'ds'}, inplace = False)
return p
m_data = [ d for d in data[1] if d[m] ]
m_pd = pd.DataFrame(data=m_data)
m_pd = m_pd[['application','time',m]]
m_pd = m_pd.rename(columns = {m:'y'}, inplace = False)
rows = m_pd.shape[0]
m_pd_train_y= m_pd.iloc[0: int(rows * 0.70)]
m_pd_test_y= m_pd.iloc[m_pd_train_y.shape[0]-1: rows]
m_pd_train_x = m_pd_train_y[['application','time']]
m_pd_train_x['x'] = m
m_pd_test_x= m_pd_test_y[['application','time']]
m_pd_test_x['x'] = m
return beautify(m_pd_train_x),beautify(m_pd_train_y), beautify(m_pd_test_x) , None #beautify(m_pd_test_y)
def _retrain(self, metrics, data):
with lock:
......@@ -77,9 +105,10 @@ class Model:
_logger.debug("Starting training model")
model = self._new_model(
self._transform( metrics, data)
)
model = self._new_model()
for m in metrics:
args = self._transform( m, data)
model.model_for_metric(m).fit(*args)
_logger.debug("Model training finished")
......@@ -93,25 +122,22 @@ class Model:
def train(self,metrics=[], data=[]):
if self.status == ModelStatus.IDLE:
t = threading.Thread(target=self._retrain, args=(data,metrics))
t = threading.Thread(target=self._retrain, args=(metrics, data))
t.start()
else:
logging.warning("Already Training")
def predict(self, m, y):
prediction = {}
prediction[m] = [
{
"x": 12.34,
"y": y,
}
]
# for m in metrics:
# prediction[m] = []
# if self._model:
# prediction[m] = self._model.esrnn.predict(pd.DataFrame(data=times))
# else:
# _logger.error("No model trainged yest")
return prediction
\ No newline at end of file
def predict(self, application, metric, times):
if not self._model:
_logger.error("No model trained yet")
return
m_pd = pd.DataFrame(data=times, columns=['ds'])
m_pd.insert(0,'unique_id',application)
m_pd.insert(1,'x',metric)
ret= self._model.model_for_metric(metric).predict(m_pd)
return ret
\ No newline at end of file
......@@ -7,4 +7,5 @@ seaborn
psutil
stomp.py
influxdb
python-daemon
\ No newline at end of file
python-daemon
configparser
\ No newline at end of file
[persistance]
host=
port=
username=
password=
dbname=
path_dataset=
[messaging]
host=
port=
username=
password=
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment