Commit 5ecd3e80 authored by Fabien Viale's avatar Fabien Viale
Browse files

Improve ThreadPoolHolder shutdown

parent 23c41dcb
......@@ -44,6 +44,7 @@ import org.ow2.proactive.resourcemanager.core.properties.PAResourceManagerProper
import org.ow2.proactive.resourcemanager.exception.RMException;
import org.ow2.proactive.resourcemanager.frontend.RMConnection;
import org.ow2.proactive.resourcemanager.frontend.RMMonitoring;
import org.ow2.proactive.resourcemanager.nodesource.NodeSource;
import org.ow2.proactive.resourcemanager.selection.SelectionManager;
import org.ow2.proactive.utils.FileUtils;
import org.ow2.proactive.utils.appenders.MultipleFileAppender;
......@@ -100,6 +101,7 @@ public class RMFactory {
null,
null);
String RMCoreName = RMConstants.NAME_ACTIVE_OBJECT_RMCORE;
NodeSource.initThreadPools();
rmcore = (RMCore) PAActiveObject.newActive(RMCore.class.getName(), // the class to deploy
new Object[] { RMCoreName, nodeRM },
nodeRM);
......
......@@ -451,7 +451,7 @@ public class RMCore implements ResourceManager, InitActive, RunActive {
public void run() {
if (!toShutDown) {
PAFuture.waitFor(rmcoreStub.shutdown(true),
PAResourceManagerProperties.RM_SHUTDOWN_TIMEOUT.getValueAsInt() * 1000);
PAResourceManagerProperties.RM_SHUTDOWN_TIMEOUT.getValueAsInt() * 1000L);
}
}
});
......@@ -1724,6 +1724,7 @@ public class RMCore implements ResourceManager, InitActive, RunActive {
if (PAResourceManagerProperties.RM_PRESERVE_NODES_ON_SHUTDOWN.getValueAsBoolean() ||
this.deployedNodeSources.size() == 0) {
NodeSource.shutdownThreadPools();
finalizeShutdown();
} else {
this.deployedNodeSources.forEach((nodeSourceName, nodeSource) -> {
......@@ -1731,6 +1732,7 @@ public class RMCore implements ResourceManager, InitActive, RunActive {
nodeSource.shutdown(this.caller);
});
waitForAllNodeSourcesToBeShutdown();
NodeSource.shutdownThreadPools();
finalizeShutdown();
}
return new BooleanWrapper(true);
......@@ -1738,16 +1740,15 @@ public class RMCore implements ResourceManager, InitActive, RunActive {
private void waitForAllNodeSourcesToBeShutdown() {
boolean atLeastOneAlive = false;
int millisBeforeHardShutdown = 0;
int secondsBeforeHardShutdown = PAResourceManagerProperties.RM_SHUTDOWN_TIMEOUT.getValueAsInt();
try {
do {
millisBeforeHardShutdown++;
Thread.sleep(100);
Thread.sleep(1000);
secondsBeforeHardShutdown--;
for (Entry<String, NodeSource> entry : this.deployedNodeSources.entrySet()) {
atLeastOneAlive = atLeastOneAlive || isNodeSourceAlive(entry);
}
} while (atLeastOneAlive &&
millisBeforeHardShutdown < PAResourceManagerProperties.RM_SHUTDOWN_TIMEOUT.getValueAsInt() * 10);
} while (atLeastOneAlive && secondsBeforeHardShutdown > 0);
} catch (InterruptedException e) {
Thread.interrupted();
logger.warn("", e);
......@@ -2685,6 +2686,9 @@ public class RMCore implements ResourceManager, InitActive, RunActive {
* @return true if the client is an admin or provider, SecurityException otherwise
*/
private boolean checkNodeAdminOrProviderPermission(RMNode rmnode, Client client) {
if (client == localClient) {
return true;
}
NodeSource nodeSource = rmnode.getNodeSource();
String errorMessage = client.getName() + " is not authorized to manage node " + rmnode.getNodeURL() + " from " +
......
......@@ -93,7 +93,7 @@ import com.google.common.annotations.VisibleForTesting;
*
*/
@ActiveObject
public class NodeSource implements InitActive, RunActive, EndActive {
public class NodeSource implements InitActive, RunActive {
private static Logger logger = Logger.getLogger(NodeSource.class);
......@@ -149,8 +149,6 @@ public class NodeSource implements InitActive, RunActive, EndActive {
private static ThreadPoolHolder threadPoolHolder;
private static AtomicInteger nodeSourceCount = new AtomicInteger(0);
private NodeSource stub;
private final Client administrator;
......@@ -252,7 +250,7 @@ public class NodeSource implements InitActive, RunActive, EndActive {
.orElse(new LinkedHashMap<>());
}
private static void initThreadPools() {
public static void initThreadPools() {
if (threadPoolHolder == null) {
try {
int maxThreads = PAResourceManagerProperties.RM_NODESOURCE_MAX_THREAD_NUMBER.getValueAsInt();
......@@ -281,6 +279,19 @@ public class NodeSource implements InitActive, RunActive, EndActive {
}
}
public static void shutdownThreadPools() {
if (threadPoolHolder != null) {
threadPoolHolder.shutdownNow(PINGER_POOL);
logger.info("Pinger Thread Pool terminated");
threadPoolHolder.shutdown(EXTERNAL_POOL,
PAResourceManagerProperties.RM_SHUTDOWN_TIMEOUT.getValueAsInt() - 1);
logger.info("External Thread Pool terminated");
threadPoolHolder.shutdownNow(INTERNAL_POOL);
logger.info("Internal Thread Pool terminated");
threadPoolHolder = null;
}
}
/**
* Initialization of node source. Creates and activates a pinger to monitor nodes.
*
......@@ -288,11 +299,6 @@ public class NodeSource implements InitActive, RunActive, EndActive {
*/
public void initActivity(Body body) {
synchronized (nodeSourceCount) {
nodeSourceCount.incrementAndGet();
initThreadPools();
}
this.stub = (NodeSource) PAActiveObject.getStubOnThis();
this.infrastructureManager.setNodeSource(this);
// Infrastructure has been configured and linked to the node source, so we can now persist the runtime
......@@ -670,21 +676,6 @@ public class NodeSource implements InitActive, RunActive, EndActive {
this.activePolicy.reconfigure(updatedPolicyParams);
}
@Override
public void endActivity(Body body) {
synchronized (nodeSourceCount) {
if (nodeSourceCount.decrementAndGet() == 0) {
threadPoolHolder.shutdown(PINGER_POOL);
logger.info("Pinger Thread Pool terminated");
threadPoolHolder.shutdown(EXTERNAL_POOL);
logger.info("External Thread Pool terminated");
threadPoolHolder.shutdown(INTERNAL_POOL);
logger.info("Internal Thread Pool terminated");
threadPoolHolder = null;
}
}
}
/**
* Looks up the node
*/
......@@ -809,8 +800,8 @@ public class NodeSource implements InitActive, RunActive, EndActive {
if (this.nodes.size() == 0) {
this.shutdownNodeSourceServices(initiator);
} else {
logger.debug("[" + this.name + "] actual shutdown is skipped, because there are still some nodes " +
this.nodes.size());
logger.warn("[" + this.name + "] actual shutdown is skipped, because there are still some alive nodes: " +
this.nodes);
}
return new BooleanWrapper(true);
}
......@@ -1038,11 +1029,13 @@ public class NodeSource implements InitActive, RunActive, EndActive {
logger.debug("Node " + nodeUrl + " is alive");
}
} catch (Throwable t) {
logger.warn("Error occurred when trying to ping node " + nodeUrl, t);
try {
stub.detectedPingedDownNode(nodeName, nodeUrl);
} catch (Exception e) {
logger.warn("Could not send detectedPingedDownNode message", e);
if (!this.toShutdown) {
logger.warn("Error occurred when trying to ping node " + nodeUrl, t);
try {
stub.detectedPingedDownNode(nodeName, nodeUrl);
} catch (Exception e) {
logger.warn("Could not send detectedPingedDownNode message", e);
}
}
}
});
......
......@@ -97,17 +97,21 @@ public class ThreadPoolHolder {
pools[num].execute(task);
}
public synchronized void shutdown(int num) {
public synchronized void shutdown(int num, int totalMaxWaitTime) {
checkPoolNumber(num);
pools[num].shutdown();
try {
pools[num].awaitTermination(PAResourceManagerProperties.RM_SHUTDOWN_TIMEOUT.getValueAsLong() - 10,
TimeUnit.SECONDS);
pools[num].awaitTermination(totalMaxWaitTime, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn("Thread pool termination interrupted");
logger.warn("Thread pool " + num + " termination interrupted");
}
}
public synchronized void shutdownNow(int num) {
checkPoolNumber(num);
pools[num].shutdownNow();
}
private void checkPoolNumber(int num) {
if (num > pools.length) {
throw new IllegalArgumentException("Incorrect thread pool number " + num);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment