Commit ff755621 authored by Maroun Koussaifi's avatar Maroun Koussaifi Committed by Mohamed Khalil Labidi
Browse files

Improve the connection to the Scheduler and RM

parent 3c52d678
...@@ -79,13 +79,11 @@ public class PAGateway { ...@@ -79,13 +79,11 @@ public class PAGateway {
/** /**
* Disconnect from the ProActive server * Disconnect from the ProActive server
*/ */
public void disconnect() { public void disconnect() throws NotConnectedException {
LOGGER.debug("Disconnecting from RM..."); LOGGER.debug("Disconnecting from RM...");
resourceManagerGateway.disconnect(); resourceManagerGateway.disconnect();
LOGGER.info("Disconnected from RM.");
LOGGER.debug("Disconnecting from Scheduler..."); LOGGER.debug("Disconnecting from Scheduler...");
schedulerGateway.disconnect(); schedulerGateway.disconnect();
LOGGER.info("Disconnected from Scheduler.");
} }
private List<Port> extractListOfPortsToOpen(JSONArray ports) { private List<Port> extractListOfPortsToOpen(JSONArray ports) {
...@@ -168,7 +166,7 @@ public class PAGateway { ...@@ -168,7 +166,7 @@ public class PAGateway {
LinkedList<Task> jobTasks = new LinkedList<>(applicationJob.getTasks()); LinkedList<Task> jobTasks = new LinkedList<>(applicationJob.getTasks());
// Add the mandatory connections between the tasks // Add the mandatory connections between the tasks
for(Task task : jobTasks){ jobTasks.forEach(task -> {
// Write the dot representation of the task // Write the dot representation of the task
dotGraphSyntax.append(task.getName() + ";\n"); dotGraphSyntax.append(task.getName() + ";\n");
...@@ -181,7 +179,7 @@ public class PAGateway { ...@@ -181,7 +179,7 @@ public class PAGateway {
// Check for Mandatory connections // Check for Mandatory connections
// If the list is empty there are no mandatory connections // If the list is empty there are no mandatory connections
for (String parentTask : parentTasks) { parentTasks.forEach(parentTask -> {
// Write the dot syntax of the connection between the two tasks // Write the dot syntax of the connection between the two tasks
dotGraphSyntax.append(parentTask dotGraphSyntax.append(parentTask
...@@ -191,8 +189,9 @@ public class PAGateway { ...@@ -191,8 +189,9 @@ public class PAGateway {
+ ";"); + ";");
dotGraphSyntax.append("\n"); dotGraphSyntax.append("\n");
} });
}
});
// Write the dot file end character // Write the dot file end character
dotGraphSyntax.append("}\n"); dotGraphSyntax.append("}\n");
......
package org.activeeon.morphemic.application.deployment; package org.activeeon.morphemic.application.deployment;
import org.activeeon.morphemic.service.SchedulerConnectionHelper;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.ow2.proactive.authentication.ConnectionInfo;
import org.ow2.proactive.scheduler.common.exception.*; import org.ow2.proactive.scheduler.common.exception.*;
import org.ow2.proactive.scheduler.common.job.Job; import org.ow2.proactive.scheduler.common.job.Job;
import org.ow2.proactive.scheduler.common.job.JobId; import org.ow2.proactive.scheduler.common.job.JobId;
...@@ -18,10 +18,6 @@ import java.util.concurrent.TimeoutException; ...@@ -18,10 +18,6 @@ import java.util.concurrent.TimeoutException;
public class PASchedulerGateway { public class PASchedulerGateway {
private final String paUrl;
private final String SCHEDULER_REST_PATH = "/rest";
private RestSmartProxyImpl restSmartProxy; private RestSmartProxyImpl restSmartProxy;
private static final Logger LOGGER = Logger.getLogger(PASchedulerGateway.class); private static final Logger LOGGER = Logger.getLogger(PASchedulerGateway.class);
...@@ -31,8 +27,7 @@ public class PASchedulerGateway { ...@@ -31,8 +27,7 @@ public class PASchedulerGateway {
* @param paUrl ProActive URL (exp: http://try.activeeon.com:8080/) * @param paUrl ProActive URL (exp: http://try.activeeon.com:8080/)
*/ */
public PASchedulerGateway(String paUrl) { public PASchedulerGateway(String paUrl) {
this.paUrl = paUrl; SchedulerConnectionHelper.init(paUrl);
restSmartProxy = new RestSmartProxyImpl();
} }
/** /**
...@@ -231,29 +226,15 @@ public class PASchedulerGateway { ...@@ -231,29 +226,15 @@ public class PASchedulerGateway {
* @param password The user's password * @param password The user's password
*/ */
public void connect(String username, String password) { public void connect(String username, String password) {
LOGGER.debug("Connecting to Scheduler ..."); // Connect to the Scheduler API
//TODO: TO improve the concatenation of URLs restSmartProxy = SchedulerConnectionHelper.connect(username,password);
ConnectionInfo connectionInfo = new ConnectionInfo(this.paUrl + this.SCHEDULER_REST_PATH,
username,
password,
null,
true);
restSmartProxy.init(connectionInfo);
LOGGER.info("Connected to Scheduler.");
} }
/** /**
* Disconnect from the ProActive server * Disconnect from the ProActive server
*/ */
public void disconnect() { public void disconnect() {
try { SchedulerConnectionHelper.disconnect();
LOGGER.debug("Disconnecting from Scheduler...");
restSmartProxy.disconnect();
LOGGER.info("Disconnected from Scheduler.");
} catch (PermissionException pe) {
LOGGER.warn("WARNING: Not able to disconnect due to: " + pe.toString());
}
} }
......
package org.activeeon.morphemic.infrastructure.deployment; package org.activeeon.morphemic.infrastructure.deployment;
import org.activeeon.morphemic.service.RMConnectionHelper;
import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.conn.ConnectTimeoutException;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.ow2.proactive.resourcemanager.common.NSState; import org.ow2.proactive.resourcemanager.common.NSState;
...@@ -9,7 +10,6 @@ import org.ow2.proactive.resourcemanager.exception.RMException; ...@@ -9,7 +10,6 @@ import org.ow2.proactive.resourcemanager.exception.RMException;
import org.ow2.proactive.resourcemanager.exception.RMNodeException; import org.ow2.proactive.resourcemanager.exception.RMNodeException;
import org.ow2.proactive.scheduler.common.exception.NotConnectedException; import org.ow2.proactive.scheduler.common.exception.NotConnectedException;
import org.ow2.proactive_grid_cloud_portal.common.RMRestInterface; import org.ow2.proactive_grid_cloud_portal.common.RMRestInterface;
import org.ow2.proactive_grid_cloud_portal.rm.client.RMRestClient;
import org.ow2.proactive_grid_cloud_portal.scheduler.exception.PermissionRestException; import org.ow2.proactive_grid_cloud_portal.scheduler.exception.PermissionRestException;
import org.ow2.proactive_grid_cloud_portal.scheduler.exception.RestException; import org.ow2.proactive_grid_cloud_portal.scheduler.exception.RestException;
...@@ -22,14 +22,8 @@ import java.util.concurrent.*; ...@@ -22,14 +22,8 @@ import java.util.concurrent.*;
public class PAResourceManagerGateway { public class PAResourceManagerGateway {
private final String paURL;
private final String RESOURCE_MANAGER_REST_PATH = "/rest";
private RMRestInterface rmRestInterface; private RMRestInterface rmRestInterface;
private String sessionId;
static final int MAX_CONNECTION_RETRIES = 10; static final int MAX_CONNECTION_RETRIES = 10;
static final int INTERVAL = 10000; static final int INTERVAL = 10000;
...@@ -55,11 +49,11 @@ public class PAResourceManagerGateway { ...@@ -55,11 +49,11 @@ public class PAResourceManagerGateway {
try { try {
Thread.sleep(INTERVAL); Thread.sleep(INTERVAL);
deployedNodes = getDeployedNodesInformation(nodeSource); deployedNodes = getDeployedNodesInformation(nodeSource);
if (!deployedNodes.isEmpty()) { if (!deployedNodes.isEmpty()) {
gotResponse = true; gotResponse = true;
} else { } else {
retries++; retries++;
} }
} catch (Exception e) { } catch (Exception e) {
retries++; retries++;
} }
...@@ -87,39 +81,30 @@ public class PAResourceManagerGateway { ...@@ -87,39 +81,30 @@ public class PAResourceManagerGateway {
* Construct a gateway to the ProActive Resource Manager * Construct a gateway to the ProActive Resource Manager
* @param paURL ProActive URL (exp: http://try.activeeon.com:8080/) * @param paURL ProActive URL (exp: http://try.activeeon.com:8080/)
*/ */
public PAResourceManagerGateway(String paURL) { public PAResourceManagerGateway(String paURL){
this.paURL = paURL; // Initialize the gateway from the RMConnectionHelper class
//TODO: TO improve the concatenation of URLs rmRestInterface = RMConnectionHelper.init(paURL);
RMRestClient client = new RMRestClient(this.paURL + this.RESOURCE_MANAGER_REST_PATH, null);
rmRestInterface = client.getRm();
} }
/** /**
* Connect to the ProActive server * Connect to the ProActive server
* @param username The user's username * @param username Username
* @param password The user's password * @param password Password
* @throws LoginException In case the login is not valid * @throws LoginException In case the login is not valid
* @throws KeyException In case the password is not valid * @throws KeyException In case the password is not valid
* @throws RMException In case an error happens in the RM * @throws RMException In case an error happens in the RM
* @throws NotConnectedException In case the session id is invalid
*/ */
public void connect(String username, String password) throws LoginException, KeyException, RMException { public void connect(String username, String password) throws LoginException, KeyException, RMException {
LOGGER.debug("Connecting to RM ..."); RMConnectionHelper.connect(username,password);
sessionId = rmRestInterface.rmConnect(username, password);
LOGGER.info("Connected to RM.");
} }
/** /**
* Disconnect from the ProActive server * Disconnect from the ProActive server
*/ */
public void disconnect() { public void disconnect(){
try { RMConnectionHelper.disconnect();
LOGGER.debug("Disconnecting from RM...");
rmRestInterface.rmDisconnect(sessionId);
LOGGER.info("Disconnected from RM.");
} catch (NotConnectedException nce) {
LOGGER.warn("WARNING: Not able to disconnect due to: " + nce.toString());
}
} }
/** /**
...@@ -132,7 +117,7 @@ public class PAResourceManagerGateway { ...@@ -132,7 +117,7 @@ public class PAResourceManagerGateway {
private List<String> getDeployedNodesInformation(String nodeSource) throws NotConnectedException, PermissionRestException { private List<String> getDeployedNodesInformation(String nodeSource) throws NotConnectedException, PermissionRestException {
List<String> deployedNodes = new ArrayList<>(); List<String> deployedNodes = new ArrayList<>();
LOGGER.debug("Getting full RM state ..."); LOGGER.debug("Getting full RM state ...");
RMStateFull rmState = rmRestInterface.getRMStateFull(sessionId); RMStateFull rmState = rmRestInterface.getRMStateFull(RMConnectionHelper.getSessionId());
LOGGER.debug("Full monitoring got."); LOGGER.debug("Full monitoring got.");
LOGGER.debug("Searching for deployed nodes information ..."); LOGGER.debug("Searching for deployed nodes information ...");
for (RMNodeEvent rmNodeEvent : rmState.getNodesEvents()) { for (RMNodeEvent rmNodeEvent : rmState.getNodesEvents()) {
...@@ -184,11 +169,11 @@ public class PAResourceManagerGateway { ...@@ -184,11 +169,11 @@ public class PAResourceManagerGateway {
String nodesRecoverable = "true"; String nodesRecoverable = "true";
LOGGER.debug("Creating NodeSource ..."); LOGGER.debug("Creating NodeSource ...");
rmRestInterface.defineNodeSource(sessionId, nodeSourceName,infrastructureType, infrastructureParameters, infrastructureFileParameters, policyType, policyParameters, policyFileParameters, nodesRecoverable); rmRestInterface.defineNodeSource(RMConnectionHelper.getSessionId(), nodeSourceName,infrastructureType, infrastructureParameters, infrastructureFileParameters, policyType, policyParameters, policyFileParameters, nodesRecoverable);
LOGGER.info("NodeSource created."); LOGGER.info("NodeSource created.");
LOGGER.debug("Deploying the NodeSource ..."); LOGGER.debug("Deploying the NodeSource ...");
rmRestInterface.deployNodeSource(sessionId, nodeSourceName); rmRestInterface.deployNodeSource(RMConnectionHelper.getSessionId(), nodeSourceName);
LOGGER.info("NodeSource VMs deployed."); LOGGER.info("NodeSource VMs deployed.");
} }
...@@ -202,7 +187,7 @@ public class PAResourceManagerGateway { ...@@ -202,7 +187,7 @@ public class PAResourceManagerGateway {
*/ */
public List<String> searchNodes(List<String> tags, boolean all) throws NotConnectedException, RestException { public List<String> searchNodes(List<String> tags, boolean all) throws NotConnectedException, RestException {
LOGGER.debug("Search for nodes ..."); LOGGER.debug("Search for nodes ...");
List<String> nodesUrls = new ArrayList<>(rmRestInterface.searchNodes(sessionId, tags, all)); List<String> nodesUrls = new ArrayList<>(rmRestInterface.searchNodes(RMConnectionHelper.getSessionId(), tags, all));
LOGGER.debug("Nodes found: " + nodesUrls); LOGGER.debug("Nodes found: " + nodesUrls);
return nodesUrls; return nodesUrls;
} }
...@@ -217,7 +202,7 @@ public class PAResourceManagerGateway { ...@@ -217,7 +202,7 @@ public class PAResourceManagerGateway {
*/ */
public NSState undeployNodeSource(String nodeSourceName, Boolean preempt) throws NotConnectedException, PermissionRestException { public NSState undeployNodeSource(String nodeSourceName, Boolean preempt) throws NotConnectedException, PermissionRestException {
LOGGER.debug("Undeploying node source ..."); LOGGER.debug("Undeploying node source ...");
NSState nsState = rmRestInterface.undeployNodeSource(sessionId, nodeSourceName, preempt); NSState nsState = rmRestInterface.undeployNodeSource(RMConnectionHelper.getSessionId(), nodeSourceName, preempt);
LOGGER.info("Node source undeployed!"); LOGGER.info("Node source undeployed!");
return nsState; return nsState;
} }
...@@ -232,7 +217,7 @@ public class PAResourceManagerGateway { ...@@ -232,7 +217,7 @@ public class PAResourceManagerGateway {
*/ */
public Boolean removeNodeSource(String nodeSourceName, Boolean preempt) throws NotConnectedException, PermissionRestException { public Boolean removeNodeSource(String nodeSourceName, Boolean preempt) throws NotConnectedException, PermissionRestException {
LOGGER.debug("Removing node source ..."); LOGGER.debug("Removing node source ...");
Boolean result = rmRestInterface.removeNodeSource(sessionId, nodeSourceName, preempt); Boolean result = rmRestInterface.removeNodeSource(RMConnectionHelper.getSessionId(), nodeSourceName, preempt);
LOGGER.info("Node source removed!"); LOGGER.info("Node source removed!");
return result; return result;
} }
...@@ -247,7 +232,7 @@ public class PAResourceManagerGateway { ...@@ -247,7 +232,7 @@ public class PAResourceManagerGateway {
*/ */
public Boolean releaseNode(String nodeUrl) throws NotConnectedException, PermissionRestException, RMNodeException { public Boolean releaseNode(String nodeUrl) throws NotConnectedException, PermissionRestException, RMNodeException {
LOGGER.debug("Releasing node ..."); LOGGER.debug("Releasing node ...");
Boolean result = rmRestInterface.releaseNode(sessionId, nodeUrl); Boolean result = rmRestInterface.releaseNode(RMConnectionHelper.getSessionId(), nodeUrl);
LOGGER.info("Node released!"); LOGGER.info("Node released!");
return result; return result;
} }
...@@ -262,7 +247,7 @@ public class PAResourceManagerGateway { ...@@ -262,7 +247,7 @@ public class PAResourceManagerGateway {
*/ */
public Boolean removeNode(String nodeUrl, Boolean preempt) throws NotConnectedException, PermissionRestException { public Boolean removeNode(String nodeUrl, Boolean preempt) throws NotConnectedException, PermissionRestException {
LOGGER.debug("Removing node ..."); LOGGER.debug("Removing node ...");
Boolean result = rmRestInterface.removeNode(sessionId, nodeUrl, preempt); Boolean result = rmRestInterface.removeNode(RMConnectionHelper.getSessionId(), nodeUrl, preempt);
LOGGER.info("Node removed!"); LOGGER.info("Node removed!");
return result; return result;
} }
......
package org.activeeon.morphemic.service;
import org.apache.log4j.Logger;
import org.ow2.proactive.resourcemanager.exception.RMException;
import org.ow2.proactive.scheduler.common.exception.NotConnectedException;
import org.ow2.proactive_grid_cloud_portal.common.RMRestInterface;
import org.ow2.proactive_grid_cloud_portal.rm.client.RMRestClient;
import javax.security.auth.login.LoginException;
import java.security.KeyException;
import java.util.prefs.Preferences;
public class RMConnectionHelper {
private static final Preferences userPreferences = Preferences.userRoot().node("USER_PREFERENCES");
private static String sessionPreferencesId;
private static final Logger LOGGER = Logger.getLogger(RMConnectionHelper.class);
private static final String RESOURCE_MANAGER_REST_PATH = "/rest";
private static String sessionId = "";
private static RMRestInterface rmRestInterface;
/**
*
* Initialize the API to RM
*
* @param paURL PA rest URL
* @return The initialized RM Interface to be used for sending request to the platform
*/
public static RMRestInterface init(String paURL) {
if(paURL.contains("trydev2")){
sessionPreferencesId = "RM_sessionId_trydev2";
}else{
sessionPreferencesId = "RM_sessionId";
}
// Initialize the client
rmRestInterface = new RMRestClient(paURL + RESOURCE_MANAGER_REST_PATH, null).getRm();
// Get the user session ID
sessionId = userPreferences.get(sessionPreferencesId,"");
LOGGER.debug("Gateway to the ProActive Resource Manager is established");
return rmRestInterface;
}
/**
*
* Connect to the RM
*
* @param username Username
* @param password Password
* @return The user session ID
* @throws LoginException In case the login is not valid
* @throws KeyException In case the password is not valid
* @throws RMException In case an error happens in the RM
*/
public static synchronized void connect(String username, String password) throws LoginException, KeyException, RMException {
LOGGER.debug("Connecting to RM ...");
boolean isActive;
try {
isActive = rmRestInterface.isActive(sessionId);
// If the sessionId is equals to "" (empty), an exception will occurs.
// If the sessionId is valid ==> Already connected
// If the sessionId is invalid we create a new session by establishing a new connection to the RM
if(isActive){
LOGGER.info("Already Connected to RM");
}else{
// Connect and create a new session
sessionId = rmRestInterface.rmConnect(username, password);
// Save the session
userPreferences.put(sessionPreferencesId,sessionId);
LOGGER.info("Connected to RM");
}
}catch (Exception NAE){
// Exception is triggered when the sessionId is equal to ""
sessionId = rmRestInterface.rmConnect(username, password);
userPreferences.put(sessionPreferencesId,sessionId);
LOGGER.info("Connected to RM");
}
}
/**
*
* Disconnect from the RM API
*
*/
public static synchronized void disconnect() {
boolean isActive;
try{
sessionId = userPreferences.get(sessionPreferencesId,"");
// Check if the session still active
isActive = rmRestInterface.isActive(sessionId);
if(isActive){
try {
LOGGER.debug("Disconnecting from RM...");
rmRestInterface.rmDisconnect(sessionId);
// Remove the stored session
userPreferences.remove(sessionPreferencesId);
LOGGER.info("Disconnected from RM.");
} catch (NotConnectedException nce) {
LOGGER.warn("WARNING: Not able to disconnect due to: " + nce.toString());
}
}else{
LOGGER.info("Already disconnected from RM");
}
}catch (Exception e){
// Exception will trigger if the sessionId is empty
LOGGER.info("Already disconnected from RM");
}
}
public static String getSessionId() {
return sessionId;
}
}
package org.activeeon.morphemic.service;
import org.apache.log4j.Logger;
import org.ow2.proactive.authentication.ConnectionInfo;
import org.ow2.proactive.scheduler.common.exception.PermissionException;
import org.ow2.proactive_grid_cloud_portal.smartproxy.RestSmartProxyImpl;
public class SchedulerConnectionHelper {
private static final Logger LOGGER = Logger.getLogger(SchedulerConnectionHelper.class);
private static final String SCHEDULER_REST_PATH = "/rest";
private static RestSmartProxyImpl restSmartProxy = new RestSmartProxyImpl();
private static String paURL;
/**
*
* Initialize the gateway URL
*
* @param url URL for the ProActive Rest service
*/
public static void init(String url) {
paURL = url;
}
/**
*
* Connect to the Scheduler gateway
*
* @param username Username
* @param password Password
* @return The initialized Scheduler gateway
*/
public static synchronized RestSmartProxyImpl connect(String username, String password) {
LOGGER.debug("Connecting to Scheduler ...");
//TODO: TO improve the concatenation of URLs
ConnectionInfo connectionInfo = new ConnectionInfo(paURL + SCHEDULER_REST_PATH,
username,
password,
null,
true);
// Check if the proxy is connected
// If not make a new connection
if(!restSmartProxy.isConnected()) {
restSmartProxy.init(connectionInfo);
LOGGER.info("Connected to Scheduler");
}else{
LOGGER.info("Already connected to Scheduler");
}
return restSmartProxy;
}
/**
*
* Disconnect from the Scheduler
*
*/
public static synchronized void disconnect() {
try {
if(restSmartProxy.isConnected()) {
restSmartProxy.disconnect();
LOGGER.info("Disconnected from Scheduler");
}else{
LOGGER.info("Already disconnected from Scheduler");
}
} catch (PermissionException e) {
LOGGER.warn("WARNING: Not able to disconnect due to: " + e.toString());
}
}
}