From bb0aae35466f04b4ab8864d628c43df8945f5b58 Mon Sep 17 00:00:00 2001 From: Jean-Didier Date: Tue, 16 May 2023 10:54:59 +0200 Subject: [PATCH] improvement wp1 --- forecaster-cnn/app.py | 32 +++++++++---- forecaster-cnn/main.py | 44 +++++++++++------- polymorphic_solver/src/.DS_Store | Bin 10244 -> 10244 bytes polymorphic_solver/src/xmi_camel_utilities.py | 4 +- profiler/src/models/.DS_Store | Bin 6148 -> 6148 bytes profiler/src/models/nn.model_C/.DS_Store | Bin 0 -> 6148 bytes 6 files changed, 52 insertions(+), 28 deletions(-) create mode 100644 profiler/src/models/nn.model_C/.DS_Store diff --git a/forecaster-cnn/app.py b/forecaster-cnn/app.py index a35d0b1b..28baa8fd 100644 --- a/forecaster-cnn/app.py +++ b/forecaster-cnn/app.py @@ -1,11 +1,13 @@ import os, json, time, stomp, pickle, logging import pandas as pd +import numpy as np from os import path from datetime import datetime from threading import Thread from morphemic.dataset import DatasetMaker from main import Train, MorphemicModel, Predictor, retrain_interval import random +from pre_processing.preprocessing import resample, datetime_conversion from amq_client.MorphemicConnection import Connection from amq_client.MorphemicListener import MorphemicListener @@ -34,6 +36,7 @@ influxdb_dbname = os.environ.get("INFLUXDB_DBNAME","morphemic") influxdb_org = os.environ.get("INFLUXDB_ORG","morphemic") start_forecasting_queue = os.environ.get("START_FORECASTING","/topic/start_forecasting.cnn") metric_to_predict_queue = os.environ.get("METRIC_TO_PREDICT","/topic/metrics_to_predict") +univariate_mode = os.environ.get("UNIVARIATE_MODE","activated") #////////////////////////////////////////////////////////////////////////////////// _time_column_name = os.environ.get("TIME_COLUMN","ems_time") _column_to_remove_str = os.environ.get("COLUMNS_TO_REMOVE","time,level") @@ -181,7 +184,7 @@ class Forecaster(Thread): def run(self): print("Forecaster started for target metric {0} ".format(self.target)) logging.info("Forecaster started for target metric {0} ".format(self.target)) - _file = open(datasets_path+"/predictions_{0}.csv".format(self.target),"w") + #_file = open(datasets_path+"/predictions_{0}.csv".format(self.target),"w") while True: if int(time.time()) < self.epoch_start: diff = self.epoch_start - int(time.time()) @@ -197,7 +200,7 @@ class Forecaster(Thread): print("Forecaster stops after having receiving new epoch start") logging.info("Forecaster stops after having receiving new epoch start") break - self.features = self.manager.getFeatureInput(self.application) + self.features = self.manager.getFeatureInput(self.application, self.prediction_horizon) if len(self.features) == 0: print("Cannot proceed, number of feature is 0") time.sleep(self.prediction_horizon) @@ -215,6 +218,9 @@ class Forecaster(Thread): continue index = 1 for v, prob,interval in response: + if np.isnan(v): + index +=1 + continue prediction_time = self.epoch_start + index * self.prediction_horizon if simul_env == "desactivated": message = {"metricValue": v, "level": 1, "timestamp": int(time.time()), "probability": prob,"confidence_interval": interval, "predictionTime": prediction_time, "refersTo": self.application, "cloud": "aws", "provider": "provider"} @@ -225,9 +231,7 @@ class Forecaster(Thread): message = {"prediction": v} self.publisher.setParameters(message, "{0}Predictions".format(self.target)) self.publisher.send() - _file.write("{0},{1},{2}\n".format(prediction_time, v, index)) - - + #_file.write("{0},{1},{2}\n".format(prediction_time, v, index)) index +=1 time.sleep(self.prediction_horizon) self.epoch_start += self.prediction_horizon @@ -237,7 +241,7 @@ class Forecaster(Thread): self.publisher.send() self.psolver_notified = True - _file.close() + #_file.close() print("Forecaster for target : {0} stopped".format(self.target)) logging.info("Forecaster for target : {0} stopped".format(self.target)) @@ -292,16 +296,24 @@ class ForecastingManager(): print(e) return pd.DataFrame() - def getFeatureInput(self, application): + def getFeatureInput(self, application, prediction_horizon): #update dataset and return the last entry response = self.prepareDataset(application) if 'url' in response: print("Dataset updated ...") logging.info("Dataset updated ...") data = self.loadDataset(response['url']) + data.replace('None', np.nan, inplace=True) + data = datetime_conversion(data, _time_column_name) + data = data.astype(float) + data = data.interpolate(method='linear').ffill().bfill() + sampling_rate = '{0}S'.format(prediction_horizon) + data = resample(data, sampling_rate) + data = data.interpolate(method='linear').ffill().bfill() + data.dropna() return data.tail(self.steps) #returning the last entry print("Could not update datasets") - return [] + return pd.DataFrame() #test_url = "/home/jean-didier/Projects/morphemic-preprocessor/forecaster-cnn/datasets/historicValues.csv" #data = self.loadDataset(test_url) #return [data.to_dict('records')[len(data)-1]] #returning the last entry @@ -420,8 +432,8 @@ class ForecastingManager(): data = self.loadDataset(response['url']) #data = self.loadDataset(test_url) if len(data) <= steps: - print("Not enough data to train the model, the CNN requires at least {0} samples, the dataset has {1} rows".format(steps, len(data))) - logging.info("Not enough data to train the model, the CNN requires at least {0} samples, the dataset has {1} rows".format(steps, len(data))) + #print("Not enough data to train the model, the CNN requires at least {0} samples, the dataset has {1} rows".format(steps, len(data))) + #logging.info("Not enough data to train the model, the CNN requires at least {0} samples, the dataset has {1} rows".format(steps, len(data))) return False #model.setDatasetUrl(test_url) #model.setDatasetCreationTime(time.time()) diff --git a/forecaster-cnn/main.py b/forecaster-cnn/main.py index 9d20717f..d4309347 100755 --- a/forecaster-cnn/main.py +++ b/forecaster-cnn/main.py @@ -18,7 +18,8 @@ from tensorflow import keras #/////////////////////////////////////////////////////////////////////////////// ml_model_path = os.environ.get("ML_MODEL_PATH","./models_trained") ml_model = os.environ.get("ML_MODEL_PATH","./models") -retrain_interval = int(os.environ.get("RETRAIN_INTERVAL","600")) #in seconds +retrain_interval = int(os.environ.get("RETRAIN_INTERVAL","900")) #in seconds +univariate_mode = os.environ.get("UNIVARIATE_MODE","activated") #/////////////////////////////////////////////////////////////////////////////// class Predictor(): @@ -74,14 +75,22 @@ class Predictor(): #data = pd.DataFrame(self.feature_dict, ignore_index=True) important_feature = list(self.dataframe.columns.values) #important_feature.remove(self.target) - important_feature.remove(self.time_column) - for col in self.column_to_remove: - if col in important_feature: - important_feature.remove(col) + #important_feature.remove(self.time_column) + if univariate_mode == "deactivated": + for col in self.column_to_remove: + if col in important_feature: + if col in important_feature: + important_feature.remove(col) + else: + important_feature = [self.target] self.dataframe.replace('None', np.nan, inplace=True) self.dataframe = self.dataframe.astype(float) self.dataframe = self.dataframe.interpolate(method='linear') + + print("Input Sample to be prdicted ==============") + print(self.dataframe) + print("=======================================") #data = resample(data, sampling_rate) #data.dropna(inplace=True) #data = missing_values_imputer(data) @@ -331,11 +340,14 @@ class Train(): return {"status": False, "message": "Metric field ({0}) not found in dataset".format(self.metric), "data": None} #self.features.remove(target) - self.features.append(target+"_target") - self.features.remove(self.time_column_name) - for col in self.column_to_remove: - if col in self.features: - self.features.remove(col) + if univariate_mode == "deactivated": + self.features.append(target+"_target") + self.features.remove(self.time_column_name) + for col in self.column_to_remove: + if col in self.features: + self.features.remove(col) + else: + self.features = [target, target+"_target"] ########### _start = time.time() data = data.round(decimals=2) @@ -344,22 +356,22 @@ class Train(): data = datetime_conversion(data, self.time_column_name) sampling_rate = '{0}S'.format(self.prediction_horizon) data = data.astype(float) - data = data.interpolate(method='linear') - data = resample(data, sampling_rate) - data.dropna(inplace=True) + data = data.interpolate(method='linear').ffill().bfill() + data = resample(data, sampling_rate) #resampling can intriduce NaN values + data = data.interpolate(method='linear').ffill().bfill() #data = missing_values_imputer(data) #data = missing_data_handling(data, drop_all_nan=True, fill_w_forward=True, rolling_median=True) #data = missing_values_imputer(data) #data = missing_data_handling(data, drop_all_nan=True, fill_w_forward=True, rolling_median=True) data = important_data(data, self.features) + data.dropna() print("important data", data) - data.apply(lambda s: pd.to_numeric(s, errors='coerce').notnull().all()) - print("After resampling",data) + #data.apply(lambda s: pd.to_numeric(s, errors='coerce').notnull().all()) + #print("After resampling",data) if len(data) * 0.2 < self.steps: return {"status": False, "message": "No enough data after sampling", "data": None} #this will be removed #try: - target_column = data[target+"_target"] #data, scaler = Min_max_scal(data) data = data.values #data[target+"_target"] = target_column diff --git a/polymorphic_solver/src/.DS_Store b/polymorphic_solver/src/.DS_Store index 1f310839625d44cd3a9a7fb73bf64abf412d658b..043f686201500142b168e21145e39fbc45c42f9f 100644 GIT binary patch delta 63 zcmZn(XbG6$&nUk!U^hRb{A3;h-N}muL^rnxbTUph5H{QVUZjY5^Ib_Twv7#OESuRC Tez8p6C8{-f5-22YCZ>S8F+Y><^DUU0H>@P|3)*{aN*|>s= zI%gYNh#csa+R&{A00|KHxfX(g^dRh}Nw_4{V3H1Pk|^Aec>f&@?tm$_zmpyK)wSyS z6yC3OP2DnE%nxJUF)7IP5uYmxcCF<7+#-R->#U&e&$1@(i9Go+}jPau#vQ zvqpg-+TpHFaWa}T5za7Dgu=fSMO9{n%%j+9d2H2l2|FaJT!9(YN8*pi;%Cm~eVJge Px;Yo5^Pj0;BP)tOv+K3% diff --git a/polymorphic_solver/src/xmi_camel_utilities.py b/polymorphic_solver/src/xmi_camel_utilities.py index 3fb4dc79..40be4236 100644 --- a/polymorphic_solver/src/xmi_camel_utilities.py +++ b/polymorphic_solver/src/xmi_camel_utilities.py @@ -1025,7 +1025,7 @@ class CamelAttribute(): self.name = "minRam{0}".format(self.component_name) if self.annotation == "totalMemoryHasMax": self.name = "maxRam{0}".format(self.component_name) - if self.annotation == "TotalMemory" or self.annotation == "hasNumberofCores_2": + if self.annotation == "TotalMemory" or self.annotation == "hasNumberofCores_2" or self.annotation == "hasNumberofCores" or self.annotation == "hasFPGANumber": self.name = self.component_name def getName(self): @@ -1040,7 +1040,7 @@ class CamelAttribute(): element = ET.Element("attributes") element.attrib["name"] = self.name element.attrib["annotations"] = self.annotation - if self.annotation == "TotalMemory" or self.annotation == "hasNumberofCores_2": + if self.annotation == "TotalMemory" or self.annotation == "hasNumberofCores_2" or self.annotation == "hasNumberofCores" or self.annotation == "hasFPGANumber": element.attrib["minInclusive"] = "true" element.attrib["maxInclusive"] = "true" if type(self.value) == type([]): diff --git a/profiler/src/models/.DS_Store b/profiler/src/models/.DS_Store index 916b5bd41cbcf92b3cae4408e9c523a58137a9f3..46a387198c185d358b83bf9e32a7a5610634f871 100644 GIT binary patch literal 6148 zcmeHKL2uJA6n<_Cb5|xR0TLXLB5_VnYKzG1s7Y6$s#5Oo zUpViFjQ_F|yk~n-l4ep*2!UVq{Mml*v+GZZ9RmOxOv64v7XUaqVQU?W8lyP*k~K^Z z@t7JzLI`7sU&iuqDcg2f1*`&pn*#LiauE32f)gmbzd)Q1-zQ#NJlXom@XO2wt*bcSt*e=k!XZo` zf&{Y4DCdLG(PUsrU zHL9%x3v~rRY+<)DtkWk0aU6rL!CWIo(3qwoYAQ2V45sPW@0dK-V6IWqftkw(Gcz-D zLt$!m9N$stz+8>Cv4N&(i|_x1<4By+YdEsoAw u2mLcT8Rg{~6$J}(9IFZ)#dpz_Va%ZlqH8eMh&^cbLqN%33#-5%Rp2-2lEa$- delta 76 zcmZoMXfc=|#>CJzu~2NHo+2aT!~knX#>qS^#+z@j=`(It;9zIj*dWBTnVo~51E_4X eAjfy+$^0U^oQw<%3K!CWIlpcDmtM<^M)UKn}s_{l~LJmC<)0YJ(NV8`T;#EK=;);U<3st8m4BfqJ39K;Hk?%CP*PDq6u7Ry zk}J0O{D1xb_y2X0ltckh;87`{nkVr|7gO?Q>%!#ttkv+(a5j#s9bQq8u}3jt`6%9i aTSL9(3^4LoJ46jkE&^5tNkoA=Rp1ZuylM^r literal 0 HcmV?d00001 -- GitLab