Commit 19e386fe authored by Mohamed Khalil Labidi's avatar Mohamed Khalil Labidi
Browse files

Add reconnexion mechanism if SAL is disconnected from RM or Scheduler

parent ae43c7a5
......@@ -37,7 +37,7 @@ public class PASchedulerGateway {
* @return JobId
*/
public JobId submit(Job job) {
reconnect();
reconnectIfDisconnected();
JobId jobId = null;
LOGGER.debug("Submitting job: " + job.toString());
try {
......@@ -60,7 +60,7 @@ public class PASchedulerGateway {
* @return JobId
*/
public JobId submit(File xmlFile) {
reconnect();
reconnectIfDisconnected();
JobId jobId = null;
LOGGER.debug("Submitting job: " + xmlFile.toString());
try {
......@@ -84,7 +84,7 @@ public class PASchedulerGateway {
* @return JobId
*/
public JobId submit(File xmlFile, Map<String, String> variables) {
reconnect();
reconnectIfDisconnected();
JobId jobId = null;
LOGGER.debug("Submitting job: " + xmlFile.toString());
LOGGER.debug(" with variables: " + variables.toString());
......@@ -108,14 +108,9 @@ public class PASchedulerGateway {
* @return The job state
*/
public JobState getJobState(String jobId) {
reconnect();
reconnectIfDisconnected();
JobState jobState = null;
try {
if (!restSmartProxy.isConnected()) {
LOGGER.warn("WARNING: Not connected to the scheduler. Reconnecting ...");
restSmartProxy.reconnect();
LOGGER.info("Reconnected to scheduler!");
}
LOGGER.info("Getting job " + jobId + " state.");
jobState = restSmartProxy.getJobState(jobId);
LOGGER.info("Job " + jobId + " is in state: " + jobState.getStatus().toString());
......@@ -125,10 +120,6 @@ public class PASchedulerGateway {
LOGGER.error("ERROR: Not able to get the job state due to an unknown job ID: " + uje.toString());
} catch (PermissionException pe) {
LOGGER.error("ERROR: Not able to submit the job due to a PermissionException: " + pe.toString());
} catch (SchedulerException se) {
LOGGER.error("ERROR: Not able to reconnect to scheduler due to a SchedulerException: " + se.toString());
} catch (LoginException le) {
LOGGER.error("ERROR: Not able to reconnect to scheduler due to a LoginException: " + le.toString());
}
return jobState;
}
......@@ -140,7 +131,7 @@ public class PASchedulerGateway {
* @return The job result
*/
public JobResult waitForJob(String jobId, long timeout) {
reconnect();
reconnectIfDisconnected();
JobResult jobResult = null;
try {
jobResult = restSmartProxy.waitForJob(jobId, timeout);
......@@ -162,7 +153,7 @@ public class PASchedulerGateway {
* @return The jobs results map
*/
public Map<Long, Map<String, Serializable>> getJobResultMaps(List<String> jobsId) {
reconnect();
reconnectIfDisconnected();
Map<Long, Map<String, Serializable>> jobResults = null;
try {
jobResults = restSmartProxy.getJobResultMaps(jobsId);
......@@ -178,7 +169,7 @@ public class PASchedulerGateway {
* @return The jobs results map
*/
public boolean killJob(String jobId) {
reconnect();
reconnectIfDisconnected();
boolean result = false;
try {
result = restSmartProxy.killJob(jobId);
......@@ -200,7 +191,7 @@ public class PASchedulerGateway {
* @return The task result
*/
public TaskResult waitForTask(String jobId, String taskName, long timeout) {
reconnect();
reconnectIfDisconnected();
TaskResult taskResult = null;
try {
taskResult = restSmartProxy.waitForTask(jobId, taskName, timeout);
......@@ -225,7 +216,7 @@ public class PASchedulerGateway {
* @return The task result
*/
public TaskResult getTaskResult(String jobId, String taskName) {
reconnect();
reconnectIfDisconnected();
TaskResult taskResult = null;
try {
taskResult = restSmartProxy.getTaskResult(jobId, taskName);
......@@ -258,17 +249,28 @@ public class PASchedulerGateway {
restSmartProxy = SchedulerConnectionHelper.disconnect();
}
private void reconnect() {
private void reconnectIfDisconnected() {
if (!restSmartProxy.isConnected()) {
try {
LOGGER.warn("WARNING: Not connected to the scheduler. Reconnecting to Scheduler ...");
restSmartProxy.reconnect();
LOGGER.info("Connexion to ProActive Scheduler refreshed.");
LOGGER.info("Reconnected to ProActive Scheduler.");
} catch (SchedulerException | LoginException e) {
LOGGER.error("ERROR: Not able to reconnect to Scheduler due to: " + Arrays.toString(e.getStackTrace()));
}
}
}
private void renewSession() {
try {
LOGGER.debug("Renewing connexion ...");
restSmartProxy.renewSession();
LOGGER.info("Connexion to ProActive Scheduler renewed.");
} catch (NotConnectedException nce) {
LOGGER.error("ERROR: Not able to renew connexion to Scheduler due to: " + Arrays.toString(nce.getStackTrace()));
}
}
// For testing purpose
public RestSmartProxyImpl getRestSmartProxy() {
return restSmartProxy;
......
......@@ -14,6 +14,7 @@ import org.ow2.proactive_grid_cloud_portal.scheduler.exception.PermissionRestExc
import org.ow2.proactive_grid_cloud_portal.scheduler.exception.RestException;
import javax.security.auth.login.LoginException;
import javax.ws.rs.NotAuthorizedException;
import java.security.KeyException;
import java.util.ArrayList;
import java.util.Arrays;
......@@ -29,6 +30,10 @@ public class PAResourceManagerGateway {
static final int INTERVAL = 10000;
private String username;
private String password;
/**
* Get, in an asynchronous way, deployed nodes names
* @param nodeSource The name of the node source
......@@ -92,7 +97,7 @@ public class PAResourceManagerGateway {
* @throws PermissionRestException In case the user does not have valid permissions
*/
public List<RMNodeEvent> getListOfNodesEvents() throws NotConnectedException, PermissionRestException {
reconnect();
reconnectIfDisconnected();
RMStateFull rmStateFull = rmRestInterface.getRMStateFull(RMConnectionHelper.getSessionId());
List<RMNodeEvent> rmNodeEvents = rmStateFull.getNodesEvents();
return rmNodeEvents;
......@@ -107,7 +112,9 @@ public class PAResourceManagerGateway {
* @throws RMException In case an error happens in the RM
*/
public void connect(String username, String password) throws LoginException, KeyException, RMException {
RMConnectionHelper.connect(username,password);
this.username = username;
this.password = password;
RMConnectionHelper.connect(this.username, this.password);
}
/**
......@@ -125,7 +132,7 @@ public class PAResourceManagerGateway {
* @throws PermissionRestException In case the user does not have valid permissions
*/
private List<String> getDeployedNodesInformation(String nodeSource) throws NotConnectedException, PermissionRestException {
reconnect();
reconnectIfDisconnected();
List<String> deployedNodes = new ArrayList<>();
LOGGER.debug("Getting full RM state ...");
RMStateFull rmState = rmRestInterface.getRMStateFull(RMConnectionHelper.getSessionId());
......@@ -153,7 +160,7 @@ public class PAResourceManagerGateway {
* @throws PermissionRestException In case the user does not have valid permissions
*/
public void deploySimpleAWSNodeSource(String awsUsername, String awsKey, String rmHostname, String nodeSourceName, Integer numberVMs) throws NotConnectedException, PermissionRestException {
reconnect();
reconnectIfDisconnected();
// Getting NS configuration settings
String infrastructureType = "org.ow2.proactive.resourcemanager.nodesource.infrastructure.AWSEC2Infrastructure";
String[] infrastructureParameters = {awsUsername, //username
......@@ -200,7 +207,7 @@ public class PAResourceManagerGateway {
* @throws RestException In case a Rest exception is thrown
*/
public List<String> searchNodes(List<String> tags, boolean all) throws NotConnectedException, RestException {
reconnect();
reconnectIfDisconnected();
LOGGER.debug("Search for nodes with tags " + tags + " ...");
List<String> nodesUrls = new ArrayList<>(rmRestInterface.searchNodes(RMConnectionHelper.getSessionId(), tags, all));
LOGGER.debug("Nodes found: " + nodesUrls);
......@@ -216,7 +223,7 @@ public class PAResourceManagerGateway {
* @throws PermissionRestException In case the user does not have valid permissions
*/
public NSState undeployNodeSource(String nodeSourceName, Boolean preempt) throws NotConnectedException, PermissionRestException {
reconnect();
reconnectIfDisconnected();
LOGGER.debug("Undeploying node source ...");
NSState nsState = rmRestInterface.undeployNodeSource(RMConnectionHelper.getSessionId(), nodeSourceName, preempt);
LOGGER.info("Node source undeployed!");
......@@ -232,7 +239,7 @@ public class PAResourceManagerGateway {
* @throws PermissionRestException In case the user does not have valid permissions
*/
public Boolean removeNodeSource(String nodeSourceName, Boolean preempt) throws NotConnectedException, PermissionRestException {
reconnect();
reconnectIfDisconnected();
LOGGER.debug("Removing node source ...");
Boolean result = rmRestInterface.removeNodeSource(RMConnectionHelper.getSessionId(), nodeSourceName, preempt);
LOGGER.info("Node source removed!");
......@@ -248,7 +255,7 @@ public class PAResourceManagerGateway {
* @throws RMNodeException In case the RM throws a Node exception
*/
public Boolean releaseNode(String nodeUrl) throws NotConnectedException, PermissionRestException, RMNodeException {
reconnect();
reconnectIfDisconnected();
LOGGER.debug("Releasing node ...");
Boolean result = rmRestInterface.releaseNode(RMConnectionHelper.getSessionId(), nodeUrl);
LOGGER.info("Node released!");
......@@ -264,16 +271,42 @@ public class PAResourceManagerGateway {
* @throws PermissionRestException In case the user does not have valid permissions
*/
public Boolean removeNode(String nodeUrl, Boolean preempt) throws NotConnectedException, PermissionRestException {
reconnect();
reconnectIfDisconnected();
LOGGER.debug("Removing node \'" + nodeUrl + "\' ...");
Boolean result = rmRestInterface.removeNode(RMConnectionHelper.getSessionId(), nodeUrl, preempt);
LOGGER.info("Node removed!");
return result;
}
private void reconnect() throws NotConnectedException {
if (!RMConnectionHelper.isActive()) {
throw new Error("ProActive Resource Manager is not reachable.");
void reconnectIfDisconnected() {
try {
if (RMConnectionHelper.isActive()) {
LOGGER.info("Connexion to ProActive RM is active.");
} else {
LOGGER.warn("WARNING: ProActive Resource Manager is not reachable.");
}
LOGGER.debug("Connexion to ProActive RM renewed.");
} catch (NotConnectedException | RuntimeException nce) {
try {
LOGGER.info("Reconnecting to ProActive RM ...");
RMConnectionHelper.connect(this.username, this.password);
} catch (LoginException | KeyException | RMException e) {
LOGGER.error("ERROR: Not able to reconnect to RM due to: " + Arrays.toString(e.getStackTrace()));
}
}
}
private void renewSession() {
try {
LOGGER.debug("Renewing connexion ...");
if (RMConnectionHelper.isActive()) {
LOGGER.debug("Connexion to ProActive RM is active.");
} else {
LOGGER.warn("WARNING: ProActive Resource Manager is not reachable.");
}
LOGGER.info("Connexion to ProActive RM renewed.");
} catch (NotConnectedException | RuntimeException nce) {
LOGGER.error("ERROR: Not able to renew connexion to RM due to: " + Arrays.toString(nce.getStackTrace()));
}
}
......
......@@ -33,11 +33,11 @@ public class RMConnectionHelper {
* @return The initialized RM Interface to be used for sending request to the platform
*/
public static RMRestInterface init(String paURL) {
if(paURL.contains("trydev2.activeeon")){
if (paURL.contains("trydev2.activeeon")) {
sessionPreferencesId = "RM_sessionId_trydev2";
}else if(paURL.contains("trydev.activeeon")){
} else if (paURL.contains("trydev.activeeon")) {
sessionPreferencesId = "RM_sessionId";
}else{
} else {
sessionPreferencesId = "TESTING_PREF";
}
// Initialize the client
......@@ -67,17 +67,17 @@ public class RMConnectionHelper {
// If the sessionId is invalid we create a new session by establishing a new connection to the RM
if(isActive()){
LOGGER.info("Already Connected to RM");
}else{
} else {
// Connect and create a new session
sessionId = rmRestInterface.rmConnect(username, password);
// Save the session
userPreferences.put(sessionPreferencesId,sessionId);
userPreferences.put(sessionPreferencesId, sessionId);
LOGGER.info("Connected to RM");
}
} catch (Exception NAE){
// Exception is triggered when the sessionId is equal to ""
sessionId = rmRestInterface.rmConnect(username, password);
userPreferences.put(sessionPreferencesId,sessionId);
userPreferences.put(sessionPreferencesId, sessionId);
LOGGER.info("Connected to RM");
}
}
......@@ -91,7 +91,7 @@ public class RMConnectionHelper {
try{
sessionId = userPreferences.get(sessionPreferencesId,"");
// Check if the session still active
if(isActive()){
if (isActive()) {
try {
LOGGER.debug("Disconnecting from RM...");
rmRestInterface.rmDisconnect(sessionId);
......@@ -100,14 +100,14 @@ public class RMConnectionHelper {
} catch (NotConnectedException nce) {
LOGGER.warn("WARNING: Not able to disconnect due to: " + nce.toString());
}
}else{
} else {
LOGGER.info("Already disconnected from RM");
}
// Clear local session
sessionId = "";
// Remove the stored session
userPreferences.remove(sessionPreferencesId);
}catch (Exception e){
} catch (Exception e) {
// Exception will trigger if the sessionId is empty
LOGGER.info("Already disconnected from RM");
// Clear local session
......
......@@ -47,14 +47,13 @@ public class SchedulerConnectionHelper {
true);
// Check if the proxy is connected
// If not make a new connection
if(!restSmartProxy.isConnected()) {
if (!restSmartProxy.isConnected()) {
restSmartProxy.init(connectionInfo);
LOGGER.info("Connected to Scheduler");
isActive = true;
}else{
} else {
LOGGER.info("Already connected to Scheduler");
isActive = true;
}
isActive = true;
return restSmartProxy;
}
......@@ -64,11 +63,11 @@ public class SchedulerConnectionHelper {
*/
public static synchronized RestSmartProxyImpl disconnect() {
try {
if(restSmartProxy.isConnected()) {
if (restSmartProxy.isConnected()) {
restSmartProxy.disconnect();
isActive = false;
LOGGER.info("Disconnected from Scheduler");
}else{
} else {
LOGGER.info("Already disconnected from Scheduler");
}
} catch (PermissionException e) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment