Commit b7ae5cb0 authored by Mohamed Khalil Labidi's avatar Mohamed Khalil Labidi
Browse files

Fix and improve components dependencies and communications

parent 9e3caa20
......@@ -83,13 +83,14 @@ public class PAGateway {
schedulerGateway.disconnect();
}
private List<Port> extractListOfPortsToOpen(JSONArray ports) {
private List<Port> extractListOfPortsToOpen(JSONArray ports, JSONObject job) {
List<Port> portsToOpen = new LinkedList<>();
if (ports != null) {
ports.forEach(object -> {
JSONObject portEntry = (JSONObject) object;
if (Objects.equals("PortProvided", portEntry.optString("type"))) {
Port portToOpen = new Port(portEntry.optInt("port"));
portToOpen.setRequestedName(findRequiredPort(job, portEntry.optString("name")));
portsToOpen.add(portToOpen);
}
});
......@@ -97,6 +98,15 @@ public class PAGateway {
return portsToOpen;
}
private String findRequiredPort(JSONObject job, String providedPortName) {
for (Object communicationObject : job.optJSONArray("communications")) {
JSONObject communication = (JSONObject) communicationObject;
if (Objects.equals(providedPortName, communication.optString("portProvided")))
return communication.optString("portRequired");
}
return "NOTREQUESTED_providedPortName";
}
private String findProvidedPort(JSONObject job, String requiredPortName) {
for (Object communicationObject : job.optJSONArray("communications")) {
JSONObject communication = (JSONObject) communicationObject;
......@@ -131,8 +141,9 @@ public class PAGateway {
if (ports != null) {
ports.forEach(portObject -> {
JSONObject portEntry = (JSONObject) portObject;
if (Objects.equals("PortRequired", portEntry.optString("type"))
&& portEntry.optBoolean("isMandatory")) {
// if (Objects.equals("PortRequired", portEntry.optString("type"))
// && portEntry.optBoolean("isMandatory")) {
if (Objects.equals("PortRequired", portEntry.optString("type"))) {
LOGGER.debug("Mandatory required port detected");
String providedPortName = findProvidedPort(job, portEntry.optString("name"));
parentTasks.add(findTaskByProvidedPort(job.optJSONArray("tasks"), providedPortName));
......@@ -259,7 +270,7 @@ public class PAGateway {
throw new IllegalArgumentException("Spark tasks are not handled yet.");
}
List<Port> portsToOpen = extractListOfPortsToOpen(task.optJSONArray("ports"));
List<Port> portsToOpen = extractListOfPortsToOpen(task.optJSONArray("ports"), job);
portsToOpen.forEach(EntityManagerHelper::persist);
newTask.setPortsToOpen(portsToOpen);
newTask.setParentTasks(extractParentTasks(job, task));
......@@ -481,7 +492,7 @@ public class PAGateway {
nodeNames.forEach(nodeName -> {
try {
String nodeUrl = resourceManagerGateway.searchNodes(nodeNames, true).get(0);
resourceManagerGateway.removeNodeSource(nodeUrl, preempt);
resourceManagerGateway.removeNode(nodeUrl, preempt);
LOGGER.info("Node " + nodeName + " with URL: " + nodeUrl + " has been removed successfully.");
} catch (NotConnectedException | RestException e) {
LOGGER.error(e.getStackTrace());
......@@ -760,7 +771,7 @@ public class PAGateway {
return 1;
}
// Validating there will still be at least on task in the task
// Validating there will still be at least one deployment in the task
if (optTask.get().getDeployments().size() - nodeNames.size() < 1) {
LOGGER.error("I stop the scale-in: the task will loose its last deployment");
return 2;
......@@ -770,7 +781,7 @@ public class PAGateway {
List<Deployment> deployments = nodeNames.stream().map(node -> EntityManagerHelper.find(Deployment.class,node)).filter(deployment -> (deployment != null)).collect(Collectors.toList());
// For deployed node, I flag their removal
List<String> nodesToBeRemoved = deployments.stream().filter(deployment -> deployment.getIsDeployed()).map(Deployment::getNodeName).collect(Collectors.toList());
List<String> nodesToBeRemoved = deployments.stream().filter(deployment -> deployment.getIsDeployed()).map(Deployment::getNodeName).collect(Collectors.toList());
// For every node, I remove the deployment entree
deployments.forEach(
deployment -> {
......@@ -788,9 +799,10 @@ public class PAGateway {
/**
* Translate a Morphemic task skeleton into a list of ProActive tasks
* @param task A Morphemic task skeleton
* @param job The related job skeleton
* @return A list of ProActive tasks
*/
public List<ScriptTask> buildPATask(Task task) {
public List<ScriptTask> buildPATask(Task task, Job job) {
List<ScriptTask> scriptTasks = new LinkedList<>();
List<String> tasksTokens = new LinkedList<>();
......@@ -802,6 +814,7 @@ public class PAGateway {
}
else {
task.getDeployments().forEach(deployment -> {
// Creating infra deployment tasks
String token = task.getTaskId() + tasksTokens.size();
String suffix = "_" + tasksTokens.size();
scriptTasks.add(createInfraTask(task, deployment, suffix, token));
......@@ -810,26 +823,21 @@ public class PAGateway {
LOGGER.debug("Token added: " + token);
tasksTokens.add(token);
deployment.setIsDeployed(true);
});
task.setDeploymentFirstSubmittedTaskName(scriptTasks.get(0).getName().substring(0, scriptTasks.get(0).getName().lastIndexOf("_0")));
tasksTokens.forEach(taskToken -> {
String suffix = "_" + tasksTokens.indexOf(taskToken);
List<ScriptTask> appTasks = createAppTasks(task, suffix, taskToken);
// Creating application deployment tasks
List<ScriptTask> appTasks = createAppTasks(task, suffix, token);
task.setDeploymentLastSubmittedTaskName(appTasks.get(appTasks.size()-1).getName().substring(0, appTasks.get(appTasks.size()-1).getName().lastIndexOf(suffix)));
scriptTasks.addAll(0, appTasks);
if (task.getInstallation().getOperatingSystemType().getOperatingSystemFamily().toLowerCase(Locale.ROOT).equals("ubuntu") &&
task.getInstallation().getOperatingSystemType().getOperatingSystemVersion() < 2000) {
LOGGER.info("Adding apt lock handler task since task: " + task.getName() +
" is meant to be executed in: " +
task.getInstallation().getOperatingSystemType().getOperatingSystemFamily() +
" version: " + task.getInstallation().getOperatingSystemType().getOperatingSystemVersion());
scriptTasks.add(0, createLockHandlerTask(task.getName(), suffix, taskToken));
scriptTasks.get(1).addDependence(scriptTasks.get(0));
}
// Creating infra preparation task
appTasks.add(0, createInfraPreparationTask(task, suffix, token, job));
appTasks.get(1).addDependence(appTasks.get(0));
// Add dependency between infra and application deployment tasks
appTasks.get(0).addDependence(scriptTasks.get(scriptTasks.size()-1));
scriptTasks.addAll(appTasks);
});
task.setDeploymentFirstSubmittedTaskName(scriptTasks.get(0).getName().substring(0, scriptTasks.get(0).getName().lastIndexOf("_0")));
}
scriptTasks.forEach(scriptTask -> task.addSubmittedTaskName(scriptTask.getName()));
......@@ -837,11 +845,49 @@ public class PAGateway {
return scriptTasks;
}
private ScriptTask createLockHandlerTask(String taskName, String suffix, String taskToken) {
String lockTaskName = "waitForLock_" + taskName + suffix;
ScriptTask lockHandlerTask = PAFactory.createBashScriptTaskFromFile(lockTaskName, "wait_for_lock_script.sh");
lockHandlerTask.addGenericInformation("NODE_ACCESS_TOKEN", taskToken);
return lockHandlerTask;
private ScriptTask createInfraPreparationTask(Task task, String suffix, String token, Job job) {
ScriptTask prepareInfraTask;
Map<String, TaskVariable> taskVariablesMap = new HashMap<>();
String taskName = "prepareInfra_" + task.getName() + suffix;
if (!task.getPortsToOpen().isEmpty()) {
prepareInfraTask = PAFactory.createBashScriptTaskFromFile(taskName, "prepare_infra_script.sh");
prepareInfraTask.setPostScript(PAFactory.createSimpleScriptFromFIle("post_prepare_infra_script.groovy",
"groovy"));
//TODO: Taking into consideration multiple provided ports
taskVariablesMap.put("providedPortName", new TaskVariable("providedPortName",
task.getPortsToOpen().get(0).getRequestedName()));
taskVariablesMap.put("providedPortValue", new TaskVariable("providedPortValue",
task.getPortsToOpen().get(0).getValue().toString()));
if (!task.getParentTasks().isEmpty()) {
//TODO: Taking into consideration multiple parent tasks with multiple communications
taskVariablesMap.put("requestedPortName", new TaskVariable("requestedPortName",
job.findTask(task.getParentTasks().get(0)).getPortsToOpen().get(0).getRequestedName()));
}
} else if (!task.getParentTasks().isEmpty()) {
prepareInfraTask = PAFactory.createBashScriptTaskFromFile(taskName, "prepare_infra_script.sh");
//TODO: Taking into consideration multiple parent tasks with multiple communications
taskVariablesMap.put("requestedPortName", new TaskVariable("requestedPortName",
job.findTask(task.getParentTasks().get(0)).getPortsToOpen().get(0).getRequestedName()));
} else {
prepareInfraTask = PAFactory.createBashScriptTask(taskName,
"echo \"No ports to open and not parent tasks. Nothing to be prepared in VM.\"");
}
if (task.getInstallation().getOperatingSystemType().getOperatingSystemFamily().toLowerCase(Locale.ROOT).equals("ubuntu") &&
task.getInstallation().getOperatingSystemType().getOperatingSystemVersion() < 2000) {
LOGGER.info("Adding apt lock handler script since task: " + task.getName() +
" is meant to be executed in: " +
task.getInstallation().getOperatingSystemType().getOperatingSystemFamily() +
" version: " + task.getInstallation().getOperatingSystemType().getOperatingSystemVersion());
prepareInfraTask.setPreScript(PAFactory.createSimpleScriptFromFIle("wait_for_lock_script.sh",
"bash"));
}
prepareInfraTask.setVariables(taskVariablesMap);
prepareInfraTask.addGenericInformation("NODE_ACCESS_TOKEN", token);
return prepareInfraTask;
}
private void setAllMandatoryDependencies(TaskFlowJob paJob, Job jobToSubmit) {
......@@ -880,7 +926,7 @@ public class PAGateway {
EntityManagerHelper.begin();
jobToSubmit.getTasks().forEach(task -> {
List<ScriptTask> scriptTasks = buildPATask(task);
List<ScriptTask> scriptTasks = buildPATask(task, jobToSubmit);
scriptTasks.forEach(scriptTask -> {
try {
......
......@@ -23,6 +23,9 @@ public class Port implements Serializable {
@Column(name = "VALUE")
private Integer value;
@Column(name = "REQUESTED_NAME")
private String requestedName;
public Port(Integer value) {
if ((value == -1) || (value >= 0 && value <= 65535)) {
this.value = value;
......
def providedPortName = variables.get("providedPortName")
def providedPortValue = variables.get("providedPortValue")
if (providedPortName?.trim()){
def ipAddr = new File(providedPortName+"_ip").text.trim()
def publicProvidedPort = ipAddr + ":" + providedPortValue
variables.put(providedPortName, publicProvidedPort)
println("Provided variable " + providedPortName + "=" + publicProvidedPort)
}
\ No newline at end of file
REQUESTED_PORT_NAME="PUBLIC_$variables_requestedPortName"
if [[ ! -z $variables_requestedPortName ]]; then
REQ="variables_$variables_requestedPortName"
REQUESTED_PORT_VALUE=${!REQ}
if [[ -z ${!REQUESTED_PORT_NAME} ]]; then
echo "Variable $REQUESTED_PORT_NAME does not exist. Exporting ..."
export PUBLIC_$REQUESTED_PORT_NAME=$REQUESTED_PORT_VALUE
echo "export PUBLIC_$REQUESTED_PORT_NAME=$REQUESTED_PORT_VALUE" >> ~/.bashrc
source ~/.bashrc
fi
echo "$REQUESTED_PORT_NAME variable set to $REQUESTED_PORT_VALUE"
fi
PROVIDED_PORT_NAME=$variables_providedPortName
if [[ ! -z $PROVIDED_PORT_NAME ]]; then
IP_ADDR=$(dig +short myip.opendns.com @resolver1.opendns.com)
echo Public adress: $IP_ADDR
echo "$IP_ADDR" > $PROVIDED_PORT_NAME"_ip"
fi
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment