diff --git a/.DS_Store b/.DS_Store index a653d74f7eb32c941dbcc397d7d9f883d465fa6c..5e890ad35c6b5f743e49bb779a9c327701905640 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/deployment/.DS_Store b/deployment/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..c5cb3da5ea5c18db7d687b32a84e131dca9bf084 Binary files /dev/null and b/deployment/.DS_Store differ diff --git a/forecaster-cnn/app.py b/forecaster-cnn/app.py index 28baa8fd4c06057e749a8d32c5e69a15b67fc4f6..dfa6a225e2944811c7771ec9ad956953170b0fb3 100644 --- a/forecaster-cnn/app.py +++ b/forecaster-cnn/app.py @@ -227,6 +227,10 @@ class Forecaster(Thread): self.publisher.setParameters(message, "intermediate_prediction.cnn.{0}".format(self.target)) self.publisher.send() print(message, self.target) + # + message = {"prediction": v} + self.publisher.setParameters(message, "{0}Predictions".format(self.target)) + self.publisher.send() else: message = {"prediction": v} self.publisher.setParameters(message, "{0}Predictions".format(self.target)) diff --git a/morphemic-performance-model/.DS_Store b/morphemic-performance-model/.DS_Store index e7375be658f2c18d0721bfcad08979be2dd4b3d7..2b01458a9ca1db09e98d4513e2b2d0908274a9c7 100644 Binary files a/morphemic-performance-model/.DS_Store and b/morphemic-performance-model/.DS_Store differ diff --git a/morphemic-performance-model/ml_code/.DS_Store b/morphemic-performance-model/ml_code/.DS_Store index 487298ce764505b146979b3686f3b1e6c2607ff4..2b8e773b752f0d52d96f8f2b472704fa8dbba0d6 100644 Binary files a/morphemic-performance-model/ml_code/.DS_Store and b/morphemic-performance-model/ml_code/.DS_Store differ diff --git a/morphemic-performance-model/ml_code/src/activemq_logic.py b/morphemic-performance-model/ml_code/src/activemq_logic.py index 8a8c8349cdf085744d15df3544932381d272c3fe..6f4d1b8e6139881179bcc37e1b9a041e79bdf32a 100644 --- a/morphemic-performance-model/ml_code/src/activemq_logic.py +++ b/morphemic-performance-model/ml_code/src/activemq_logic.py @@ -1,10 +1,8 @@ -from email.mime import application -import os, time, subprocess, json -from shutil import ExecError +import os, time, json, math from threading import Thread from datetime import datetime from activemq.MorphemicConnection import Connection -import stomp +from typing import List, Dict #from initial_ML_module import Predictor from morphemic.dataset import DatasetMaker from ml_module import MultiComponentsManager @@ -132,6 +130,9 @@ class MessagingManager(Thread): def multiComponentsTrainHandler(self, data): #{"application": app_name, models:[{"target": target, "components_config": [{"name": comp_name, "variant": variant, "hardware": hardware}]}]} + print("==============Train request===============") + print(data) + print("===========================================") _json = None try: _json = json.loads(data) @@ -145,6 +146,9 @@ class MessagingManager(Thread): return None app_name = _json["application"] + self.manager.setFormulas(_json["formulas"]) + self.manager.setUtilityMetricName(_json["utility_metric_name"]) + configs = {'hostname': influxdb_hostname, 'port': influxdb_port, 'username': influxdb_username, @@ -200,8 +204,15 @@ class MessagingManager(Thread): _file.close() self.multiComponentsTrainHandler(data) + def completeVariables(self, variables: Dict[str,str]): + _variables = [] + for component in self.current_deployment_config: + variables.update({"component": component["component"]}) + _variables.append(variables) + return _variables + def multiComponentsPredictHandler(self, data): - print("-------------------Data------------------") + print("-------------------Prediction requests------------------") print(data) print("-----------------------------------------") _json = None @@ -225,6 +236,8 @@ class MessagingManager(Thread): return None sender_id = _json["sender_id"] variables = _json["variables"] + if type(variables) == type({}): + variables = self.completeVariables(variables) variables = self.completeMissingVariables(variables) if not variables: @@ -268,14 +281,24 @@ class MessagingManager(Thread): if not self.current_deployment_config: if not self.loadDeploymentConfigFile(): return variables + _final = [] for component in variables: + _component = {} + _component.update(component) if not "component" in component: return None if not "_variant" in component: - component["_variant"] = self.getComponentConfig(component["component"],"variant") + _component["_variant"] = self.getComponentConfig(component["component"],"variant") if not "_hardware" in component: - component["_hardware"] = self.getComponentConfig(component["component"],"hardware") - return variables + _component["_hardware"] = self.getComponentConfig(component["component"],"hardware") + if not "_cores" in component: + _component["_cores"] = self.getComponentConfig(component["component"],"cores") + if not "_memory" in component: + _component["_memory"] = self.getComponentConfig(component["component"],"memory") + if not "_instances" in component: + _component["_instances"] = self.getComponentConfig(component["component"],"instances") + _final.append(_component) + return _final def run(self): print("Messaging manager started ...") diff --git a/morphemic-performance-model/ml_code/src/ml_module.py b/morphemic-performance-model/ml_code/src/ml_module.py index 8ecb9075b3d6a62269366398cf53b1de84834c17..e6e702ad65618e536c18e675e7189021c1a56b73 100644 --- a/morphemic-performance-model/ml_code/src/ml_module.py +++ b/morphemic-performance-model/ml_code/src/ml_module.py @@ -1,4 +1,4 @@ -import os, time, pickle, hashlib +import os, time, pickle, hashlib, math import pandas as pd import numpy as np from sklearn.impute import SimpleImputer @@ -8,7 +8,6 @@ from tensorflow import keras from tensorflow.keras.layers import LSTM, Dropout, Flatten, Dense, Conv1D from tensorflow.python.keras.layers import MaxPooling1D from sklearn.model_selection import train_test_split -from math import sqrt from sklearn.metrics import mean_squared_error, r2_score from morphemic.dataset import DatasetMaker @@ -29,6 +28,8 @@ columns_to_remove = columns_to_remove.split(",") _VARIANTS_HARDWARE = ["VM","DOCKER","SERVERLESS","HPC","EDGE","CPU","GPU","FPGA","NPU","TPU"] +functions = ['acos', 'acosh', 'asin', 'asinh', 'atan', 'atan2', 'atanh', 'ceil', 'comb', 'copysign', 'cos', 'cosh', 'degrees', 'dist', 'e', 'erf', 'erfc', 'exp', 'expm1', 'fabs', 'factorial', 'floor', 'fmod', 'frexp', 'fsum', 'gamma', 'gcd', 'hypot', 'inf', 'isclose', 'isfinite', 'isinf', 'isnan', 'isqrt', 'lcm', 'ldexp', 'lgamma', 'log', 'log10', 'log1p', 'log2', 'modf', 'nan', 'nextafter', 'perm', 'pi', 'pow', 'prod', 'radians', 'remainder', 'sin', 'sinh', 'sqrt', 'tan', 'tanh', 'tau', 'trunc', 'ulp'] + def get_index(name): if name in _VARIANTS_HARDWARE: return _VARIANTS_HARDWARE.index(name) @@ -49,7 +50,7 @@ def CNN_model(n_features, X, y): def prediction_and_score_for_CNN(model, x_input,test_y): #x_input = x_input.reshape((1, n_steps, n_features)) yhat = model.predict(x_input, verbose=2) - rmse = sqrt(mean_squared_error(test_y, yhat)) + rmse = math.sqrt(mean_squared_error(test_y, yhat)) r2score = r2_score(test_y, yhat) return rmse, r2score @@ -255,6 +256,8 @@ class MultiComponentsManager(): self.models = {} self.ml_loaded_models = {} self.ready_to_train = False + self.formulas = None + self.utility_metric_name = None self.loadModels() def addModel(self, components_config, target, url_file): @@ -271,8 +274,32 @@ class MultiComponentsManager(): self.saveModels() self.checkTrainStatus() + def setFormulas(self, formulas): + self.formulas = formulas + + def getFormulas(self): + return self.formulas + + def setUtilityMetricName(self, name): + print("utility metric name = ", name) + self.utility_metric_name = name + + def getUtilityMetricName(self) -> str: + return self.utility_metric_name + + def replaceFunction(self, _string: str): + for function in functions: + _exp = "{0}(".format(function) + if _exp in _string: + _string = _string.replace(_exp,"math.{0}(".format(function)) + return _string + def addPredictData(self, sender_id, variables, target): #variables : [{"component": name, "variant": var, "hardware": hardware, "_memory": mem, "_cores": core, "_instances": instances}] + print("target is ", target) + if target == "utility": + target = self.getUtilityMetricName() + print("target is ", target) components_config = [] for component in variables: mandatory_field = ["component", "_variant", "_hardware"] @@ -302,7 +329,7 @@ class MultiComponentsManager(): features_list = [] for component in variables: for field in list(component.keys()): - if not field in static_fields: + if not field in static_fields and not field in features_list: features_list.append(field) model = MultiComponentsModel() @@ -311,8 +338,31 @@ class MultiComponentsManager(): model.makeIdentifier() id = model.getIdentifier() if not id in self.models: - print("Model with id = {0} not found".format(id)) - return {"status": False, "message": "Model not found", "sender_id": sender_id} + print("Model with id = {0} not found, the formula will be used".format(id)) + if target in self.formulas: + _dict = {} + _dict.update(features_dict) + _dict.update({"math":math}) + for _ in range(len(list(self.formulas.keys()))): + for t, formula in self.formulas.items(): + if t == target: + continue + try: + formula = self.replaceFunction(formula) + result = eval(formula, _dict) + _dict[t] = result + except: + pass + print('Formula for = {0} found'.format(target)) + print('Formula = ', self.formulas[target]) + formula = self.replaceFunction(self.formulas[target]) + try: + result = eval(formula, _dict) + return {"status": True, "message": "", "result": {target: result}} + except Exception as e: + return {"status": False, "message": str(e), "result": None} + else: + return {"status": False, "message": "Model not found", "sender_id": sender_id} model = self.models[id] if not model.getLastTraining(): train_result = self.train(model) @@ -344,7 +394,7 @@ class MultiComponentsManager(): else: ml_model = self.ml_loaded_models[id] prediction = float(ml_model.predict(new_sample.values)[0][0]) - return {"status": True, "message": "", "results":{"prediction": prediction},"sender_id": sender_id} + return {"status": True, "message": "", "results":{target: prediction},"sender_id": sender_id} def readyToTrain(self): self.ready_to_train = True @@ -389,6 +439,7 @@ class MultiComponentsManager(): target = model.getTarget() if not target in list(data.columns): return {"status": False, "message": 'Target field not found in the dataset'} + data = model.filterRows(data) if data.empty: return {"status": False, "message": "An error occured, dataframe is empty after filtering"} diff --git a/morphemic-persistent-storage/.DS_Store b/morphemic-persistent-storage/.DS_Store index 0617f43e32b18a641eb10087291c440261bba41c..38432110c6b646bcd47d9632a5aa149660a1d553 100644 Binary files a/morphemic-persistent-storage/.DS_Store and b/morphemic-persistent-storage/.DS_Store differ diff --git a/morphemic-persistent-storage/database/.DS_Store b/morphemic-persistent-storage/database/.DS_Store index ea777f464545b9ae8971c1d1f15939bcf5d77700..c187e3dba7e2a4756977f73f78f6a8a44449bf76 100644 Binary files a/morphemic-persistent-storage/database/.DS_Store and b/morphemic-persistent-storage/database/.DS_Store differ diff --git a/morphemic-persistent-storage/database/inputapi/.DS_Store b/morphemic-persistent-storage/database/inputapi/.DS_Store index 34656b34614ea8fa495da593ce87e09ba998c208..e7640f4f37dfeeb9be3adfe5c341761935651dfb 100644 Binary files a/morphemic-persistent-storage/database/inputapi/.DS_Store and b/morphemic-persistent-storage/database/inputapi/.DS_Store differ diff --git a/morphemic-persistent-storage/database/inputapi/src/app.py b/morphemic-persistent-storage/database/inputapi/src/app.py index db36fe4cd50fce666c36f443d4616e6524f30d77..73e7ff56b92a48a210f400ef0ad94c148c9bbcb0 100644 --- a/morphemic-persistent-storage/database/inputapi/src/app.py +++ b/morphemic-persistent-storage/database/inputapi/src/app.py @@ -100,11 +100,12 @@ class ApplicationConfig(): def __init__(self): self.applications = {} self.read_try = False + self.loadConfig() def get(self, application_name, component_name): - key = "{0}_{1}".format(application_name,component_name) - if key in self.applications: - element_config = self.applications[key] + #key = "{0}_{1}".format(application_name,component_name) + if component_name in self.applications: + element_config = self.applications[component_name] return element_config.toJSON() return {} @@ -116,7 +117,22 @@ class ApplicationConfig(): def add(self, application_name, component_name,variant, hardware, memory, cores, instances): element_config = ElementConfig(application_name, component_name,variant, hardware, memory, cores, instances) - self.applications[element_config.getKey()] = element_config + self.applications[element_config.getComponentName()] = element_config + #self.saveConfigs() + + def loadConfig(self): + _path = "./configs.pickle" + if os.path.exists(_path): + _file = open(_path,"rb") + self.applications = pickle.load(_file) + _file.close() + + def saveConfigs(self): + filename = "./configd.pickle" + _file = open(filename,"wb") + pickle.dump(self.applications, _file) + _file.close() + class Publisher(Thread): def __init__(self): @@ -498,6 +514,7 @@ class Ingestor(Thread): def run(self): self.connect() + self.application_config.loadConfig() while True: size = len(self.list_content) for _couple in self.list_content: @@ -546,6 +563,7 @@ class Ingestor(Thread): def insert(self, _couple): # {'timestamp': time.time(),'name':metric,'labels':{'hostname':'localhost','application':'application-1'},'value': measurement} # {“metricName”: “name”, “metricValue”: value, “timestamp”: time, “application”: “applicationName”, “level”: 1…} + self.application_config.loadConfig() fields = None topic = None try: @@ -559,11 +577,12 @@ class Ingestor(Thread): if not metric_name_field_timestamp in fields: return None # self.tolerance_manager.addTime(fields["application"], fields["timestamp"]) - application = None - if not metric_name_field_application in fields: - application = "default_application" - else: - application = fields[metric_name_field_application] + application = "default_application" + #application = None + #if not metric_name_field_application in fields: + # application = "default_application" + #else: + # application = fields[metric_name_field_application] timestamp = int(fields[metric_name_field_timestamp]/1000) #time in sec #for an unknown error, i'll reduce the timestamp to 5 minutes for debugging purpose #timestamp -= 60*5 @@ -752,7 +771,7 @@ class InputApi: self.saveSubscriptions(result) self.sendMetricToPredictToPolmorphicSolver(_json) self.startConsuming() - self.startApplicationStateCollector() + #self.startApplicationStateCollector() except Exception as e: print(e) @@ -788,12 +807,16 @@ class InputApi: if _json["request"] == "current_config": #{"request": "set_state", "application_name": "name": "components":[{"name":"component_name", "variant": "variant", "hardware":"CPU", "memory": mem, "cores": cores, "instances": n_instances}]} - application_name = "default_application" #_json["application"] + if not "application" in _json: + application_name = "default_application" #_json["application"] + else: + application_name = _json["application"] for component in _json["data"]: self.application_config.add(application_name, component["component"],component["variant"], component["hardware"], component["memory"], component["cores"], component["instances"]) print("===========Configuration for {0}===================".format(component["component"])) print(component) print("=============================================================================") + self.application_config.saveConfigs() if _json["request"] == "make_dataset": if not "application" in _json: print("Application name missing") diff --git a/polymorphic_solver/.DS_Store b/polymorphic_solver/.DS_Store index 4318aa0f7094474d880697f459b542f343d8d671..9c26108df807a8afecb8e6c5c6b4dafcf816f89a 100644 Binary files a/polymorphic_solver/.DS_Store and b/polymorphic_solver/.DS_Store differ diff --git a/polymorphic_solver/src/app.py b/polymorphic_solver/src/app.py index e1362b94b43e490de095606500fc844910526a3f..644416d307edb92634937e5497038dd4f416964e 100644 --- a/polymorphic_solver/src/app.py +++ b/polymorphic_solver/src/app.py @@ -43,11 +43,11 @@ collect_tolerance = float(os.environ.get("COLLECT_TOLERANCE","1.0")) proactive_scheduler_url = os.environ.get("PROACTIVE_URL","http://localhost:7879/node_candidates") polymorphic_solver_topic = os.environ.get("POLYMORPHIC_TOPIC","/topic/polymorphic_solver") -utility_generator_feedback_topic = os.environ.get("UTILITY_GENERATOR_FEEDBACK_TOPIC","/topic/solver_ug_reply") +utility_generator_feedback_topic = os.environ.get("UTILITY_GENERATOR_FEEDBACK_TOPIC","/topic/performance_module_predict_feedback") ps_management_topic = os.environ.get("PS_MANAGEMENT_TOPIC", "persistent_storage") performance_module_topic = os.environ.get("PERFORMANCE_MODULE_TRAIN_TOPIC","performance_module_train") performance_module_config_topic = os.environ.get("PERFORMANCE_MODULE_CONFIG","performance_module_config") -analysis_frequency = int(os.environ.get("ANALYSIS_FREQUENCY","240")) #Freezing time after a new redeployment +analysis_frequency = int(os.environ.get("ANALYSIS_FREQUENCY","600")) #Freezing time after a new redeployment logname = "/var/log/polymorphic_solver.log" logging.basicConfig(filename=logname,filemode='a',format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',datefmt='%H:%M:%S',level=logging.DEBUG) @@ -249,7 +249,8 @@ class PolymorphicSolver(): if self.setApplicationData(_json): self.virtual_application.setApplicationName(_json['application']) self.virtual_application.setApplicationId(_json['application_id']) - + self.virtual_application.setFormulas(_json["formulas"]) + self.virtual_application.setUtilityMetricName(_json["utility_metric_name"]) self.virtual_application.setNumberOfComponents(self.application_data['number_of_components']) self.sendApplicationIdToPersistentStorage(data['application_id']) self.sendTrainRequestToPerformanceModule() @@ -320,6 +321,8 @@ class PolymorphicSolver(): data = {} application_name = self.virtual_application.getApplicationName() data["application"] = application_name + data["formulas"] = self.virtual_application.getFormulas() + data["utility_metric_name"] = self.virtual_application.getUtilityMetricName() component_combinations = {} for component_name, component in self.virtual_application.items(): @@ -339,6 +342,8 @@ class PolymorphicSolver(): for metric in self.metric_to_predict_message: res = {"target": metric['metric'], "components_config": model_to_dict(model)} models.append(res) + #res = {"target": data["utility_metric_name"], "components_config": model_to_dict(model)} + #models.append(res) data["models"] = models self.publisher.setParameters(data, performance_module_topic) @@ -401,7 +406,6 @@ class PolymorphicSolver(): return None if data['request'] == "prediction_ready": - print("Application ready message received") self.application_ready = True if self.first_deployment_completed: if not self.rl_engine_started: @@ -436,6 +440,9 @@ class PolymorphicSolver(): def ugFeedBack(self, data, metadata): + print('-------Performance estimation------------') + print(data) + print('-----------------------------------------') _data = None try: _data = json.loads(data) @@ -443,6 +450,10 @@ class PolymorphicSolver(): print("Could not parse utility generator response") print(e) if _data: + if not "sender_id" in _data: + print("An error occured") + print(_data) + return None sender_id = _data["sender_id"] consumer_id = sender_id[:sender_id.index("-")] if consumer_id in self.agent_consumers: @@ -496,10 +507,7 @@ class PolymorphicSolver(): return False def setApplicationData(self, data): - print("==============Data received==================") - print(data) app_name = data['application'] #debug - print("=============================================") self.application_data = data if "components" in data: for component in data["components"]: @@ -643,12 +651,13 @@ class PolymorphicSolver(): def stopRLEngine(self): self.program.wait() + self.rl_engine_started = False def restartRLEngine(self): local_resources = lp_utils.to_device( program_nodes=self.program.groups.keys(), nodes_on_gpu=[] ) - + self.rl_engine_started = False lp.launch( self.program, launch_type="test_mt", diff --git a/polymorphic_solver/src/morphemic.py b/polymorphic_solver/src/morphemic.py index 4d558a3693b734519e42fc822b6f2723a2610205..ec18df744cae2fe1a61ceeb94fffd2270c3b5a43 100644 --- a/polymorphic_solver/src/morphemic.py +++ b/polymorphic_solver/src/morphemic.py @@ -361,19 +361,15 @@ class CamelTransformer(): filename = "camel_{0}.xmi".format(int(time.time())) data = {"filename": filename, "resource_name": self.applicationId} response = requests.post(url=camel_converter_url+"/export_model", data=json.dumps(data), headers={"Content-Type":"application/json"}) - try: - print("Camel converter response", response.text) - if json.loads(response.text)["status"]: - print("Initial camel loaded successfully") - self.camel = Camel(self.applicationId) - print(default_camel_folder+filename) - self.camel.loadCamelFromXmi(default_camel_folder+filename) - return self.camel.getComponentsJSON() - else: - print("An error occured while loading the camel model, camel converter response") - print(response.text) - except Exception as e: - print(e) + print("Camel converter response", response.text) + if json.loads(response.text)["status"]: + print("Initial camel loaded successfully") + self.camel = Camel(self.applicationId) + self.camel.loadCamelFromXmi(default_camel_folder+filename) + return self.camel.getComponentsJSON() + else: + print("An error occured while loading the camel model, camel converter response") + print(response.text) return None def getGroupId(self, current_groups, component_name): diff --git a/polymorphic_solver/src/xmi_camel_utilities.py b/polymorphic_solver/src/xmi_camel_utilities.py index e1248f0ad2b728ae7078ea95535dae817576f0bc..90b1e744a844b080fc095275643921bbfa2aa355 100644 --- a/polymorphic_solver/src/xmi_camel_utilities.py +++ b/polymorphic_solver/src/xmi_camel_utilities.py @@ -324,6 +324,8 @@ class Camel(): self.initial_camel_xmi = None self.camel_original_filename = None self.camel_file_headers = None + self.formulas = None + self.utility_metric_name = None self.tree = None def setNumberOfComponent(self, _number): @@ -698,6 +700,27 @@ class Camel(): def getMetric(self, index): metrics = self.getElements("metrics") return metrics[index].get("name") + + def isUtilityTemplate(self, index_template, index_metric_model): + metric_model = self.getElements("metricModels")[index_metric_model] + template = list(metric_model.iter("templates"))[index_template] + return template.attrib["name"] == "UtilityTemplate" + + def getFormulas(self): + result = {} + utility_function_name = None + for metric in self.getElements("metrics"): + if "formula" in metric.attrib: + if "metricTemplate" in metric.attrib: + content_metric_template = metric.attrib["metricTemplate"] + first_index = content_metric_template.index(".") + index_metric_model = int(content_metric_template[first_index+1:content_metric_template.rindex("/")]) + last_index = content_metric_template.rindex(".") + template_id = int(content_metric_template[last_index+1:]) + if self.isUtilityTemplate(template_id, index_metric_model): + utility_function_name = metric.attrib["name"] + result[metric.attrib["name"]] = metric.attrib["formula"] + return result, utility_function_name def getComponentIndexFromObjectContext(self, index): object_context = self.getElements("objectContexts")[index] @@ -766,6 +789,7 @@ class Camel(): self.application_name = app_element.attrib['name'] #get components deployment component_index = 0 + self.formulas, self.utility_metric_name = self.getFormulas() for comp in self.getElements("softwareComponents"): variants, hws = [], [] component_name = comp.attrib['name'] @@ -782,6 +806,7 @@ class Camel(): variants, hws = self.getVariantsAndHws(comp) reqs_list = self.getElements("requirements") metrics = self.getMetrics(component_index) + min_instances, max_instances = None, None if index_horizontal_req: min_instances, max_instances = self.getMinMaxInstances(reqs_list,index_horizontal_req) @@ -814,7 +839,7 @@ class Camel(): return self.components def getComponentsJSON(self): - result = {"application": self.application_name, "application_id": self.applicationId, "components": [], "number_of_components": self.number_of_component} + result = {"application": self.application_name, "application_id": self.applicationId, "components": [], "number_of_components": self.number_of_component, "formulas": self.formulas, "utility_metric_name": self.utility_metric_name} for _, comp in self.components.items(): result["components"].append(comp.toJSON()) return result