Commit 8fa45e2b authored by Marta Różańska's avatar Marta Różańska
Browse files

Merge branch 'iccs-eshybrid-2.0' into 'morphemic-rc2.0'

Adding start_forecasting version check, and fixing predictionTime

See merge request !266
parents 9e6376fb 0f620f61
Pipeline #20259 passed with stages
in 29 minutes and 33 seconds
......@@ -408,6 +408,7 @@ class StartForecasting(enumerate):
_match="start_forecasting."
VERSION = "version"
METRICS = "metrics"
'''metrics for which a certain method should start producing predictions'''
TIMESTAMP = "timestamp"
......
......@@ -13,6 +13,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
forecast_config = dict()
scheduler = False
application ='default_application'
forecasting_version=False
def __init__(self,config):
self._run=False
......@@ -130,7 +131,6 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
def on_schedule(self, times):
for m in self.metrics:
predictions = self.model.predict(
m,
......@@ -144,10 +144,10 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
t = row['ds']
payload = {
"metricValue": row['y_hat'],
"timestamp": int(t.strftime('%s')),
"timestamp": int(time.time()),
"probability": 0.98,
"confidence_interval": [float(8),float(15)],
"predictionTime": int(time.time()),
"predictionTime": t,
}
logging.debug("Sending prediction for time %s => %s " % (t, payload) )
self.connector.send_to_topic(
......@@ -175,7 +175,11 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
def on_metrics_to_predict(self,res):
logging.debug("[2] Metrics to predics %s " % res)
logging.debug("[2] Metrics to predict %s " % res)
if len(self.metrics):
logging.warning("[2] We already have metrics >> %s " % self.metrics)
return
for metric in self.metrics:
logging.debug("Un-subscribing from %s " % metric)
self.metrics.remove(metric)
......@@ -186,6 +190,9 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
def on_stop_forecasting_eshybrid(self,res):
logging.debug("[6] Stop Subscribing %s " % res)
if not self.forecasting_version:
return
for metric in res[messaging.events.StopForecasting.METRICS]:
if metric in self.metrics:
logging.debug("Un-subscribing from %s " % metric)
......@@ -200,6 +207,11 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
def on_start_forecasting_eshybrid(self,res):
logging.debug("[7] Start Forecasting %s " % res)
if self.forecasting_version and self.forecasting_version == res[messaging.events.StartForecasting.VERSION]:
logging.warning("Already forecasting")
return
self.forecasting_version = res[messaging.events.StartForecasting.VERSION]
if not self.metrics:
logging.error("Start forecasting before metrics to predict ")
return
......
......@@ -50,7 +50,8 @@ class Scheduler:
times = []
if t > self._next_time:
for i in range(0, self._forward_predictions):
logging.info(" t%s %s ", i, datetime.datetime.fromtimestamp(self._next_time + ( i * self._horizon) ) )
_t = self._next_time + ( i * self._horizon)
logging.info(" step-t%s %s %s ", i, _t, datetime.datetime.fromtimestamp(_t ))
times.append( int( datetime.datetime.timestamp(datetime.datetime.fromtimestamp(self._next_time + ( i * self._horizon) )) ))
self._next_time = self.compute_next()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment