Commit 7e984be6 authored by Mohamed Khalil Labidi's avatar Mohamed Khalil Labidi
Browse files

Add BYON deployment, persistence and scaling support

parent 15225d02
Pipeline #15034 passed with stage
in 1 minute and 18 seconds
......@@ -8,7 +8,6 @@ import org.activeeon.morphemic.infrastructure.deployment.PAConnectorIaasGateway;
import org.activeeon.morphemic.infrastructure.deployment.PAResourceManagerGateway;
import org.activeeon.morphemic.model.*;
import org.activeeon.morphemic.service.*;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.Validate;
import org.json.JSONArray;
import org.json.JSONObject;
......@@ -581,6 +580,7 @@ public class PAGateway {
Deployment newDeployment = new Deployment();
JSONObject nodeCandidateInfo = node.optJSONObject("nodeCandidateInformation");
newDeployment.setNodeName(node.optString("nodeName"));
newDeployment.setDeploymentType(NodeType.IAAS);
newDeployment.setLocationName(nodeCandidateInfo.optString("locationName"));
newDeployment.setImageProviderId(nodeCandidateInfo.optString("imageProviderId"));
newDeployment.setHardwareProviderId(nodeCandidateInfo.optString("hardwareProviderId"));
......@@ -662,11 +662,13 @@ public class PAGateway {
public List<ByonNode> getByonNodeList(String jobId) {
List<ByonNode> filteredByonNodes = new LinkedList<>();
List<ByonNode> listByonNodes = EntityManagerHelper.createQuery("SELECT byon FROM ByonNode byon", ByonNode.class).getResultList();
if(jobId.equals("0")) {return listByonNodes;}
else{
for (ByonNode byonNode : listByonNodes){
if (byonNode.getJob().getJobId().equals(jobId)){
filteredByonNodes.add(byonNode);}
if (jobId.equals("0")) {
return listByonNodes;
} else {
for (ByonNode byonNode : listByonNodes) {
if (byonNode.getJob().getJobId().equals(jobId)) {
filteredByonNodes.add(byonNode);
}
}
return filteredByonNodes;
}
......@@ -682,12 +684,37 @@ public class PAGateway {
* @return 0 if nodes has been added properly. A greater than 0 value otherwise.
*/
public int addByonNodes(Map<String, String> byonIdPerComponent, String jobId) {
for (Map.Entry<String, String> entry : byonIdPerComponent.entrySet()) {
ByonNode byonNode = EntityManagerHelper.find(ByonNode.class, entry.getKey());
Validate.notNull(byonIdPerComponent, "The received byonIdPerComponent structure is empty. Nothing to be added.");
EntityManagerHelper.begin();
byonIdPerComponent.forEach((byonNodeId, componentId) -> {
ByonNode byonNode = EntityManagerHelper.find(ByonNode.class, byonNodeId);
Task task = EntityManagerHelper.find(Task.class, componentId);
Deployment newDeployment = new Deployment();
newDeployment.setNodeName(byonNode.getName());
newDeployment.setDeploymentType(NodeType.BYON);
newDeployment.setByonNode(byonNode);
List<ByonNode> byonNodeList = new LinkedList<>();
byonNodeList.add(byonNode);
defineByonNodeSource(byonNodeList, byonNode.getName());
}
LOGGER.info("BYON node source defined.");
newDeployment.setTask(task);
newDeployment.setNumber(task.getNextDeploymentID());
EntityManagerHelper.persist(newDeployment);
LOGGER.debug("Deployment created: " + newDeployment.toString());
task.addDeployment(newDeployment);
EntityManagerHelper.persist(task);
});
EntityManagerHelper.commit();
LOGGER.info("BYON nodes added properly.");
return 0;
/*TODO:
* Create node sources per JobID
......@@ -981,15 +1008,26 @@ public class PAGateway {
}
private ScriptTask createInfraTask(Task task, Deployment deployment, String taskNameSuffix, String nodeToken) {
switch (deployment.getDeploymentType()) {
case IAAS:
return createInfraIAASTask(task, deployment, taskNameSuffix, nodeToken);
case BYON:
return createInfraBYONTask(task, deployment, taskNameSuffix, nodeToken);
}
return new ScriptTask();
}
private ScriptTask createInfraIAASTask(Task task, Deployment deployment, String taskNameSuffix, String nodeToken) {
LOGGER.debug("Acquiring node AWS script file: " + getClass().getResource(File.separator + "acquire_node_aws_script.groovy").toString());
ScriptTask deployNodeTask = PAFactory.createGroovyScriptTaskFromFile("acquireAWSNode_" + task.getName() + taskNameSuffix,
"acquire_node_aws_script.groovy");
"acquire_node_aws_script.groovy");
deployNodeTask.setPreScript(PAFactory.createSimpleScriptFromFIle("pre_acquire_node_script.groovy", "groovy"));
Map<String, TaskVariable> variablesMap = new HashMap<>();
variablesMap.put("NS_name", new TaskVariable("NS_name",
deployment.getPaCloud().getNodeSourceNamePrefix() + deployment.getLocationName()));
deployment.getPaCloud().getNodeSourceNamePrefix() + deployment.getLocationName()));
variablesMap.put("nVMs", new TaskVariable("nVMs", "1", "PA:Integer", false));
variablesMap.put("synchronous", new TaskVariable("synchronous", "true", "PA:Boolean", false));
variablesMap.put("timeout", new TaskVariable("timeout", "700", "PA:Long", false));
......@@ -1011,24 +1049,57 @@ public class PAGateway {
LOGGER.debug("Variables to be added to the task: " + variablesMap.toString());
deployNodeTask.setVariables(variablesMap);
addLocalDefaultNSRegexSelectionScript(deployNodeTask);
return deployNodeTask;
}
private ScriptTask createInfraBYONTask(Task task, Deployment deployment, String taskNameSuffix, String nodeToken) {
LOGGER.debug("Acquiring node BYON script file: " + getClass().getResource(File.separator + "acquire_node_BYON_script.groovy").toString());
ScriptTask deployNodeTask = PAFactory.createGroovyScriptTaskFromFile("acquireBYONNode_" + task.getName() + taskNameSuffix,
"acquire_node_BYON_script.groovy");
deployNodeTask.setPreScript(PAFactory.createSimpleScriptFromFIle("pre_acquire_node_script.groovy", "groovy"));
Map<String, TaskVariable> variablesMap = new HashMap<>();
variablesMap.put("NS_name", new TaskVariable("NS_name",
deployment.getPaCloud().getNodeSourceNamePrefix() + deployment.getLocationName()));
//TODO: To check this. Which ip address is used as host in RM?
variablesMap.put("host_name", new TaskVariable("host_name",
deployment.getByonNode().getIpAddresses().get(0).getValue()));
variablesMap.put("token", new TaskVariable("token", nodeToken));
LOGGER.debug("Variables to be added to the task: " + variablesMap.toString());
deployNodeTask.setVariables(variablesMap);
addLocalDefaultNSRegexSelectionScript(deployNodeTask);
return deployNodeTask;
}
private void addLocalDefaultNSRegexSelectionScript(ScriptTask scriptTask) {
try {
String selectionScriptFileName = "check_node_source_regexp.groovy";
String[] nodeSourceNameRegex = {"^local$|^Default$"};
SelectionScript selectionScript = new SelectionScript(Utils.getContentWithFileName(selectionScriptFileName),
"groovy",
nodeSourceNameRegex,
true);
deployNodeTask.setSelectionScript(selectionScript);
"groovy",
nodeSourceNameRegex,
true);
scriptTask.setSelectionScript(selectionScript);
} catch (InvalidScriptException e) {
LOGGER.warn("Selection script could not have been added.");
}
return deployNodeTask;
}
private ScriptTask createEmsDeploymentTask(EmsDeploymentRequest emsDeploymentRequest, String taskNameSuffix, String nodeToken) {
LOGGER.debug("Preparing EMS deployment task");
ScriptTask emsDeploymentTask = PAFactory.createComplexScriptTaskFromFiles("emsDeployment" + taskNameSuffix,"emsdeploy_mainscript.groovy","groovy","emsdeploy_prescript.sh","bash","emsdeploy_postscript.sh","bash");
ScriptTask emsDeploymentTask = PAFactory.createComplexScriptTaskFromFiles("emsDeployment" + taskNameSuffix,
"emsdeploy_mainscript.groovy",
"groovy",
"emsdeploy_prescript.sh",
"bash",
"emsdeploy_postscript.sh",
"bash");
Map<String, TaskVariable> variablesMap = emsDeploymentRequest.getWorkflowMap();
emsDeploymentTask.addGenericInformation("NODE_ACCESS_TOKEN", nodeToken);
emsDeploymentTask.setVariables(variablesMap);
......@@ -1064,7 +1135,7 @@ public class PAGateway {
// Let's retrieve the deployment to clone
if (optTask.get().getDeployments() == null || optTask.get().getDeployments().isEmpty()) {
LOGGER.error(String.format("No previous deployment found in task [%s] ",taskName));
LOGGER.error(String.format("No previous deployment found in task [%s] ", taskName));
return 2;
}
......@@ -1073,6 +1144,10 @@ public class PAGateway {
// Let's clone the deployment/node as needed
Deployment oldDeployment = optTask.get().getDeployments().get(0);
if (NodeType.BYON.equals(oldDeployment.getDeploymentType())) {
LOGGER.error(String.format("The previous deployment is a BYON node [%s] ", oldDeployment));
return 3;
}
nodeNames.stream().map(nodeName -> {
EmsDeploymentRequest newEmsDeploymentReq =
oldDeployment.getEmsDeployment() == null ? null : oldDeployment.getEmsDeployment().clone(nodeName);
......@@ -1085,6 +1160,8 @@ public class PAGateway {
oldDeployment.getTask(),
false,
null,
null,
NodeType.IAAS,
null
);
})
......@@ -1374,10 +1451,24 @@ public class PAGateway {
}
// For supplied node, I retrieve their deployment
List<Deployment> deployments = nodeNames.stream().map(node -> EntityManagerHelper.find(Deployment.class,node)).filter(Objects::nonNull).collect(Collectors.toList());
List<Deployment> deployments = nodeNames.stream().map(node -> EntityManagerHelper.find(Deployment.class,node))
.filter(Objects::nonNull)
.collect(Collectors.toList());
deployments = deployments.stream().filter(deployment -> {
if (NodeType.BYON.equals(deployment.getDeploymentType())) {
LOGGER.warn("Deployment " + deployment.getNodeName() + " is a BYON node and can't be removed in scaling in.");
return false;
}
return true;
}).collect(Collectors.toList());
// For deployed node, I flag their removal
List<String> nodesToBeRemoved = deployments.stream().filter(Deployment::getIsDeployed).map(Deployment::getNodeName).collect(Collectors.toList());
List<String> nodesToBeRemoved = deployments.stream()
.filter(Deployment::getIsDeployed)
.map(Deployment::getNodeName)
.collect(Collectors.toList());
LOGGER.info("Nodes to be removed are : " + nodesToBeRemoved);
// For every node, I remove the deployment entree
deployments.forEach(
......@@ -1584,13 +1675,7 @@ public class PAGateway {
jobToSubmit.getTasks().forEach(task -> {
List<ScriptTask> scriptTasks = buildPATask(task, jobToSubmit);
scriptTasks.forEach(scriptTask -> {
try {
paJob.addTask(scriptTask);
} catch (UserException e) {
LOGGER.error("Task " + task.getName() + " could not be added due to: " + e.toString());
}
});
addAllScriptTasksToPAJob(paJob, task, scriptTasks);
EntityManagerHelper.persist(task);
});
......
......@@ -47,6 +47,13 @@ public class Deployment implements Serializable {
@Column(name = "NUMBER")
private Long number;
@Column(name = "NODE_TYPE")
@Enumerated(EnumType.STRING)
private NodeType deploymentType;
@OneToOne(fetch = FetchType.EAGER, cascade = CascadeType.REFRESH)
private ByonNode byonNode;
@Override
public String toString() {
return "Deployment{" +
......@@ -59,6 +66,7 @@ public class Deployment implements Serializable {
", number='" + number + '\'' +
", paCloud='" + paCloud.getNodeSourceNamePrefix() + '\'' +
", task='" + task.getName() + '\'' +
", byonNode='" + byonNode.toString() + '\'' +
'}';
}
}
......@@ -71,7 +71,7 @@ public class NodeCandidate implements Serializable {
private NodeCandidateTypeEnum nodeCandidateType = null;
@Column(name = "JOB_ID_FOR_BYON")
@JsonProperty("nodeType")
@JsonProperty("jobIdForByon")
private String jobIdForBYON;
@Column(name = "PRICE")
......
......@@ -16,7 +16,7 @@ public class NodeTypeRequirement extends Requirement {
@JsonProperty("nodeType")
private List<NodeType> nodeTypes;
@JsonProperty("nodeType")
@JsonProperty("jobIdForByon")
private String jobIdForBYON;
/**
......
<?xml version="1.0" encoding="UTF-8"?>
<job
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="urn:proactive:jobdescriptor:3.12" xsi:schemaLocation="urn:proactive:jobdescriptor:3.12 http://www.activeeon.com/public_content/schemas/proactive/jobdescriptor/3.12/schedulerjob.xsd" name="Define_NS_BYON.xml" priority="normal" onTaskError="continueJobExecution" maxNumberOfExecution="2" >
<variables>
<variable name="pa_port" value="8080" />
<variable name="NS_name" value="SSH_INFRA" />
<variable name="pa_protocol" value="http" />
<variable name="rm_host_name" value="" model=""/>
<variable name="ssh_username" value="" model=""/>
<variable name="ssh_password" value="" model=""/>
<variable name="ssh_key" value="" />
<variable name="ssh_port" value="22" model=""/>
<variable name="list_of_ips" value="" model=""/>
<variable name="tokens" value="" model=""/>
</variables>
<description>
<![CDATA[ A workflow that executes Groovy in JVM. ]]>
</description>
<taskFlow>
<task name="Create_NodeSource"
fork="true">
<description>
<![CDATA[ The simplest task, ran by a Groovy engine. ]]>
</description>
<scriptExecutable>
<script>
<code language="groovy">
<![CDATA[
// Connecting to the Scheduler
import java.io.File
def retCode = 0;
def tokens= variables.get("tokens")
def nodeSourceName = variables.get("NS_name")
def protocol = variables.get("pa_protocol")
def host = variables.get("rm_host_name")
def port = variables.get("pa_port")
def sshUsername = variables.get("ssh_username")
def sshPassword = variables.get("ssh_password")
def sshKey = variables.get("ssh_key")
def sshPort = variables.get("ssh_port")
def ips= variables.get("list_of_ips")
def javaOptions = "-Dproactive.net.nolocal=true -Dproactive.communication.protocol=pamr -Dproactive.useIPaddress=true -Dproactive.pamr.router.address=" + host
ips = ips.split(",")
def hosts = ""
for (ip in ips) {
hosts = hosts + ip + " 1\n"
}
if (tokens=="") {
tokens="ALL"
}
println "[+] Preparation of NodeSoure " + nodeSourceName
print "(1/4) Connecting to the RM at "+protocol+"://"+host+":"+port+"/ ..."
rmapi.connect()
println " OK!"
//Getting NS configuration settings
def infrastructureType = "org.ow2.proactive.resourcemanager.nodesource.infrastructure.SSHInfrastructureV2"
def infrastructureParameters = ["60000", //Node Time out
"5", //Max deployment failure
"5000", //wait between Deployment
sshPort,//port, //SSH port
sshUsername, //SSH Username
sshPassword, //SSH Password
"/opt/activeeon_enterprise-node-linux-x64-12.1.0-SNAPSHOT/jre/bin/java", //JavaPath on the remote host
"/opt/activeeon_enterprise-node-linux-x64-12.1.0-SNAPSHOT", //ScheduligPath on the remote hosts
"Linux", //targetOs
javaOptions]//Java options
def infrastructureFileParameters = [hosts, //hostsList file content
"", //SSH Private Key
"" //SSH Options
]
def policyType = "org.ow2.proactive.resourcemanager.nodesource.policy.StaticPolicy"
def poliyParameters = [tokens,"ALL"]
def policyFileParameters = [""]
def nodesRecoverable = "true"
print "(2/4) Creating NodeSource ..."
rmapi.defineNodeSource(nodeSourceName,infrastructureType,(String[]) infrastructureParameters.toArray(),(String[]) infrastructureFileParameters.toArray(), policyType, (String[]) poliyParameters.toArray(), (String[]) policyFileParameters.toArray(),nodesRecoverable)
println " ... OK !"
print "(3/4) Deploying the NodeSource ..."
rmapi.deployNodeSource(nodeSourceName)
println " ... OK !"
print "(4/4) Done"
return retCode;
]]>
</code>
</script>
</scriptExecutable>
<metadata>
<positionTop>
508
</positionTop>
<positionLeft>
510.5
</positionLeft>
</metadata>
</task>
</taskFlow>
<metadata>
<visualization>
<![CDATA[ <html>
<head>
<link rel="stylesheet" href="/studio/styles/studio-standalone.css">
<style>
#workflow-designer {
left:0 !important;
top:0 !important;
width:2726px;
height:3116px;
}
</style>
</head>
<body>
<div id="workflow-visualization-view"><div id="workflow-visualization" style="position:relative;top:-503px;left:-505.5px"><div class="task _jsPlumb_endpoint_anchor_ ui-draggable" id="jsPlumb_1_13" style="top: 508px; left: 510.5px; z-index: 24;"><a class="task-name" data-toggle="tooltip" data-placement="right" title="The simplest task, ran by a Groovy engine."><img src="images/Groovy.png" width="20px">&nbsp;<span class="name">Create_NodeSource</span></a></div><div class="_jsPlumb_endpoint source-endpoint dependency-source-endpoint connected _jsPlumb_endpoint_anchor_ ui-draggable ui-droppable" style="position: absolute; height: 20px; width: 20px; left: 563px; top: 538px;"><svg style="position:absolute;left:0px;top:0px" width="20" height="20" pointer-events="all" position="absolute" version="1.1" xmlns="http://www.w3.org/1999/xhtml"><circle cx="10" cy="10" r="10" version="1.1" xmlns="http://www.w3.org/1999/xhtml" fill="#666" stroke="none" style=""></circle></svg></div></div></div>
</body>
</html>
]]>
</visualization>
</metadata>
</job>
\ No newline at end of file
import org.ow2.proactive.resourcemanager.common.event.RMNodeEvent
// Conncting to the Scheduler
println "[+] Preparation of Nodes ... "
print "(1/4) Connecting to the RM ..."
rmapi.connect()
println " ... OK !"
// Getting NS configuration settings
def retCode = 0;
def nodeSourceName = variables.get("NS_name")
def nodeToken = variables.get("token")
def hostName = variables.get("host_name")
// Enforcing ....
print "(2/4) Searching for nodes ..."
def nodeURLs = rmapi.getRMStateFull().getNodesEvents().findAll { nodeEvent ->
return (nodeSourceName == nodeEvent.getNodeSource() && hostName == nodeEvent.getHostName());
}.collect { nodeEvent ->
return (nodeEvent.getNodeUrl());
}
println " ... OK !"
println "Found node URLs : " + nodeURLs.toString()
print "(3/4) Acquiring nodes ..."
nodeURLs.each { nodeURL ->
rmapi.addNodeToken(nodeURL, nodeToken)
}
println " ... OK !"
print "(4/4) Logging out ..."
rmapi.disconnect();
println " ... OK !"
return retCode;
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment