diff --git a/forecaster-cnn/app.py b/forecaster-cnn/app.py index a35d0b1bf16b90cc6ee9841b95a5ac0b4ea87d98..28baa8fd4c06057e749a8d32c5e69a15b67fc4f6 100644 --- a/forecaster-cnn/app.py +++ b/forecaster-cnn/app.py @@ -1,11 +1,13 @@ import os, json, time, stomp, pickle, logging import pandas as pd +import numpy as np from os import path from datetime import datetime from threading import Thread from morphemic.dataset import DatasetMaker from main import Train, MorphemicModel, Predictor, retrain_interval import random +from pre_processing.preprocessing import resample, datetime_conversion from amq_client.MorphemicConnection import Connection from amq_client.MorphemicListener import MorphemicListener @@ -34,6 +36,7 @@ influxdb_dbname = os.environ.get("INFLUXDB_DBNAME","morphemic") influxdb_org = os.environ.get("INFLUXDB_ORG","morphemic") start_forecasting_queue = os.environ.get("START_FORECASTING","/topic/start_forecasting.cnn") metric_to_predict_queue = os.environ.get("METRIC_TO_PREDICT","/topic/metrics_to_predict") +univariate_mode = os.environ.get("UNIVARIATE_MODE","activated") #////////////////////////////////////////////////////////////////////////////////// _time_column_name = os.environ.get("TIME_COLUMN","ems_time") _column_to_remove_str = os.environ.get("COLUMNS_TO_REMOVE","time,level") @@ -181,7 +184,7 @@ class Forecaster(Thread): def run(self): print("Forecaster started for target metric {0} ".format(self.target)) logging.info("Forecaster started for target metric {0} ".format(self.target)) - _file = open(datasets_path+"/predictions_{0}.csv".format(self.target),"w") + #_file = open(datasets_path+"/predictions_{0}.csv".format(self.target),"w") while True: if int(time.time()) < self.epoch_start: diff = self.epoch_start - int(time.time()) @@ -197,7 +200,7 @@ class Forecaster(Thread): print("Forecaster stops after having receiving new epoch start") logging.info("Forecaster stops after having receiving new epoch start") break - self.features = self.manager.getFeatureInput(self.application) + self.features = self.manager.getFeatureInput(self.application, self.prediction_horizon) if len(self.features) == 0: print("Cannot proceed, number of feature is 0") time.sleep(self.prediction_horizon) @@ -215,6 +218,9 @@ class Forecaster(Thread): continue index = 1 for v, prob,interval in response: + if np.isnan(v): + index +=1 + continue prediction_time = self.epoch_start + index * self.prediction_horizon if simul_env == "desactivated": message = {"metricValue": v, "level": 1, "timestamp": int(time.time()), "probability": prob,"confidence_interval": interval, "predictionTime": prediction_time, "refersTo": self.application, "cloud": "aws", "provider": "provider"} @@ -225,9 +231,7 @@ class Forecaster(Thread): message = {"prediction": v} self.publisher.setParameters(message, "{0}Predictions".format(self.target)) self.publisher.send() - _file.write("{0},{1},{2}\n".format(prediction_time, v, index)) - - + #_file.write("{0},{1},{2}\n".format(prediction_time, v, index)) index +=1 time.sleep(self.prediction_horizon) self.epoch_start += self.prediction_horizon @@ -237,7 +241,7 @@ class Forecaster(Thread): self.publisher.send() self.psolver_notified = True - _file.close() + #_file.close() print("Forecaster for target : {0} stopped".format(self.target)) logging.info("Forecaster for target : {0} stopped".format(self.target)) @@ -292,16 +296,24 @@ class ForecastingManager(): print(e) return pd.DataFrame() - def getFeatureInput(self, application): + def getFeatureInput(self, application, prediction_horizon): #update dataset and return the last entry response = self.prepareDataset(application) if 'url' in response: print("Dataset updated ...") logging.info("Dataset updated ...") data = self.loadDataset(response['url']) + data.replace('None', np.nan, inplace=True) + data = datetime_conversion(data, _time_column_name) + data = data.astype(float) + data = data.interpolate(method='linear').ffill().bfill() + sampling_rate = '{0}S'.format(prediction_horizon) + data = resample(data, sampling_rate) + data = data.interpolate(method='linear').ffill().bfill() + data.dropna() return data.tail(self.steps) #returning the last entry print("Could not update datasets") - return [] + return pd.DataFrame() #test_url = "/home/jean-didier/Projects/morphemic-preprocessor/forecaster-cnn/datasets/historicValues.csv" #data = self.loadDataset(test_url) #return [data.to_dict('records')[len(data)-1]] #returning the last entry @@ -420,8 +432,8 @@ class ForecastingManager(): data = self.loadDataset(response['url']) #data = self.loadDataset(test_url) if len(data) <= steps: - print("Not enough data to train the model, the CNN requires at least {0} samples, the dataset has {1} rows".format(steps, len(data))) - logging.info("Not enough data to train the model, the CNN requires at least {0} samples, the dataset has {1} rows".format(steps, len(data))) + #print("Not enough data to train the model, the CNN requires at least {0} samples, the dataset has {1} rows".format(steps, len(data))) + #logging.info("Not enough data to train the model, the CNN requires at least {0} samples, the dataset has {1} rows".format(steps, len(data))) return False #model.setDatasetUrl(test_url) #model.setDatasetCreationTime(time.time()) diff --git a/forecaster-cnn/main.py b/forecaster-cnn/main.py index 9d20717f603956a196a0d5aba38af1a1f3658c18..d43093477a434dc27c34c9625b2da5764a6fa398 100755 --- a/forecaster-cnn/main.py +++ b/forecaster-cnn/main.py @@ -18,7 +18,8 @@ from tensorflow import keras #/////////////////////////////////////////////////////////////////////////////// ml_model_path = os.environ.get("ML_MODEL_PATH","./models_trained") ml_model = os.environ.get("ML_MODEL_PATH","./models") -retrain_interval = int(os.environ.get("RETRAIN_INTERVAL","600")) #in seconds +retrain_interval = int(os.environ.get("RETRAIN_INTERVAL","900")) #in seconds +univariate_mode = os.environ.get("UNIVARIATE_MODE","activated") #/////////////////////////////////////////////////////////////////////////////// class Predictor(): @@ -74,14 +75,22 @@ class Predictor(): #data = pd.DataFrame(self.feature_dict, ignore_index=True) important_feature = list(self.dataframe.columns.values) #important_feature.remove(self.target) - important_feature.remove(self.time_column) - for col in self.column_to_remove: - if col in important_feature: - important_feature.remove(col) + #important_feature.remove(self.time_column) + if univariate_mode == "deactivated": + for col in self.column_to_remove: + if col in important_feature: + if col in important_feature: + important_feature.remove(col) + else: + important_feature = [self.target] self.dataframe.replace('None', np.nan, inplace=True) self.dataframe = self.dataframe.astype(float) self.dataframe = self.dataframe.interpolate(method='linear') + + print("Input Sample to be prdicted ==============") + print(self.dataframe) + print("=======================================") #data = resample(data, sampling_rate) #data.dropna(inplace=True) #data = missing_values_imputer(data) @@ -331,11 +340,14 @@ class Train(): return {"status": False, "message": "Metric field ({0}) not found in dataset".format(self.metric), "data": None} #self.features.remove(target) - self.features.append(target+"_target") - self.features.remove(self.time_column_name) - for col in self.column_to_remove: - if col in self.features: - self.features.remove(col) + if univariate_mode == "deactivated": + self.features.append(target+"_target") + self.features.remove(self.time_column_name) + for col in self.column_to_remove: + if col in self.features: + self.features.remove(col) + else: + self.features = [target, target+"_target"] ########### _start = time.time() data = data.round(decimals=2) @@ -344,22 +356,22 @@ class Train(): data = datetime_conversion(data, self.time_column_name) sampling_rate = '{0}S'.format(self.prediction_horizon) data = data.astype(float) - data = data.interpolate(method='linear') - data = resample(data, sampling_rate) - data.dropna(inplace=True) + data = data.interpolate(method='linear').ffill().bfill() + data = resample(data, sampling_rate) #resampling can intriduce NaN values + data = data.interpolate(method='linear').ffill().bfill() #data = missing_values_imputer(data) #data = missing_data_handling(data, drop_all_nan=True, fill_w_forward=True, rolling_median=True) #data = missing_values_imputer(data) #data = missing_data_handling(data, drop_all_nan=True, fill_w_forward=True, rolling_median=True) data = important_data(data, self.features) + data.dropna() print("important data", data) - data.apply(lambda s: pd.to_numeric(s, errors='coerce').notnull().all()) - print("After resampling",data) + #data.apply(lambda s: pd.to_numeric(s, errors='coerce').notnull().all()) + #print("After resampling",data) if len(data) * 0.2 < self.steps: return {"status": False, "message": "No enough data after sampling", "data": None} #this will be removed #try: - target_column = data[target+"_target"] #data, scaler = Min_max_scal(data) data = data.values #data[target+"_target"] = target_column diff --git a/polymorphic_solver/src/.DS_Store b/polymorphic_solver/src/.DS_Store index 1f310839625d44cd3a9a7fb73bf64abf412d658b..043f686201500142b168e21145e39fbc45c42f9f 100644 Binary files a/polymorphic_solver/src/.DS_Store and b/polymorphic_solver/src/.DS_Store differ diff --git a/polymorphic_solver/src/xmi_camel_utilities.py b/polymorphic_solver/src/xmi_camel_utilities.py index 3fb4dc796f92e6921787372900cf8e6b5673993f..40be4236b840264b63638dfd98f9f3552f0ad052 100644 --- a/polymorphic_solver/src/xmi_camel_utilities.py +++ b/polymorphic_solver/src/xmi_camel_utilities.py @@ -1025,7 +1025,7 @@ class CamelAttribute(): self.name = "minRam{0}".format(self.component_name) if self.annotation == "totalMemoryHasMax": self.name = "maxRam{0}".format(self.component_name) - if self.annotation == "TotalMemory" or self.annotation == "hasNumberofCores_2": + if self.annotation == "TotalMemory" or self.annotation == "hasNumberofCores_2" or self.annotation == "hasNumberofCores" or self.annotation == "hasFPGANumber": self.name = self.component_name def getName(self): @@ -1040,7 +1040,7 @@ class CamelAttribute(): element = ET.Element("attributes") element.attrib["name"] = self.name element.attrib["annotations"] = self.annotation - if self.annotation == "TotalMemory" or self.annotation == "hasNumberofCores_2": + if self.annotation == "TotalMemory" or self.annotation == "hasNumberofCores_2" or self.annotation == "hasNumberofCores" or self.annotation == "hasFPGANumber": element.attrib["minInclusive"] = "true" element.attrib["maxInclusive"] = "true" if type(self.value) == type([]): diff --git a/profiler/src/models/.DS_Store b/profiler/src/models/.DS_Store index 916b5bd41cbcf92b3cae4408e9c523a58137a9f3..46a387198c185d358b83bf9e32a7a5610634f871 100644 Binary files a/profiler/src/models/.DS_Store and b/profiler/src/models/.DS_Store differ diff --git a/profiler/src/models/nn.model_C/.DS_Store b/profiler/src/models/nn.model_C/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..04c54f8a2cec6f2d19891230f45b5cbb3acb7233 Binary files /dev/null and b/profiler/src/models/nn.model_C/.DS_Store differ