diff --git a/grouping/PPO_cg.py b/grouping/PPO_cg.py index cdd42d35d756bf099f7fe8b9f91af09fcdb25431..121749ff8c9dbd0073573d4d7142953147aeac86 100644 --- a/grouping/PPO_cg.py +++ b/grouping/PPO_cg.py @@ -37,894 +37,859 @@ uri_notification = None request_type = None full_filepath = None -components_indexes_map = {} - -def get_component_name_from_solutions(solutions): - global components_indexes_map - result = [] - for solution in solutions: - solution_with_name = [] - for component_index in solution: - solution_with_name.append(components_indexes_map[component_index-1]) - result.append(solution_with_name) - return result - -def groupings(file_): - global filename_ - filename_ = file_ - - visitedList = [[]] - - - #env variables that are used inside the program - activemq_hostname = os.environ.get("ACTIVEMQ_HOST", "localhost") - activemq_port = int(os.environ.get("ACTIVEMQ_PORT", "61613")) - activemq_username = os.environ.get("ACTIVEMQ_USERNAME", "morphemic") - activemq_password = os.environ.get("ACTIVEMQ_PASSWORD", "morphemic") - amq_topic_to_send = os.environ.get("AMQ_TOPIC_REQUEST_CG", "/topic/solver_ug_request") - amq_topic_utility_receive = os.environ.get("AMQ_TOPIC_CG_REPLY", "/topic/solver_ug_reply") - - def create_input_utility(sol_1): - - resources_values=[] - resources_components=[('CPU', 'GPU', 'FPGA', 'RAM')] - total_resources=[] - list_to_tuple=[] - r_f_list=[] - cpu, ram = [],[] - cut_=[] - CPU, GPU, FPGA, RAM = 0,0,0,0 - for element in range(len(app.data['Application_components'])): - for e in app.data['Application_components'][element]['Resources']: - for k,v in e.items(): - if k == 'CPU': - CPU= int(list(v[1].values())[0]) - elif k == 'GPU': - GPU=int(list(v[1].values())[0]) - elif k == 'FPGA': - FPGA=int(list(v[1].values())[0]) - elif k == 'RAM': - RAM=int(list(v[1].values())[0]) - - total_resources.append((CPU, GPU, FPGA, RAM)) - CPU, GPU, FPGA, RAM = 0,0,0,0 +def set_component_names_from_solutions(filename_, solutions): - for element in total_resources: - list_to_tuple.append(list(element)) - + with open(filename_, 'r+') as f: + data = json.load(f) - for element in sol_1: - for idx,e in enumerate(element): - cpu.append(list_to_tuple[e-1][0]) - ram.append(list_to_tuple[e-1][3]) - + no_of_components = data['Number_of_components'] + component_index_map={} + name_of_components=[] + for element in range(no_of_components): + name_of_components.append(data['Application_components'][element]['name']) + i=0 - for i in range(len(element)): + for i in range(no_of_components): + component_index_map.update({i+1:name_of_components[i]}) - r_f_list.append({'_cores':max(cpu)}) - r_f_list.append({'_mem': max(ram)}) - r_f_list.append({'_instances': int(list(list(app.data['Application_components'][i]['Horizontal scaling requirements'][0].values())[0][1].values())[0])}) - r_f_list.append({'_variant': "[]"}) - r_f_list.append({'_hardware':'CPU'}) - - cut_.append(r_f_list) - r_f_list=[] - cpu, ram = [], [] - + for element in solutions: + for idx,e in enumerate(element): + element[idx] = component_index_map[e] + return solutions + +def groupings(file_): + time.sleep(1200) - return total_resources, {'target': 'utility', 'sender_id': get_random_ug_id(), 'combination': sol_1, 'variables': cut_[0]} - - def hardware_solutions(solutions): - keep_resources_gpu=[] - keep_resources_fpga=[] - for element in solutions: - total_resources= create_input_utility(element)[0] - for e in element: - if len(e) ==1: - if total_resources[e[0]-1][1] !=0: - keep_resources_gpu.append(element) - for e in element: - if len(e) ==1: - if total_resources[e[0]-1][2] !=0: - keep_resources_fpga.append(element) - return keep_resources_gpu, keep_resources_fpga - + while True: + global filename_ + filename_ = file_ - def sorted_list(list_solutions): - list_to_be_changed=[] - solutions=[] - final_solutions=[] - for idx,element in enumerate(list_solutions): - for e in element: - e.sort() - list_to_be_changed.append(e) - solutions.append(list_to_be_changed) - list_to_be_changed=[] - - for idx, element in enumerate(solutions): - x = sorted(element, key=lambda x: x[0]) - final_solutions.append(x) + visitedList = [[]] - return final_solutions - def create_map_actions(solutions): - solutions_string_type=[] - solution_number=[] - for idx,element in enumerate(solutions): + #env variables that are used inside the program + activemq_hostname = os.environ.get("ACTIVEMQ_HOST", "localhost") + activemq_port = int(os.environ.get("ACTIVEMQ_PORT", "61613")) + activemq_username = os.environ.get("ACTIVEMQ_USERNAME", "morphemic") + activemq_password = os.environ.get("ACTIVEMQ_PASSWORD", "morphemic") + amq_topic_to_send = os.environ.get("AMQ_TOPIC_REQUEST_CG", "/topic/solver_ug_request") + amq_topic_utility_receive = os.environ.get("AMQ_TOPIC_CG_REPLY", "/topic/solver_ug_reply") - solution_number.append(idx) - solutions_string_type.append(str(element)) + def create_input_utility(sol_1): - return solution_number, solutions_string_type + resources_values=[] + resources_components=[('CPU', 'GPU', 'FPGA', 'RAM')] + total_resources=[] + list_to_tuple=[] + r_f_list=[] + cpu, ram = [],[] + cut_=[] + CPU, GPU, FPGA, RAM = 0,0,0,0 + for element in range(len(app.data['Application_components'])): + for e in app.data['Application_components'][element]['Resources']: + for k,v in e.items(): + if k == 'CPU': + CPU= int(list(v[1].values())[0]) + elif k == 'GPU': + GPU=int(list(v[1].values())[0]) + elif k == 'FPGA': + FPGA=int(list(v[1].values())[0]) + elif k == 'RAM': + RAM=int(list(v[1].values())[0]) + + total_resources.append((CPU, GPU, FPGA, RAM)) + CPU, GPU, FPGA, RAM = 0,0,0,0 - def create_str_groupings(additional_resources): - for idx, element in enumerate(additional_resources): - additional_resources[idx] = str(element) - - return additional_resources - - def remove_duplicates(x): - return list(dict.fromkeys(x)) - - - def MAP_ACTIONS_RESOURCES(solutions): - initial_actions = [] - code_actions=[] - cpu_combinations = [] - total_combinations= [] - for idx, element in enumerate(solutions): - total_resources = create_input_utility(element)[0] - combination_input_utility = create_input_utility(element)[1] - cpu_combinations.append((idx, combination_input_utility)) - gpu_solutions, fpga_solutions = hardware_solutions(solutions) - solutions = sorted_list(solutions) - gpu_solutions = sorted_list(gpu_solutions) - gpu_solutions = create_str_groupings(gpu_solutions) - gpu_solutions = remove_duplicates(gpu_solutions) - action_numbr_list, groupings_list = create_map_actions(solutions) - number_of_actions, original_groupings = create_map_actions(solutions) - for element in original_groupings: - if gpu_solutions!=None: - for e in gpu_solutions: - if e == element: - idx = original_groupings.index(element) - - new_action = create_input_utility(ast.literal_eval(original_groupings[idx]))[1] - combination = new_action['combination'] - - - for element in combination: - if len(element) ==1: - index_ = element[0]-1 - if total_resources[index_][1]!=0: - - gpu_value = total_resources[index_][1] - cores_ = new_action['variables'][element[0]-1][0] - hardware_ = new_action['variables'][element[0]-1][4] - cores_['_cores'] = gpu_value - hardware_['_hardware'] = "GPU" - total_combinations.append(new_action) - new_action=None - - if fpga_solutions!=None: - for e in fpga_solutions: - if e == element: - idx = original_groupings.index(element) - - new_action = create_input_utility(ast.literal_eval(original_groupings[idx])) - - - for element in combination: - if len(element) ==1: - index_ = element[0]-1 - if total_resources[index_][2]!=0: - fpga_value = total_resources[index_][1] - cores_ = new_action['variables'][element[0]-1][0] - hardware_ = new_action['variables'][element[0]-1][4] - cores_['_cores'] = fpga_value - hardware_['_hardware'] = "FPGA" - total_combinations.append(new_action) - - for solution in solutions: - initial_actions.append(create_input_utility(solution)[1]) - - total_actions = initial_actions+total_combinations - - for i in range(len(total_actions)): - code_actions.append(i) - - return total_actions, code_actions - - - def check_uniquenss(List): - result = all(element == 1 for element in List) - if (result): - res_ = True - else: - res_ = False - - return res_ - - def filter_dfs(visited_list): - new_x = [] - added_x = [] - combinations_all_len=[] - differences_=[] - indexes=[] - final_workflow_rule = [] - rules=[] - ff_rules=[] - - for element in visited_list: - if len(element)>1: - for e_ in element: - added_x.append(e_+1) - new_x.append(added_x) - added_x=[] - else: - pass + for element in total_resources: + list_to_tuple.append(list(element)) + - for j in new_x: - len_ = len(j) - for i in range(2,len_): - lst_ = list(combinations(j, i)) - for element in lst_: - combinations_all_len.append(list(element)) + for element in sol_1: + for idx,e in enumerate(element): + cpu.append(list_to_tuple[e-1][0]) + ram.append(list_to_tuple[e-1][3]) + + for i in range(len(element)): - for element in combinations_all_len: - differences_.append(list(diff(element))) - - for idx,e in enumerate(differences_): - if len(e)==1 and e[0]==1: - indexes.append(idx) - elif len(e)>1 and check_uniquenss(e) == True: - indexes.append(idx) - for element in indexes: - final_workflow_rule.append(combinations_all_len[element]) - for element in new_x: - rules.append(str(element)) - for element in final_workflow_rule: - rules.append(str(element)) - mylist = list( dict.fromkeys(rules)) - for element in mylist: - ff_rules.append(ast.literal_eval(element)) - - return ff_rules + r_f_list.append({'_cores':max(cpu)}) + r_f_list.append({'_mem': max(ram)}) + r_f_list.append({'_instances': int(list(list(app.data['Application_components'][i]['Horizontal scaling requirements'][0].values())[0][1].values())[0])}) + r_f_list.append({'_variant': "[]"}) + r_f_list.append({'_hardware':'CPU'}) + + cut_.append(r_f_list) + r_f_list=[] + cpu, ram = [], [] + + return total_resources, {'target': 'utility', 'sender_id': get_random_ug_id(), 'combination': sol_1, 'variables': cut_} + + def hardware_solutions(solutions): + keep_resources_gpu=[] + keep_resources_fpga=[] + for element in solutions: + total_resources= create_input_utility(element)[0] + for e in element: + if len(e) ==1: + if total_resources[e[0]-1][1] !=0: + keep_resources_gpu.append(element) + for e in element: + if len(e) ==1: + if total_resources[e[0]-1][2] !=0: + keep_resources_fpga.append(element) + return keep_resources_gpu, keep_resources_fpga + - class Listener_(stomp.ConnectionListener): + def sorted_list(list_solutions): + list_to_be_changed=[] + solutions=[] + final_solutions=[] + for idx,element in enumerate(list_solutions): + for e in element: + e.sort() + list_to_be_changed.append(e) + solutions.append(list_to_be_changed) + list_to_be_changed=[] + + for idx, element in enumerate(solutions): + x = sorted(element, key=lambda x: x[0]) + final_solutions.append(x) - def __init__(self): - self.message_list=[] + return final_solutions - def on_error(self, headers, message): - print('received an error "%s"' % message) - def on_message(self, headers, message): - self.message_list.append(message) + def create_map_actions(solutions): + solutions_string_type=[] + solution_number=[] + for idx,element in enumerate(solutions): - def amq_send_receive(data_): - connected = False - while not connected: - try: - hosts = [(activemq_hostname, activemq_port)] - conn = stomp.Connection(host_and_ports=hosts) - listener_ = Listener_() - conn.set_listener('', listener_) - conn.connect(activemq_username, activemq_password, wait=True) - connected = True - except Exception as e: - print("Connection failed, process will retry in 5 seconds") - time.sleep(5) - # Register a subscriber with ActiveMQ. This tells ActiveMQ to send - # all messages received on the topic 'topic-1' to this listener - try: - conn.subscribe(destination=amq_topic_to_send, ack='auto') - time.sleep(1) - conn.send(body=json.dumps(data_), destination=amq_topic_to_send) - time.sleep(1) - conn.disconnect() - conn.connect(activemq_username, activemq_password, wait=True) - conn.subscribe(destination=amq_topic_utility_receive, ack='auto') - time.sleep(5) - while True: - for element in listener_.message_list: - e_ = json.loads(element) - if e_['sender_id'] == data_['sender_id']: - result = e_["utility"] - conn.disconnect() - break - else: - continue - except: - print("Could not connect to the ACTIVEMQ Server") - result = None - - return result - - #function to assign a unique id to each request in utility generator - def get_random_ug_id(): - # choose from all lowercase letter - characters = string.ascii_letters + string.digits - password = ''.join(random.choice(characters) for i in range(8)) - return password - - #function to retrive the data from the .json file produced by the WF Analyzer - def export_data(): - ''' - This function is used to acquire the data fromcd Pro - the json file - Returns: the data/ the content - ''' - #final_path = os.path.join(os.getcwd(), file_) - with open(filename_, 'r+') as f: - data = json.load(f) - return data - - - #function to create all the possible paths from the application's workflow - visitedList = [[]] - def depthFirst(graph, currentVertex, visited): - visited.append(currentVertex) - for vertex in graph[currentVertex]: - if vertex not in visited: - depthFirst(graph, vertex, visited.copy()) - visitedList.append(visited) - return visitedList - - - #function to split the relative paths to smaller ones - def split_list(alist, wanted_parts=1): - length = len(alist) - return [ alist[i*length // wanted_parts: (i+1)*length // wanted_parts] - for i in range(wanted_parts) ] - - #function for supporting to filter combinations based on the combinations derived from the application's workflow - def intersection(lst1, lst2): - return list(set(lst1) & set(lst2)) - - #function to create all the possible combinations in lists based the nummber of elements in a list - def combination_per_len(components,r): - combinations_ = list(combinations(components, r)) - for idx, element in enumerate(combinations_): - combinations_[idx] = list(element) - return combinations_ - - - - ############################################## - ''' THE ENTIRE FUNCTIONALITY FOR COMMUNICATION BETWEEN CG AND UTILITY GENERATOR - - reads the json file from the WF Analyzer and exploits it - - create the app workflow graph and creates the filter for combinations - - usage of dlx algorithms to find the correct combinations - - creates the resources of each combinations and produces the input for th UG''' - - class Application_model: - def __init__(self): - self.data = export_data() - self.application_components = self.data['Number_of_components'] # i.e. 2 if the App has two Components - self.app_list = [i+1 for i in range(self.application_components)] # [1,2,...,n] - self.app_worfklow = self.data['Application_graph'] #to retrieve the application's flow - self.dict_cpus = {} - self.dict_ram = {} - self.dict_hardware={} - self.dict_variant={} - self.dict_instances={} - self.resources={} - self.final_resources=[] #[{'Component': 1, 'CORES': 1, 'RAM': 8100}, {'Component': 2, 'CORES': 1, 'RAM': 1024}] - self.solutions= [] #[[[1], [2]], [[1, 2]]] all the combinations - self.resource_combination = None - self.str_actions = None #['[[1], [2]]', '[[1, 2]]'] - self.workflow_rules=None - self.no_of_actions = None - - def transform_workflow(self): - new_dict,dict_ = {}, {} - component_name_list,list_with_keys = [], [] - for idx,item in enumerate(self.data['Application_graph']): - new_dict[list(item.keys())[0]] = list(item.values())[0] - - for idx in range(self.data['Number_of_components']): - component_name_list.append(self.data['Application_components'][idx]['name']) - original_values = list(new_dict.values()) - for element in original_values: - for id_, each_element in enumerate(element): - for idx_, e in enumerate(component_name_list): - if each_element == e: - element[id_] = idx_ - for i in range(0,len(original_values)): - list_with_keys.append(str(i)) - for idx,element in enumerate(component_name_list): - dict_.update({idx: original_values[idx]}) - - return dict_ + solution_number.append(idx) + solutions_string_type.append(str(element)) - def set_actions(self): - self.get_workflow() - self.create_combinations() - self.get_combinations() - self.no_of_actions = len(self.solutions) + return solution_number, solutions_string_type - def get_default_conf(self): - return [[i+1] for i in range(self.application_components)] + def create_str_groupings(additional_resources): + for idx, element in enumerate(additional_resources): + additional_resources[idx] = str(element) + + return additional_resources + + def remove_duplicates(x): + return list(dict.fromkeys(x)) + + + def MAP_ACTIONS_RESOURCES(solutions): + initial_actions = [] + code_actions=[] + cpu_combinations = [] + total_combinations= [] + for idx, element in enumerate(solutions): + total_resources = create_input_utility(element)[0] + combination_input_utility = create_input_utility(element)[1] + cpu_combinations.append((idx, combination_input_utility)) + gpu_solutions, fpga_solutions = hardware_solutions(solutions) + solutions = sorted_list(solutions) + gpu_solutions = sorted_list(gpu_solutions) + gpu_solutions = create_str_groupings(gpu_solutions) + gpu_solutions = remove_duplicates(gpu_solutions) + action_numbr_list, groupings_list = create_map_actions(solutions) + number_of_actions, original_groupings = create_map_actions(solutions) + for element in original_groupings: + if gpu_solutions!=None: + for e in gpu_solutions: + if e == element: + idx = original_groupings.index(element) + + new_action = create_input_utility(ast.literal_eval(original_groupings[idx]))[1] + combination = new_action['combination'] + + + for element in combination: + if len(element) ==1: + index_ = element[0]-1 + if total_resources[index_][1]!=0: + + gpu_value = total_resources[index_][1] + cores_ = new_action['variables'][element[0]-1][0] + hardware_ = new_action['variables'][element[0]-1][4] + cores_['_cores'] = gpu_value + hardware_['_hardware'] = "GPU" + total_combinations.append(new_action) + new_action=None + + if fpga_solutions!=None: + for e in fpga_solutions: + if e == element: + idx = original_groupings.index(element) + + new_action = create_input_utility(ast.literal_eval(original_groupings[idx])) + + + for element in combination: + if len(element) ==1: + index_ = element[0]-1 + if total_resources[index_][2]!=0: + fpga_value = total_resources[index_][1] + cores_ = new_action['variables'][element[0]-1][0] + hardware_ = new_action['variables'][element[0]-1][4] + cores_['_cores'] = fpga_value + hardware_['_hardware'] = "FPGA" + total_combinations.append(new_action) + + for solution in solutions: + initial_actions.append(create_input_utility(solution)[1]) + + total_actions = initial_actions+total_combinations - def reset_resources(self): - self.dict_cpus={} - self.dict_ram={} - self.dict_hardware={} - self.dict_variant={} + for i in range(len(total_actions)): + code_actions.append(i) - - def get_workflow(self): - graph = {'graph': self.transform_workflow()} - graph = {int(k):[int(i) for i in v] for k,v in graph['graph'].items()} - list_=[] - visitedList = depthFirst(graph, 0, []) - self.workflow_rules = filter_dfs(visited_list=visitedList) - time.sleep(5) - - def combination_list(self): - final_list=[] - self.get_workflow() - final_list.append(self.app_list) - for element in self.app_list: - final_list.append([element]) - - for r in range(2, len(self.app_list)): - combinations_ = combination_per_len(self.app_list, r) + return total_actions, code_actions - for element in combinations_: - for rule in self.workflow_rules: - if intersection(element, rule) == rule and len(element) == len(rule): - final_list.append(element) - return final_list + def check_uniquenss(List): + result = all(element == 1 for element in List) + if (result): + res_ = True + else: + res_ = False + return res_ - def create_combinations(self): - return self.final_resources - - def get_combinations(self): - - id = [i+1 for i in range(self.application_components)] - combinations = self.combination_list() - X = id - Y = makeY_(final_list(X, combinations)) - X_set = prepareX(X, Y) - self.solutions = find_solutions(X_set, Y) - return self.solutions - - def filter_node_candidates(self): - pass + def filter_dfs(visited_list): + new_x = [] + added_x = [] + combinations_all_len=[] + differences_=[] + indexes=[] + final_workflow_rule = [] + rules=[] + ff_rules=[] - def create_single_resources(self, i, resource): - mid_cores_list,mid_mem_list,f_list,list_with_min=[],[],[],[] - for element in self.solutions[i]: + for element in visited_list: if len(element)>1: - for e in element: - idx= e-1 - mid_cores_list.append(self.final_resources[idx][resource]) - f_list.append(mid_cores_list) - mid_cores_list=[] + for e_ in element: + added_x.append(e_+1) + new_x.append(added_x) + added_x=[] else: - idx=element[0]-1 + pass - mid_cores_list.append(self.final_resources[idx][resource]) - f_list.append(mid_cores_list) - mid_cores_list=[] - for element in f_list: + for j in new_x: + len_ = len(j) + for i in range(2,len_): + lst_ = list(combinations(j, i)) + for element in lst_: + combinations_all_len.append(list(element)) - min_=0 - if len(element)>1: - min_=min(element) - for i in range(len(element)): - list_with_min.append(min_) - - else: - list_with_min.append(element[0]) - - return list_with_min - - - ''' RL CUSTOM ENVIRONMENT - - ACTIONS: {"0": [[[1,2]], "1": [[1], [2]]]} (i.e. in an app with 2 components - - OBSERVATION SPACE: 1D, TAKES THE UTILITY VALUES FOR EACH COMBINATION - - INITIAL ACTION :0 - ''' - - class Application_Env(Env): - def __init__(self): - self.application=Application_model() - self.action_space = Discrete(actions) # number of possible actions - self.observation_space = Box(low = np.array([11]), high = np.array([200]), dtype=np.int64) - self.utility_state = 0 - #self.utility_interactor = UtilityInteractor() - self.action_masked = [i for i in range(len(self.application.get_combinations()))] - self.original_actions = [i for i in self.application.get_combinations()] - self.initial_action = self.application.get_default_conf() - self.masked_initial_action = [len(i) for i in self.original_actions].index(max([len(i) for i in self.original_actions])) - self.list_of_inputs= [] - self.metrics_to_predict = None - - def prepare_inputs(self): - self.application.get_workflow() - self.application.create_combinations() - self.application.get_combinations() - combin_ = MAP_ACTIONS_RESOURCES(self.application.solutions)[0] - self.list_of_actions=MAP_ACTIONS_RESOURCES(self.application.solutions)[1] - self.list_of_inputs = combin_ - - - def get_performance(self, action): - data = self.list_of_inputs[action] - #return data,random.uniform(0.1, 0.9) - print("Data sent -> ",data) - return self.utility_interactor.getPerformance(data) - #return random.uniform(0.1, 0.9) - """ - while True: - utility_value = amq_topic_utility_receive(data, action) - if utility_value != None: - break - else: - continue - return utility_value""" - - def step(self, action): + for element in combinations_all_len: + differences_.append(list(diff(element))) + + for idx,e in enumerate(differences_): + if len(e)==1 and e[0]==1: + indexes.append(idx) + elif len(e)>1 and check_uniquenss(e) == True: + indexes.append(idx) + for element in indexes: + final_workflow_rule.append(combinations_all_len[element]) + for element in new_x: + rules.append(str(element)) + for element in final_workflow_rule: + rules.append(str(element)) + mylist = list( dict.fromkeys(rules)) + for element in mylist: + ff_rules.append(ast.literal_eval(element)) + + return ff_rules - print('ACTION:{}'.format(self.list_of_actions[action])) - input_ = self.get_performance(action) - print(input_) - ''' - JD only to check if it is working - in this part the ulitity should work instead of a random or the - simulated application + + #function to assign a unique id to each request in utility generator + def get_random_ug_id(): + # choose from all lowercase letter + characters = string.ascii_letters + string.digits + password = ''.join(random.choice(characters) for i in range(8)) + return password + + #function to retrive the data from the .json file produced by the WF Analyzer + def export_data(): ''' - #input_, utility = self.get_performance(action) - #util = UtilitySimulator(input_) - #util.generateLoad(lower_bound=11, upper_bound=200) - #load_ = util.load - #util.computePerformance() - #application_performance = util.getPerfomance() - application_performance = random.randint(1,300) - scale_app_performance = (application_performance-0.05)/(300-0.05) - - if scale_app_performance<0: - scale_app_performance = 0 - else: - pass - print('performance based on utility: {}'.format(scale_app_performance)) - if scale_app_performance>=0.7: - reward=1 - done=True - else: - reward = -1 - done=True - info={} - self.state = random.uniform(0.1, 0.9) + This function is used to acquire the data fromcd Pro + the json file + Returns: the data/ the content ''' - if self.state>200: - self.state=200 - elif self.state<11: - self.state=11 - else: - pass - ''' - return self.state, reward, done, info, self.state - - def reset(self): - self.state= 0 - self.initial_action = self.action_space.sample() - self.initial_mask_action= [len(i) for i in self.original_actions].index(max([len(i) for i in self.original_actions])) - return self.state + #final_path = os.path.join(os.getcwd(), file_) + with open(filename_, 'r+') as f: + data = json.load(f) + return data + + + #function to create all the possible paths from the application's workflow + visitedList = [[]] + def depthFirst(graph, currentVertex, visited): + visited.append(currentVertex) + for vertex in graph[currentVertex]: + if vertex not in visited: + depthFirst(graph, vertex, visited.copy()) + visitedList.append(visited) + return visitedList + + + #function to split the relative paths to smaller ones + def split_list(alist, wanted_parts=1): + length = len(alist) + return [ alist[i*length // wanted_parts: (i+1)*length // wanted_parts] + for i in range(wanted_parts) ] + + #function for supporting to filter combinations based on the combinations derived from the application's workflow + def intersection(lst1, lst2): + return list(set(lst1) & set(lst2)) + + #function to create all the possible combinations in lists based the nummber of elements in a list + def combination_per_len(components,r): + combinations_ = list(combinations(components, r)) + for idx, element in enumerate(combinations_): + combinations_[idx] = list(element) + return combinations_ + + + + ############################################## + ''' THE ENTIRE FUNCTIONALITY FOR COMMUNICATION BETWEEN CG AND UTILITY GENERATOR + - reads the json file from the WF Analyzer and exploits it + - create the app workflow graph and creates the filter for combinations + - usage of dlx algorithms to find the correct combinations + - creates the resources of each combinations and produces the input for th UG''' + + class Application_model: + def __init__(self): + self.data = export_data() + self.application_components = self.data['Number_of_components'] # i.e. 2 if the App has two Components + self.app_list = [i+1 for i in range(self.application_components)] # [1,2,...,n] + self.app_worfklow = self.data['Application_graph'] #to retrieve the application's flow + self.dict_cpus = {} + self.dict_ram = {} + self.dict_hardware={} + self.dict_variant={} + self.dict_instances={} + self.resources={} + self.final_resources=[] #[{'Component': 1, 'CORES': 1, 'RAM': 8100}, {'Component': 2, 'CORES': 1, 'RAM': 1024}] + self.solutions= [] #[[[1], [2]], [[1, 2]]] all the combinations + self.resource_combination = None + self.str_actions = None #['[[1], [2]]', '[[1, 2]]'] + self.workflow_rules=None + self.no_of_actions = None + + def transform_workflow(self): + new_dict,dict_ = {}, {} + component_name_list,list_with_keys = [], [] + for idx,item in enumerate(self.data['Application_graph']): + new_dict[list(item.keys())[0]] = list(item.values())[0] + + for idx in range(self.data['Number_of_components']): + component_name_list.append(self.data['Application_components'][idx]['name']) + original_values = list(new_dict.values()) + for element in original_values: + for id_, each_element in enumerate(element): + for idx_, e in enumerate(component_name_list): + if each_element == e: + element[id_] = idx_ + for i in range(0,len(original_values)): + list_with_keys.append(str(i)) + for idx,element in enumerate(component_name_list): + dict_.update({idx: original_values[idx]}) + + return dict_ + def set_actions(self): + self.get_workflow() + self.create_combinations() + self.get_combinations() + self.no_of_actions = len(self.solutions) - def discounted_cumulative_sums(x, discount): - # Discounted cumulative sums of vectors for computing rewards-to-go and advantage estimates - return scipy.signal.lfilter([1], [1, float(-discount)], x[::-1], axis=0)[::-1] + def get_default_conf(self): + return [[i+1] for i in range(self.application_components)] + def reset_resources(self): + self.dict_cpus={} + self.dict_ram={} + self.dict_hardware={} + self.dict_variant={} - class Buffer: - # Buffer for storing trajectories - def __init__(self, observation_dimensions, size, gamma=0.99, lam=0.95): - # Buffer initialization - self.observation_buffer = np.zeros( - (size, observation_dimensions), dtype=np.float32 - ) - self.action_buffer = np.zeros(size, dtype=np.int32) - self.advantage_buffer = np.zeros(size, dtype=np.float32) - self.reward_buffer = np.zeros(size, dtype=np.float32) - self.return_buffer = np.zeros(size, dtype=np.float32) - self.value_buffer = np.zeros(size, dtype=np.float32) - self.logprobability_buffer = np.zeros(size, dtype=np.float32) - self.gamma, self.lam = gamma, lam - self.pointer, self.trajectory_start_index = 0, 0 - - def store(self, observation, action, reward, value, logprobability): - # Append one step of agent-environment interaction - self.observation_buffer[self.pointer] = observation - self.action_buffer[self.pointer] = action - self.reward_buffer[self.pointer] = reward - self.value_buffer[self.pointer] = value - self.logprobability_buffer[self.pointer] = logprobability - self.pointer += 1 - - def finish_trajectory(self, last_value=0): - # Finish the trajectory by computing advantage estimates and rewards-to-go - path_slice = slice(self.trajectory_start_index, self.pointer) - rewards = np.append(self.reward_buffer[path_slice], last_value) - values = np.append(self.value_buffer[path_slice], last_value) - - deltas = rewards[:-1] + self.gamma * values[1:] - values[:-1] - - self.advantage_buffer[path_slice] = discounted_cumulative_sums( - deltas, self.gamma * self.lam - ) - self.return_buffer[path_slice] = discounted_cumulative_sums( - rewards, self.gamma - )[:-1] - - self.trajectory_start_index = self.pointer - - def get(self): - # Get all data of the buffer and normalize the advantages - self.pointer, self.trajectory_start_index = 0, 0 - advantage_mean, advantage_std = ( - np.mean(self.advantage_buffer), - np.std(self.advantage_buffer), - ) - self.advantage_buffer = (self.advantage_buffer - advantage_mean) / advantage_std - return ( - self.observation_buffer, - self.action_buffer, - self.advantage_buffer, - self.return_buffer, - self.logprobability_buffer, - ) + + def get_workflow(self): + graph = {'graph': self.transform_workflow()} + graph = {int(k):[int(i) for i in v] for k,v in graph['graph'].items()} + list_=[] + visitedList = depthFirst(graph, 0, []) + self.workflow_rules = filter_dfs(visited_list=visitedList) + time.sleep(5) + def combination_list(self): + final_list=[] + self.get_workflow() + final_list.append(self.app_list) + for element in self.app_list: + final_list.append([element]) + + for r in range(2, len(self.app_list)): + combinations_ = combination_per_len(self.app_list, r) - def mlp(x, sizes, activation=tf.tanh, output_activation=None): - # Build a feedforward neural network - for size in sizes[:-1]: - x = layers.Dense(units=size, activation=activation)(x) - return layers.Dense(units=sizes[-1], activation=output_activation)(x) + for element in combinations_: + for rule in self.workflow_rules: + if intersection(element, rule) == rule and len(element) == len(rule): + final_list.append(element) + return final_list - def logprobabilities(logits, a): - # Compute the log-probabilities of taking actions a by using the logits (i.e. the output of the actor) - logprobabilities_all = tf.nn.log_softmax(logits) - logprobability = tf.reduce_sum( - tf.one_hot(a, num_actions) * logprobabilities_all, axis=1 - ) - return logprobability + def create_combinations(self): + return self.final_resources + + def get_combinations(self): + + id = [i+1 for i in range(self.application_components)] + combinations = self.combination_list() + X = id + Y = makeY_(final_list(X, combinations)) + X_set = prepareX(X, Y) + self.solutions = find_solutions(X_set, Y) + return self.solutions + + def filter_node_candidates(self): + pass - # Sample action from actor - @tf.function - def sample_action(observation): - logits = actor(observation) - action = tf.squeeze(tf.random.categorical(logits, 1), axis=1) - return logits, action + def create_single_resources(self, i, resource): + mid_cores_list,mid_mem_list,f_list,list_with_min=[],[],[],[] + for element in self.solutions[i]: + if len(element)>1: + for e in element: + idx= e-1 + mid_cores_list.append(self.final_resources[idx][resource]) + f_list.append(mid_cores_list) + mid_cores_list=[] + else: + idx=element[0]-1 + mid_cores_list.append(self.final_resources[idx][resource]) + f_list.append(mid_cores_list) + mid_cores_list=[] + for element in f_list: + + min_=0 + if len(element)>1: + min_=min(element) + for i in range(len(element)): + list_with_min.append(min_) + + else: + list_with_min.append(element[0]) - # Train the policy by maxizing the PPO-Clip objective - @tf.function - def train_policy( - observation_buffer, action_buffer, logprobability_buffer, advantage_buffer - ): + return list_with_min - with tf.GradientTape() as tape: # Record operations for automatic differentiation. - ratio = tf.exp( - logprobabilities(actor(observation_buffer), action_buffer) - - logprobability_buffer - ) - min_advantage = tf.where( - advantage_buffer > 0, - (1 + clip_ratio) * advantage_buffer, - (1 - clip_ratio) * advantage_buffer, - ) - policy_loss = -tf.reduce_mean( - tf.minimum(ratio * advantage_buffer, min_advantage) - ) - policy_grads = tape.gradient(policy_loss, actor.trainable_variables) - policy_optimizer.apply_gradients(zip(policy_grads, actor.trainable_variables)) + ''' RL CUSTOM ENVIRONMENT + - ACTIONS: {"0": [[[1,2]], "1": [[1], [2]]]} (i.e. in an app with 2 components + - OBSERVATION SPACE: 1D, TAKES THE UTILITY VALUES FOR EACH COMBINATION + - INITIAL ACTION :0 + ''' - kl = tf.reduce_mean( - logprobability_buffer - - logprobabilities(actor(observation_buffer), action_buffer) - ) - kl = tf.reduce_sum(kl) - return kl - - - # Train the value function by regression on mean-squared error - @tf.function - def train_value_function(observation_buffer, return_buffer): - with tf.GradientTape() as tape: # Record operations for automatic differentiation. - value_loss = tf.reduce_mean((return_buffer - critic(observation_buffer)) ** 2) - value_grads = tape.gradient(value_loss, critic.trainable_variables) - value_optimizer.apply_gradients(zip(value_grads, critic.trainable_variables)) - - - # Hyperparameters of the PPO algorithm - - - - - # Initialize the environment and get the dimensionality of the - # observation space and the number of possible actions - #filename_ = 'wf.json' - combinations_ = [] - filename_ = file_ - app= Application_model() - app.set_actions() - x = app.solutions - utilities, code_actions = MAP_ACTIONS_RESOURCES(x) - global actions - actions = len(code_actions) - print('Initializing actions......') - - steps_per_epoch = 30*actions - epoch = 20*actions - gamma = 0.99 - clip_ratio = 0.2 - policy_learning_rate = 3e-4 - value_function_learning_rate = 1e-3 - train_policy_iterations = 80 - train_value_iterations = 80 - lam = 0.97 - target_kl = 0.01 - hidden_sizes = (64, 64) - - # True if you want to render the environment - render = False - time.sleep(2) - print('No of possible grouping combinations: {} including all hardware options'.format(actions)) - time.sleep(1) - for utility in utilities: - combinations_.append(utility['combination']) - print('Combinations are: {}'.format(combinations_)) - env = Application_Env() - env.prepare_inputs() - - observation_dimensions = env.observation_space.shape[0] - num_actions = env.action_space.n - list_of_scores=[] - list_of_groupings=[] - - - # Initialize the buffer - buffer = Buffer(observation_dimensions, steps_per_epoch) - - # Initialize the actor and the critic as keras models - observation_input = keras.Input(shape=(observation_dimensions,), dtype=tf.float32) - logits = mlp(observation_input, list(hidden_sizes) + [num_actions], tf.tanh, None) - actor = keras.Model(inputs=observation_input, outputs=logits) - value = tf.squeeze( - mlp(observation_input, list(hidden_sizes) + [1], tf.tanh, None), axis=1 - ) - critic = keras.Model(inputs=observation_input, outputs=value) - - # Initialize the policy and the value function optimizers - policy_optimizer = tf.keras.optimizers.Adam(learning_rate=policy_learning_rate) - value_optimizer = tf.keras.optimizers.Adam(learning_rate=value_function_learning_rate) - - # Initialize the observation, episode return and episode length - observation, episode_return, episode_length,observation = env.reset(), 0, 0, env.reset() - - - list_sum_legth=[] - # Iterate over the number of epochs - for e in range(0,epoch): - #print('EPOCH: {}'.format(epoch)) - #print('epoch {}'.format(epoch)) - # Initialize the sum of the returns, lengths and number of episodes for each epoch - sum_return = 0 - sum_length = 0 - num_episodes = 0 - list_episodes=[] - + class Application_Env(Env): + def __init__(self): + self.application=Application_model() + self.action_space = Discrete(actions) # number of possible actions + self.observation_space = Box(low = np.array([11]), high = np.array([200]), dtype=np.int64) + self.utility_state = 0 + self.utility_interactor = UtilityInteractor() + self.action_masked = [i for i in range(len(self.application.get_combinations()))] + self.original_actions = [i for i in self.application.get_combinations()] + self.initial_action = self.application.get_default_conf() + self.masked_initial_action = [len(i) for i in self.original_actions].index(max([len(i) for i in self.original_actions])) + self.list_of_inputs= [] + self.metrics_to_predict = None + + def prepare_inputs(self): + self.application.get_workflow() + self.application.create_combinations() + self.application.get_combinations() + combin_ = MAP_ACTIONS_RESOURCES(self.application.solutions)[0] + self.list_of_actions=MAP_ACTIONS_RESOURCES(self.application.solutions)[1] + self.list_of_inputs = combin_ + + + def get_performance(self, action): + data = self.list_of_inputs[action] + data['sender_id'] = get_random_ug_id() + print("Data sent -> ",data) + return self.utility_interactor.getPerformance(data) + #return random.uniform(0.1, 0.9) + - # Iterate over the steps of each epoch - for t in range(steps_per_epoch): - #print('step per epoch {}/{}'.format(t, steps_per_epoch)) - #print(t) - if render: - env.render() + def step(self, action): - - # Get the logits, action, and take one step in the environment - observation = np.array([observation]) - observation=observation.reshape(1, -1) - logits, action = sample_action(observation) - observation_new, reward, done, _,observation_new = env.step(action[0].numpy()) - episode_return += reward - episode_length += 1 - - # Get the value and log-probability of the action - value_t = critic(observation) - logprobability_t = logprobabilities(logits, action) - - # Store obs, act, rew, v_t, logp_pi_t - buffer.store(observation, action, reward, value_t, logprobability_t) - + print('ACTION:{}'.format(self.list_of_actions[action])) + data_, utility_ = self.get_performance(action) + print('performance based on utility: {}'.format(utility_)) - # Update the observation - observation = observation_new - - # Finish trajectory if reached to a terminal state - terminal = done - if terminal or (t == steps_per_epoch - 1): - list_of_scores.append((combinations_[action.numpy()[0]],observation,action.numpy()[0])) - last_value = 0 if done else critic(np.array([[observation]])) - buffer.finish_trajectory(last_value) - sum_return += episode_return - sum_length += episode_length - num_episodes += 1 - observation, episode_return, episode_length = env.reset(), 0, 0 - - # Get values from the buffer - ( - observation_buffer, - action_buffer, - advantage_buffer, - return_buffer, - logprobability_buffer, - ) = buffer.get() - - # Update the policy and implement early stopping using KL divergence - for _ in range(train_policy_iterations): - kl = train_policy( - observation_buffer, action_buffer, logprobability_buffer, advantage_buffer + if utility_<0 or utility_==None: + reward = -100 + elif utility_>=0.7: + reward=100 + done=True + elif utility_>=0.5: + reward=1 + done=False + else: + reward = -1 + done=True + info={} + self.state = utility_ + + return self.state, reward, done, info, self.state + + def reset(self): + self.state= 0 + self.initial_action = self.action_space.sample() + self.initial_mask_action= [len(i) for i in self.original_actions].index(max([len(i) for i in self.original_actions])) + return self.state + + + def discounted_cumulative_sums(x, discount): + # Discounted cumulative sums of vectors for computing rewards-to-go and advantage estimates + return scipy.signal.lfilter([1], [1, float(-discount)], x[::-1], axis=0)[::-1] + + + class Buffer: + # Buffer for storing trajectories + def __init__(self, observation_dimensions, size, gamma=0.99, lam=0.95): + # Buffer initialization + self.observation_buffer = np.zeros( + (size, observation_dimensions), dtype=np.float32 + ) + self.action_buffer = np.zeros(size, dtype=np.int32) + self.advantage_buffer = np.zeros(size, dtype=np.float32) + self.reward_buffer = np.zeros(size, dtype=np.float32) + self.return_buffer = np.zeros(size, dtype=np.float32) + self.value_buffer = np.zeros(size, dtype=np.float32) + self.logprobability_buffer = np.zeros(size, dtype=np.float32) + self.gamma, self.lam = gamma, lam + self.pointer, self.trajectory_start_index = 0, 0 + + def store(self, observation, action, reward, value, logprobability): + # Append one step of agent-environment interaction + self.observation_buffer[self.pointer] = observation + self.action_buffer[self.pointer] = action + self.reward_buffer[self.pointer] = reward + self.value_buffer[self.pointer] = value + self.logprobability_buffer[self.pointer] = logprobability + self.pointer += 1 + + def finish_trajectory(self, last_value=0): + # Finish the trajectory by computing advantage estimates and rewards-to-go + path_slice = slice(self.trajectory_start_index, self.pointer) + rewards = np.append(self.reward_buffer[path_slice], last_value) + values = np.append(self.value_buffer[path_slice], last_value) + + deltas = rewards[:-1] + self.gamma * values[1:] - values[:-1] + + self.advantage_buffer[path_slice] = discounted_cumulative_sums( + deltas, self.gamma * self.lam + ) + self.return_buffer[path_slice] = discounted_cumulative_sums( + rewards, self.gamma + )[:-1] + + self.trajectory_start_index = self.pointer + + def get(self): + # Get all data of the buffer and normalize the advantages + self.pointer, self.trajectory_start_index = 0, 0 + advantage_mean, advantage_std = ( + np.mean(self.advantage_buffer), + np.std(self.advantage_buffer), + ) + self.advantage_buffer = (self.advantage_buffer - advantage_mean) / advantage_std + return ( + self.observation_buffer, + self.action_buffer, + self.advantage_buffer, + self.return_buffer, + self.logprobability_buffer, + ) + + + def mlp(x, sizes, activation=tf.tanh, output_activation=None): + # Build a feedforward neural network + for size in sizes[:-1]: + x = layers.Dense(units=size, activation=activation)(x) + return layers.Dense(units=sizes[-1], activation=output_activation)(x) + + + def logprobabilities(logits, a): + # Compute the log-probabilities of taking actions a by using the logits (i.e. the output of the actor) + logprobabilities_all = tf.nn.log_softmax(logits) + logprobability = tf.reduce_sum( + tf.one_hot(a, num_actions) * logprobabilities_all, axis=1 ) - if kl > 1.5 * target_kl: - # Early Stopping - break + return logprobability + + + # Sample action from actor + @tf.function + def sample_action(observation): + logits = actor(observation) + action = tf.squeeze(tf.random.categorical(logits, 1), axis=1) + return logits, action + + + # Train the policy by maxizing the PPO-Clip objective + @tf.function + def train_policy( + observation_buffer, action_buffer, logprobability_buffer, advantage_buffer + ): + + with tf.GradientTape() as tape: # Record operations for automatic differentiation. + ratio = tf.exp( + logprobabilities(actor(observation_buffer), action_buffer) + - logprobability_buffer + ) + min_advantage = tf.where( + advantage_buffer > 0, + (1 + clip_ratio) * advantage_buffer, + (1 - clip_ratio) * advantage_buffer, + ) + + policy_loss = -tf.reduce_mean( + tf.minimum(ratio * advantage_buffer, min_advantage) + ) + policy_grads = tape.gradient(policy_loss, actor.trainable_variables) + policy_optimizer.apply_gradients(zip(policy_grads, actor.trainable_variables)) + + kl = tf.reduce_mean( + logprobability_buffer + - logprobabilities(actor(observation_buffer), action_buffer) + ) + kl = tf.reduce_sum(kl) + return kl + + + # Train the value function by regression on mean-squared error + @tf.function + def train_value_function(observation_buffer, return_buffer): + with tf.GradientTape() as tape: # Record operations for automatic differentiation. + value_loss = tf.reduce_mean((return_buffer - critic(observation_buffer)) ** 2) + value_grads = tape.gradient(value_loss, critic.trainable_variables) + value_optimizer.apply_gradients(zip(value_grads, critic.trainable_variables)) + + + # Hyperparameters of the PPO algorithm + + + + + # Initialize the environment and get the dimensionality of the + # observation space and the number of possible actions + #filename_ = 'wf.json' + combinations_ = [] + filename_ = file_ + app= Application_model() + app.set_actions() + x = app.solutions + utilities, code_actions = MAP_ACTIONS_RESOURCES(x) + time.sleep(5) + global actions + actions = len(code_actions) + print('Initializing actions......') + + steps_per_epoch = 30*actions + epoch = 20*actions + gamma = 0.99 + clip_ratio = 0.2 + policy_learning_rate = 3e-4 + value_function_learning_rate = 1e-3 + train_policy_iterations = 80 + train_value_iterations = 80 + lam = 0.97 + target_kl = 0.01 + hidden_sizes = (64, 64) + + # True if you want to render the environment + render = False + print('No of possible grouping combinations: {} including all hardware options'.format(actions)) + time.sleep(1) + for utility in utilities: + combinations_.append(utility['combination']) + print('Combinations are: {}'.format(combinations_)) + env = Application_Env() + env.prepare_inputs() + variables=[] + for id, element in enumerate(env.list_of_inputs): + for idx,e in enumerate(element['variables']): + variables.append({'component': idx+1}) + for dict_ in e: + variables.append(dict_) + env.list_of_inputs[id]['variables'] = variables + + variables=[] + + observation_dimensions = env.observation_space.shape[0] + num_actions = env.action_space.n + list_of_scores=[] + list_of_groupings=[] - # Update the value function - for _ in range(train_value_iterations): - train_value_function(observation_buffer, return_buffer) - # Print mean return and length for each epoch + # Initialize the buffer + buffer = Buffer(observation_dimensions, steps_per_epoch) - print( - f" Epoch: {e + 1}. Mean Return: {sum_return / num_episodes}. Mean Length: {sum_length / num_episodes}" + # Initialize the actor and the critic as keras models + observation_input = keras.Input(shape=(observation_dimensions,), dtype=tf.float32) + logits = mlp(observation_input, list(hidden_sizes) + [num_actions], tf.tanh, None) + actor = keras.Model(inputs=observation_input, outputs=logits) + value = tf.squeeze( + mlp(observation_input, list(hidden_sizes) + [1], tf.tanh, None), axis=1 ) - list_sum_legth.append(sum_length / num_episodes) - list_of_groupings.append(list_of_scores) - list_of_scores=[] + critic = keras.Model(inputs=observation_input, outputs=value) + + # Initialize the policy and the value function optimizers + policy_optimizer = tf.keras.optimizers.Adam(learning_rate=policy_learning_rate) + value_optimizer = tf.keras.optimizers.Adam(learning_rate=value_function_learning_rate) + + # Initialize the observation, episode return and episode length + observation, episode_return, episode_length,observation = env.reset(), 0, 0, env.reset() + + + list_sum_legth=[] + # Iterate over the number of epochs + for e in range(0,epoch): + #print('EPOCH: {}'.format(epoch)) + #print('epoch {}'.format(epoch)) + # Initialize the sum of the returns, lengths and number of episodes for each epoch + sum_return = 0 + sum_length = 0 + num_episodes = 0 + list_episodes=[] + - global full_filepath, app_id, uri_notification, request_type - #print(list_sum_legth) - actions_=[] + # Iterate over the steps of each epoch + for t in range(steps_per_epoch): + #print('step per epoch {}/{}'.format(t, steps_per_epoch)) + #print(t) + if render: + env.render() - performance_m = [] - for element in list_of_groupings: - actions_.append(element[0][0]) + + # Get the logits, action, and take one step in the environment + observation = np.array([observation]) + observation=observation.reshape(1, -1) + logits, action = sample_action(observation) + observation_new, reward, done, _,observation_new = env.step(action[0].numpy()) + episode_return += reward + episode_length += 1 + + # Get the value and log-probability of the action + value_t = critic(observation) + logprobability_t = logprobabilities(logits, action) + + # Store obs, act, rew, v_t, logp_pi_t + buffer.store(observation, action, reward, value_t, logprobability_t) + + + # Update the observation + observation = observation_new + + # Finish trajectory if reached to a terminal state + terminal = done + if terminal or (t == steps_per_epoch - 1): + list_of_scores.append((combinations_[action.numpy()[0]],observation,action.numpy()[0])) + last_value = 0 if done else critic(np.array([[observation]])) + buffer.finish_trajectory(last_value) + sum_return += episode_return + sum_length += episode_length + num_episodes += 1 + observation, episode_return, episode_length = env.reset(), 0, 0 + + # Get values from the buffer + ( + observation_buffer, + action_buffer, + advantage_buffer, + return_buffer, + logprobability_buffer, + ) = buffer.get() + + # Update the policy and implement early stopping using KL divergence + for _ in range(train_policy_iterations): + kl = train_policy( + observation_buffer, action_buffer, logprobability_buffer, advantage_buffer + ) + if kl > 1.5 * target_kl: + # Early Stopping + break - performance_m.append(element[0][1]) + # Update the value function + for _ in range(train_value_iterations): + train_value_function(observation_buffer, return_buffer) + # Print mean return and length for each epoch - max_ = max(performance_m) - print("max: {}".format(max_)) - idx_ = performance_m.index(max_) - grouping = actions_[idx_] - grouping_utility = max_ - print('proposed grouping: {}, calculated utility: {}'.format(grouping, grouping_utility)) - time.sleep(4) - data_to_send_to_ps = {"utility": grouping_utility, "application_id": app_id,"uri_notification": uri_notification, "request_type":request_type, "result": grouping, "json_path": full_filepath} - print(data_to_send_to_ps) + print( + f" Epoch: {e + 1}. Mean Return: {sum_return / num_episodes}. Mean Length: {sum_length / num_episodes}" + ) + list_sum_legth.append(sum_length / num_episodes) + list_of_groupings.append(list_of_scores) + list_of_scores=[] + + global full_filepath, app_id, uri_notification, request_type + #print(list_sum_legth) + actions_=[] + + performance_m = [] + for element in list_of_groupings: + actions_.append(element[0][0]) + + performance_m.append(element[0][1]) + + + max_ = max(performance_m) + print("max: {}".format(max_)) + idx_ = performance_m.index(max_) + grouping = actions_[idx_] + grouping_utility = max_ + print('proposed grouping: {}, calculated utility: {}'.format(set_component_names_from_solutions(filename_,grouping), grouping_utility)) + time.sleep(2) + data_to_send_to_ps = {"utility": grouping_utility, "application_id": app_id,"uri_notification": uri_notification, "request_type":request_type, "result": grouping, "json_path": full_filepath} + print(data_to_send_to_ps) + headers = {"Content-Type":"application/json"} + try: + response = requests.post(url=url_polymorphic_solver,data=json.dumps(data_to_send_to_ps), headers=headers) + print(response) + except Exception as e: + print("Could not reach the polymorphic solver") + + time.sleep(1200) + + +def initial_groupings(full_filepath,app_id,uri_notification,request_type): + initial_combination = [] + initial_utility=0 + with open(full_filepath, 'r+') as f: + data = json.load(f) + Number_components = data['Number_of_components'] + for element in range(1, Number_components+1): + initial_combination.append([element]) + print("Initial grouping {0}".format(set_component_names_from_solutions(full_filepath,initial_combination))) + time.sleep(5) + data_to_send_to_ps = {"utility": initial_utility, "application_id": app_id,"uri_notification": uri_notification, "request_type":request_type, "result": initial_combination, "json_path": full_filepath} + print('data to send to ps: {0}'.format(data_to_send_to_ps)) headers = {"Content-Type":"application/json"} try: response = requests.post(url=url_polymorphic_solver,data=json.dumps(data_to_send_to_ps), headers=headers) print(response) except Exception as e: print("Could not reach the polymorphic solver") - - + app = FastAPI() class RequestWA(BaseModel): @@ -945,31 +910,12 @@ async def grouping(request:RequestWA): full_filepath = config_dir +"/"+ request.filepath request_type = request.request_type uri_notification = request.uri_notification + init_ = Thread(target=initial_groupings, args=(full_filepath,app_id,uri_notification,request_type)) + init_.start() t = Thread(target=groupings, args=(full_filepath,)) t.start() - #solutions_ = groupings(file_=full_filepath) - """ - data_to_send_to_ps = {"utility": solutions_[1], "application_id": app_id,"uri_notification": uri_notification, "request_type":request_type, "result": solutions_[0], "json_path": full_filepath} - headers = {"Content-Type":"application/json"} - try: - response = requests.post(url=url_polymorphic_solver,data=json.dumps(data_to_send_to_ps), headers=headers) - print(response) - except Exception as e: - print("Could not reach the polymorphic solver") """ + return {"status": True} if __name__ == "__main__": - uvicorn.run(app,host="0.0.0.0", port=7474) - -''' -actions_=[] -performances=[] - - -print(list_of_groupings) -for element in list_of_groupings: - actions_.append(element[0]) - performances.append(element[1]) - -print(max(performances)) -''' \ No newline at end of file + uvicorn.run(app,host="0.0.0.0", port=7474) \ No newline at end of file diff --git a/polymorphic_solver/src/api.py b/polymorphic_solver/src/api.py index 788b30166191aa34c2bd0e46508aee8037bc67b5..43c0490bdc57e7965aa266936c8033a87009b489 100644 --- a/polymorphic_solver/src/api.py +++ b/polymorphic_solver/src/api.py @@ -26,7 +26,7 @@ camel_converter_url = os.environ.get("CAME_CONVERTER_URL","http://localhost:7676 defaul_initial_camel_name = "initial_camel_model" class ApplicationDataRequest(BaseModel): - utility: str + utility: float json_path: str application_id: str uri_notification: str diff --git a/polymorphic_solver/src/app.py b/polymorphic_solver/src/app.py index 340f88a4a77b9fb23571de41bc8a53ffba4a5695..bec607301091040bc2980ecfc0dec15a84f7fad7 100644 --- a/polymorphic_solver/src/app.py +++ b/polymorphic_solver/src/app.py @@ -203,6 +203,8 @@ class PolymorphicSolver(): self.max_optimization_time = 20 self.last_analysis = time.time() self.agent_consumers = {} + self.rl_engine_started = False + self.first_deployment_completed = False self.program = None def newGrouping(self, groups): @@ -238,7 +240,10 @@ class PolymorphicSolver(): self.sendTrainRequestToPerformanceModule() self.updateVirtualApplication() self.createComponents() - self.prepareMultiRLEngine() + if self.archetype_manager.initialAnalysis(): + #self.prepareMultiRLEngine() + self.first_deployment_completed = True + print('First deployment finished') else: print("Metrics to predict not ready, process will enter a loop waiting for the metrics to predict message") timeout = 180 # timeout metrics to predict message @@ -248,7 +253,10 @@ class PolymorphicSolver(): self.sendTrainRequestToPerformanceModule() self.updateVirtualApplication() self.createComponents() - self.prepareMultiRLEngine() + if self.archetype_manager.initialAnalysis(): + #self.prepareMultiRLEngine() + self.first_deployment_completed = True + print('First deployment finished') break else: if time.time() - _start >= timeout: @@ -305,7 +313,6 @@ class PolymorphicSolver(): data["models"] = models self.publisher.setParameters(data, performance_module_topic) self.publisher.send() - print("******************************") def updateVirtualApplication(self): metrics_data = {} @@ -359,6 +366,12 @@ class PolymorphicSolver(): if data['request'] == 'state': return None + if data['request'] == "application_ready": + if self.first_deployment_completed: + if not self.rl_engine_started: + self.prepareMultiRLEngine() + self.rl_engine_started = True + if data["request"] == "stop_rl_engine": print("Stop RL engine message received") self.stopRLEngine() @@ -542,15 +555,16 @@ class PolymorphicSolver(): def createComponents(self): for name, comp in self.virtual_application.items(): index = comp.getIndex() - #min_mem, max_mem, min_cpu, max_cpu,min_gpu, max_gpu, min_fpga, max_fpga, max_instances, hardware_list, variants, metrics = comp.prepareResource() + min_mem, max_mem, min_cpu, max_cpu,min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc, min_instances, max_instances, hardware_list, variants, metrics = comp.prepareResource() #self.env.createStates(name, min_mem, max_mem, min_cpu, max_cpu, min_gpu, max_gpu, min_fpga, max_fpga, max_instances, hardware_list, variants, comp.getMetrics()) - variants = comp.getVariants() - hardware_list = comp.getHWS() image = comp.getImage() - self.archetype_manager.createArchetypes(name, index, variants, hardware_list,image) + self.archetype_manager.createArchetypes(name, index, variants, hardware_list,image, min_mem, max_mem, min_cpu, max_cpu, min_instances, max_instances, min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc) self.archetype_manager.setNumberOfComponents(self.virtual_application.getNumberOfComponents()) def prepareMultiRLEngine(self): + print("RL Engine is starting ...") + #wait to application to be deployed + # environment_factory = functools.partial( self.make_environment ) diff --git a/polymorphic_solver/src/morphemic.py b/polymorphic_solver/src/morphemic.py index 2aa7baef63b08872b4cc4c0e4eec7b30d192feea..d5143d57d17a02d88340ffe2c382ec627a256463 100644 --- a/polymorphic_solver/src/morphemic.py +++ b/polymorphic_solver/src/morphemic.py @@ -21,7 +21,7 @@ INDEX_CORES = 1 INDEX_MEMORY = 0 class MorphemicArchetype(): - def __init__(self, component_name, index, variant, hw, image): + def __init__(self, component_name, index, variant, hw, image, min_mem, max_mem, min_cpu, max_cpu, min_instances, max_instances, min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc): self.variant = variant self.hw = hw self.component_name = component_name @@ -33,6 +33,18 @@ class MorphemicArchetype(): self.avg_performance = 0 self.sum_performance = 0 self.uri_notification = None + self.min_cpu = min_cpu + self.min_gpu = min_gpu + self.min_fpga = min_fpga + self.max_cpu = max_cpu + self.max_gpu = max_gpu + self.min_hpc = min_hpc + self.max_hpc = max_hpc + self.max_fpga = max_fpga + self.max_instances = max_instances + self.min_instances = min_instances + self.min_mem = min_mem + self.max_mem = max_mem self.lowest = [None, None, None, None] self.highest = [None, None, None, None] print('Archetype for component index = {0} Variant = {1}, HW= {2} for component {3} created'.format(self.component_index, self.variant, self.hw, self.component_name)) @@ -59,14 +71,27 @@ class MorphemicArchetype(): return self.key def getComponentName(self): return self.component_name - + def getMinRequirements(self): + return self.min_mem, self.min_cpu, self.min_gpu, self.min_fpga + def getMaxRequirements(self): + return self.max_mem, self.max_cpu, self.max_gpu, self.max_fpga def clean(self): del self.collections[:] self.lowest = [None, None, None, None] self.highest = [None, None, None, None] self.avg_performance = 0 self.sum_performance = 0 - + def setLowestHighestFromRequirements(self): + min_cores, max_cores = None, None + if self.hw == "CPU": + min_cores, max_cores = self.min_cpu, self.max_cpu + elif self.hw == "GPU": + min_cores, max_cores = self.min_gpu, self.max_gpu + else: + min_cores, max_cores = self.min_fpga, self.max_fpga + + self.lowest = [self.min_mem, min_cores, self.min_instances, 0] + self.highest = [self.max_mem, max_cores, self.max_instances, 0] def getVariant(self): return self.variant def getIndexState(self): @@ -147,12 +172,12 @@ class MorphemicArchetypeManager(): for archetype in self.archetypes: archetype.clean() - def createArchetypes(self, component_name, index, variants, hws, image): + def createArchetypes(self, component_name, index, variants, hws, image, min_mem, max_mem, min_cpu, max_cpu, min_instances, max_instances, min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc): for variant in variants: for hw in hws: if hw == "FPGA" and variant == "SERVERLESS": continue - archetype = MorphemicArchetype(component_name,index, variant, hw, image) + archetype = MorphemicArchetype(component_name,index, variant, hw, image, min_mem, max_mem, min_cpu, max_cpu, min_instances, max_instances, min_gpu, max_gpu, min_fpga, max_fpga, min_hpc, max_hpc) self.archetypes.append(archetype) def getArchetypeByComponentName(self, name, variant, hw): @@ -207,6 +232,27 @@ class MorphemicArchetypeManager(): result += archetype.getAvgPerformance() return result + def initialAnalysis(self): + all_keys = [] + for archetype in self.archetypes: + all_keys.append(archetype.getKey()) + + archetypes = None + for group in combinations(all_keys, self.number_of_components): + if self.isGroupIsCompleted(group): + archetypes = self.getArchetypesByGroup(group) + for archetype in archetypes: + archetype.setLowestHighestFromRequirements() + + filename = self.camel_transformer.archetypesToCamels(archetypes, self.camel_version, self.current_grouping) + if filename: + self.cleanAllArchetypes() + self.camel_version +=1 + if self.importNewCamel(filename): + self.notifyMorphemic(filename) + return True + return False + def analyse(self): print("Analysis started ...") all_keys = [] @@ -260,7 +306,9 @@ class MorphemicArchetypeManager(): } try: response = requests.post(mule_hostname+self.uri_notification, data=json.dumps(data), headers={"Content-Type": "application/json"}) + print('*************Mule**************') print(response.text) + print("*******************************") except Exception as e: print(e) diff --git a/workflow_analyzer/target/SpringRestExample-1.0.0-BUILD-SNAPSHOT.war b/workflow_analyzer/target/SpringRestExample-1.0.0-BUILD-SNAPSHOT.war index 390212c74567343f2fb490bf7fc3b91bd006619d..df7896dc600d88495474dbbc74834a6faf9e6a9c 100644 Binary files a/workflow_analyzer/target/SpringRestExample-1.0.0-BUILD-SNAPSHOT.war and b/workflow_analyzer/target/SpringRestExample-1.0.0-BUILD-SNAPSHOT.war differ