Commit fc199d5c authored by Jean-Didier Totow's avatar Jean-Didier Totow
Browse files

exposing ems time as metric

parent c8e6edf1
Pipeline #16631 passed with stage
in 1 minute and 40 seconds
......@@ -23,7 +23,7 @@ ml_model_path = os.environ.get("ML_MODEL_PATH","./models_trained")
prediction_tolerance = os.environ.get("PREDICTION_TOLERANCE","85")
forecasting_method_name = os.environ.get("FORECASTING_METHOD_NAME","cnn")
#/////////////////////////////////////////////////////////////////////////////////
steps = int(os.environ.get("BACKWARD_STEPS","8"))
steps = int(os.environ.get("BACKWARD_STEPS","64"))
#/////////////////////////////////////////////////////////////////////////////////
influxdb_hostname = os.environ.get("INFLUXDB_HOSTNAME","147.102.17.76") #persistent_storage_hostname
influxdb_port = int(os.environ.get("INFLUXDB_PORT","8086"))
......@@ -34,8 +34,9 @@ influxdb_org = os.environ.get("INFLUXDB_ORG","morphemic")
start_forecasting_queue = os.environ.get("START_FORECASTING","/topic/start_forecasting.cnn")
metric_to_predict_queue = os.environ.get("METRIC_TO_PREDICT","/topic/metrics_to_predict")
#//////////////////////////////////////////////////////////////////////////////////
_time_column_name = 'time'
_column_to_remove = ['ems_time',"level"]
_time_column_name = os.environ.get("TIME_COLUMN","time")
_column_to_remove_str = os.environ.get("COLUMNS_TO_REMOVE","ems_time,level,name,application")
_column_to_remove = _column_to_remove_str.split(",")
_new_epoch = False
#logging
......@@ -171,10 +172,13 @@ class Forecaster(Thread):
logging.info("Forecaster started for target metric {0} ".format(self.target))
while True:
if int(time.time()) < self.epoch_start:
diff = self.epoch_start - int(time.time())
print("Prediction starts in {0} sec".format(diff))
time.sleep(1)
continue
if int(time.time()) > self.epoch_start + self.tolerance_epoch_start:
n, sleeping_time = self.computeSleepingTime()
print("{0} sec sleeping time before publishing".format(sleeping_time))
time.sleep(sleeping_time)
self.epoch_start += self.prediction_horizon * n
if self.stop:
......@@ -183,6 +187,7 @@ class Forecaster(Thread):
break
self.features = self.manager.getFeatureInput(self.application)
if len(self.features) == 0:
print("Cannot proceed, number of feature is 0")
time.sleep(self.prediction_horizon)
self.epoch_start += self.prediction_horizon
continue
......@@ -242,7 +247,8 @@ class ForecastingManager():
def loadDataset(self, url_dataset):
try:
return pd.read_csv(url_dataset, low_memory=False, on_bad_lines='skip')
#return pd.read_csv(url_dataset, low_memory=False, on_bad_lines='skip')
return pd.read_csv(url_dataset, low_memory=False)
except Exception as e:
print("Could not load the dataset")
print(e)
......@@ -258,6 +264,10 @@ class ForecastingManager():
return [data.to_dict('records')[len(data)-1]] #returning the last entry
print("Could not update datasets")
return []
#test_url = "/home/jean-didier/Projects/morphemic-preprocessor/forecaster-cnn/datasets/historicValues.csv"
#data = self.loadDataset(test_url)
#return [data.to_dict('records')[len(data)-1]] #returning the last entry
def getModelFromMetric(self, metric):
for key, model in self.applications.items():
......@@ -285,14 +295,19 @@ class ForecastingManager():
self.trainModel(model)
def simulateForcasting(self):
data = {"metrics":["avgResponseTime","memory"],"timestamp":1623242615043,"epoch_start":1623242815041,"number_of_forward_predictions":8,"prediction_horizon":30}
#data = {"metrics":["avgResponseTime","memory"],"timestamp":1623242615043,"epoch_start":1623242815041,"number_of_forward_predictions":8,"prediction_horizon":30}
data = {"metrics":["AvgResponseTime"],"timestamp":int(time.time())+20,"epoch_start":int(time.time())+20,"number_of_forward_predictions":8,"prediction_horizon":30}
self.startForecasting(json.dumps(data))
def simulateMetricToPredict(self):
data = [
{"refersTo": "default_application", "level":1, "metric": "AvgResponseTime", "publish_rate":3000}
]
"""
data = [
{"refersTo": "default_application", "level":1, "metric": "avgResponseTime", "publish_rate":3000},
{"refersTo": "default_application", "level":1, "metric": "memory", "publish_rate":3000}
]
]"""
self.metricToPredict(json.dumps(data))
def metricToPredict(self, data):
......@@ -366,14 +381,17 @@ class ForecastingManager():
print("Cannot create dataset, not data available")
logging.info("Cannot create dataset, not data available")
return None
#test_url = "/home/jean-didier/Projects/morphemic-preprocessor/forecaster-cnn/datasets/historicValues.csv"
model.setDatasetUrl(response['url'])
#model.setDatasetUrl(test_url)
data = self.loadDataset(response['url'])
#data = self.loadDataset(test_url)
if len(data) <= steps:
print("Not enough data to train the model, the CNN requires at least {0} samples, the dataset has {1} rows".format(steps, len(data)))
logging.info("Not enough data to train the model, the CNN requires at least {0} samples, the dataset has {1} rows".format(steps, len(data)))
return False
#model.setDatasetUrl("/home/jean-didier/Projects/morphemic/Morphemic_TimeSeries/datasets/ds.csv")
model.setDatasetCreationTime(time.time())
#model.setDatasetUrl(test_url)
#model.setDatasetCreationTime(time.time())
#start training ml (application, url, metrics)
metric = model.getMetric()
trainer = Train(application, metric, _time_column_name, _column_to_remove, model.getDatasetUrl(), model.getNumberOfForwardPredictions(), steps, model.getPredictionHorizon())
......@@ -433,7 +451,7 @@ class ForecastingManager():
#self.simulateMetricToPredict()
#time.sleep(10)
#self.simulateForcasting()
#time.sleep(100)
#time.sleep(2)
#self.simulateForcasting()
while True:
for key, model in self.applications.items():
......
......@@ -33,7 +33,7 @@ class Predictor():
def loadDataset(self, url):
try:
return missing_values_imputer(pd.read_csv(url, low_memory=False, on_bad_lines='skip'))
return missing_values_imputer(pd.read_csv(url, low_memory=False))
except Exception as e:
print("Could not load the dataset")
print(e)
......@@ -76,10 +76,12 @@ class Predictor():
for col in self.column_to_remove:
if col in important_feature:
important_feature.remove(col)
data = important_data(data, important_feature)
#print(important_feature)
#data, scaler = Min_max_scal(data)
data, scaler = Min_max_scal(data)
#print(data)
data = data.values
#data = data.values
new_sample = data[-self.steps:]
#new_sample = np.array(self.feature_dict)
new_sample = new_sample.reshape((1, self.steps, len(important_feature)))
......@@ -90,7 +92,7 @@ class Predictor():
predictor = keras.models.load_model(path)
#[lower_bound, y_predict, upper_bound] = prediction_interval(predictor, x_train, y_train, new_sample, alpha=0.05)
y_predict = predictor.predict(new_sample, verbose=0)
y_predict = Min_max_scal_inverse(scaler, y_predict)
y_predict = y_predict[0].astype('float')
returned_data = []
for v in y_predict:
......@@ -247,7 +249,7 @@ class Train():
def loadDataset(self):
try:
return missing_values_imputer(pd.read_csv(self.url_dataset, low_memory=False, on_bad_lines='skip'))
return missing_values_imputer(pd.read_csv(self.url_dataset, low_memory=False))
except Exception as e:
print("Could not load the dataset")
print(e)
......@@ -294,6 +296,8 @@ class Train():
print("Target metric = {0}".format(target))
print("Sampling rate : {0} Seconde".format(self.prediction_horizon))
data = self.loadDataset()
if len(str(data[self.time_column_name][0])) == 19:
data[self.time_column_name] = data[self.time_column_name]/1000000000
#data['memory'] = data['memory']/1000000
if len(data) == 0:
return {"status": False, "message": "An error occured while loading the dataset", "data": None}
......@@ -319,7 +323,10 @@ class Train():
_start = time.time()
data = data.round(decimals=2)
data = missing_data_handling(data, rolling_mean=True)
percent_missing(data)
missing_data = percent_missing(data)
print("---Missing resume---")
print(missing_data)
print("---End resume---")
data = datetime_conversion(data, self.time_column_name)
data = important_data(data, self.features)
......@@ -328,6 +335,7 @@ class Train():
if len(data) * 0.33 < self.steps:
return {"status": False, "message": "No enough data after sampling", "data": None} #this will be removed
data, scaler = Min_max_scal(data)
data = data[~np.isnan(data).any(axis=1)]
#X_train, y_train, X_test,y_test = split_sequences(data, n_steps=steps)
#X_train, y_train, X_test,y_test = split_sequences_univariate(data, n_steps=self.number_of_foreward_forecating)
X_train, y_train, X_test,y_test = split_sequences_multi_steps(data, n_steps_in=self.steps, n_steps_out=self.number_of_foreward_forecating)
......
......@@ -27,7 +27,7 @@ def percent_missing(data):
missing_value_df = missing_value_df.drop(columns=['index'])
missing_value_df.sort_values('percent_missing', inplace=True, ascending=False)
print(missing_value_df) #TODO dont know for sure if it has to return something or just to print
#return missing_value_df #if needed we can use return here
return missing_value_df #if needed we can use return here
......@@ -141,8 +141,6 @@ def missing_data_handling(data ,drop_all_nan = False, fill_with_mean = False,
return data
def datetime_conversion(data, column_name):
data[column_name] = pd.to_datetime(data[column_name], unit='s')
data = data.set_index(column_name)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment