Unverified Commit 1e1fbc45 authored by Mohamed Khalil LABIDI's avatar Mohamed Khalil LABIDI Committed by GitHub
Browse files

Merge pull request #21 from mklkun/fix-com

parents f03e6b5b f4eb8006
......@@ -83,13 +83,14 @@ public class PAGateway {
schedulerGateway.disconnect();
}
private List<Port> extractListOfPortsToOpen(JSONArray ports) {
private List<Port> extractListOfPortsToOpen(JSONArray ports, JSONObject job) {
List<Port> portsToOpen = new LinkedList<>();
if (ports != null) {
ports.forEach(object -> {
JSONObject portEntry = (JSONObject) object;
if (Objects.equals("PortProvided", portEntry.optString("type"))) {
Port portToOpen = new Port(portEntry.optInt("port"));
portToOpen.setRequestedName(findRequiredPort(job, portEntry.optString("name")));
portsToOpen.add(portToOpen);
}
});
......@@ -97,6 +98,15 @@ public class PAGateway {
return portsToOpen;
}
private String findRequiredPort(JSONObject job, String providedPortName) {
for (Object communicationObject : job.optJSONArray("communications")) {
JSONObject communication = (JSONObject) communicationObject;
if (Objects.equals(providedPortName, communication.optString("portProvided")))
return communication.optString("portRequired");
}
return "NOTREQUESTED_providedPortName";
}
private String findProvidedPort(JSONObject job, String requiredPortName) {
for (Object communicationObject : job.optJSONArray("communications")) {
JSONObject communication = (JSONObject) communicationObject;
......@@ -131,8 +141,9 @@ public class PAGateway {
if (ports != null) {
ports.forEach(portObject -> {
JSONObject portEntry = (JSONObject) portObject;
if (Objects.equals("PortRequired", portEntry.optString("type"))
&& portEntry.optBoolean("isMandatory")) {
// if (Objects.equals("PortRequired", portEntry.optString("type"))
// && portEntry.optBoolean("isMandatory")) {
if (Objects.equals("PortRequired", portEntry.optString("type"))) {
LOGGER.debug("Mandatory required port detected");
String providedPortName = findProvidedPort(job, portEntry.optString("name"));
parentTasks.add(findTaskByProvidedPort(job.optJSONArray("tasks"), providedPortName));
......@@ -259,7 +270,7 @@ public class PAGateway {
throw new IllegalArgumentException("Spark tasks are not handled yet.");
}
List<Port> portsToOpen = extractListOfPortsToOpen(task.optJSONArray("ports"));
List<Port> portsToOpen = extractListOfPortsToOpen(task.optJSONArray("ports"), job);
portsToOpen.forEach(EntityManagerHelper::persist);
newTask.setPortsToOpen(portsToOpen);
newTask.setParentTasks(extractParentTasks(job, task));
......@@ -481,7 +492,7 @@ public class PAGateway {
nodeNames.forEach(nodeName -> {
try {
String nodeUrl = resourceManagerGateway.searchNodes(nodeNames, true).get(0);
resourceManagerGateway.removeNodeSource(nodeUrl, preempt);
resourceManagerGateway.removeNode(nodeUrl, preempt);
LOGGER.info("Node " + nodeName + " with URL: " + nodeUrl + " has been removed successfully.");
} catch (NotConnectedException | RestException e) {
LOGGER.error(e.getStackTrace());
......@@ -558,10 +569,10 @@ public class PAGateway {
return 0;
}
private List<ScriptTask> createAppTasks(Task task, String taskNameSuffix, String taskToken) {
private List<ScriptTask> createAppTasks(Task task, String taskNameSuffix, String taskToken, Job job) {
switch (task.getType()) {
case "commands":
return createCommandsTask(task, taskNameSuffix, taskToken);
return createCommandsTask(task, taskNameSuffix, taskToken, job);
case "docker":
return createDockerTask(task, taskNameSuffix, taskToken);
}
......@@ -583,27 +594,46 @@ public class PAGateway {
return scriptTasks;
}
private List<ScriptTask> createCommandsTask(Task task, String taskNameSuffix, String taskToken) {
private List<ScriptTask> createCommandsTask(Task task, String taskNameSuffix, String taskToken, Job job) {
final String newLine = System.getProperty("line.separator");
final String scriptsSeparation = newLine + newLine + "# Main script" + newLine;
List<ScriptTask> scriptTasks = new LinkedList<>();
ScriptTask scriptTaskStart = null;
ScriptTask scriptTaskInstall = null;
Map<String, TaskVariable> taskVariablesMap = new HashMap<>();
if (!task.getParentTasks().isEmpty()) {
//TODO: Taking into consideration multiple parent tasks with multiple communications
taskVariablesMap.put("requestedPortName", new TaskVariable("requestedPortName",
job.findTask(task.getParentTasks().get(0)).getPortsToOpen().get(0).getRequestedName()));
}
if (!(task.getInstallation().getInstall().isEmpty() &&
task.getInstallation().getPreInstall().isEmpty() &&
task.getInstallation().getPostInstall().isEmpty())) {
if (!task.getInstallation().getInstall().isEmpty()) {
scriptTaskInstall = PAFactory.createBashScriptTask(task.getName() + "_install" + taskNameSuffix,
task.getInstallation().getInstall());
Utils.getContentWithFileName("export_env_var_script.sh") + scriptsSeparation +
task.getInstallation().getInstall());
} else {
scriptTaskInstall = PAFactory.createBashScriptTask(task.getName() + "_install" + taskNameSuffix,
"echo \"Installation script is empty. Nothing to be executed.\"");
}
if (!task.getInstallation().getPreInstall().isEmpty()) {
scriptTaskInstall.setPreScript(PAFactory.createSimpleScript(task.getInstallation().getPreInstall(), "bash"));
scriptTaskInstall.setPreScript(PAFactory.createSimpleScript(
Utils.getContentWithFileName("export_env_var_script.sh") + scriptsSeparation +
task.getInstallation().getPreInstall(),
"bash"));
}
if (!task.getInstallation().getPostInstall().isEmpty()) {
scriptTaskInstall.setPostScript(PAFactory.createSimpleScript(task.getInstallation().getPostInstall(), "bash"));
scriptTaskInstall.setPostScript(PAFactory.createSimpleScript(
Utils.getContentWithFileName("export_env_var_script.sh") + scriptsSeparation +
task.getInstallation().getPostInstall(),
"bash"));
}
if (!task.getParentTasks().isEmpty()) {
scriptTaskInstall.setVariables(taskVariablesMap);
}
scriptTaskInstall.addGenericInformation("NODE_ACCESS_TOKEN", taskToken);
scriptTasks.add(scriptTaskInstall);
......@@ -614,17 +644,24 @@ public class PAGateway {
task.getInstallation().getPostStart().isEmpty())) {
if (!task.getInstallation().getStart().isEmpty()) {
scriptTaskStart = PAFactory.createBashScriptTask(task.getName() + "_start" + taskNameSuffix,
task.getInstallation().getStart());
Utils.getContentWithFileName("export_env_var_script.sh") + scriptsSeparation +
task.getInstallation().getStart());
} else {
scriptTaskStart = PAFactory.createBashScriptTask(task.getName() + "_start" + taskNameSuffix,
"echo \"Installation script is empty. Nothing to be executed.\"");
}
if (!task.getInstallation().getPreStart().isEmpty()) {
scriptTaskStart.setPreScript(PAFactory.createSimpleScript(task.getInstallation().getPreStart(), "bash"));
scriptTaskStart.setPreScript(PAFactory.createSimpleScript(
Utils.getContentWithFileName("export_env_var_script.sh") + scriptsSeparation +
task.getInstallation().getPreStart(),
"bash"));
}
if (!task.getInstallation().getPostStart().isEmpty()) {
scriptTaskStart.setPostScript(PAFactory.createSimpleScript(task.getInstallation().getPostStart(), "bash"));
scriptTaskStart.setPostScript(PAFactory.createSimpleScript(
Utils.getContentWithFileName("export_env_var_script.sh") + scriptsSeparation +
task.getInstallation().getPostStart(),
"bash"));
}
if(scriptTaskInstall != null) {
scriptTaskStart.addDependence(scriptTaskInstall);
......@@ -760,7 +797,7 @@ public class PAGateway {
return 1;
}
// Validating there will still be at least on task in the task
// Validating there will still be at least one deployment in the task
if (optTask.get().getDeployments().size() - nodeNames.size() < 1) {
LOGGER.error("I stop the scale-in: the task will loose its last deployment");
return 2;
......@@ -770,7 +807,7 @@ public class PAGateway {
List<Deployment> deployments = nodeNames.stream().map(node -> EntityManagerHelper.find(Deployment.class,node)).filter(deployment -> (deployment != null)).collect(Collectors.toList());
// For deployed node, I flag their removal
List<String> nodesToBeRemoved = deployments.stream().filter(deployment -> deployment.getIsDeployed()).map(Deployment::getNodeName).collect(Collectors.toList());
List<String> nodesToBeRemoved = deployments.stream().filter(deployment -> deployment.getIsDeployed()).map(Deployment::getNodeName).collect(Collectors.toList());
// For every node, I remove the deployment entree
deployments.forEach(
deployment -> {
......@@ -788,20 +825,22 @@ public class PAGateway {
/**
* Translate a Morphemic task skeleton into a list of ProActive tasks
* @param task A Morphemic task skeleton
* @param job The related job skeleton
* @return A list of ProActive tasks
*/
public List<ScriptTask> buildPATask(Task task) {
public List<ScriptTask> buildPATask(Task task, Job job) {
List<ScriptTask> scriptTasks = new LinkedList<>();
List<String> tasksTokens = new LinkedList<>();
if (task.getDeployments() == null || task.getDeployments().isEmpty()) {
LOGGER.warn("The task " + task.getName() + " does not have a deployment. It will be scheduled on any free node.");
scriptTasks.addAll(createAppTasks(task, "", ""));
scriptTasks.addAll(createAppTasks(task, "", "", job));
task.setDeploymentFirstSubmittedTaskName(scriptTasks.get(0).getName());
task.setDeploymentLastSubmittedTaskName(scriptTasks.get(scriptTasks.size()-1).getName());
}
else {
task.getDeployments().forEach(deployment -> {
// Creating infra deployment tasks
String token = task.getTaskId() + tasksTokens.size();
String suffix = "_" + tasksTokens.size();
scriptTasks.add(createInfraTask(task, deployment, suffix, token));
......@@ -810,26 +849,21 @@ public class PAGateway {
LOGGER.debug("Token added: " + token);
tasksTokens.add(token);
deployment.setIsDeployed(true);
});
task.setDeploymentFirstSubmittedTaskName(scriptTasks.get(0).getName().substring(0, scriptTasks.get(0).getName().lastIndexOf("_0")));
tasksTokens.forEach(taskToken -> {
String suffix = "_" + tasksTokens.indexOf(taskToken);
List<ScriptTask> appTasks = createAppTasks(task, suffix, taskToken);
// Creating application deployment tasks
List<ScriptTask> appTasks = createAppTasks(task, suffix, token, job);
task.setDeploymentLastSubmittedTaskName(appTasks.get(appTasks.size()-1).getName().substring(0, appTasks.get(appTasks.size()-1).getName().lastIndexOf(suffix)));
scriptTasks.addAll(0, appTasks);
if (task.getInstallation().getOperatingSystemType().getOperatingSystemFamily().toLowerCase(Locale.ROOT).equals("ubuntu") &&
task.getInstallation().getOperatingSystemType().getOperatingSystemVersion() < 2000) {
LOGGER.info("Adding apt lock handler task since task: " + task.getName() +
" is meant to be executed in: " +
task.getInstallation().getOperatingSystemType().getOperatingSystemFamily() +
" version: " + task.getInstallation().getOperatingSystemType().getOperatingSystemVersion());
scriptTasks.add(0, createLockHandlerTask(task.getName(), suffix, taskToken));
scriptTasks.get(1).addDependence(scriptTasks.get(0));
}
// Creating infra preparation task
appTasks.add(0, createInfraPreparationTask(task, suffix, token, job));
appTasks.get(1).addDependence(appTasks.get(0));
// Add dependency between infra and application deployment tasks
appTasks.get(0).addDependence(scriptTasks.get(scriptTasks.size()-1));
scriptTasks.addAll(appTasks);
});
task.setDeploymentFirstSubmittedTaskName(scriptTasks.get(0).getName().substring(0, scriptTasks.get(0).getName().lastIndexOf("_0")));
}
scriptTasks.forEach(scriptTask -> task.addSubmittedTaskName(scriptTask.getName()));
......@@ -837,11 +871,49 @@ public class PAGateway {
return scriptTasks;
}
private ScriptTask createLockHandlerTask(String taskName, String suffix, String taskToken) {
String lockTaskName = "waitForLock_" + taskName + suffix;
ScriptTask lockHandlerTask = PAFactory.createBashScriptTaskFromFile(lockTaskName, "wait_for_lock_script.sh");
lockHandlerTask.addGenericInformation("NODE_ACCESS_TOKEN", taskToken);
return lockHandlerTask;
private ScriptTask createInfraPreparationTask(Task task, String suffix, String token, Job job) {
ScriptTask prepareInfraTask;
Map<String, TaskVariable> taskVariablesMap = new HashMap<>();
String taskName = "prepareInfra_" + task.getName() + suffix;
if (!task.getPortsToOpen().isEmpty()) {
prepareInfraTask = PAFactory.createBashScriptTaskFromFile(taskName, "prepare_infra_script.sh");
prepareInfraTask.setPostScript(PAFactory.createSimpleScriptFromFIle("post_prepare_infra_script.groovy",
"groovy"));
//TODO: Taking into consideration multiple provided ports
taskVariablesMap.put("providedPortName", new TaskVariable("providedPortName",
task.getPortsToOpen().get(0).getRequestedName()));
taskVariablesMap.put("providedPortValue", new TaskVariable("providedPortValue",
task.getPortsToOpen().get(0).getValue().toString()));
if (!task.getParentTasks().isEmpty()) {
//TODO: Taking into consideration multiple parent tasks with multiple communications
taskVariablesMap.put("requestedPortName", new TaskVariable("requestedPortName",
job.findTask(task.getParentTasks().get(0)).getPortsToOpen().get(0).getRequestedName()));
}
} else if (!task.getParentTasks().isEmpty()) {
prepareInfraTask = PAFactory.createBashScriptTaskFromFile(taskName, "prepare_infra_script.sh");
//TODO: Taking into consideration multiple parent tasks with multiple communications
taskVariablesMap.put("requestedPortName", new TaskVariable("requestedPortName",
job.findTask(task.getParentTasks().get(0)).getPortsToOpen().get(0).getRequestedName()));
} else {
prepareInfraTask = PAFactory.createBashScriptTask(taskName,
"echo \"No ports to open and not parent tasks. Nothing to be prepared in VM.\"");
}
if (task.getInstallation().getOperatingSystemType().getOperatingSystemFamily().toLowerCase(Locale.ROOT).equals("ubuntu") &&
task.getInstallation().getOperatingSystemType().getOperatingSystemVersion() < 2000) {
LOGGER.info("Adding apt lock handler script since task: " + task.getName() +
" is meant to be executed in: " +
task.getInstallation().getOperatingSystemType().getOperatingSystemFamily() +
" version: " + task.getInstallation().getOperatingSystemType().getOperatingSystemVersion());
prepareInfraTask.setPreScript(PAFactory.createSimpleScriptFromFIle("wait_for_lock_script.sh",
"bash"));
}
prepareInfraTask.setVariables(taskVariablesMap);
prepareInfraTask.addGenericInformation("NODE_ACCESS_TOKEN", token);
return prepareInfraTask;
}
private void setAllMandatoryDependencies(TaskFlowJob paJob, Job jobToSubmit) {
......@@ -880,7 +952,7 @@ public class PAGateway {
EntityManagerHelper.begin();
jobToSubmit.getTasks().forEach(task -> {
List<ScriptTask> scriptTasks = buildPATask(task);
List<ScriptTask> scriptTasks = buildPATask(task, jobToSubmit);
scriptTasks.forEach(scriptTask -> {
try {
......
package org.activeeon.morphemic.application.deployment;
import org.activeeon.morphemic.service.TemporaryFilesHelper;
import org.activeeon.morphemic.service.Utils;
import org.apache.log4j.Logger;
import org.ow2.proactive.scheduler.common.job.JobVariable;
import org.ow2.proactive.scheduler.common.task.ScriptTask;
......@@ -10,14 +11,10 @@ import org.ow2.proactive.scripting.SelectionScript;
import org.ow2.proactive.scripting.SimpleScript;
import org.ow2.proactive.scripting.TaskScript;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class PAFactory {
......@@ -52,15 +49,7 @@ public class PAFactory {
*/
public static SimpleScript createSimpleScriptFromFIle(String scriptFileName, String scriptLanguage) {
SimpleScript mySQLSimpleScript = null;
String script;
String newLine = System.getProperty("line.separator");
String scriptFileNameWithSeparator = (scriptFileName.startsWith(File.separator)) ?
scriptFileName : File.separator + scriptFileName;
LOGGER.debug("Creating a simple script from the file : " + scriptFileNameWithSeparator);
try (Stream<String> lines = new BufferedReader(new InputStreamReader(
PAFactory.class.getResourceAsStream(scriptFileNameWithSeparator))).lines()) {
script = lines.collect(Collectors.joining(newLine));
}
String script = Utils.getContentWithFileName(scriptFileName);
mySQLSimpleScript = createSimpleScript(script, scriptLanguage);
LOGGER.debug("Simple script created.");
return mySQLSimpleScript;
......
......@@ -23,6 +23,9 @@ public class Port implements Serializable {
@Column(name = "VALUE")
private Integer value;
@Column(name = "REQUESTED_NAME")
private String requestedName;
public Port(Integer value) {
if ((value == -1) || (value >= 0 && value <= 65535)) {
this.value = value;
......
package org.activeeon.morphemic.service;
import org.activeeon.morphemic.application.deployment.PAFactory;
import org.apache.log4j.Logger;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class Utils {
private static final Logger LOGGER = Logger.getLogger(Utils.class);
private Utils() { }
public static String getContentWithFileName(String fileName) {
String script;
String newLine = System.getProperty("line.separator");
String scriptFileNameWithSeparator = (fileName.startsWith(File.separator)) ?
fileName : File.separator + fileName;
LOGGER.debug("Creating a simple script from the file : " + scriptFileNameWithSeparator);
try (Stream<String> lines = new BufferedReader(new InputStreamReader(
PAFactory.class.getResourceAsStream(scriptFileNameWithSeparator))).lines()) {
script = lines.collect(Collectors.joining(newLine));
}
return script;
}
}
# Environment variables preparation
if [ -z "$variables_requestedPortName" ]; then
echo "[Env_var] No requested ports for this task. Nothing to be set."
else
REQUESTED_PORT_NAME="PUBLIC_$variables_requestedPortName"
if [[ ! -z $variables_requestedPortName ]]; then
REQ="variables_$variables_requestedPortName"
REQUESTED_PORT_VALUE=${!REQ}
if [[ -z ${!REQUESTED_PORT_NAME} ]]; then
echo "[Env_var] Variable $REQUESTED_PORT_NAME does not exist. Exporting ..."
export PUBLIC_$REQUESTED_PORT_NAME=$REQUESTED_PORT_VALUE
fi
echo "[Env_var] $REQUESTED_PORT_NAME variable set to $REQUESTED_PORT_VALUE"
fi
fi
def providedPortName = variables.get("providedPortName")
def providedPortValue = variables.get("providedPortValue")
if (providedPortName?.trim()){
def ipAddr = new File(providedPortName+"_ip").text.trim()
def publicProvidedPort = ipAddr + ":" + providedPortValue
variables.put(providedPortName, publicProvidedPort)
println("Provided variable " + providedPortName + "=" + publicProvidedPort)
}
\ No newline at end of file
PROVIDED_PORT_NAME=$variables_providedPortName
if [[ ! -z $PROVIDED_PORT_NAME ]]; then
IP_ADDR=$(dig +short myip.opendns.com @resolver1.opendns.com)
echo Public adress: $IP_ADDR
echo "$IP_ADDR" > $PROVIDED_PORT_NAME"_ip"
fi
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment