Commit e59bf1f8 authored by Fotis Paraskevopoulos's avatar Fotis Paraskevopoulos
Browse files

More logs, and initialization sequence

parent 3a03dfb3
......@@ -13,7 +13,9 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
forecast_config = dict()
scheduler = False
application ='default_application'
state = 'pending_metrics'
forecasting_version=False
_last_training_time=False
def __init__(self,config):
self._run=False
......@@ -117,16 +119,24 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
self._run = self.run()
if self._run:
logging.info("ESHYBRID_STARTED")
while self._run:
if self.scheduler:
self.scheduler.check(self)
else:
if len(self.metrics) <= 0 :
logging.info("Waiting for metrics_to_predict")
else:
logging.info("Waiting for start_forecasting")
time.sleep(1)
if self.state=='forecasting':
self.scheduler.check(self)
continue
if self.state=='pending_metrics':
logging.info("Waiting for metrics_to_predict")
continue
if self.state=='pending_forecast':
logging.info("Waiting for start_forecasting")
continue
self.connector.disconnect()
......@@ -156,13 +166,19 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
)
#adding simple method to retrain the model
if time.time() - self._last_training_time > 3000:
if self._last_training_time and time.time() - self._last_training_time > 3000:
self._train_model()
def _train_model(self):
self.dataset.make()
self._last_training_time = time.time()
if not os.path.exists("%s/%s.csv" % (self.data_set_path,self.application)):
logging.error("**** NO DATA FROM DATASET MAKER ****")
return
self.model.train("%s/%s.csv" % (self.data_set_path,self.application),self.metrics)
self._last_training_time = time.time()
def on_train(self,model):
......@@ -176,7 +192,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
def on_metrics_to_predict(self,res):
logging.debug("[2] Metrics to predict %s " % res)
if len(self.metrics):
if self.state != 'pending_metrics':
logging.warning("[2] We already have metrics >> %s " % self.metrics)
return
......@@ -185,12 +201,13 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
self.metrics.remove(metric)
self.metrics = [x['metric'] for x in res]
self.state = 'pending_forecast'
def on_stop_forecasting_eshybrid(self,res):
logging.debug("[6] Stop Subscribing %s " % res)
if not self.forecasting_version:
if self.state !='forecasting':
return
for metric in res[messaging.events.StopForecasting.METRICS]:
......@@ -202,12 +219,13 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
self.forecast_config = res
if not len(self.metrics):
self.scheduler = False
self.state='pending_forecast'
def on_start_forecasting_eshybrid(self,res):
logging.debug("[7] Start Forecasting %s " % res)
if self.forecasting_version and self.forecasting_version == res[messaging.events.StartForecasting.VERSION]:
if self.state == 'forecasting':
logging.warning("Already forecasting")
return
......@@ -216,6 +234,7 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
logging.error("Start forecasting before metrics to predict ")
return
for metric in res[messaging.events.StartForecasting.METRICS]:
if metric not in self.metrics:
logging.debug("Subscribing to %s " % metric)
......@@ -228,6 +247,8 @@ class ESHybrid(morphemic.handler.ModelHandler,messaging.listener.MorphemicListen
horizon= res[messaging.events.StartForecasting.PREDICTION_HORIZON]
)
self.state='forecasting'
def on_error(self, headers, body):
logging.error("Headers %s",headers)
logging.error(" %s",body)
......
import time, json, requests
class Dataset:
def __init__(self, url, port,database,application,usernaname,password):
self._url =url
self._port =port
self._database =database
self._application =application
self._usernaname =usernaname
self._password =password
def count(self):
"""
curl -XPOST localhost:8086/api/v2/query -sS -H 'Accept:application/csv' -H 'Content-type:application/vnd.flux' -H 'Authorization: Token username:password' -d 'from(bucket:"telegraf")|> range(start:-5m) |> filter(fn:(r) => r._measurement == "cpu")'
"""
url = self._url
username = self._port
password = self._password
database = self._database
application = 'demo'
params = '-sS'
headers = {'Accept': 'application/csv', 'Content-type': 'application/vnd.flux','Authorization': 'Token '+username+':'+password}
data_post = 'from(bucket:"'+database+'")|> range(start:-5m)|> filter(fn:(r) => r._measurement == "'+application+'")'
response = requests.post(url+'/api/v2/query',data=json.dumps(data_post),headers=headers)
import os, json, time
import os, json, time
from influxdb import InfluxDBClient
import pandas as pd
import pandas as pd
from datetime import datetime
url_path_dataset = None
url_path_dataset = None
class Row():
def __init__(self, features,metricsname):
self.features = features
self.features = features
if "time" in self.features:
time_str = self.features["time"]
_obj = None
_obj = None
try:
_obj = datetime.strptime(time_str,'%Y-%m-%dT%H:%M:%S.%fZ')
except:
......@@ -20,7 +20,7 @@ class Row():
metricsname.remove('application')
for field_name in metricsname:
if not field_name in self.features:
self.features[field_name] = None
self.features[field_name] = None
def getTime(self):
......@@ -28,7 +28,7 @@ class Row():
return self.features["time"]
if "timestamp" in self.features:
return self.features["timestamp"]
return None
return None
def makeCsvRow(self):
if "application" in self.features:
......@@ -43,14 +43,14 @@ class Dataset():
self.rows = {}
self.size = 0
def addRow(self,row):
self.rows[row.getTime()] = row
self.rows[row.getTime()] = row
self.size +=1
def reset(self):
self.rows = {}
self.size = 0
print("Dataset reset")
def getSize(self):
return self.size
return self.size
def sortRows(self):
return sorted(list(self.rows.values()), key=lambda x: x.getTime(), reverse=True)
def getRows(self):
......@@ -59,7 +59,7 @@ class Dataset():
for i in range(tolerance):
if int(_time + i) in self.rows:
return self.rows[int(_time+i)]
return None
return None
def save(self,metricnames,application_name):
if "application" in metricnames:
metricnames.remove("application")
......@@ -95,7 +95,7 @@ class DatasetMaker():
for column in columns:
row[column] = values[index]
index +=1
return row
return row
def prepareResultSet(self, result_set):
result = []
......@@ -121,7 +121,7 @@ class DatasetMaker():
for _row in _data:
row = Row(_row,metricnames)
self.dataset.addRow(row)
print("Rows construction completed")
print("{0} rows found".format(self.dataset.getSize()))
#self.dataset.sortRows()
......@@ -130,26 +130,25 @@ class DatasetMaker():
if features == None:
return {'status': False, 'message': 'An error occured while building dataset'}
return {'status': True,'url': url, 'application': self.application, 'features': features}
def getFeatures(self, url):
try:
df = pd.read_csv(url)
return df.columns.to_list()
except Exception as e:
print("Cannot extract data feature list")
return None
return None
def extractMeasurement(self, _json):
return _json["series"][0]["columns"]
def getData(self):
query = None
query = None
try:
if self.start_filter != None and self.start_filter != "":
query = "SELECT * FROM " + self.application +" WHERE time > now() - "+ self.start_filter
else:
query = "SELECT * FROM " + self.application
query = "SELECT * FROM " + self.application
result_set = self.influxdb.query(query=query)
series = self.extractMeasurement(result_set.raw)
#self.influxdb.close() #closing connexion
......
......@@ -152,6 +152,7 @@ class Model:
self.status = ModelStatus.IDLE
def train(self, dataset_path, metrics):
_logger.info("Start training for %s in %s " % (metrics,dataset_path,))
if self.status == ModelStatus.IDLE:
t = threading.Thread(target=self._retrain,
......@@ -162,6 +163,7 @@ class Model:
def predict(self, metric, times):
_logger.debug("Request prediction for %s @ %s " % (metric,times,))
if not self._model:
_logger.error("No model trained yet")
return
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment