Commit 6bbaa3b4 authored by Fotis Paraskevopoulos's avatar Fotis Paraskevopoulos
Browse files

Fixes

parent 8dd95e99
......@@ -6,7 +6,8 @@ import logging
import signal
import time
import socket
import datetime, pytz, time
import math
class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListener, morphemic.scheduler.Handler):
......@@ -142,23 +143,38 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
def on_schedule(self, times):
for m in self.metrics:
times_in_utc = [
int(datetime.datetime.utcfromtimestamp(x).strftime('%s')) for x in times
]
predictions = self.model.predict(
m,
times
times_in_utc
)
if predictions.empty:
logging.debug("No prediction available yet for metric[%s]",m)
continue
logging.debug("Got metric[%s] predictions %s",(m,predictions))
logging.debug("Got metric[%s] predictions",(m))
predictions.head()
for index,row in predictions.iterrows():
t = row['ds']
value= row['y_hat']
if math.isnan(value):
continue
payload = {
"metricValue": row['y_hat'],
"metricValue": value,
"timestamp": int(time.time()),
"probability": 0.98,
"confidence_interval": [float(8),float(15)],
"predictionTime": t.strftime('%s'),
"predictionTime": times[index],
"refersTo":"todo",
"cloud":"todo",
"provider":"todo"
}
logging.debug("Sending prediction for time %s => %s " % (t, payload) )
self.connector.send_to_topic(
......
......@@ -47,7 +47,7 @@ def test(c, metric):
config.read(config_file)
dataset_file = "%s/test_%s_%s.csv" % (config['persistence']['path_dataset'] , config['persistence']['application'],metric)
df = pd.read_csv("/tmp/illustrative/dataset.csv")
df = pd.read_csv(dataset_file)
df.rename(columns={'time':'ems_time'},inplace=True)
df = df[["ems_time",metric]]
df.dropna(inplace=True)
......
......@@ -15,10 +15,10 @@ def get_config(dataset_name):
return {
'device': 'cpu',
'runtime':{
"verbose":True
"verbose":False
},
'train_parameters': {
'max_epochs': 60,
'max_epochs': 30,
'batch_size': 1,
'freq_of_test': 5,
'learning_rate': '1e-2',
......@@ -31,7 +31,7 @@ def get_config(dataset_name):
'level_variability_penalty': 30,
'testing_percentile': 40,
'training_percentile': 60,
'ensemble': True
'ensemble': False
},
'data_parameters': {
'max_periods': 120,
......
......@@ -94,7 +94,7 @@ class DataHandler:
def to_predict(self, metric, times):
m_pd = pd.DataFrame(data=[datetime.utcfromtimestamp(x) for x in times], columns=['ds'])
m_pd = pd.DataFrame(data=[datetime.fromtimestamp(x) for x in times], columns=['ds'])
m_pd.insert(1, 'unique_id', self._application)
m_pd.insert(2, 'x', metric)
......@@ -136,25 +136,23 @@ class Model:
with lock:
self.status = ModelStatus.TRAINNING
while self.status != ModelStatus.TRAINED:
try:
_logger.debug("Starting training model")
_logger.debug("Starting training model")
model = self._new_model()
for m in metrics:
model = self._new_model()
for m in metrics:
try:
self._retrain_for_metric(model, m,path)
_logger.debug("Model training succesful for %s ",m)
self._model = model
_logger.debug("set new model")
self.status = ModelStatus.TRAINED
if self._handler:
self._handler.on_train(self)
except:
_logger.error("Not enough data - not training")
_logger.debug("Model training succesful for %s ",m)
except:
_logger.error("Not enough data for metric %s - not training", (m,))
self._model = model
_logger.debug("set new model")
self.status = ModelStatus.TRAINED
if self._handler:
self._handler.on_train(self)
_logger.info("Waiting for next training loop")
time.sleep(30)
time.sleep(15)
self.status = ModelStatus.IDLE
......@@ -171,12 +169,17 @@ class Model:
def predict(self, metric, times):
_logger.debug("Request prediction for %s @ %s " % (metric,times,))
ret = pd.DataFrame()
if not self._model:
_logger.error("No model trained yet")
return pd.DataFrame()
ret = self._model.model_for_metric(metric).predict(
self._dataHandler.to_predict(metric,times)
)
try:
df = self._model.model_for_metric(metric).predict(
self._dataHandler.to_predict(metric,times)
)
ret = df
except :
_logger.error("Couldn't fit predictions for %s ",(metric))
return ret
......@@ -27,7 +27,7 @@ class Scheduler:
Next: %s
Now: %s
""" % (datetime.datetime.fromtimestamp(self._epoch_start),
""" % (datetime.datetime.utcfromtimestamp(self._epoch_start),
horizon,
forward_predictons,
datetime.datetime.fromtimestamp(self._next_time),
......@@ -52,7 +52,7 @@ class Scheduler:
for i in range(0, self._forward_predictions):
_t = self._next_time + ( i * self._horizon)
logging.info(" step-t%s %s %s ", i, _t, datetime.datetime.fromtimestamp(_t ))
times.append( int( datetime.datetime.timestamp(datetime.datetime.fromtimestamp(self._next_time + ( i * self._horizon) )) ))
times.append(_t)
self._next_time = self.compute_next()
if handler:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment