Commit ad15f053 authored by Mohamed Khalil Labidi's avatar Mohamed Khalil Labidi
Browse files

Add refresh connexion mechanism in RM and Scheduler gateways

parent 00e6dbec
......@@ -13,6 +13,7 @@ import org.ow2.proactive_grid_cloud_portal.smartproxy.RestSmartProxyImpl;
import javax.security.auth.login.LoginException;
import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
......@@ -36,6 +37,7 @@ public class PASchedulerGateway {
* @return JobId
*/
public JobId submit(Job job) {
reconnect();
JobId jobId = null;
LOGGER.debug("Submitting job: " + job.toString());
try {
......@@ -58,6 +60,7 @@ public class PASchedulerGateway {
* @return JobId
*/
public JobId submit(File xmlFile) {
reconnect();
JobId jobId = null;
LOGGER.debug("Submitting job: " + xmlFile.toString());
try {
......@@ -81,6 +84,7 @@ public class PASchedulerGateway {
* @return JobId
*/
public JobId submit(File xmlFile, Map<String, String> variables) {
reconnect();
JobId jobId = null;
LOGGER.debug("Submitting job: " + xmlFile.toString());
LOGGER.debug(" with variables: " + variables.toString());
......@@ -104,6 +108,7 @@ public class PASchedulerGateway {
* @return The job state
*/
public JobState getJobState(String jobId) {
reconnect();
JobState jobState = null;
try {
if (!restSmartProxy.isConnected()) {
......@@ -135,6 +140,7 @@ public class PASchedulerGateway {
* @return The job result
*/
public JobResult waitForJob(String jobId, long timeout) {
reconnect();
JobResult jobResult = null;
try {
jobResult = restSmartProxy.waitForJob(jobId, timeout);
......@@ -156,6 +162,7 @@ public class PASchedulerGateway {
* @return The jobs results map
*/
public Map<Long, Map<String, Serializable>> getJobResultMaps(List<String> jobsId) {
reconnect();
Map<Long, Map<String, Serializable>> jobResults = null;
try {
jobResults = restSmartProxy.getJobResultMaps(jobsId);
......@@ -171,6 +178,7 @@ public class PASchedulerGateway {
* @return The jobs results map
*/
public boolean killJob(String jobId) {
reconnect();
boolean result = false;
try {
result = restSmartProxy.killJob(jobId);
......@@ -192,6 +200,7 @@ public class PASchedulerGateway {
* @return The task result
*/
public TaskResult waitForTask(String jobId, String taskName, long timeout) {
reconnect();
TaskResult taskResult = null;
try {
taskResult = restSmartProxy.waitForTask(jobId, taskName, timeout);
......@@ -216,6 +225,7 @@ public class PASchedulerGateway {
* @return The task result
*/
public TaskResult getTaskResult(String jobId, String taskName) {
reconnect();
TaskResult taskResult = null;
try {
taskResult = restSmartProxy.getTaskResult(jobId, taskName);
......@@ -248,6 +258,17 @@ public class PASchedulerGateway {
restSmartProxy = SchedulerConnectionHelper.disconnect();
}
private void reconnect() {
if (!restSmartProxy.isConnected()) {
try {
restSmartProxy.reconnect();
LOGGER.info("Connexion to ProActive Scheduler refreshed.");
} catch (SchedulerException | LoginException e) {
LOGGER.error("ERROR: Not able to reconnect to Scheduler due to: " + Arrays.toString(e.getStackTrace()));
}
}
}
// For testing purpose
public RestSmartProxyImpl getRestSmartProxy() {
return restSmartProxy;
......
......@@ -92,6 +92,7 @@ public class PAResourceManagerGateway {
* @throws PermissionRestException In case the user does not have valid permissions
*/
public List<RMNodeEvent> getListOfNodesEvents() throws NotConnectedException, PermissionRestException {
reconnect();
RMStateFull rmStateFull = rmRestInterface.getRMStateFull(RMConnectionHelper.getSessionId());
List<RMNodeEvent> rmNodeEvents = rmStateFull.getNodesEvents();
return rmNodeEvents;
......@@ -124,6 +125,7 @@ public class PAResourceManagerGateway {
* @throws PermissionRestException In case the user does not have valid permissions
*/
private List<String> getDeployedNodesInformation(String nodeSource) throws NotConnectedException, PermissionRestException {
reconnect();
List<String> deployedNodes = new ArrayList<>();
LOGGER.debug("Getting full RM state ...");
RMStateFull rmState = rmRestInterface.getRMStateFull(RMConnectionHelper.getSessionId());
......@@ -151,6 +153,7 @@ public class PAResourceManagerGateway {
* @throws PermissionRestException In case the user does not have valid permissions
*/
public void deploySimpleAWSNodeSource(String awsUsername, String awsKey, String rmHostname, String nodeSourceName, Integer numberVMs) throws NotConnectedException, PermissionRestException {
reconnect();
// Getting NS configuration settings
String infrastructureType = "org.ow2.proactive.resourcemanager.nodesource.infrastructure.AWSEC2Infrastructure";
String[] infrastructureParameters = {awsUsername, //username
......@@ -197,6 +200,7 @@ public class PAResourceManagerGateway {
* @throws RestException In case a Rest exception is thrown
*/
public List<String> searchNodes(List<String> tags, boolean all) throws NotConnectedException, RestException {
reconnect();
LOGGER.debug("Search for nodes with tags " + tags + " ...");
List<String> nodesUrls = new ArrayList<>(rmRestInterface.searchNodes(RMConnectionHelper.getSessionId(), tags, all));
LOGGER.debug("Nodes found: " + nodesUrls);
......@@ -212,6 +216,7 @@ public class PAResourceManagerGateway {
* @throws PermissionRestException In case the user does not have valid permissions
*/
public NSState undeployNodeSource(String nodeSourceName, Boolean preempt) throws NotConnectedException, PermissionRestException {
reconnect();
LOGGER.debug("Undeploying node source ...");
NSState nsState = rmRestInterface.undeployNodeSource(RMConnectionHelper.getSessionId(), nodeSourceName, preempt);
LOGGER.info("Node source undeployed!");
......@@ -227,6 +232,7 @@ public class PAResourceManagerGateway {
* @throws PermissionRestException In case the user does not have valid permissions
*/
public Boolean removeNodeSource(String nodeSourceName, Boolean preempt) throws NotConnectedException, PermissionRestException {
reconnect();
LOGGER.debug("Removing node source ...");
Boolean result = rmRestInterface.removeNodeSource(RMConnectionHelper.getSessionId(), nodeSourceName, preempt);
LOGGER.info("Node source removed!");
......@@ -242,6 +248,7 @@ public class PAResourceManagerGateway {
* @throws RMNodeException In case the RM throws a Node exception
*/
public Boolean releaseNode(String nodeUrl) throws NotConnectedException, PermissionRestException, RMNodeException {
reconnect();
LOGGER.debug("Releasing node ...");
Boolean result = rmRestInterface.releaseNode(RMConnectionHelper.getSessionId(), nodeUrl);
LOGGER.info("Node released!");
......@@ -257,12 +264,19 @@ public class PAResourceManagerGateway {
* @throws PermissionRestException In case the user does not have valid permissions
*/
public Boolean removeNode(String nodeUrl, Boolean preempt) throws NotConnectedException, PermissionRestException {
reconnect();
LOGGER.debug("Removing node \'" + nodeUrl + "\' ...");
Boolean result = rmRestInterface.removeNode(RMConnectionHelper.getSessionId(), nodeUrl, preempt);
LOGGER.info("Node removed!");
return result;
}
private void reconnect() throws NotConnectedException {
if (!RMConnectionHelper.isActive()) {
throw new Error("ProActive Resource Manager is not reachable.");
}
}
// For testing purpose only
public RMRestInterface getRmRestInterface() {
......
......@@ -61,13 +61,11 @@ public class RMConnectionHelper {
*/
public static synchronized void connect(String username, String password) throws LoginException, KeyException, RMException {
LOGGER.debug("Connecting to RM ...");
boolean isActive;
try {
isActive = rmRestInterface.isActive(sessionId);
// If the sessionId is equals to "" (empty), an exception will occurs.
// If the sessionId is valid ==> Already connected
// If the sessionId is invalid we create a new session by establishing a new connection to the RM
if(isActive){
if(isActive()){
LOGGER.info("Already Connected to RM");
}else{
// Connect and create a new session
......@@ -76,7 +74,7 @@ public class RMConnectionHelper {
userPreferences.put(sessionPreferencesId,sessionId);
LOGGER.info("Connected to RM");
}
}catch (Exception NAE){
} catch (Exception NAE){
// Exception is triggered when the sessionId is equal to ""
sessionId = rmRestInterface.rmConnect(username, password);
userPreferences.put(sessionPreferencesId,sessionId);
......@@ -90,12 +88,10 @@ public class RMConnectionHelper {
*
*/
public static synchronized void disconnect() {
boolean isActive;
try{
sessionId = userPreferences.get(sessionPreferencesId,"");
// Check if the session still active
isActive = rmRestInterface.isActive(sessionId);
if(isActive){
if(isActive()){
try {
LOGGER.debug("Disconnecting from RM...");
rmRestInterface.rmDisconnect(sessionId);
......@@ -121,6 +117,10 @@ public class RMConnectionHelper {
}
}
public static Boolean isActive() throws NotConnectedException {
return rmRestInterface.isActive(sessionId);
}
public static String getSessionId() {
return sessionId;
}
......
......@@ -92,6 +92,11 @@ class PAResourceManagerGatewayTest {
return null;
});
// For all methods of the PAResourceManagerGateway class that refresh the connexion
mb.when(RMConnectionHelper::isActive).thenAnswer(invocation -> {
mb.when(RMConnectionHelper::getSessionId).thenReturn(DUMMY_SESSION_ID);
return null;
});
// For the searchNodes method of the PAResourceManagerGateway class
try {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment