From 19e386fe3b443ba4207996610b48fe7315f30853 Mon Sep 17 00:00:00 2001 From: mklkun Date: Tue, 22 Jun 2021 17:23:10 +0100 Subject: [PATCH] Add reconnexion mechanism if SAL is disconnected from RM or Scheduler --- .../deployment/PASchedulerGateway.java | 42 +++++++------- .../deployment/PAResourceManagerGateway.java | 57 +++++++++++++++---- .../morphemic/service/RMConnectionHelper.java | 18 +++--- .../service/SchedulerConnectionHelper.java | 11 ++-- 4 files changed, 81 insertions(+), 47 deletions(-) diff --git a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/application/deployment/PASchedulerGateway.java b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/application/deployment/PASchedulerGateway.java index 21158d52..bdc0d256 100644 --- a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/application/deployment/PASchedulerGateway.java +++ b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/application/deployment/PASchedulerGateway.java @@ -37,7 +37,7 @@ public class PASchedulerGateway { * @return JobId */ public JobId submit(Job job) { - reconnect(); + reconnectIfDisconnected(); JobId jobId = null; LOGGER.debug("Submitting job: " + job.toString()); try { @@ -60,7 +60,7 @@ public class PASchedulerGateway { * @return JobId */ public JobId submit(File xmlFile) { - reconnect(); + reconnectIfDisconnected(); JobId jobId = null; LOGGER.debug("Submitting job: " + xmlFile.toString()); try { @@ -84,7 +84,7 @@ public class PASchedulerGateway { * @return JobId */ public JobId submit(File xmlFile, Map variables) { - reconnect(); + reconnectIfDisconnected(); JobId jobId = null; LOGGER.debug("Submitting job: " + xmlFile.toString()); LOGGER.debug(" with variables: " + variables.toString()); @@ -108,14 +108,9 @@ public class PASchedulerGateway { * @return The job state */ public JobState getJobState(String jobId) { - reconnect(); + reconnectIfDisconnected(); JobState jobState = null; try { - if (!restSmartProxy.isConnected()) { - LOGGER.warn("WARNING: Not connected to the scheduler. Reconnecting ..."); - restSmartProxy.reconnect(); - LOGGER.info("Reconnected to scheduler!"); - } LOGGER.info("Getting job " + jobId + " state."); jobState = restSmartProxy.getJobState(jobId); LOGGER.info("Job " + jobId + " is in state: " + jobState.getStatus().toString()); @@ -125,10 +120,6 @@ public class PASchedulerGateway { LOGGER.error("ERROR: Not able to get the job state due to an unknown job ID: " + uje.toString()); } catch (PermissionException pe) { LOGGER.error("ERROR: Not able to submit the job due to a PermissionException: " + pe.toString()); - } catch (SchedulerException se) { - LOGGER.error("ERROR: Not able to reconnect to scheduler due to a SchedulerException: " + se.toString()); - } catch (LoginException le) { - LOGGER.error("ERROR: Not able to reconnect to scheduler due to a LoginException: " + le.toString()); } return jobState; } @@ -140,7 +131,7 @@ public class PASchedulerGateway { * @return The job result */ public JobResult waitForJob(String jobId, long timeout) { - reconnect(); + reconnectIfDisconnected(); JobResult jobResult = null; try { jobResult = restSmartProxy.waitForJob(jobId, timeout); @@ -162,7 +153,7 @@ public class PASchedulerGateway { * @return The jobs results map */ public Map> getJobResultMaps(List jobsId) { - reconnect(); + reconnectIfDisconnected(); Map> jobResults = null; try { jobResults = restSmartProxy.getJobResultMaps(jobsId); @@ -178,7 +169,7 @@ public class PASchedulerGateway { * @return The jobs results map */ public boolean killJob(String jobId) { - reconnect(); + reconnectIfDisconnected(); boolean result = false; try { result = restSmartProxy.killJob(jobId); @@ -200,7 +191,7 @@ public class PASchedulerGateway { * @return The task result */ public TaskResult waitForTask(String jobId, String taskName, long timeout) { - reconnect(); + reconnectIfDisconnected(); TaskResult taskResult = null; try { taskResult = restSmartProxy.waitForTask(jobId, taskName, timeout); @@ -225,7 +216,7 @@ public class PASchedulerGateway { * @return The task result */ public TaskResult getTaskResult(String jobId, String taskName) { - reconnect(); + reconnectIfDisconnected(); TaskResult taskResult = null; try { taskResult = restSmartProxy.getTaskResult(jobId, taskName); @@ -258,17 +249,28 @@ public class PASchedulerGateway { restSmartProxy = SchedulerConnectionHelper.disconnect(); } - private void reconnect() { + private void reconnectIfDisconnected() { if (!restSmartProxy.isConnected()) { try { + LOGGER.warn("WARNING: Not connected to the scheduler. Reconnecting to Scheduler ..."); restSmartProxy.reconnect(); - LOGGER.info("Connexion to ProActive Scheduler refreshed."); + LOGGER.info("Reconnected to ProActive Scheduler."); } catch (SchedulerException | LoginException e) { LOGGER.error("ERROR: Not able to reconnect to Scheduler due to: " + Arrays.toString(e.getStackTrace())); } } } + private void renewSession() { + try { + LOGGER.debug("Renewing connexion ..."); + restSmartProxy.renewSession(); + LOGGER.info("Connexion to ProActive Scheduler renewed."); + } catch (NotConnectedException nce) { + LOGGER.error("ERROR: Not able to renew connexion to Scheduler due to: " + Arrays.toString(nce.getStackTrace())); + } + } + // For testing purpose public RestSmartProxyImpl getRestSmartProxy() { return restSmartProxy; diff --git a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/infrastructure/deployment/PAResourceManagerGateway.java b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/infrastructure/deployment/PAResourceManagerGateway.java index 1b99c052..2523b469 100644 --- a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/infrastructure/deployment/PAResourceManagerGateway.java +++ b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/infrastructure/deployment/PAResourceManagerGateway.java @@ -14,6 +14,7 @@ import org.ow2.proactive_grid_cloud_portal.scheduler.exception.PermissionRestExc import org.ow2.proactive_grid_cloud_portal.scheduler.exception.RestException; import javax.security.auth.login.LoginException; +import javax.ws.rs.NotAuthorizedException; import java.security.KeyException; import java.util.ArrayList; import java.util.Arrays; @@ -29,6 +30,10 @@ public class PAResourceManagerGateway { static final int INTERVAL = 10000; + private String username; + + private String password; + /** * Get, in an asynchronous way, deployed nodes names * @param nodeSource The name of the node source @@ -92,7 +97,7 @@ public class PAResourceManagerGateway { * @throws PermissionRestException In case the user does not have valid permissions */ public List getListOfNodesEvents() throws NotConnectedException, PermissionRestException { - reconnect(); + reconnectIfDisconnected(); RMStateFull rmStateFull = rmRestInterface.getRMStateFull(RMConnectionHelper.getSessionId()); List rmNodeEvents = rmStateFull.getNodesEvents(); return rmNodeEvents; @@ -107,7 +112,9 @@ public class PAResourceManagerGateway { * @throws RMException In case an error happens in the RM */ public void connect(String username, String password) throws LoginException, KeyException, RMException { - RMConnectionHelper.connect(username,password); + this.username = username; + this.password = password; + RMConnectionHelper.connect(this.username, this.password); } /** @@ -125,7 +132,7 @@ public class PAResourceManagerGateway { * @throws PermissionRestException In case the user does not have valid permissions */ private List getDeployedNodesInformation(String nodeSource) throws NotConnectedException, PermissionRestException { - reconnect(); + reconnectIfDisconnected(); List deployedNodes = new ArrayList<>(); LOGGER.debug("Getting full RM state ..."); RMStateFull rmState = rmRestInterface.getRMStateFull(RMConnectionHelper.getSessionId()); @@ -153,7 +160,7 @@ public class PAResourceManagerGateway { * @throws PermissionRestException In case the user does not have valid permissions */ public void deploySimpleAWSNodeSource(String awsUsername, String awsKey, String rmHostname, String nodeSourceName, Integer numberVMs) throws NotConnectedException, PermissionRestException { - reconnect(); + reconnectIfDisconnected(); // Getting NS configuration settings String infrastructureType = "org.ow2.proactive.resourcemanager.nodesource.infrastructure.AWSEC2Infrastructure"; String[] infrastructureParameters = {awsUsername, //username @@ -200,7 +207,7 @@ public class PAResourceManagerGateway { * @throws RestException In case a Rest exception is thrown */ public List searchNodes(List tags, boolean all) throws NotConnectedException, RestException { - reconnect(); + reconnectIfDisconnected(); LOGGER.debug("Search for nodes with tags " + tags + " ..."); List nodesUrls = new ArrayList<>(rmRestInterface.searchNodes(RMConnectionHelper.getSessionId(), tags, all)); LOGGER.debug("Nodes found: " + nodesUrls); @@ -216,7 +223,7 @@ public class PAResourceManagerGateway { * @throws PermissionRestException In case the user does not have valid permissions */ public NSState undeployNodeSource(String nodeSourceName, Boolean preempt) throws NotConnectedException, PermissionRestException { - reconnect(); + reconnectIfDisconnected(); LOGGER.debug("Undeploying node source ..."); NSState nsState = rmRestInterface.undeployNodeSource(RMConnectionHelper.getSessionId(), nodeSourceName, preempt); LOGGER.info("Node source undeployed!"); @@ -232,7 +239,7 @@ public class PAResourceManagerGateway { * @throws PermissionRestException In case the user does not have valid permissions */ public Boolean removeNodeSource(String nodeSourceName, Boolean preempt) throws NotConnectedException, PermissionRestException { - reconnect(); + reconnectIfDisconnected(); LOGGER.debug("Removing node source ..."); Boolean result = rmRestInterface.removeNodeSource(RMConnectionHelper.getSessionId(), nodeSourceName, preempt); LOGGER.info("Node source removed!"); @@ -248,7 +255,7 @@ public class PAResourceManagerGateway { * @throws RMNodeException In case the RM throws a Node exception */ public Boolean releaseNode(String nodeUrl) throws NotConnectedException, PermissionRestException, RMNodeException { - reconnect(); + reconnectIfDisconnected(); LOGGER.debug("Releasing node ..."); Boolean result = rmRestInterface.releaseNode(RMConnectionHelper.getSessionId(), nodeUrl); LOGGER.info("Node released!"); @@ -264,16 +271,42 @@ public class PAResourceManagerGateway { * @throws PermissionRestException In case the user does not have valid permissions */ public Boolean removeNode(String nodeUrl, Boolean preempt) throws NotConnectedException, PermissionRestException { - reconnect(); + reconnectIfDisconnected(); LOGGER.debug("Removing node \'" + nodeUrl + "\' ..."); Boolean result = rmRestInterface.removeNode(RMConnectionHelper.getSessionId(), nodeUrl, preempt); LOGGER.info("Node removed!"); return result; } - private void reconnect() throws NotConnectedException { - if (!RMConnectionHelper.isActive()) { - throw new Error("ProActive Resource Manager is not reachable."); + void reconnectIfDisconnected() { + try { + if (RMConnectionHelper.isActive()) { + LOGGER.info("Connexion to ProActive RM is active."); + } else { + LOGGER.warn("WARNING: ProActive Resource Manager is not reachable."); + } + LOGGER.debug("Connexion to ProActive RM renewed."); + } catch (NotConnectedException | RuntimeException nce) { + try { + LOGGER.info("Reconnecting to ProActive RM ..."); + RMConnectionHelper.connect(this.username, this.password); + } catch (LoginException | KeyException | RMException e) { + LOGGER.error("ERROR: Not able to reconnect to RM due to: " + Arrays.toString(e.getStackTrace())); + } + } + } + + private void renewSession() { + try { + LOGGER.debug("Renewing connexion ..."); + if (RMConnectionHelper.isActive()) { + LOGGER.debug("Connexion to ProActive RM is active."); + } else { + LOGGER.warn("WARNING: ProActive Resource Manager is not reachable."); + } + LOGGER.info("Connexion to ProActive RM renewed."); + } catch (NotConnectedException | RuntimeException nce) { + LOGGER.error("ERROR: Not able to renew connexion to RM due to: " + Arrays.toString(nce.getStackTrace())); } } diff --git a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/service/RMConnectionHelper.java b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/service/RMConnectionHelper.java index af860b0b..b942205c 100644 --- a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/service/RMConnectionHelper.java +++ b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/service/RMConnectionHelper.java @@ -33,11 +33,11 @@ public class RMConnectionHelper { * @return The initialized RM Interface to be used for sending request to the platform */ public static RMRestInterface init(String paURL) { - if(paURL.contains("trydev2.activeeon")){ + if (paURL.contains("trydev2.activeeon")) { sessionPreferencesId = "RM_sessionId_trydev2"; - }else if(paURL.contains("trydev.activeeon")){ + } else if (paURL.contains("trydev.activeeon")) { sessionPreferencesId = "RM_sessionId"; - }else{ + } else { sessionPreferencesId = "TESTING_PREF"; } // Initialize the client @@ -67,17 +67,17 @@ public class RMConnectionHelper { // If the sessionId is invalid we create a new session by establishing a new connection to the RM if(isActive()){ LOGGER.info("Already Connected to RM"); - }else{ + } else { // Connect and create a new session sessionId = rmRestInterface.rmConnect(username, password); // Save the session - userPreferences.put(sessionPreferencesId,sessionId); + userPreferences.put(sessionPreferencesId, sessionId); LOGGER.info("Connected to RM"); } } catch (Exception NAE){ // Exception is triggered when the sessionId is equal to "" sessionId = rmRestInterface.rmConnect(username, password); - userPreferences.put(sessionPreferencesId,sessionId); + userPreferences.put(sessionPreferencesId, sessionId); LOGGER.info("Connected to RM"); } } @@ -91,7 +91,7 @@ public class RMConnectionHelper { try{ sessionId = userPreferences.get(sessionPreferencesId,""); // Check if the session still active - if(isActive()){ + if (isActive()) { try { LOGGER.debug("Disconnecting from RM..."); rmRestInterface.rmDisconnect(sessionId); @@ -100,14 +100,14 @@ public class RMConnectionHelper { } catch (NotConnectedException nce) { LOGGER.warn("WARNING: Not able to disconnect due to: " + nce.toString()); } - }else{ + } else { LOGGER.info("Already disconnected from RM"); } // Clear local session sessionId = ""; // Remove the stored session userPreferences.remove(sessionPreferencesId); - }catch (Exception e){ + } catch (Exception e) { // Exception will trigger if the sessionId is empty LOGGER.info("Already disconnected from RM"); // Clear local session diff --git a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/service/SchedulerConnectionHelper.java b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/service/SchedulerConnectionHelper.java index 65c81856..2a404e90 100644 --- a/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/service/SchedulerConnectionHelper.java +++ b/scheduling-abstraction-layer/src/main/java/org/activeeon/morphemic/service/SchedulerConnectionHelper.java @@ -47,14 +47,13 @@ public class SchedulerConnectionHelper { true); // Check if the proxy is connected // If not make a new connection - if(!restSmartProxy.isConnected()) { + if (!restSmartProxy.isConnected()) { restSmartProxy.init(connectionInfo); LOGGER.info("Connected to Scheduler"); - isActive = true; - }else{ + } else { LOGGER.info("Already connected to Scheduler"); - isActive = true; } + isActive = true; return restSmartProxy; } @@ -64,11 +63,11 @@ public class SchedulerConnectionHelper { */ public static synchronized RestSmartProxyImpl disconnect() { try { - if(restSmartProxy.isConnected()) { + if (restSmartProxy.isConnected()) { restSmartProxy.disconnect(); isActive = false; LOGGER.info("Disconnected from Scheduler"); - }else{ + } else { LOGGER.info("Already disconnected from Scheduler"); } } catch (PermissionException e) { -- GitLab