From 23c8f9ca4e7b17b4c32e2760b4e051b3d1f99055 Mon Sep 17 00:00:00 2001 From: mklkun Date: Wed, 14 Apr 2021 17:54:55 +0200 Subject: [PATCH 1/3] Add scaling in and out endpoints and some improvements --- .../org/activeeon/morphemic/PAGateway.java | 418 ++++++++++++++++-- .../activeeon/morphemic/model/Deployment.java | 8 + .../org/activeeon/morphemic/model/Task.java | 5 +- .../resources/collect_ip_addr_results.groovy | 18 + .../post_prepare_infra_script.groovy | 1 + 5 files changed, 408 insertions(+), 42 deletions(-) create mode 100644 scheduling-abstraction-layer/src/main/resources/collect_ip_addr_results.groovy diff --git a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java index 9f2a3dd..3aa2d25 100644 --- a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java +++ b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java @@ -45,6 +45,10 @@ public class PAGateway { public PAConnectorIaasGateway connectorIaasGateway; + final String NEW_LINE = System.getProperty("line.separator"); + + final String SCRIPTS_SEPARATION = NEW_LINE + NEW_LINE + "# Main script" + NEW_LINE; + private static final Logger LOGGER = Logger.getLogger(PAGateway.class); /** @@ -424,7 +428,6 @@ public class PAGateway { cloud.addDeployedRegion(newDeployment.getLocationName(), newDeployment.getLocationName() + "/" + newDeployment.getImageProviderId()); } - LOGGER.info("Node source defined."); LOGGER.info("Trying to retrieve task: " + node.optString("taskName")); @@ -432,6 +435,7 @@ public class PAGateway { newDeployment.setPaCloud(cloud); newDeployment.setTask(task); + newDeployment.setNumber(task.getNextDeploymentID()); EntityManagerHelper.persist(newDeployment); LOGGER.debug("Deployment created: " + newDeployment.toString()); @@ -486,16 +490,24 @@ public class PAGateway { EntityManagerHelper.begin(); cloudIDs.forEach(cloudID -> { PACloud cloud = EntityManagerHelper.find(PACloud.class, cloudID); + LOGGER.info("Removing cloud : " + cloud.toString()); for (Map.Entry entry : cloud.getDeployedRegions().entrySet()) { try { - resourceManagerGateway.removeNodeSource(cloud.getNodeSourceNamePrefix() + entry.getKey(), preempt); + String nodeSourceName = cloud.getNodeSourceNamePrefix() + entry.getKey(); + LOGGER.info("Removing node source " + nodeSourceName + " from the ProActive server."); + resourceManagerGateway.removeNodeSource(nodeSourceName, preempt); } catch (NotConnectedException | PermissionRestException e) { LOGGER.error(e.getStackTrace()); } } - cloud.getDeployments().forEach(deployment -> deployment.getTask().removeDeployment(deployment)); + if (cloud.getDeployments() != null) { + LOGGER.info("Cleaning deployments from related tasks " + cloud.getDeployments().toString()); + cloud.getDeployments().forEach(deployment -> deployment.getTask().removeDeployment(deployment)); + } + LOGGER.info("Cleaning deployments from the cloud entry"); cloud.clearDeployments(); EntityManagerHelper.remove(cloud); + LOGGER.info("Cloud removed."); }); EntityManagerHelper.commit(); } @@ -516,7 +528,7 @@ public class PAGateway { public void removeNodes(List nodeNames, Boolean preempt) { nodeNames.forEach(nodeName -> { try { - String nodeUrl = resourceManagerGateway.searchNodes(nodeNames, true).get(0); + String nodeUrl = resourceManagerGateway.searchNodes(Collections.singletonList(nodeName), true).get(0); resourceManagerGateway.removeNode(nodeUrl, preempt); LOGGER.info("Node " + nodeName + " with URL: " + nodeUrl + " has been removed successfully."); } catch (NotConnectedException | RestException e) { @@ -620,8 +632,6 @@ public class PAGateway { } private List createCommandsTask(Task task, String taskNameSuffix, String taskToken, Job job) { - final String newLine = System.getProperty("line.separator"); - final String scriptsSeparation = newLine + newLine + "# Main script" + newLine; List scriptTasks = new LinkedList<>(); ScriptTask scriptTaskStart = null; ScriptTask scriptTaskInstall = null; @@ -638,7 +648,7 @@ public class PAGateway { task.getInstallation().getPostInstall().isEmpty())) { if (!task.getInstallation().getInstall().isEmpty()) { scriptTaskInstall = PAFactory.createBashScriptTask(task.getName() + "_install" + taskNameSuffix, - Utils.getContentWithFileName("export_env_var_script.sh") + scriptsSeparation + + Utils.getContentWithFileName("export_env_var_script.sh") + SCRIPTS_SEPARATION + task.getInstallation().getInstall()); } else { scriptTaskInstall = PAFactory.createBashScriptTask(task.getName() + "_install" + taskNameSuffix, @@ -647,13 +657,13 @@ public class PAGateway { if (!task.getInstallation().getPreInstall().isEmpty()) { scriptTaskInstall.setPreScript(PAFactory.createSimpleScript( - Utils.getContentWithFileName("export_env_var_script.sh") + scriptsSeparation + + Utils.getContentWithFileName("export_env_var_script.sh") + SCRIPTS_SEPARATION + task.getInstallation().getPreInstall(), "bash")); } if (!task.getInstallation().getPostInstall().isEmpty()) { scriptTaskInstall.setPostScript(PAFactory.createSimpleScript( - Utils.getContentWithFileName("export_env_var_script.sh") + scriptsSeparation + + Utils.getContentWithFileName("export_env_var_script.sh") + SCRIPTS_SEPARATION + task.getInstallation().getPostInstall(), "bash")); } @@ -669,7 +679,7 @@ public class PAGateway { task.getInstallation().getPostStart().isEmpty())) { if (!task.getInstallation().getStart().isEmpty()) { scriptTaskStart = PAFactory.createBashScriptTask(task.getName() + "_start" + taskNameSuffix, - Utils.getContentWithFileName("export_env_var_script.sh") + scriptsSeparation + + Utils.getContentWithFileName("export_env_var_script.sh") + SCRIPTS_SEPARATION + task.getInstallation().getStart()); } else { scriptTaskStart = PAFactory.createBashScriptTask(task.getName() + "_start" + taskNameSuffix, @@ -678,13 +688,13 @@ public class PAGateway { if (!task.getInstallation().getPreStart().isEmpty()) { scriptTaskStart.setPreScript(PAFactory.createSimpleScript( - Utils.getContentWithFileName("export_env_var_script.sh") + scriptsSeparation + + Utils.getContentWithFileName("export_env_var_script.sh") + SCRIPTS_SEPARATION + task.getInstallation().getPreStart(), "bash")); } if (!task.getInstallation().getPostStart().isEmpty()) { scriptTaskStart.setPostScript(PAFactory.createSimpleScript( - Utils.getContentWithFileName("export_env_var_script.sh") + scriptsSeparation + + Utils.getContentWithFileName("export_env_var_script.sh") + SCRIPTS_SEPARATION + task.getInstallation().getPostStart(), "bash")); } @@ -759,7 +769,7 @@ public class PAGateway { return 1; } // Let's find the task: - Optional optTask = Optional.ofNullable(EntityManagerHelper.find(Task.class,optJob.get().findTask(taskName))); + Optional optTask = Optional.ofNullable(optJob.get().findTask(taskName)); if (!optTask.isPresent()) { LOGGER.error(String.format("Task [%s] not found", taskName)); return 1; @@ -772,30 +782,270 @@ public class PAGateway { return 2; } - // Let's clone the deployment/node as needed. + // Saving suffix IDs of new nodes + List newNodesNumbers = new LinkedList<>(); + + // Let's clone the deployment/node as needed Deployment oldDeployment = optDeployment.get(); nodeNames.stream().map(nodeName -> { - Deployment newDeployment = new Deployment(); - newDeployment.setPaCloud(oldDeployment.getPaCloud()); - newDeployment.setNodeName(nodeName); - newDeployment.setLocationName(oldDeployment.getLocationName()); - newDeployment.setIsDeployed(false); - newDeployment.setImageProviderId(oldDeployment.getImageProviderId()); - newDeployment.setHardwareProviderId(oldDeployment.getHardwareProviderId()); - EmsDeploymentRequest newEmsDeploymentReq = oldDeployment.getEmsDeployment().clone(nodeName); - newDeployment.setEmsDeployment(newEmsDeploymentReq); - return newDeployment; - }).forEach( deployment -> { - optTask.get().addDeployment(deployment); - EntityManagerHelper.persist(deployment.getEmsDeployment()); - EntityManagerHelper.persist(deployment); - EntityManagerHelper.persist(optTask.get()); + EmsDeploymentRequest newEmsDeploymentReq = + oldDeployment.getEmsDeployment() == null ? null : oldDeployment.getEmsDeployment().clone(nodeName); + return new Deployment(nodeName, + oldDeployment.getLocationName(), + oldDeployment.getImageProviderId(), + oldDeployment.getHardwareProviderId(), + newEmsDeploymentReq, + oldDeployment.getPaCloud(), + oldDeployment.getTask(), + false, + null, + null + ); + }) + .forEach(deployment -> { + // Persist new deployment data + deployment.setNumber(optTask.get().getNextDeploymentID()); + newNodesNumbers.add(optTask.get().getNextDeploymentID()); + optTask.get().addDeployment(deployment); + if (deployment.getEmsDeployment() != null) { + EntityManagerHelper.persist(deployment.getEmsDeployment()); + } + deployment.getPaCloud().addDeployment(deployment); + EntityManagerHelper.persist(deployment); + EntityManagerHelper.persist(optTask.get()); + EntityManagerHelper.persist(deployment.getPaCloud()); }); EntityManagerHelper.commit(); + + // Let's deploy the VMS + submitScalingOutJob(optJob.get(), taskName, newNodesNumbers); + return 0; } + private void submitScalingOutJob(Job job, String scaledTaskName, List newNodesNumbers) { + EntityManagerHelper.refresh(job); + LOGGER.info("Task: " + scaledTaskName + " of job " + job.toString() + " to be scaled out."); + + TaskFlowJob paJob = new TaskFlowJob(); + paJob.setName(job.getName() + "_" + scaledTaskName + "_ScaleOut"); + LOGGER.info("Job created: " + paJob.toString()); + + EntityManagerHelper.begin(); + + job.getTasks().forEach(task -> { + List scriptTasks = buildScalingOutPATask(task, job, scaledTaskName); + + if (scriptTasks != null && !scriptTasks.isEmpty()) { + scriptTasks.forEach(scriptTask -> { + try { + paJob.addTask(scriptTask); + } catch (UserException e) { + LOGGER.error("Task " + task.getName() + " could not be added due to: " + e.toString()); + } + }); + EntityManagerHelper.persist(task); + } + }); + + setAllScalingOutMandatoryDependencies(paJob, job, scaledTaskName, newNodesNumbers); + + paJob.setProjectName("Morphemic"); + + long submittedJobId = schedulerGateway.submit(paJob).longValue(); + job.setSubmittedJobId(submittedJobId); + + EntityManagerHelper.persist(job); + EntityManagerHelper.commit(); + LOGGER.info("Scaling out of task \'" + scaledTaskName + "\' job, submitted successfully. ID = " + submittedJobId); + } + + private void setAllScalingOutMandatoryDependencies(TaskFlowJob paJob, Job jobToSubmit, String scaledTaskName, List newNodesNumbers) { + jobToSubmit.getTasks().forEach(task -> { + if (task.getParentTasks() != null && !task.getParentTasks().isEmpty()) { + task.getParentTasks().forEach(parentTaskName -> { + paJob.getTasks().forEach(paTask -> { + paJob.getTasks().forEach(paParentTask -> { + if (paTask.getName().contains(task.getName()) && paParentTask.getName().contains(parentTaskName)) { + if (paParentTask.getName().contains(scaledTaskName)) { + if (newNodesNumbers.stream().anyMatch(entry -> paParentTask.getName().endsWith(entry.toString()))) { + if (paTask.getName().contains(task.getDeploymentFirstSubmittedTaskName()) && + paParentTask.getName().contains(jobToSubmit.findTask(parentTaskName).getDeploymentLastSubmittedTaskName())) { + paTask.addDependence(paParentTask); + } + } else { + if (paTask.getName().contains(task.getDeploymentFirstSubmittedTaskName()) && + paParentTask.getName().startsWith("prepareInfra")) { + paTask.addDependence(paParentTask); + } + } + } else if (paTask.getName().contains(scaledTaskName)) { + if (newNodesNumbers.stream().anyMatch(entry -> paTask.getName().endsWith(entry.toString()))) { + if (paTask.getName().contains(task.getDeploymentFirstSubmittedTaskName()) && + paParentTask.getName().contains(jobToSubmit.findTask(parentTaskName).getDeploymentLastSubmittedTaskName())) { + paTask.addDependence(paParentTask); + } + } else { + if (paTask.getName().startsWith("prepareInfra") && + paParentTask.getName().contains(jobToSubmit.findTask(parentTaskName).getDeploymentLastSubmittedTaskName())) { + paTask.addDependence(paParentTask); + } + } + } + } + }); + }); + }); + } + }); + } + + private List buildScalingOutPATask(Task task, Job job, String scaledTaskName) { + List scriptTasks = new LinkedList<>(); + Task scaledTask = job.findTask(scaledTaskName); + + if (scaledTask.getParentTasks().contains(task.getName())) { + // When the scaled task is a child the task to be built + LOGGER.info("Building task " + task.getName() + " as a parent of task " + scaledTaskName); + scriptTasks.addAll(createParentScaledTask(task, job)); + } else if (scaledTaskName.equals(task.getName())) { + // When the scaled task is the task to be built + LOGGER.info("Building task " + task.getName() + " as it is scaled out"); + scriptTasks.addAll(buildScaledPATask(task, job)); + } else if (task.getParentTasks().contains(scaledTaskName)) { + // When the scaled task is a parent of the task to be built + LOGGER.info("Building task " + task.getName() + " as a child of task " + scaledTaskName); + scriptTasks.addAll(createChildScaledTask(task, job)); + } else { + LOGGER.debug("Task " + task.getName() + " is not impacted by the scaling of task " + scaledTaskName); + } + + return scriptTasks; + } + + private List createChildScaledTask(Task task, Job job) { + List scriptTasks = new LinkedList<>(); + task.getDeployments().stream().filter(Deployment::getIsDeployed).forEach(deployment -> { + // Creating infra deployment tasks + String token = task.getTaskId() + deployment.getNumber(); + String suffix = "_" + deployment.getNumber(); + scriptTasks.add(createScalingChildUpdateTask(task, suffix, token, job)); + }); + task.setDeploymentFirstSubmittedTaskName(scriptTasks.get(0).getName().substring(0, scriptTasks.get(0).getName().lastIndexOf("_"))); + task.setDeploymentLastSubmittedTaskName(scriptTasks.get(0).getName().substring(0, scriptTasks.get(0).getName().lastIndexOf("_"))); + return scriptTasks; + } + + private ScriptTask createScalingChildUpdateTask(Task task, String suffix, String token, Job job) { + ScriptTask scriptTaskUpdate = null; + + Map taskVariablesMap = new HashMap<>(); + //TODO: Taking into consideration multiple parent tasks with multiple communications + taskVariablesMap.put("requestedPortName", new TaskVariable("requestedPortName", + job.findTask(task.getParentTasks().get(0)).getPortsToOpen().get(0).getRequestedName())); + + if (!task.getInstallation().getUpdateCmd().isEmpty()) { + scriptTaskUpdate = PAFactory.createBashScriptTask(task.getName() + "_update" + suffix, + Utils.getContentWithFileName("export_env_var_script.sh") + SCRIPTS_SEPARATION + + task.getInstallation().getUpdateCmd()); + } else { + scriptTaskUpdate = PAFactory.createBashScriptTask(task.getName() + "_install" + suffix, + "echo \"Installation script is empty. Nothing to be executed.\""); + } + + scriptTaskUpdate.setPreScript(PAFactory.createSimpleScriptFromFIle("collect_ip_addr_results.groovy", + "groovy")); + + scriptTaskUpdate.setVariables(taskVariablesMap); + scriptTaskUpdate.addGenericInformation("NODE_ACCESS_TOKEN", token); + + return scriptTaskUpdate; + } + + private List buildScaledPATask(Task task, Job job) { + List scriptTasks = new LinkedList<>(); + + task.getDeployments().stream().filter(Deployment::getIsDeployed).forEach(deployment -> { + String token = task.getTaskId() + deployment.getNumber(); + String suffix = "_" + deployment.getNumber(); + + // Creating infra preparation task + scriptTasks.add(createInfraPreparationTask(task, suffix, token, job)); + }); + + task.setDeploymentLastSubmittedTaskName(scriptTasks.get(0).getName().substring(0, scriptTasks.get(0).getName().lastIndexOf("_"))); + task.setDeploymentFirstSubmittedTaskName(scriptTasks.get(0).getName().substring(0, scriptTasks.get(0).getName().lastIndexOf("_"))); + + task.getDeployments().stream().filter(deployment -> !deployment.getIsDeployed()).forEach(deployment -> { + // Creating infra deployment tasks + String token = task.getTaskId() + deployment.getNumber(); + String suffix = "_" + deployment.getNumber(); + scriptTasks.add(createInfraTask(task, deployment, suffix, token)); + task.setDeploymentFirstSubmittedTaskName(scriptTasks.get(scriptTasks.size()-1).getName().substring(0, scriptTasks.get(scriptTasks.size()-1).getName().lastIndexOf("_"))); + // If the infrastructure comes with the deployment of the EMS, we set it up. + Optional.ofNullable(deployment.getEmsDeployment()).ifPresent(emsDeploymentRequest -> scriptTasks.add(createEmsDeploymentTask(emsDeploymentRequest,suffix,token))); + LOGGER.info("Token added: " + token); + deployment.setIsDeployed(true); + deployment.setNodeAccessToken(token); + + // Creating application deployment tasks + List appTasks = createAppTasks(task, suffix, token, job); + task.setDeploymentLastSubmittedTaskName(appTasks.get(appTasks.size()-1).getName().substring(0, appTasks.get(appTasks.size()-1).getName().lastIndexOf(suffix))); + + // Creating infra preparation task + appTasks.add(0, createInfraPreparationTask(task, suffix, token, job)); + appTasks.get(1).addDependence(appTasks.get(0)); + + // Add dependency between infra and application deployment tasks + appTasks.get(0).addDependence(scriptTasks.get(scriptTasks.size()-1)); + + scriptTasks.addAll(appTasks); + }); + + scriptTasks.forEach(scriptTask -> task.addSubmittedTaskName(scriptTask.getName())); + + return scriptTasks; + } + + private List createParentScaledTask(Task task, Job job) { + List scriptTasks = new LinkedList<>(); + task.getDeployments().stream().filter(Deployment::getIsDeployed).forEach(deployment -> { + // Creating infra deployment tasks + String token = task.getTaskId() + deployment.getNumber(); + String suffix = "_" + deployment.getNumber(); + scriptTasks.add(createScalingParentInfraPreparationTask(task, suffix, token, job)); + }); + task.setDeploymentFirstSubmittedTaskName(scriptTasks.get(0).getName().substring(0, scriptTasks.get(0).getName().lastIndexOf("_"))); + task.setDeploymentLastSubmittedTaskName(scriptTasks.get(0).getName().substring(0, scriptTasks.get(0).getName().lastIndexOf("_"))); + return scriptTasks; + } + + private ScriptTask createScalingParentInfraPreparationTask(Task task, String suffix, String token, Job job) { + ScriptTask prepareInfraTask; + Map taskVariablesMap = new HashMap<>(); + String taskName = "parentPrepareInfra_" + task.getName() + suffix; + + if (!task.getPortsToOpen().isEmpty()) { + prepareInfraTask = PAFactory.createGroovyScriptTaskFromFile(taskName, "post_prepare_infra_script.groovy"); + prepareInfraTask.setPreScript(PAFactory.createSimpleScriptFromFIle("prepare_infra_script.sh", + "bash")); + //TODO: Taking into consideration multiple provided ports + taskVariablesMap.put("providedPortName", new TaskVariable("providedPortName", + task.getPortsToOpen().get(0).getRequestedName())); + taskVariablesMap.put("providedPortValue", new TaskVariable("providedPortValue", + task.getPortsToOpen().get(0).getValue().toString())); + } else { + prepareInfraTask = PAFactory.createBashScriptTask(taskName, + "echo \"No ports to open and not parent tasks. Nothing to be prepared in VM.\""); + } + + prepareInfraTask.setVariables(taskVariablesMap); + prepareInfraTask.addGenericInformation("NODE_ACCESS_TOKEN", token); + + return prepareInfraTask; + } + /** * Unregister a set of node as a scale-down operation * @param nodeNames A list of node to be removed @@ -816,7 +1066,7 @@ public class PAGateway { } // Let's find the task: - Optional optTask = Optional.ofNullable(EntityManagerHelper.find(Task.class,optJob.get().findTask(taskName))); + Optional optTask = Optional.ofNullable(optJob.get().findTask(taskName)); if (!optTask.isPresent()) { LOGGER.error(String.format("Task [%s] not found", taskName)); return 1; @@ -829,24 +1079,108 @@ public class PAGateway { } // For supplied node, I retrieve their deployment - List deployments = nodeNames.stream().map(node -> EntityManagerHelper.find(Deployment.class,node)).filter(deployment -> (deployment != null)).collect(Collectors.toList()); + List deployments = nodeNames.stream().map(node -> EntityManagerHelper.find(Deployment.class,node)).filter(Objects::nonNull).collect(Collectors.toList()); // For deployed node, I flag their removal - List nodesToBeRemoved = deployments.stream().filter(deployment -> deployment.getIsDeployed()).map(Deployment::getNodeName).collect(Collectors.toList()); + List nodesToBeRemoved = deployments.stream().filter(Deployment::getIsDeployed).map(Deployment::getNodeName).collect(Collectors.toList()); + LOGGER.info("Nodes to be removed are : " + nodesToBeRemoved); // For every node, I remove the deployment entree deployments.forEach( deployment -> { + deployment.getTask().removeDeployment(deployment); + EntityManagerHelper.persist(deployment.getTask()); + deployment.getPaCloud().removeDeployment(deployment); + EntityManagerHelper.persist(deployment.getPaCloud()); EntityManagerHelper.remove(deployment); - EntityManagerHelper.persist(deployment); } ); // I commit the removal of deployed node - removeNodes(nodesToBeRemoved,false); + removeNodes(nodesToBeRemoved,true); EntityManagerHelper.commit(); + + // Let's deploy the VMS + submitScalingInJob(optJob.get(), taskName); + return 0; } + private void submitScalingInJob(Job job, String scaledTaskName) { + EntityManagerHelper.refresh(job); + LOGGER.info("Task: " + scaledTaskName + " of job " + job.toString() + " to be scaled in."); + + TaskFlowJob paJob = new TaskFlowJob(); + paJob.setName(job.getName() + "_" + scaledTaskName + "_ScaleIn"); + LOGGER.info("Job created: " + paJob.toString()); + + EntityManagerHelper.begin(); + + job.getTasks().forEach(task -> { + List scriptTasks = buildScalingInPATask(task, job, scaledTaskName); + + if (scriptTasks != null && !scriptTasks.isEmpty()) { + scriptTasks.forEach(scriptTask -> { + try { + paJob.addTask(scriptTask); + } catch (UserException e) { + LOGGER.error("Task " + task.getName() + " could not be added due to: " + e.toString()); + } + }); + EntityManagerHelper.persist(task); + } + }); + + setAllScalingInMandatoryDependencies(paJob, job, scaledTaskName); + + paJob.setProjectName("Morphemic"); + + long submittedJobId = schedulerGateway.submit(paJob).longValue(); + job.setSubmittedJobId(submittedJobId); + + EntityManagerHelper.persist(job); + EntityManagerHelper.commit(); + LOGGER.info("Scaling out of task \'" + scaledTaskName + "\' job, submitted successfully. ID = " + submittedJobId); + } + + private void setAllScalingInMandatoryDependencies(TaskFlowJob paJob, Job jobToSubmit, String scaledTaskName) { + Task scaledTask = jobToSubmit.findTask(scaledTaskName); + jobToSubmit.getTasks().forEach(task -> { + if (task.getParentTasks() != null && !task.getParentTasks().isEmpty()) { + task.getParentTasks().forEach(parentTaskName -> { + paJob.getTasks().forEach(paTask -> { + paJob.getTasks().forEach(paParentTask -> { + if (paTask.getName().contains(task.getName()) && paParentTask.getName().contains(parentTaskName)) { + if (paTask.getName().contains(task.getDeploymentFirstSubmittedTaskName()) && + paParentTask.getName().contains(jobToSubmit.findTask(parentTaskName).getDeploymentLastSubmittedTaskName())) { + paTask.addDependence(paParentTask); + } + } + }); + }); + }); + } + }); + } + + private List buildScalingInPATask(Task task, Job job, String scaledTaskName) { + List scriptTasks = new LinkedList<>(); + Task scaledTask = job.findTask(scaledTaskName); + + if (scaledTaskName.equals(task.getName())) { + // When the scaled task is the task to be built + LOGGER.info("Building task " + task.getName() + " as it is scaled out"); + scriptTasks.addAll(buildScaledPATask(task, job)); + } else if (task.getParentTasks().contains(scaledTaskName)) { + // When the scaled task is a parent of the task to be built + LOGGER.info("Building task " + task.getName() + " as a child of task " + scaledTaskName); + scriptTasks.addAll(createChildScaledTask(task, job)); + } else { + LOGGER.debug("Task " + task.getName() + " is not impacted by the scaling of task " + scaledTaskName); + } + + return scriptTasks; + } + /** * Translate a Morphemic task skeleton into a list of ProActive tasks * @param task A Morphemic task skeleton @@ -855,7 +1189,6 @@ public class PAGateway { */ public List buildPATask(Task task, Job job) { List scriptTasks = new LinkedList<>(); - List tasksTokens = new LinkedList<>(); if (task.getDeployments() == null || task.getDeployments().isEmpty()) { LOGGER.warn("The task " + task.getName() + " does not have a deployment. It will be scheduled on any free node."); @@ -864,16 +1197,19 @@ public class PAGateway { task.setDeploymentLastSubmittedTaskName(scriptTasks.get(scriptTasks.size()-1).getName()); } else { - task.getDeployments().forEach(deployment -> { + task.getDeployments().stream().filter(deployment -> !deployment.getIsDeployed()).forEach(deployment -> { // Creating infra deployment tasks - String token = task.getTaskId() + tasksTokens.size(); - String suffix = "_" + tasksTokens.size(); + String token = task.getTaskId() + deployment.getNumber(); + String suffix = "_" + deployment.getNumber(); scriptTasks.add(createInfraTask(task, deployment, suffix, token)); // If the infrastructure comes with the deployment of the EMS, we set it up. Optional.ofNullable(deployment.getEmsDeployment()).ifPresent(emsDeploymentRequest -> scriptTasks.add(createEmsDeploymentTask(emsDeploymentRequest,suffix,token))); - LOGGER.debug("Token added: " + token); - tasksTokens.add(token); + LOGGER.info("Token added: " + token); deployment.setIsDeployed(true); + deployment.setNodeAccessToken(token); + + LOGGER.info("+++ Deployment number: " + deployment.getNumber()); + // Creating application deployment tasks List appTasks = createAppTasks(task, suffix, token, job); @@ -888,7 +1224,7 @@ public class PAGateway { scriptTasks.addAll(appTasks); }); - task.setDeploymentFirstSubmittedTaskName(scriptTasks.get(0).getName().substring(0, scriptTasks.get(0).getName().lastIndexOf("_0"))); + task.setDeploymentFirstSubmittedTaskName(scriptTasks.get(0).getName().substring(0, scriptTasks.get(0).getName().lastIndexOf("_"))); } scriptTasks.forEach(scriptTask -> task.addSubmittedTaskName(scriptTask.getName())); diff --git a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Deployment.java b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Deployment.java index a486521..d19b9f9 100644 --- a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Deployment.java +++ b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Deployment.java @@ -41,6 +41,12 @@ public class Deployment implements Serializable { @Column(name = "IS_DEPLOYED") private Boolean isDeployed = false; + @Column(name = "NODE_ACCESS_TOKEN") + private String nodeAccessToken; + + @Column(name = "NUMBER") + private Long number; + @Override public String toString() { return "Deployment{" + @@ -49,6 +55,8 @@ public class Deployment implements Serializable { ", imageProviderId='" + imageProviderId + '\'' + ", hardwareProviderId='" + hardwareProviderId + '\'' + ", isDeployed='" + isDeployed.toString() + '\'' + + ", nodeAccessToken='" + nodeAccessToken + '\'' + + ", number='" + number + '\'' + ", paCloud='" + paCloud.getNodeSourceNamePrefix() + '\'' + ", task='" + task.getName() + '\'' + '}'; diff --git a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Task.java b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Task.java index cdbda4e..cf926f0 100644 --- a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Task.java +++ b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Task.java @@ -10,7 +10,6 @@ import java.util.LinkedList; import java.util.List; -//@AllArgsConstructor @NoArgsConstructor @ToString @Getter @@ -59,11 +58,15 @@ public class Task implements Serializable { @Column(name = "DEPLOYMENT_LAST_SUBMITTED_TASK_NAME") private String deploymentLastSubmittedTaskName; + @Column(name = "NEXT_DEPLOYMENT_ID") + private Long nextDeploymentID = 0L; + public void addDeployment(Deployment deployment) { if (deployments==null){ deployments = new LinkedList<>(); } deployments.add(deployment); + nextDeploymentID++; } public void removeDeployment(Deployment deployment) { diff --git a/scheduling-abstraction-layer/src/main/resources/collect_ip_addr_results.groovy b/scheduling-abstraction-layer/src/main/resources/collect_ip_addr_results.groovy new file mode 100644 index 0000000..7d4e516 --- /dev/null +++ b/scheduling-abstraction-layer/src/main/resources/collect_ip_addr_results.groovy @@ -0,0 +1,18 @@ +def requestedPortName = variables.get("requestedPortName") +def publicRequestedPort +def count = 0 + +variables.each { key, value -> + if (key.contains(requestedPortName)) { + if (count == 0) { + publicRequestedPort = value.toString() + count++ + } else { + publicRequestedPort += "," + value.toString() + count++ + } + } +} + +println "publicRequestedPort: " + publicRequestedPort +variables.put(requestedPortName, publicRequestedPort) \ No newline at end of file diff --git a/scheduling-abstraction-layer/src/main/resources/post_prepare_infra_script.groovy b/scheduling-abstraction-layer/src/main/resources/post_prepare_infra_script.groovy index 71962fa..b590df2 100644 --- a/scheduling-abstraction-layer/src/main/resources/post_prepare_infra_script.groovy +++ b/scheduling-abstraction-layer/src/main/resources/post_prepare_infra_script.groovy @@ -5,5 +5,6 @@ if (providedPortName?.trim()){ def ipAddr = new File(providedPortName+"_ip").text.trim() def publicProvidedPort = ipAddr + ":" + providedPortValue variables.put(providedPortName, publicProvidedPort) + variables.put(providedPortName + variables.get("PA_TASK_ID"), publicProvidedPort) println("Provided variable " + providedPortName + "=" + publicProvidedPort) } \ No newline at end of file -- GitLab From 282f4bd0c3d1c5e6355d033ee2975b48c9dbe042 Mon Sep 17 00:00:00 2001 From: mklkun Date: Thu, 15 Apr 2021 18:29:22 +0200 Subject: [PATCH 2/3] Some code cleaning and refactoring --- .../org/activeeon/morphemic/PAGateway.java | 114 ++++++------------ 1 file changed, 38 insertions(+), 76 deletions(-) diff --git a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java index 3aa2d25..ef3b7ae 100644 --- a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java +++ b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java @@ -39,15 +39,15 @@ public class PAGateway { private final String paURL; - public PAResourceManagerGateway resourceManagerGateway; + public final PAResourceManagerGateway resourceManagerGateway; - public PASchedulerGateway schedulerGateway; + public final PASchedulerGateway schedulerGateway; - public PAConnectorIaasGateway connectorIaasGateway; + public final PAConnectorIaasGateway connectorIaasGateway; - final String NEW_LINE = System.getProperty("line.separator"); + private final String NEW_LINE = System.getProperty("line.separator"); - final String SCRIPTS_SEPARATION = NEW_LINE + NEW_LINE + "# Main script" + NEW_LINE; + private final String SCRIPTS_SEPARATION = NEW_LINE + NEW_LINE + "# Main script" + NEW_LINE; private static final Logger LOGGER = Logger.getLogger(PAGateway.class); @@ -768,6 +768,7 @@ public class PAGateway { LOGGER.error(String.format("Job [%s] not found", jobId)); return 1; } + // Let's find the task: Optional optTask = Optional.ofNullable(optJob.get().findTask(taskName)); if (!optTask.isPresent()) { @@ -838,13 +839,7 @@ public class PAGateway { List scriptTasks = buildScalingOutPATask(task, job, scaledTaskName); if (scriptTasks != null && !scriptTasks.isEmpty()) { - scriptTasks.forEach(scriptTask -> { - try { - paJob.addTask(scriptTask); - } catch (UserException e) { - LOGGER.error("Task " + task.getName() + " could not be added due to: " + e.toString()); - } - }); + addAllScriptTasksToPAJob(paJob, task, scriptTasks); EntityManagerHelper.persist(task); } }); @@ -861,6 +856,16 @@ public class PAGateway { LOGGER.info("Scaling out of task \'" + scaledTaskName + "\' job, submitted successfully. ID = " + submittedJobId); } + private void addAllScriptTasksToPAJob(TaskFlowJob paJob, Task task, List scriptTasks) { + scriptTasks.forEach(scriptTask -> { + try { + paJob.addTask(scriptTask); + } catch (UserException e) { + LOGGER.error("Task " + task.getName() + " could not be added due to: " + e.toString()); + } + }); + } + private void setAllScalingOutMandatoryDependencies(TaskFlowJob paJob, Job jobToSubmit, String scaledTaskName, List newNodesNumbers) { jobToSubmit.getTasks().forEach(task -> { if (task.getParentTasks() != null && !task.getParentTasks().isEmpty()) { @@ -909,16 +914,10 @@ public class PAGateway { // When the scaled task is a child the task to be built LOGGER.info("Building task " + task.getName() + " as a parent of task " + scaledTaskName); scriptTasks.addAll(createParentScaledTask(task, job)); - } else if (scaledTaskName.equals(task.getName())) { - // When the scaled task is the task to be built - LOGGER.info("Building task " + task.getName() + " as it is scaled out"); - scriptTasks.addAll(buildScaledPATask(task, job)); - } else if (task.getParentTasks().contains(scaledTaskName)) { - // When the scaled task is a parent of the task to be built - LOGGER.info("Building task " + task.getName() + " as a child of task " + scaledTaskName); - scriptTasks.addAll(createChildScaledTask(task, job)); } else { - LOGGER.debug("Task " + task.getName() + " is not impacted by the scaling of task " + scaledTaskName); + // Using buildScalingInPATask because it handles all the remaining cases + LOGGER.info("Moving to building with buildScalingInPATask() method"); + scriptTasks.addAll(buildScalingInPATask(task, job, scaledTaskName)); } return scriptTasks; @@ -990,17 +989,7 @@ public class PAGateway { deployment.setNodeAccessToken(token); // Creating application deployment tasks - List appTasks = createAppTasks(task, suffix, token, job); - task.setDeploymentLastSubmittedTaskName(appTasks.get(appTasks.size()-1).getName().substring(0, appTasks.get(appTasks.size()-1).getName().lastIndexOf(suffix))); - - // Creating infra preparation task - appTasks.add(0, createInfraPreparationTask(task, suffix, token, job)); - appTasks.get(1).addDependence(appTasks.get(0)); - - // Add dependency between infra and application deployment tasks - appTasks.get(0).addDependence(scriptTasks.get(scriptTasks.size()-1)); - - scriptTasks.addAll(appTasks); + createAndAddAppDeploymentTasks(task, suffix, token, scriptTasks, job); }); scriptTasks.forEach(scriptTask -> task.addSubmittedTaskName(scriptTask.getName())); @@ -1008,6 +997,20 @@ public class PAGateway { return scriptTasks; } + private void createAndAddAppDeploymentTasks(Task task, String suffix, String token, List scriptTasks, Job job) { + List appTasks = createAppTasks(task, suffix, token, job); + task.setDeploymentLastSubmittedTaskName(appTasks.get(appTasks.size()-1).getName().substring(0, appTasks.get(appTasks.size()-1).getName().lastIndexOf(suffix))); + + // Creating infra preparation task + appTasks.add(0, createInfraPreparationTask(task, suffix, token, job)); + appTasks.get(1).addDependence(appTasks.get(0)); + + // Add dependency between infra and application deployment tasks + appTasks.get(0).addDependence(scriptTasks.get(scriptTasks.size()-1)); + + scriptTasks.addAll(appTasks); + } + private List createParentScaledTask(Task task, Job job) { List scriptTasks = new LinkedList<>(); task.getDeployments().stream().filter(Deployment::getIsDeployed).forEach(deployment -> { @@ -1119,18 +1122,12 @@ public class PAGateway { List scriptTasks = buildScalingInPATask(task, job, scaledTaskName); if (scriptTasks != null && !scriptTasks.isEmpty()) { - scriptTasks.forEach(scriptTask -> { - try { - paJob.addTask(scriptTask); - } catch (UserException e) { - LOGGER.error("Task " + task.getName() + " could not be added due to: " + e.toString()); - } - }); + addAllScriptTasksToPAJob(paJob, task, scriptTasks); EntityManagerHelper.persist(task); } }); - setAllScalingInMandatoryDependencies(paJob, job, scaledTaskName); + setAllMandatoryDependencies(paJob, job); paJob.setProjectName("Morphemic"); @@ -1142,29 +1139,8 @@ public class PAGateway { LOGGER.info("Scaling out of task \'" + scaledTaskName + "\' job, submitted successfully. ID = " + submittedJobId); } - private void setAllScalingInMandatoryDependencies(TaskFlowJob paJob, Job jobToSubmit, String scaledTaskName) { - Task scaledTask = jobToSubmit.findTask(scaledTaskName); - jobToSubmit.getTasks().forEach(task -> { - if (task.getParentTasks() != null && !task.getParentTasks().isEmpty()) { - task.getParentTasks().forEach(parentTaskName -> { - paJob.getTasks().forEach(paTask -> { - paJob.getTasks().forEach(paParentTask -> { - if (paTask.getName().contains(task.getName()) && paParentTask.getName().contains(parentTaskName)) { - if (paTask.getName().contains(task.getDeploymentFirstSubmittedTaskName()) && - paParentTask.getName().contains(jobToSubmit.findTask(parentTaskName).getDeploymentLastSubmittedTaskName())) { - paTask.addDependence(paParentTask); - } - } - }); - }); - }); - } - }); - } - private List buildScalingInPATask(Task task, Job job, String scaledTaskName) { List scriptTasks = new LinkedList<>(); - Task scaledTask = job.findTask(scaledTaskName); if (scaledTaskName.equals(task.getName())) { // When the scaled task is the task to be built @@ -1212,17 +1188,7 @@ public class PAGateway { // Creating application deployment tasks - List appTasks = createAppTasks(task, suffix, token, job); - task.setDeploymentLastSubmittedTaskName(appTasks.get(appTasks.size()-1).getName().substring(0, appTasks.get(appTasks.size()-1).getName().lastIndexOf(suffix))); - - // Creating infra preparation task - appTasks.add(0, createInfraPreparationTask(task, suffix, token, job)); - appTasks.get(1).addDependence(appTasks.get(0)); - - // Add dependency between infra and application deployment tasks - appTasks.get(0).addDependence(scriptTasks.get(scriptTasks.size()-1)); - - scriptTasks.addAll(appTasks); + createAndAddAppDeploymentTasks(task, suffix, token, scriptTasks, job); }); task.setDeploymentFirstSubmittedTaskName(scriptTasks.get(0).getName().substring(0, scriptTasks.get(0).getName().lastIndexOf("_"))); } @@ -1416,10 +1382,6 @@ public class PAGateway { String.valueOf(submittedJob.getSubmittedJobId()), submittedTaskName)); }); - - TaskResult taskResult = schedulerGateway.getTaskResult(String.valueOf(submittedJob.getSubmittedJobId()), - createdTask.getSubmittedTaskNames() - .get(createdTask.getSubmittedTaskNames().size() - 1)); LOGGER.info("Results of task: " + taskName + " fetched successfully: " + taskResultsMap.toString()); return taskResultsMap; } -- GitLab From 221da3fa238981be53e0281fc0a4de5b8f4dc9f1 Mon Sep 17 00:00:00 2001 From: mklkun Date: Fri, 16 Apr 2021 17:37:40 +0200 Subject: [PATCH 3/3] Fix ports to be open jackson mapping issue --- .../src/main/java/org/activeeon/morphemic/PAGateway.java | 4 ++-- .../src/main/java/org/activeeon/morphemic/model/Job.java | 4 ++-- .../src/main/java/org/activeeon/morphemic/model/Port.java | 1 + .../org/activeeon/morphemic/service/NodeCandidateUtils.java | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java index ef3b7ae..91f8ef1 100644 --- a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java +++ b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/PAGateway.java @@ -1,5 +1,6 @@ package org.activeeon.morphemic; +import com.fasterxml.jackson.databind.ObjectMapper; import org.activeeon.morphemic.application.deployment.PAFactory; import org.activeeon.morphemic.application.deployment.PASchedulerGateway; import org.activeeon.morphemic.infrastructure.deployment.PAConnectorIaasGateway; @@ -8,7 +9,6 @@ import org.activeeon.morphemic.model.*; import org.activeeon.morphemic.service.*; import org.apache.commons.lang3.Validate; import org.apache.log4j.Logger; -import org.codehaus.jackson.map.ObjectMapper; import org.json.JSONArray; import org.json.JSONObject; import org.ow2.proactive.resourcemanager.exception.RMException; @@ -728,7 +728,7 @@ public class PAGateway { nodeConfigJson += "\"}"; } else { try { - nodeConfigJson += "\", \"portToOpens\": " + mapper.writeValueAsString(task.getPortsToOpen()) + "}"; + nodeConfigJson += "\", \"portsToOpen\": " + mapper.writeValueAsString(task.getPortsToOpen()) + "}"; } catch (IOException e) { LOGGER.error(e.getStackTrace()); } diff --git a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Job.java b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Job.java index 7970f50..172ccae 100644 --- a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Job.java +++ b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Job.java @@ -1,8 +1,8 @@ package org.activeeon.morphemic.model; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; import lombok.*; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.ObjectWriter; import javax.persistence.*; import java.io.IOException; diff --git a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Port.java b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Port.java index 17a3af0..5def6db 100644 --- a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Port.java +++ b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/model/Port.java @@ -23,6 +23,7 @@ public class Port implements Serializable { @Column(name = "VALUE") private Integer value; + @JsonIgnore @Column(name = "REQUESTED_NAME") private String requestedName; diff --git a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/service/NodeCandidateUtils.java b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/service/NodeCandidateUtils.java index b0a47ca..3f943da 100644 --- a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/service/NodeCandidateUtils.java +++ b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/service/NodeCandidateUtils.java @@ -1,10 +1,10 @@ package org.activeeon.morphemic.service; +import com.fasterxml.jackson.databind.ObjectMapper; import org.activeeon.morphemic.infrastructure.deployment.PAConnectorIaasGateway; import org.activeeon.morphemic.model.*; import org.apache.commons.lang3.StringUtils; import org.apache.log4j.Logger; -import org.codehaus.jackson.map.ObjectMapper; import org.json.JSONArray; import org.json.JSONObject; -- GitLab