Unverified Commit 432d8e10 authored by Fabien Viale's avatar Fabien Viale Committed by GitHub
Browse files

Merge pull request #3977 from fviale/master

NodesRecoveryManager: fix down nodes lock restoration for down nodes
parents 65d20f88 5ecd3e80
...@@ -44,6 +44,7 @@ import org.ow2.proactive.resourcemanager.core.properties.PAResourceManagerProper ...@@ -44,6 +44,7 @@ import org.ow2.proactive.resourcemanager.core.properties.PAResourceManagerProper
import org.ow2.proactive.resourcemanager.exception.RMException; import org.ow2.proactive.resourcemanager.exception.RMException;
import org.ow2.proactive.resourcemanager.frontend.RMConnection; import org.ow2.proactive.resourcemanager.frontend.RMConnection;
import org.ow2.proactive.resourcemanager.frontend.RMMonitoring; import org.ow2.proactive.resourcemanager.frontend.RMMonitoring;
import org.ow2.proactive.resourcemanager.nodesource.NodeSource;
import org.ow2.proactive.resourcemanager.selection.SelectionManager; import org.ow2.proactive.resourcemanager.selection.SelectionManager;
import org.ow2.proactive.utils.FileUtils; import org.ow2.proactive.utils.FileUtils;
import org.ow2.proactive.utils.appenders.MultipleFileAppender; import org.ow2.proactive.utils.appenders.MultipleFileAppender;
...@@ -100,6 +101,7 @@ public class RMFactory { ...@@ -100,6 +101,7 @@ public class RMFactory {
null, null,
null); null);
String RMCoreName = RMConstants.NAME_ACTIVE_OBJECT_RMCORE; String RMCoreName = RMConstants.NAME_ACTIVE_OBJECT_RMCORE;
NodeSource.initThreadPools();
rmcore = (RMCore) PAActiveObject.newActive(RMCore.class.getName(), // the class to deploy rmcore = (RMCore) PAActiveObject.newActive(RMCore.class.getName(), // the class to deploy
new Object[] { RMCoreName, nodeRM }, new Object[] { RMCoreName, nodeRM },
nodeRM); nodeRM);
......
...@@ -451,7 +451,7 @@ public class RMCore implements ResourceManager, InitActive, RunActive { ...@@ -451,7 +451,7 @@ public class RMCore implements ResourceManager, InitActive, RunActive {
public void run() { public void run() {
if (!toShutDown) { if (!toShutDown) {
PAFuture.waitFor(rmcoreStub.shutdown(true), PAFuture.waitFor(rmcoreStub.shutdown(true),
PAResourceManagerProperties.RM_SHUTDOWN_TIMEOUT.getValueAsInt() * 1000); PAResourceManagerProperties.RM_SHUTDOWN_TIMEOUT.getValueAsInt() * 1000L);
} }
} }
}); });
...@@ -1724,6 +1724,7 @@ public class RMCore implements ResourceManager, InitActive, RunActive { ...@@ -1724,6 +1724,7 @@ public class RMCore implements ResourceManager, InitActive, RunActive {
if (PAResourceManagerProperties.RM_PRESERVE_NODES_ON_SHUTDOWN.getValueAsBoolean() || if (PAResourceManagerProperties.RM_PRESERVE_NODES_ON_SHUTDOWN.getValueAsBoolean() ||
this.deployedNodeSources.size() == 0) { this.deployedNodeSources.size() == 0) {
NodeSource.shutdownThreadPools();
finalizeShutdown(); finalizeShutdown();
} else { } else {
this.deployedNodeSources.forEach((nodeSourceName, nodeSource) -> { this.deployedNodeSources.forEach((nodeSourceName, nodeSource) -> {
...@@ -1731,6 +1732,7 @@ public class RMCore implements ResourceManager, InitActive, RunActive { ...@@ -1731,6 +1732,7 @@ public class RMCore implements ResourceManager, InitActive, RunActive {
nodeSource.shutdown(this.caller); nodeSource.shutdown(this.caller);
}); });
waitForAllNodeSourcesToBeShutdown(); waitForAllNodeSourcesToBeShutdown();
NodeSource.shutdownThreadPools();
finalizeShutdown(); finalizeShutdown();
} }
return new BooleanWrapper(true); return new BooleanWrapper(true);
...@@ -1738,16 +1740,15 @@ public class RMCore implements ResourceManager, InitActive, RunActive { ...@@ -1738,16 +1740,15 @@ public class RMCore implements ResourceManager, InitActive, RunActive {
private void waitForAllNodeSourcesToBeShutdown() { private void waitForAllNodeSourcesToBeShutdown() {
boolean atLeastOneAlive = false; boolean atLeastOneAlive = false;
int millisBeforeHardShutdown = 0; int secondsBeforeHardShutdown = PAResourceManagerProperties.RM_SHUTDOWN_TIMEOUT.getValueAsInt();
try { try {
do { do {
millisBeforeHardShutdown++; Thread.sleep(1000);
Thread.sleep(100); secondsBeforeHardShutdown--;
for (Entry<String, NodeSource> entry : this.deployedNodeSources.entrySet()) { for (Entry<String, NodeSource> entry : this.deployedNodeSources.entrySet()) {
atLeastOneAlive = atLeastOneAlive || isNodeSourceAlive(entry); atLeastOneAlive = atLeastOneAlive || isNodeSourceAlive(entry);
} }
} while (atLeastOneAlive && } while (atLeastOneAlive && secondsBeforeHardShutdown > 0);
millisBeforeHardShutdown < PAResourceManagerProperties.RM_SHUTDOWN_TIMEOUT.getValueAsInt() * 10);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.interrupted(); Thread.interrupted();
logger.warn("", e); logger.warn("", e);
...@@ -2685,6 +2686,9 @@ public class RMCore implements ResourceManager, InitActive, RunActive { ...@@ -2685,6 +2686,9 @@ public class RMCore implements ResourceManager, InitActive, RunActive {
* @return true if the client is an admin or provider, SecurityException otherwise * @return true if the client is an admin or provider, SecurityException otherwise
*/ */
private boolean checkNodeAdminOrProviderPermission(RMNode rmnode, Client client) { private boolean checkNodeAdminOrProviderPermission(RMNode rmnode, Client client) {
if (client == localClient) {
return true;
}
NodeSource nodeSource = rmnode.getNodeSource(); NodeSource nodeSource = rmnode.getNodeSource();
String errorMessage = client.getName() + " is not authorized to manage node " + rmnode.getNodeURL() + " from " + String errorMessage = client.getName() + " is not authorized to manage node " + rmnode.getNodeURL() + " from " +
......
...@@ -236,6 +236,7 @@ public class NodesRecoveryManager { ...@@ -236,6 +236,7 @@ public class NodesRecoveryManager {
} }
this.nodesLockRestorationManager.handle(rmNode, rmNodeData.getProvider()); this.nodesLockRestorationManager.handle(rmNode, rmNodeData.getProvider());
} else { } else {
this.nodesLockRestorationManager.handle(rmNode, rmNodeData.getProvider());
logger.info("Triggering down node notification for " + nodeUrl); logger.info("Triggering down node notification for " + nodeUrl);
this.triggerDownNodeHookIfNecessary(nodeSource, rmNodeData, nodeUrl, previousState); this.triggerDownNodeHookIfNecessary(nodeSource, rmNodeData, nodeUrl, previousState);
} }
......
...@@ -93,7 +93,7 @@ import com.google.common.annotations.VisibleForTesting; ...@@ -93,7 +93,7 @@ import com.google.common.annotations.VisibleForTesting;
* *
*/ */
@ActiveObject @ActiveObject
public class NodeSource implements InitActive, RunActive, EndActive { public class NodeSource implements InitActive, RunActive {
private static Logger logger = Logger.getLogger(NodeSource.class); private static Logger logger = Logger.getLogger(NodeSource.class);
...@@ -149,8 +149,6 @@ public class NodeSource implements InitActive, RunActive, EndActive { ...@@ -149,8 +149,6 @@ public class NodeSource implements InitActive, RunActive, EndActive {
private static ThreadPoolHolder threadPoolHolder; private static ThreadPoolHolder threadPoolHolder;
private static AtomicInteger nodeSourceCount = new AtomicInteger(0);
private NodeSource stub; private NodeSource stub;
private final Client administrator; private final Client administrator;
...@@ -252,7 +250,7 @@ public class NodeSource implements InitActive, RunActive, EndActive { ...@@ -252,7 +250,7 @@ public class NodeSource implements InitActive, RunActive, EndActive {
.orElse(new LinkedHashMap<>()); .orElse(new LinkedHashMap<>());
} }
private static void initThreadPools() { public static void initThreadPools() {
if (threadPoolHolder == null) { if (threadPoolHolder == null) {
try { try {
int maxThreads = PAResourceManagerProperties.RM_NODESOURCE_MAX_THREAD_NUMBER.getValueAsInt(); int maxThreads = PAResourceManagerProperties.RM_NODESOURCE_MAX_THREAD_NUMBER.getValueAsInt();
...@@ -281,6 +279,19 @@ public class NodeSource implements InitActive, RunActive, EndActive { ...@@ -281,6 +279,19 @@ public class NodeSource implements InitActive, RunActive, EndActive {
} }
} }
public static void shutdownThreadPools() {
if (threadPoolHolder != null) {
threadPoolHolder.shutdownNow(PINGER_POOL);
logger.info("Pinger Thread Pool terminated");
threadPoolHolder.shutdown(EXTERNAL_POOL,
PAResourceManagerProperties.RM_SHUTDOWN_TIMEOUT.getValueAsInt() - 1);
logger.info("External Thread Pool terminated");
threadPoolHolder.shutdownNow(INTERNAL_POOL);
logger.info("Internal Thread Pool terminated");
threadPoolHolder = null;
}
}
/** /**
* Initialization of node source. Creates and activates a pinger to monitor nodes. * Initialization of node source. Creates and activates a pinger to monitor nodes.
* *
...@@ -288,11 +299,6 @@ public class NodeSource implements InitActive, RunActive, EndActive { ...@@ -288,11 +299,6 @@ public class NodeSource implements InitActive, RunActive, EndActive {
*/ */
public void initActivity(Body body) { public void initActivity(Body body) {
synchronized (nodeSourceCount) {
nodeSourceCount.incrementAndGet();
initThreadPools();
}
this.stub = (NodeSource) PAActiveObject.getStubOnThis(); this.stub = (NodeSource) PAActiveObject.getStubOnThis();
this.infrastructureManager.setNodeSource(this); this.infrastructureManager.setNodeSource(this);
// Infrastructure has been configured and linked to the node source, so we can now persist the runtime // Infrastructure has been configured and linked to the node source, so we can now persist the runtime
...@@ -670,21 +676,6 @@ public class NodeSource implements InitActive, RunActive, EndActive { ...@@ -670,21 +676,6 @@ public class NodeSource implements InitActive, RunActive, EndActive {
this.activePolicy.reconfigure(updatedPolicyParams); this.activePolicy.reconfigure(updatedPolicyParams);
} }
@Override
public void endActivity(Body body) {
synchronized (nodeSourceCount) {
if (nodeSourceCount.decrementAndGet() == 0) {
threadPoolHolder.shutdown(PINGER_POOL);
logger.info("Pinger Thread Pool terminated");
threadPoolHolder.shutdown(EXTERNAL_POOL);
logger.info("External Thread Pool terminated");
threadPoolHolder.shutdown(INTERNAL_POOL);
logger.info("Internal Thread Pool terminated");
threadPoolHolder = null;
}
}
}
/** /**
* Looks up the node * Looks up the node
*/ */
...@@ -809,8 +800,8 @@ public class NodeSource implements InitActive, RunActive, EndActive { ...@@ -809,8 +800,8 @@ public class NodeSource implements InitActive, RunActive, EndActive {
if (this.nodes.size() == 0) { if (this.nodes.size() == 0) {
this.shutdownNodeSourceServices(initiator); this.shutdownNodeSourceServices(initiator);
} else { } else {
logger.debug("[" + this.name + "] actual shutdown is skipped, because there are still some nodes " + logger.warn("[" + this.name + "] actual shutdown is skipped, because there are still some alive nodes: " +
this.nodes.size()); this.nodes);
} }
return new BooleanWrapper(true); return new BooleanWrapper(true);
} }
...@@ -1038,11 +1029,13 @@ public class NodeSource implements InitActive, RunActive, EndActive { ...@@ -1038,11 +1029,13 @@ public class NodeSource implements InitActive, RunActive, EndActive {
logger.debug("Node " + nodeUrl + " is alive"); logger.debug("Node " + nodeUrl + " is alive");
} }
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("Error occurred when trying to ping node " + nodeUrl, t); if (!this.toShutdown) {
try { logger.warn("Error occurred when trying to ping node " + nodeUrl, t);
stub.detectedPingedDownNode(nodeName, nodeUrl); try {
} catch (Exception e) { stub.detectedPingedDownNode(nodeName, nodeUrl);
logger.warn("Could not send detectedPingedDownNode message", e); } catch (Exception e) {
logger.warn("Could not send detectedPingedDownNode message", e);
}
} }
} }
}); });
......
...@@ -97,17 +97,21 @@ public class ThreadPoolHolder { ...@@ -97,17 +97,21 @@ public class ThreadPoolHolder {
pools[num].execute(task); pools[num].execute(task);
} }
public synchronized void shutdown(int num) { public synchronized void shutdown(int num, int totalMaxWaitTime) {
checkPoolNumber(num); checkPoolNumber(num);
pools[num].shutdown(); pools[num].shutdown();
try { try {
pools[num].awaitTermination(PAResourceManagerProperties.RM_SHUTDOWN_TIMEOUT.getValueAsLong() - 10, pools[num].awaitTermination(totalMaxWaitTime, TimeUnit.SECONDS);
TimeUnit.SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.warn("Thread pool termination interrupted"); logger.warn("Thread pool " + num + " termination interrupted");
} }
} }
public synchronized void shutdownNow(int num) {
checkPoolNumber(num);
pools[num].shutdownNow();
}
private void checkPoolNumber(int num) { private void checkPoolNumber(int num) {
if (num > pools.length) { if (num > pools.length) {
throw new IllegalArgumentException("Incorrect thread pool number " + num); throw new IllegalArgumentException("Incorrect thread pool number " + num);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment