Commit 4db3d6d4 authored by Jean-Didier Totow's avatar Jean-Didier Totow
Browse files

univariate fixes

parent f54ef6d7
Pipeline #16652 passed with stage
in 1 minute and 9 seconds
import os, json, time, stomp, pickle, logging
import os, json, time, stomp, pickle, logging
import pandas as pd
from os import path
from datetime import datetime
from threading import Thread
......@@ -22,7 +23,7 @@ ml_model_path = os.environ.get("ML_MODEL_PATH","./models_trained")
prediction_tolerance = os.environ.get("PREDICTION_TOLERANCE","85")
forecasting_method_name = os.environ.get("FORECASTING_METHOD_NAME","cnn")
#/////////////////////////////////////////////////////////////////////////////////
steps = 128
steps = int(os.environ.get("BACKWARD_STEPS","64"))
#/////////////////////////////////////////////////////////////////////////////////
influxdb_hostname = os.environ.get("INFLUXDB_HOSTNAME","147.102.17.76") #persistent_storage_hostname
influxdb_port = int(os.environ.get("INFLUXDB_PORT","8086"))
......@@ -33,7 +34,9 @@ influxdb_org = os.environ.get("INFLUXDB_ORG","morphemic")
start_forecasting_queue = os.environ.get("START_FORECASTING","/topic/start_forecasting.cnn")
metric_to_predict_queue = os.environ.get("METRIC_TO_PREDICT","/topic/metrics_to_predict")
#//////////////////////////////////////////////////////////////////////////////////
_time_column_name = 'time'
_time_column_name = os.environ.get("TIME_COLUMN","time")
_column_to_remove_str = os.environ.get("COLUMNS_TO_REMOVE","ems_time,level,name,application")
_column_to_remove = _column_to_remove_str.split(",")
_new_epoch = False
#logging
......@@ -149,6 +152,7 @@ class Forecaster(Thread):
self.publisher = publisher
self.target = target
self.application = application
self.tolerance_epoch_start = 1
self.features_dict = {}
self.stop = False
super(Forecaster,self).__init__()
......@@ -159,23 +163,35 @@ class Forecaster(Thread):
def setStop(self):
self.stop = True
def computeSleepingTime(self):
now = int(time.time())
return (now - self.epoch_start) // self.prediction_horizon, (now - self.epoch_start) % self.prediction_horizon
def run(self):
print("Forecaster started for target metric {0} ".format(self.target))
logging.info("Forecaster started for target metric {0} ".format(self.target))
while True:
if int(time.time()) < self.epoch_start:
diff = self.epoch_start - int(time.time())
print("Prediction starts in {0} sec".format(diff))
time.sleep(1)
continue
if int(time.time()) > self.epoch_start + self.tolerance_epoch_start:
n, sleeping_time = self.computeSleepingTime()
print("{0} sec sleeping time before publishing".format(sleeping_time))
time.sleep(sleeping_time)
self.epoch_start += self.prediction_horizon * n
if self.stop:
print("Forecaster stops after having receiving new epoch start")
logging.info("Forecaster stops after having receiving new epoch start")
break
self.features = self.manager.getFeatureInput(self.application)
if len(self.features) == 0:
print("Cannot proceed, number of feature is 0")
time.sleep(self.prediction_horizon)
self.epoch_start += self.prediction_horizon
continue
predictor = Predictor(self.application, self.target, steps, self.features)
predictor = Predictor(self.application,_time_column_name, _column_to_remove, self.target, steps, self.features)
response = predictor.predict()
index = 1
for v, prob,interval in response:
......@@ -229,8 +245,29 @@ class ForecastingManager():
else:
return None
def loadDataset(self, url_dataset):
try:
#return pd.read_csv(url_dataset, low_memory=False, on_bad_lines='skip')
return pd.read_csv(url_dataset, low_memory=False)
except Exception as e:
print("Could not load the dataset")
print(e)
return pd.DataFrame()
def getFeatureInput(self, application):
return [{'time':1602538628,'served_request':2110,'request_rate':426,'avgResponseTime':673.574009325832,'performance':0.626508734240462,'cpu_usage':31.6,'memory':71798784}]
#update dataset and return the last entry
response = self.prepareDataset(application)
if 'url' in response:
print("Dataset updated ...")
logging.info("Dataset updated ...")
data = self.loadDataset(response['url'])
return [data.to_dict('records')[len(data)-1]] #returning the last entry
print("Could not update datasets")
return []
#test_url = "/home/jean-didier/Projects/morphemic-preprocessor/forecaster-cnn/datasets/historicValues.csv"
#data = self.loadDataset(test_url)
#return [data.to_dict('records')[len(data)-1]] #returning the last entry
def getModelFromMetric(self, metric):
for key, model in self.applications.items():
......@@ -241,11 +278,6 @@ class ForecastingManager():
def startForecasting(self, data):
print("Start forecasting methods")
logging.info("Start forecasting methods")
_json = None
metrics = None
epoch_start = None
number_of_forward_forecasting = None
prediction_horizon = None
_json = json.loads(data)
metrics = _json['metrics']
......@@ -261,18 +293,21 @@ class ForecastingManager():
model.setPredictionHorizon(prediction_horizon)
model.setEpochStart(epoch_start)
self.trainModel(model)
def simulateForcasting(self):
data = {"metrics":["avgResponseTime","memory"],"timestamp":1623242615043,"epoch_start":1623242815041,"number_of_forward_predictions":8,"prediction_horizon":30}
#data = {"metrics":["avgResponseTime","memory"],"timestamp":1623242615043,"epoch_start":1623242815041,"number_of_forward_predictions":8,"prediction_horizon":30}
data = {"metrics":["AvgResponseTime"],"timestamp":int(time.time())+20,"epoch_start":int(time.time())+20,"number_of_forward_predictions":8,"prediction_horizon":30}
self.startForecasting(json.dumps(data))
def simulateMetricToPredict(self):
data = [
{"refersTo": "default_application", "level":1, "metric": "AvgResponseTime", "publish_rate":3000}
]
"""
data = [
{"refersTo": "default_application", "level":1, "metric": "avgResponseTime", "publish_rate":3000},
{"refersTo": "default_application", "level":1, "metric": "memory", "publish_rate":3000}
]
]"""
self.metricToPredict(json.dumps(data))
def metricToPredict(self, data):
......@@ -343,15 +378,23 @@ class ForecastingManager():
application = model.getApplication()
response = self.prepareDataset(application)
if not 'url' in response:
print("Cannot create dataset")
logging.info("Cannot create dataset")
print("Cannot create dataset, not data available")
logging.info("Cannot create dataset, not data available")
return None
#test_url = "/home/jean-didier/Projects/morphemic-preprocessor/forecaster-cnn/datasets/historicValues.csv"
model.setDatasetUrl(response['url'])
#model.setDatasetUrl("/home/jean-didier/Projects/morphemic/Morphemic_TimeSeries/datasets/ds.csv")
model.setDatasetCreationTime(time.time())
#model.setDatasetUrl(test_url)
data = self.loadDataset(response['url'])
#data = self.loadDataset(test_url)
if len(data) <= steps:
print("Not enough data to train the model, the CNN requires at least {0} samples, the dataset has {1} rows".format(steps, len(data)))
logging.info("Not enough data to train the model, the CNN requires at least {0} samples, the dataset has {1} rows".format(steps, len(data)))
return False
#model.setDatasetUrl(test_url)
#model.setDatasetCreationTime(time.time())
#start training ml (application, url, metrics)
metric = model.getMetric()
trainer = Train(application, metric, _time_column_name, model.getDatasetUrl(), model.getNumberOfForwardPredictions(), steps, model.getPredictionHorizon())
trainer = Train(application, metric, _time_column_name, _column_to_remove, model.getDatasetUrl(), model.getNumberOfForwardPredictions(), steps, model.getPredictionHorizon())
model.setMLModelStatus('started')
result = trainer.prepareTraining()
if len(result) > 0:
......@@ -367,16 +410,19 @@ class ForecastingManager():
worker = Forecaster(self, model.getPredictionHorizon(), model.getEpochStart(), self.publisher, model.getMetric(), model.getApplication())
worker.start()
self.workers.append(worker)
else:
model.setMLModelStatus('NotExist')
def publishTrainingCompleted(self, model):
data = model.getTrainingData()
for tr in data:
del tr['x_train']
del tr['y_train']
message = {"metrics": ["cpu_usage"], "forecasting_method":"cnn","timestamp": int(time.time())}
#print(data)
self.publisher.setParameters(message, "training_models")
self.publisher.send()
message = {"metrics": [tr['target']], "forecasting_method":"cnn","timestamp": int(time.time())}
print("Training information -> ", message)
logging.info(message)
self.publisher.setParameters(message, "training_models")
self.publisher.send()
def predict(self,application,model, target, features):
predictor = Predictor(application, target, steps, features)
......@@ -405,7 +451,7 @@ class ForecastingManager():
#self.simulateMetricToPredict()
#time.sleep(10)
#self.simulateForcasting()
#time.sleep(100)
#time.sleep(2)
#self.simulateForcasting()
while True:
for key, model in self.applications.items():
......
from pre_processing.preprocessing import load_data, percent_missing, datetime_conversion
from pre_processing.preprocessing import important_data, resample, resample_median, missing_data_handling, resample_quantile
from pre_processing.Data_transformation import reshape_data_single_lag, series_to_supervised, \
prediction_and_score_for_CNN
prediction_and_score_for_CNN, missing_values_imputer
from models.ML_models import LSTM_model, CNN_model, CNN_model_multi_steps
from plots.plots import plot_train_test_loss
from pre_processing.Data_transformation import predictions_and_scores, Min_max_scal, Min_max_scal_inverse
from pre_processing.Data_transformation import split_sequences, split_sequences_multi_steps, prediction_interval
from pre_processing.Data_transformation import split_sequences, split_sequences_multi_steps, prediction_interval, split_sequences_univariate
#import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from numpy import hstack
import os, time, pickle, json, psutil
from os import path
from tensorflow import keras
......@@ -19,65 +20,20 @@ ml_model_path = os.environ.get("ML_MODEL_PATH","./models_trained")
ml_model = os.environ.get("ML_MODEL_PATH","./models")
#///////////////////////////////////////////////////////////////////////////////
#metrics = ['performance','request_rate', 'cpu_usage', 'memory','served_request']
"""
metrics = ['cpu_usage', 'memory', 'request_rate',]
data = load_data()
data = data.round(decimals=2)
data = missing_data_handling(data, rolling_mean=True)
percent_missing(data)
data = datetime_conversion(data, 'time')
print(data)
data = important_data(data, metrics)
print(data)
data = resample(data)
print(data)
data = Min_max_scal(data)
print(data)
#data = series_to_supervised(data, 24, 1)
#print(data)
X_train, y_train, X_test,y_test = split_sequences(data, n_steps=3)
print(X_train.shape)
print(X_test.shape)
print(y_train.shape)
print(y_test.shape)
# summarize the data
#for i in range(len(X_train)):
# print(X_train[i], y_train[i])
train_X, train_y, test_X, test_y, val_X, val_y = reshape_data_single_lag(data, 0.6, 0.2, 0.2 )
model = LSTM_model(train_X, train_y, test_X, test_y)
model.summary()
plot_train_test_loss(model)
predictions_and_scores(model, test_X, test_y)
model = CNN_model(n_steps=3, n_features=2, X=X_train, y=y_train, val_x=X_test, val_y=y_test)
plot_train_test_loss(model)
prediction_and_score_for_CNN(n_steps = 3,n_features=2, x_input=X_test, model=model,test_y=y_test)
model.summary()
"""
class Predictor():
def __init__(self, application, target, steps, features):
def __init__(self, application, _time_column, _column_to_remove, target, steps, features):
self.application = application
self.target = target
self.steps = steps
self.time_column = _time_column
self.column_to_remove = _column_to_remove
self.feature_dict = features
self.applications_model = None
self.loadModel()
def loadDataset(self, url):
try:
return pd.read_csv(url, low_memory=False, error_bad_lines=False)
return missing_values_imputer(pd.read_csv(url, low_memory=False))
except Exception as e:
print("Could not load the dataset")
print(e)
......@@ -103,11 +59,11 @@ class Predictor():
#data preparation
data = self.loadDataset(model_metadata["dataset_url"])
#if data.empty:
# return {'status': False, 'message': 'dataset empty', 'data': None}
for _dict in self.feature_dict:
data = data.append(_dict, ignore_index=True)
#return {'status': False, 'message': 'dataset empty', 'data': None}
#for _dict in self.feature_dict:
# data = data.append(_dict, ignore_index=True)
#data['memory'] = data['memory']/1000000
data = data.drop(columns=[self.target, 'time'])
data = data.drop(columns=[self.time_column])
#data = data.round(decimals=2)
#data = missing_data_handling(data, rolling_mean=True)
#percent_missing(data)
......@@ -115,12 +71,17 @@ class Predictor():
#important_features.remove(self.target)
#data = important_data(data, important_features)
important_feature = list(self.feature_dict[0].keys())
important_feature.remove(self.target)
important_feature.remove('time')
#important_feature.remove(self.target)
important_feature.remove(self.time_column)
for col in self.column_to_remove:
if col in important_feature:
important_feature.remove(col)
data = important_data(data, important_feature)
#print(important_feature)
#data, scaler = Min_max_scal(data)
data, scaler = Min_max_scal(data)
#print(data)
data = data.values
#data = data.values
new_sample = data[-self.steps:]
#new_sample = np.array(self.feature_dict)
new_sample = new_sample.reshape((1, self.steps, len(important_feature)))
......@@ -131,7 +92,7 @@ class Predictor():
predictor = keras.models.load_model(path)
#[lower_bound, y_predict, upper_bound] = prediction_interval(predictor, x_train, y_train, new_sample, alpha=0.05)
y_predict = predictor.predict(new_sample, verbose=0)
y_predict = Min_max_scal_inverse(scaler, y_predict)
y_predict = y_predict[0].astype('float')
returned_data = []
for v in y_predict:
......@@ -161,7 +122,7 @@ class MorphemicModel():
self.features = None
self.steps = None
self.epoch_start = None
self.number_of_forward_predictions = None
self.number_of_forward_predictions = 8
def getLowestPredictionProbability(self):
return self.lowest_prediction_probability
......@@ -261,11 +222,12 @@ class Model():
self.dataset_characteristics = properties
class Train():
def __init__(self, application, metric, _time_column_name, url_dataset, number_of_forward_forecasting, steps, prediction_horizon):
def __init__(self, application, metric, _time_column_name, _column_to_remove, url_dataset, number_of_forward_forecasting, steps, prediction_horizon):
self.application = application
self.metric = metric
self.features = None
self.time_column_name = _time_column_name
self.column_to_remove = _column_to_remove
self.applications_model = {}
self.url_dataset = url_dataset
self.number_of_foreward_forecating = number_of_forward_forecasting
......@@ -287,7 +249,7 @@ class Train():
def loadDataset(self):
try:
return pd.read_csv(self.url_dataset, low_memory=False, error_bad_lines=False)
return missing_values_imputer(pd.read_csv(self.url_dataset, low_memory=False))
except Exception as e:
print("Could not load the dataset")
print(e)
......@@ -315,6 +277,7 @@ class Train():
model.setUrlDataset(self.url_dataset)
key = None
if not response["status"]:
print(response['message'])
model.setStatus("Failed")
else:
key = self.makeKey(self.application, self.metric)
......@@ -331,8 +294,10 @@ class Train():
def train(self, target):
print("Target metric = {0}".format(target))
print("Sampling rate : {0}".format(self.prediction_horizon))
print("Sampling rate : {0} Seconde".format(self.prediction_horizon))
data = self.loadDataset()
if len(str(data[self.time_column_name][0])) == 19:
data[self.time_column_name] = data[self.time_column_name]/1000000000
#data['memory'] = data['memory']/1000000
if len(data) == 0:
return {"status": False, "message": "An error occured while loading the dataset", "data": None}
......@@ -341,28 +306,38 @@ class Train():
if not target in self.features:
return {"status": False, "message": "target not in features list", "data": None}
data[target+"_target"] = data[target] #we duplicate the targeted column
if not self.time_column_name in self.features:
return {"status": False, "message": "time field ({0}) not found in dataset".format(self.time_column_name), "data": None}
if not self.metric in self.features:
return {"status": False, "message": "Metric field ({0}) not found in dataset".format(metric), "data": None}
return {"status": False, "message": "Metric field ({0}) not found in dataset".format(self.metric), "data": None}
self.features.remove(target)
self.features.append(target)
#self.features.remove(target)
self.features.append(target+"_target")
self.features.remove(self.time_column_name)
for col in self.column_to_remove:
if col in self.features:
self.features.remove(col)
###########
_start = time.time()
data = data.round(decimals=2)
data = missing_data_handling(data, rolling_mean=True)
percent_missing(data)
missing_data = percent_missing(data)
print("---Missing resume---")
print(missing_data)
print("---End resume---")
data = datetime_conversion(data, self.time_column_name)
data = important_data(data, self.features)
sampling_rate = '{0}S'.format(self.prediction_horizon)
data = resample_quantile(data, sampling_rate)
print(data)
data = resample(data, sampling_rate)
if len(data) * 0.33 < self.steps:
return {"status": False, "message": "No enough data after sampling", "data": None} #this will be removed
data, scaler = Min_max_scal(data)
data = data[~np.isnan(data).any(axis=1)]
#X_train, y_train, X_test,y_test = split_sequences(data, n_steps=steps)
#X_train, y_train, X_test,y_test = split_sequences_univariate(data, n_steps=self.number_of_foreward_forecating)
X_train, y_train, X_test,y_test = split_sequences_multi_steps(data, n_steps_in=self.steps, n_steps_out=self.number_of_foreward_forecating)
model = CNN_model_multi_steps(n_steps=self.steps, n_features=len(self.features)-1, X=X_train, y=y_train, val_x=X_test, val_y=y_test, n_steps_out=self.number_of_foreward_forecating)
prediction_and_score_for_CNN(n_steps = self.steps,n_features=len(self.features)-1, x_input=X_test, model=model,test_y=y_test)
......
......@@ -5,6 +5,7 @@ import numpy as np
# convert series to supervised learning
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.impute import SimpleImputer
from numpy import concatenate
from math import sqrt
......@@ -165,6 +166,7 @@ def split_sequences(sequences, n_steps):
return array(X_train), array(y_train), array(X_test), array(y_test)
#multivariate, multisteps
#an other change here
def split_sequences_multi_steps(sequences, n_steps_in, n_steps_out):
X, y = list(), list()
for i in range(len(sequences)):
......@@ -181,6 +183,21 @@ def split_sequences_multi_steps(sequences, n_steps_in, n_steps_out):
X_train, X_test, y_train, y_test = train_test_split(np.array(X), np.array(y), test_size=0.33, random_state=42)
return array(X_train), array(y_train), array(X_test), array(y_test)
def split_sequences_univariate(sequences, n_steps):
X, y = list(), list()
for i in range(len(sequences)):
# find the end of this pattern
end_ix = i + n_steps
# check if we are beyond the dataset
if end_ix > len(sequences):
break
# gather input and output parts of the pattern
seq_x, seq_y = sequences[i:end_ix, :-1], sequences[end_ix-1, -1]
X.append(seq_x)
y.append(seq_y)
X_train, X_test, y_train, y_test = train_test_split(np.array(X), np.array(y), test_size=0.33, random_state=42)
return array(X_train), array(y_train), array(X_test), array(y_test)
def prediction_and_score_for_CNN(n_steps,n_features, x_input, model,test_y):
#x_input = x_input.reshape((1, n_steps, n_features))
yhat = model.predict(x_input, verbose=2)
......@@ -230,4 +247,21 @@ def prediction_interval(model, X_train, y_train, x0, alpha: float = 0.05):
qs = [100 * alpha / 2, 100 * (1 - alpha / 2)]
percentiles = np.percentile(C, q = qs)
return percentiles[0], model.predict(x0), percentiles[1]
\ No newline at end of file
return percentiles[0], model.predict(x0), percentiles[1]
def missing_values_imputer(data):
# data = self.load_data()
# TODO ---> make a configuration column
# TODO ---> use the missing values imputation based on configuration
if data.isna().any().any() == False:
pass
elif data.isna().any().any() == True:
names = data.columns
# example
imp = SimpleImputer(missing_values=np.nan, strategy='mean')
imp.fit(data.values)
data = imp.transform(data.values)
data = pd.DataFrame(data, columns=names)
return data
\ No newline at end of file
......@@ -27,7 +27,7 @@ def percent_missing(data):
missing_value_df = missing_value_df.drop(columns=['index'])
missing_value_df.sort_values('percent_missing', inplace=True, ascending=False)
print(missing_value_df) #TODO dont know for sure if it has to return something or just to print
#return missing_value_df #if needed we can use return here
return missing_value_df #if needed we can use return here
......@@ -141,8 +141,6 @@ def missing_data_handling(data ,drop_all_nan = False, fill_with_mean = False,
return data
def datetime_conversion(data, column_name):
data[column_name] = pd.to_datetime(data[column_name], unit='s')
data = data.set_index(column_name)
......@@ -156,7 +154,7 @@ def important_data(data, list_of_important_features):
#resampling 10 seconde
def resample(data, rate='30S'):
def resample(data, rate):
resampled_data= data.resample(rate).mean() #TODO maybe the dot in data_. will cause problems
return resampled_data
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment