diff --git a/.DS_Store b/.DS_Store index 5e890ad35c6b5f743e49bb779a9c327701905640..d9a6cbc11a17f0eb1e1375ae43498d8f0257f4ed 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/morphemic-performance-model/.DS_Store b/morphemic-performance-model/.DS_Store index 2b01458a9ca1db09e98d4513e2b2d0908274a9c7..d3c95c9769b3f032f19fa37eb3c16fb04970eaba 100644 Binary files a/morphemic-performance-model/.DS_Store and b/morphemic-performance-model/.DS_Store differ diff --git a/morphemic-performance-model/ml_code/.DS_Store b/morphemic-performance-model/ml_code/.DS_Store index 2b8e773b752f0d52d96f8f2b472704fa8dbba0d6..8592794b6782b58772f44cfc42e243829e087404 100644 Binary files a/morphemic-performance-model/ml_code/.DS_Store and b/morphemic-performance-model/ml_code/.DS_Store differ diff --git a/morphemic-performance-model/ml_code/src/activemq_logic.py b/morphemic-performance-model/ml_code/src/activemq_logic.py index c38343bbe091bc825efff338fac6d0556da96de1..8c7ad7e0ed83b3fb6cb805efb4ce6b9cbd361da3 100644 --- a/morphemic-performance-model/ml_code/src/activemq_logic.py +++ b/morphemic-performance-model/ml_code/src/activemq_logic.py @@ -278,28 +278,37 @@ class MessagingManager(Thread): if not self.manager.modelExists(configs, target): print("Model for = ", target, " was not found, it will be created and trained") self.manager.addModel(configs, target, None) - result = {"status": False, "result": None, "message": "model not ready, no formula available", "sender_id": sender_id} - self.publisher.setParameters(result, performance_module_predict_feedback_topic) - self.publisher.send() + #result = {"status": False, "result": None, "message": "model not ready, no formula available", "sender_id": sender_id} + #self.publisher.setParameters(result, performance_module_predict_feedback_topic) + #self.publisher.send() + result = self.manager.addPredictData(sender_id, variables, target) + if result["status"]: + results.update(result["result"]) else: - result = self.manager.addPredictData(sender_id, variables, target) - if result["status"]: - results.update(result["result"]) - else: - print(result) - error = result - can_proceed = False + print(result) + error = result + can_proceed = False if can_proceed: - results["math"] = math - formula = self.manager.replaceFunction(utility_formula_original) - utility = eval(formula, results) - response = {"status": True, "result": {"utility": utility}, "sender_id": sender_id} - self.publisher.setParameters(response, performance_module_predict_feedback_topic) + data = None + target = self.manager.getUtilityMetricName() + if target: + result = self.manager.predictFromFormula(target, results) + if result: + data = {"status": True, "result": {"utility": result}, "message": None, "sender_id": sender_id} + else: + data = {"status": False, "result": None, "message": "An error occured", "sender_id": sender_id} + else: + data = {"status": False, "result": None, "message": "No utility formula specified", "sender_id": sender_id} + self.publisher.setParameters(data, performance_module_predict_feedback_topic) self.publisher.send() else: self.publisher.setParameters(error, performance_module_predict_feedback_topic) self.publisher.send() else: + configs = self.extractConfig(variables) + if not self.manager.modelExists(configs, target): + print("Model for = ", target, " was not found, it will be created and trained") + self.manager.addModel(configs, target, None) result = self.manager.addPredictData(sender_id, variables, target) if result["status"]: print("***************Prediction*********************") diff --git a/morphemic-performance-model/ml_code/src/ml_module.py b/morphemic-performance-model/ml_code/src/ml_module.py index 1a29be02f9e7120a0de60ef5c61eb187997d88c4..6fbaede5b6f93e148f72c375838b6634bfdbc71a 100644 --- a/morphemic-performance-model/ml_code/src/ml_module.py +++ b/morphemic-performance-model/ml_code/src/ml_module.py @@ -78,7 +78,14 @@ def load_data(url_file, features=None): # Load the file data = pd.read_csv(url_file) if features: - data = data[features] + can_proceed = True + for f in features: + if not f in list(data.columns): + can_proceed = False + print("field <<", f, " >> is missing in the dataset") + if can_proceed: + data = data[features] + return pd.DataFrame() if data.iloc[0].values[1] == 'Unnamed 0': data = data.drop(columns=['Unnamed 0']) if type(data.iloc[0].values[1]) == str: @@ -303,15 +310,40 @@ class MultiComponentsManager(): _string = _string.replace(_exp,"math.{0}(".format(function)) return _string - def mean(self, value): - return value + def mean(self, number): + return number + def add(self, number): + return number + def predictFromFormula(self, target, variables): + if target in self.formulas: + _dict = {} + _dict.update(variables) + _dict.update({"math":math, "mean": self.mean, "add": self.add}) + for _ in range(len(list(self.formulas.keys()))): + for t, formula in self.formulas.items(): + if t == target: + continue + try: + formula = self.replaceFunction(formula) + result = eval(formula, _dict) + if not t in _dict: + _dict[t] = result + except Exception as e: + print(e) + print('Formula for = {0} found {1}'.format(target, self.formulas[target])) + formula = self.replaceFunction(self.formulas[target]) + try: + return eval(formula, _dict) + except Exception as e: + print(e) + return None + def addPredictData(self, sender_id, variables, target): #variables : [{"component": name, "variant": var, "hardware": hardware, "_memory": mem, "_cores": core, "_instances": instances}] #print("target is ", target) #if target == "utility": # target = self.getUtilityMetricName() variables = copy.deepcopy(variables) - print("target is ", target) components_config = [] for component in variables: mandatory_field = ["component", "_variant", "_hardware"] @@ -349,37 +381,16 @@ class MultiComponentsManager(): model.setTarget(target) model.makeIdentifier() id = model.getIdentifier() - if not id in self.models: - print("Model with id = {0} not found, the formula will be used".format(id)) - if target in self.formulas: - _dict = {} - _dict.update(features_dict) - _dict.update({"math":math, "mean": self.mean}) - for _ in range(len(list(self.formulas.keys()))): - for t, formula in self.formulas.items(): - if t == target: - continue - try: - formula = self.replaceFunction(formula) - result = eval(formula, _dict) - _dict[t] = result - except: - pass - print('Formula for = {0} found {1}'.format(target, self.formulas[target])) - formula = self.replaceFunction(self.formulas[target]) - try: - result = eval(formula, _dict) - return {"status": True, "message": "", "result": {target: result}} - except Exception as e: - return {"status": False, "message": str(e), "result": None} - else: - return {"status": False, "message": "Model not found","result": None, "sender_id": sender_id} model = self.models[id] if not model.getLastTraining(): + print("Model for target find but not trained, ", target, ", training will start") train_result = self.train(model) if not train_result["status"]: - return {"status": False, "message": train_result["message"], "sender_id": sender_id} + #print("Probably, the requested configuration") + return {"status": True, "result": {target: 0}, "message": None, "sender_id": sender_id} + model_feature_list = model.getFeaturesList() + missing = [] for feature in model_feature_list: if 'Unnamed' in feature: @@ -390,10 +401,15 @@ class MultiComponentsManager(): if len(missing) > 0: return {"status": False, "message": "The following feature fields are missing = {0}".format(missing),"sender_id": sender_id} df = load_data(model.getUrlFile(),features_list) + if df.empty: + return {"status": True, "result": {target: 0}, "message": None, "sender_id": sender_id} #concatenate feature_dict to the data + #work around + features_dict = {field: value for field, value in features_dict.items() if field in model_feature_list} features_dict[target] = 1 #df2 = pd.DataFrame(feature_dict,index=0) #df = pd.concat([df2,df1]) + df.loc[0] = list(features_dict.values()) pipeline = Pipeline([prepare_data,missing_values_imputer,normalization]) data = pipeline.execute(df, target) @@ -409,12 +425,8 @@ class MultiComponentsManager(): def readyToTrain(self): self.ready_to_train = True - def checkTrainStatus(self): - if not self.ready_to_train: - print("Not ready to train, retry in 180 seconds ...") - time.sleep(180) - self.checkTrainStatus() + def checkTrainStatus(self): for id, model in self.models.items(): if model.canBeTrained(): print("Model {0} will be trained".format(id)) @@ -453,7 +465,7 @@ class MultiComponentsManager(): data = model.filterRows(data) if data.empty: - return {"status": False, "message": "An error occured, dataframe is empty after filtering"} + return {"status": True, "message": "An error occured, dataframe is empty after filtering"} print("*************Dataset after filtering*************") print(data) print("*************************************************") @@ -469,10 +481,13 @@ class MultiComponentsManager(): for field in columns_to_remove: if field in features_list: features_list.remove(field) - + # + features_list = [feature for feature in features_list if feature[0] == "_"] + # data = data.dropna() Y = data[target].values - features_list.remove(target) + if target in features_list: + features_list.remove(target) model.setFeaturesList(features_list) X = data[features_list].values X = X.reshape((X.shape[0], X.shape[1], 1)) @@ -486,7 +501,6 @@ class MultiComponentsManager(): ml_model.save(_path) model.setModelUrl(_path) return {"status": True, "message": "Train finished",} - def loadModels(self): _path = app_model_folder+"/models.pickle" diff --git a/morphemic-persistent-storage/database/inputapi/src/app.py b/morphemic-persistent-storage/database/inputapi/src/app.py index 73e7ff56b92a48a210f400ef0ad94c148c9bbcb0..47f50bd4e04f5094e3b575054604e1480998e648 100644 --- a/morphemic-persistent-storage/database/inputapi/src/app.py +++ b/morphemic-persistent-storage/database/inputapi/src/app.py @@ -753,6 +753,8 @@ class InputApi: #{"refersTo": "default", "level":3, "metric": "avgResponseTime", "publish_rate":3000, "component": "WS"} result = [] for group in _json: + if group["metric"] in self.list_metrics: + continue self.list_metrics.append(group["metric"]) component_name = "default" if "component" in group: @@ -769,7 +771,8 @@ class InputApi: } result.append(conn) self.saveSubscriptions(result) - self.sendMetricToPredictToPolmorphicSolver(_json) + if topic !="local": + self.sendMetricToPredictToPolmorphicSolver(_json) self.startConsuming() #self.startApplicationStateCollector() @@ -798,6 +801,8 @@ class InputApi: ) if _json["request"] == "set_application_id": self.application_id = _json["application_id"] + if _json["metrics"]: + self.metricToPredict(json.dumps(_json["metrics"]), "local") print("Camel resource name added") if _json["request"] == "unsubscribe": if not "name" in _json: diff --git a/polymorphic_solver/src/app.py b/polymorphic_solver/src/app.py index 644416d307edb92634937e5497038dd4f416964e..d1f8b6f825f4794e2589571a85882b02e5828446 100644 --- a/polymorphic_solver/src/app.py +++ b/polymorphic_solver/src/app.py @@ -61,6 +61,7 @@ class ApplicationComponent(): self.environment = None self.status = False self.reqs_indexes = None + self.byon_parameters = {} self.index = None self.image = None self.metrics = [] @@ -83,6 +84,7 @@ class ApplicationComponent(): max_instances = self.constraints.getResourceValue("INSTANCE","<") min_instances = self.constraints.getResourceValue("INSTANCE",">") + byon_type, byon_name = None, None if not min_mem: min_mem = MIN_MEM if not max_mem: @@ -95,7 +97,11 @@ class ApplicationComponent(): max_instances = MAX_INSTANCES if not min_instances: min_instances = 1 - return min_mem, max_mem, min_cpu, max_cpu, min_gpu, max_gpu, min_fpga, max_fpga, min_hpc,max_hpc, min_instances, max_instances, self.hardware_list, self.variants, self.metrics + if "type" in self.byon_parameters: + byon_type = self.byon_parameters["type"] + if "name" in self.byon_parameters: + byon_name = self.byon_parameters["name"] + return min_mem, max_mem, min_cpu, max_cpu, min_gpu, max_gpu, min_fpga, max_fpga, min_hpc,max_hpc, min_instances, max_instances, self.hardware_list, self.variants, self.metrics, byon_type, byon_name def setIndex(self, index): self.index = index @@ -103,6 +109,12 @@ class ApplicationComponent(): def setReqsIndexes(self, reqs_indexes): self.reqs_indexes = reqs_indexes + def setByonParameters(self, byon_parameters): + self.byon_parameters = byon_parameters + + def getByonParameters(self): + return self.byon_parameters + def getReqsIndexes(self): return self.reqs_indexes @@ -203,6 +215,7 @@ class PolymorphicSolver(): self.virtual_application = VirtualApplication(self.publisher, self) self.metric_to_predict_message = [] self.max_optimization_time = 20 + self.metrics_rl_engine = [] self.last_analysis = time.time() self.agent_consumers = {} self.rl_engine_started = False @@ -252,7 +265,8 @@ class PolymorphicSolver(): self.virtual_application.setFormulas(_json["formulas"]) self.virtual_application.setUtilityMetricName(_json["utility_metric_name"]) self.virtual_application.setNumberOfComponents(self.application_data['number_of_components']) - self.sendApplicationIdToPersistentStorage(data['application_id']) + data_to_send_to_ps = {"application_id": data["application_id"], "metrics": self.metric_to_predict_message} + self.sendApplicationIdToPersistentStorage(data_to_send_to_ps) self.sendTrainRequestToPerformanceModule() self.updateVirtualApplication() self.createComponents() @@ -261,7 +275,7 @@ class PolymorphicSolver(): self.first_deployment_completed = True print('First deployment finished') if self.metric_to_predict_message == []: - print("The application does not any metric the process will stop") + print("The application does not have any metric the process will stop") os._exit(0) """ if self.metric_to_predict_message != []: @@ -302,8 +316,8 @@ class PolymorphicSolver(): os._exit(1) time.sleep(10) """ - def sendApplicationIdToPersistentStorage(self, app_id): - data = {"request": "set_application_id", "application_id": app_id} + def sendApplicationIdToPersistentStorage(self, data): + data = {"request": "set_application_id", "application_id": data["application_id"], "metrics": data["metrics"]} self.publisher.setParameters(data, ps_management_topic) self.publisher.send() @@ -355,7 +369,8 @@ class PolymorphicSolver(): metric = conf["metric"] component = conf["component"] if component in metrics_data: - metrics_data[component].append(metric) + if not metric in metrics_data[component]: + metrics_data[component].append(metric) else: metrics_data[component] = [metric] # @@ -373,6 +388,7 @@ class PolymorphicSolver(): application = {"object": self.virtual_application} components_data = {} for name, comp in self.virtual_application.items(): + comp.setMetrics(self.metrics_rl_engine) components_data[name] = comp.prepareResource() application["data"] = components_data return Wrapper(Environment(application, self.max_optimization_time, evaluation)) @@ -389,13 +405,15 @@ class PolymorphicSolver(): return None if not 'request' in data: print("Metrics to predict message received ...") - if self.metric_to_predict_message == []: - self.metric_to_predict_message = data - print('Storing metrics to predict') - _file = open(config_directory+"/metrics_to_predict.json","w") - _file.write(json.dumps(data)) - _file.close() - print('Metrics to predict stored') + for conf in data: + self.metric_to_predict_message.append(conf) + if not conf["metric"] in self.metrics_rl_engine: + self.metrics_rl_engine.append(conf["metric"]) + print('Storing metrics to predict') + _file = open(config_directory+"/metrics_to_predict.json","w") + _file.write(json.dumps(data)) + _file.close() + print('Metrics to predict stored') return None if data["request"] == "metrics": @@ -527,6 +545,8 @@ class PolymorphicSolver(): all_requirements = component["requirements"][0] if "image" in all_requirements: comp.setImage(all_requirements["image"]) + if "byon_parameters" in component: + comp.setByonParameters(component["byon_parameters"]) if "hw" in component: comp.setHWS(component["hw"]) for hw in component["hw"]: @@ -591,10 +611,10 @@ class PolymorphicSolver(): def createComponents(self): for name, comp in self.virtual_application.items(): index = comp.getIndex() - min_mem, max_mem, min_cpu, max_cpu,min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc, min_instances, max_instances, hardware_list, variants, metrics = comp.prepareResource() + min_mem, max_mem, min_cpu, max_cpu,min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc, min_instances, max_instances, hardware_list, variants, metrics, byon_type, byon_name = comp.prepareResource() #self.env.createStates(name, min_mem, max_mem, min_cpu, max_cpu, min_gpu, max_gpu, min_fpga, max_fpga, max_instances, hardware_list, variants, comp.getMetrics()) image = comp.getImage() - self.archetype_manager.createArchetypes(name, index, variants, hardware_list,image, min_mem, max_mem, min_cpu, max_cpu, min_instances, max_instances, min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc) + self.archetype_manager.createArchetypes(name, index, variants, hardware_list,image, min_mem, max_mem, min_cpu, max_cpu, min_instances, max_instances, min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc, byon_type, byon_name) self.archetype_manager.setNumberOfComponents(self.virtual_application.getNumberOfComponents()) def prepareMultiRLEngine(self): diff --git a/polymorphic_solver/src/ml_models/env.py b/polymorphic_solver/src/ml_models/env.py index 74ac7cf81dc4579c6463e7463a718f8794d983f3..addb9a0757c620c3925bb4307c0a7436bce07aee 100644 --- a/polymorphic_solver/src/ml_models/env.py +++ b/polymorphic_solver/src/ml_models/env.py @@ -187,10 +187,12 @@ class VirtualRealApplicationInteractor(): print("Cannot decode json content") print(e) result = _json["result"] + _value = None if result: - _value = None for _, v in result.items(): _value = v + else: + _value = 0 self.ug_collector.setUtility(_json['sender_id'], _value) def setComponentState(self, state): @@ -362,7 +364,7 @@ class Environment(gym.Env): self.agents = [] print("Environment initialized by pid =",os.getpid()," and thread id =",threading.get_ident()) for name, data in self.application['data'].items(): - min_mem, max_mem, min_cpu, max_cpu, min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc, min_instances, max_instances, hws, variants, metrics = data + min_mem, max_mem, min_cpu, max_cpu, min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc, min_instances, max_instances, hws, variants, metrics, _, _ = data print("Creating component = {0} ...".format(name)) print("Variant supported : ", variants) print("Extra hardware type supported : ", hws) diff --git a/polymorphic_solver/src/morphemic.py b/polymorphic_solver/src/morphemic.py index ec18df744cae2fe1a61ceeb94fffd2270c3b5a43..7d55a878ef5edce4b8ea7c34e080e9705772e6d6 100644 --- a/polymorphic_solver/src/morphemic.py +++ b/polymorphic_solver/src/morphemic.py @@ -26,7 +26,7 @@ INDEX_CORES = 1 INDEX_MEMORY = 0 class MorphemicArchetype(): - def __init__(self, component_name, index, variant, hw, image, min_mem, max_mem, min_cpu, max_cpu, min_instances, max_instances, min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc): + def __init__(self, component_name, index, variant, hw, image, min_mem, max_mem, min_cpu, max_cpu, min_instances, max_instances, min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc, byon_type, byon_name): self.variant = variant self.hw = hw self.component_name = component_name @@ -40,6 +40,8 @@ class MorphemicArchetype(): self.uri_notification = None self.min_cpu = min_cpu self.min_gpu = min_gpu + self.byon_type = byon_type + self.byon_name = byon_name self.min_fpga = min_fpga self.max_cpu = max_cpu self.max_gpu = max_gpu @@ -66,6 +68,8 @@ class MorphemicArchetype(): return self.image def getAvgPerformance(self): return self.avg_performance + def getByonParameters(self): + return {"type": self.byon_type, "name": self.byon_name} def updateAvgPerformance(self): self.avg_performance = self.sum_performance / len(self.collections) def getUriNotification(self): @@ -178,12 +182,12 @@ class MorphemicArchetypeManager(): for archetype in self.archetypes: archetype.clean() - def createArchetypes(self, component_name, index, variants, hws, image, min_mem, max_mem, min_cpu, max_cpu, min_instances, max_instances, min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc): + def createArchetypes(self, component_name, index, variants, hws, image, min_mem, max_mem, min_cpu, max_cpu, min_instances, max_instances, min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc, byon_type, byon_name): for variant in variants: for hw in hws: if hw == "FPGA" and variant == "SERVERLESS": continue - archetype = MorphemicArchetype(component_name,index, variant, hw, image, min_mem, max_mem, min_cpu, max_cpu, min_instances, max_instances, min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc) + archetype = MorphemicArchetype(component_name,index, variant, hw, image, min_mem, max_mem, min_cpu, max_cpu, min_instances, max_instances, min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc, byon_type, byon_name) self.archetypes.append(archetype) def getArchetypeByComponentName(self, name, variant, hw): @@ -394,6 +398,8 @@ class CamelTransformer(): comp_data['min_cores'] = archetype.lowest[1] comp_data["image"] = archetype.getImage() comp_data['max_mem'] = archetype.highest[0] + comp_data["byon_type"] = archetype.getByonParameters()["type"] + comp_data["byon_name"] = archetype.getByonParameters()["name"] comp_data['max_cores'] = archetype.highest[1] comp_data['min_instances'] = archetype.lowest[2] comp_data['max_instances'] = archetype.highest[2]