Unverified Commit 65d20f88 authored by Fabien Viale's avatar Fabien Viale Committed by GitHub
Browse files

Merge pull request #3976 from fviale/master

NodesRecoveryManager: fix down nodes after restart
parents a2174b59 6a8c875f
...@@ -27,6 +27,8 @@ package org.ow2.proactive.resourcemanager.core.recovery; ...@@ -27,6 +27,8 @@ package org.ow2.proactive.resourcemanager.core.recovery;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.net.InetAddress;
import java.rmi.dgc.VMID;
import java.util.Objects; import java.util.Objects;
import org.objectweb.proactive.ActiveObjectCreationException; import org.objectweb.proactive.ActiveObjectCreationException;
...@@ -55,9 +57,55 @@ public class FakeDownNodeForRecovery implements Node, Serializable { ...@@ -55,9 +57,55 @@ public class FakeDownNodeForRecovery implements Node, Serializable {
private String url; private String url;
public FakeDownNodeForRecovery(String name, String url) { private String hostname;
private VMInformation vmInformation;
public FakeDownNodeForRecovery(String name, String url, String hostname) {
this.name = name; this.name = name;
this.url = url; this.url = url;
this.hostname = hostname;
this.vmInformation = new VMInformation() {
@Override
public VMID getVMID() {
return null;
}
@Override
public InetAddress getInetAddress() {
return null;
}
@Override
public String getName() {
return null;
}
@Override
public String getHostName() {
return FakeDownNodeForRecovery.this.hostname;
}
@Override
public String getDescriptorVMName() {
return null;
}
@Override
public long getCapacity() {
return 0;
}
@Override
public long getDeploymentId() {
return 0;
}
@Override
public long getTopologyId() {
return 0;
}
};
} }
@Override @Override
...@@ -75,14 +123,14 @@ public class FakeDownNodeForRecovery implements Node, Serializable { ...@@ -75,14 +123,14 @@ public class FakeDownNodeForRecovery implements Node, Serializable {
@Override @Override
public VMInformation getVMInformation() { public VMInformation getVMInformation() {
return null; return vmInformation;
} }
}; };
} }
@Override @Override
public VMInformation getVMInformation() { public VMInformation getVMInformation() {
return null; return vmInformation;
} }
@Override @Override
......
...@@ -228,7 +228,7 @@ public class NodesRecoveryManager { ...@@ -228,7 +228,7 @@ public class NodesRecoveryManager {
RMNodeData rmNodeData, String nodeUrl, Node node, NodeState previousState) { RMNodeData rmNodeData, String nodeUrl, Node node, NodeState previousState) {
RMNode rmNode = nodeSource.internalAddNodeAfterRecovery(node, rmNodeData); RMNode rmNode = nodeSource.internalAddNodeAfterRecovery(node, rmNodeData);
this.rmCore.registerAvailableNode(rmNode); this.rmCore.registerAvailableNode(rmNode);
if (node != null) { if (!(node instanceof FakeDownNodeForRecovery)) {
try { try {
RMCore.topologyManager.addNode(rmNode.getNode()); RMCore.topologyManager.addNode(rmNode.getNode());
} catch (Exception e) { } catch (Exception e) {
...@@ -236,6 +236,7 @@ public class NodesRecoveryManager { ...@@ -236,6 +236,7 @@ public class NodesRecoveryManager {
} }
this.nodesLockRestorationManager.handle(rmNode, rmNodeData.getProvider()); this.nodesLockRestorationManager.handle(rmNode, rmNodeData.getProvider());
} else { } else {
logger.info("Triggering down node notification for " + nodeUrl);
this.triggerDownNodeHookIfNecessary(nodeSource, rmNodeData, nodeUrl, previousState); this.triggerDownNodeHookIfNecessary(nodeSource, rmNodeData, nodeUrl, previousState);
} }
this.updateRecoveredNodeStateCounter(nodeStates, rmNode.getState()); this.updateRecoveredNodeStateCounter(nodeStates, rmNode.getState());
...@@ -293,7 +294,7 @@ public class NodesRecoveryManager { ...@@ -293,7 +294,7 @@ public class NodesRecoveryManager {
} while (!connected && isPAMR && (System.currentTimeMillis() - initialTime) < Agent.MAXIMUM_RETRY_DELAY_MS); } while (!connected && isPAMR && (System.currentTimeMillis() - initialTime) < Agent.MAXIMUM_RETRY_DELAY_MS);
if (!connected) { if (!connected) {
node = new FakeDownNodeForRecovery(rmNodeData.getName(), rmNodeData.getNodeUrl()); node = new FakeDownNodeForRecovery(rmNodeData.getName(), rmNodeData.getNodeUrl(), rmNodeData.getHostname());
rmNodeData.setState(NodeState.DOWN); rmNodeData.setState(NodeState.DOWN);
if (isPAMR) { if (isPAMR) {
logger.error("Node " + nodeUrl + " could not be looked up."); logger.error("Node " + nodeUrl + " could not be looked up.");
......
...@@ -149,8 +149,6 @@ public class TopologyManager { ...@@ -149,8 +149,6 @@ public class TopologyManager {
logger.debug("Adding Node " + node.getNodeInformation().getURL() + " to topology"); logger.debug("Adding Node " + node.getNodeInformation().getURL() + " to topology");
} }
InetAddress inetAddress = node.getVMInformation().getInetAddress();
String hostName = node.getVMInformation().getHostName(); String hostName = node.getVMInformation().getHostName();
if (topology.knownHost(hostName)) { if (topology.knownHost(hostName)) {
...@@ -163,6 +161,8 @@ public class TopologyManager { ...@@ -163,6 +161,8 @@ public class TopologyManager {
return; return;
} }
InetAddress inetAddress = node.getVMInformation().getInetAddress();
// unknown host => start pinging process // unknown host => start pinging process
NodeSet toPing = new NodeSet(); NodeSet toPing = new NodeSet();
HashMap<String, Long> hostsTopology = new HashMap<>(); HashMap<String, Long> hostsTopology = new HashMap<>();
......
...@@ -43,6 +43,7 @@ import org.ow2.proactive.resourcemanager.common.event.RMNodeSourceEvent; ...@@ -43,6 +43,7 @@ import org.ow2.proactive.resourcemanager.common.event.RMNodeSourceEvent;
import org.ow2.proactive.resourcemanager.core.properties.PAResourceManagerProperties; import org.ow2.proactive.resourcemanager.core.properties.PAResourceManagerProperties;
import org.ow2.proactive.resourcemanager.frontend.ResourceManager; import org.ow2.proactive.resourcemanager.frontend.ResourceManager;
import org.ow2.proactive.resourcemanager.nodesource.infrastructure.SSHInfrastructureV2; import org.ow2.proactive.resourcemanager.nodesource.infrastructure.SSHInfrastructureV2;
import org.ow2.proactive.resourcemanager.nodesource.policy.RestartDownNodesPolicy;
import org.ow2.proactive.resourcemanager.nodesource.policy.StaticPolicy; import org.ow2.proactive.resourcemanager.nodesource.policy.StaticPolicy;
import functionaltests.monitor.RMMonitorEventReceiver; import functionaltests.monitor.RMMonitorEventReceiver;
...@@ -69,7 +70,6 @@ public class RecoverSSHInfrastructureV2Test extends RMFunctionalTest { ...@@ -69,7 +70,6 @@ public class RecoverSSHInfrastructureV2Test extends RMFunctionalTest {
public void setup() throws Exception { public void setup() throws Exception {
TestSSHInfrastructureV2.startSSHServer(); TestSSHInfrastructureV2.startSSHServer();
RMTHelper.log("SSH server started"); RMTHelper.log("SSH server started");
startRmAndCheckInitialState();
} }
@After @After
...@@ -88,6 +88,7 @@ public class RecoverSSHInfrastructureV2Test extends RMFunctionalTest { ...@@ -88,6 +88,7 @@ public class RecoverSSHInfrastructureV2Test extends RMFunctionalTest {
@Test @Test
public void testRecoverSSHInfrastructureV2WithAliveNodes() throws Exception { public void testRecoverSSHInfrastructureV2WithAliveNodes() throws Exception {
startRmAndCheckInitialState(false);
// kill only the RM by sending a SIGKILL and leave node processes alive // kill only the RM by sending a SIGKILL and leave node processes alive
RecoverInfrastructureTestHelper.killRmWithStrongSigKill(); RecoverInfrastructureTestHelper.killRmWithStrongSigKill();
// nodes should be re-taken into account by the restarted RM // nodes should be re-taken into account by the restarted RM
...@@ -96,19 +97,34 @@ public class RecoverSSHInfrastructureV2Test extends RMFunctionalTest { ...@@ -96,19 +97,34 @@ public class RecoverSSHInfrastructureV2Test extends RMFunctionalTest {
@Test @Test
public void testRecoverSSHInfrastructureV2WithDownNodes() throws Exception { public void testRecoverSSHInfrastructureV2WithDownNodes() throws Exception {
startRmAndCheckInitialState(false);
// kill RM and nodes with SIGKILL // kill RM and nodes with SIGKILL
RecoverInfrastructureTestHelper.killRmAndNodesWithStrongSigKill(); RecoverInfrastructureTestHelper.killRmAndNodesWithStrongSigKill();
// nodes should be re-deployed by the restarted RM // nodes should be re-deployed by the restarted RM
restartRmAndCheckFinalState(0, TestSSHInfrastructureV2.NB_NODES); restartRmAndCheckFinalState(0, TestSSHInfrastructureV2.NB_NODES);
} }
@Test
public void testRecoverSSHInfrastructureV2WithDownNodesRestartPolicy() throws Exception {
startRmAndCheckInitialState(true);
// kill RM and nodes with SIGKILL
RecoverInfrastructureTestHelper.killRmAndNodesWithStrongSigKill();
// nodes should be re-deployed by the restarted RM
restartRmAndCheckFinalState(0, TestSSHInfrastructureV2.NB_NODES);
RMMonitorEventReceiver resourceManagerMonitor = (RMMonitorEventReceiver) resourceManager;
while (resourceManagerMonitor.getState().getAliveNodes().size() < TestSSHInfrastructureV2.NB_NODES) {
Thread.sleep(5000);
}
RMTHelper.log("Restart down nodes policy redeployed nodes after restart");
}
private void startRmWithConfig(String configurationFilePath) throws Exception { private void startRmWithConfig(String configurationFilePath) throws Exception {
String rmconf = new File(RMTHelper.class.getResource(configurationFilePath).toURI()).getAbsolutePath(); String rmconf = new File(RMTHelper.class.getResource(configurationFilePath).toURI()).getAbsolutePath();
rmHelper.startRM(rmconf); rmHelper.startRM(rmconf);
resourceManager = rmHelper.getResourceManager(); resourceManager = rmHelper.getResourceManager();
} }
private void startRmAndCheckInitialState() throws Exception { private void startRmAndCheckInitialState(boolean restartDownNodesPolicy) throws Exception {
// start RM // start RM
startRmWithConfig(START_CONFIG); startRmWithConfig(START_CONFIG);
assertThat(PAResourceManagerProperties.RM_PRESERVE_NODES_ON_SHUTDOWN.getValueAsBoolean()).isTrue(); assertThat(PAResourceManagerProperties.RM_PRESERVE_NODES_ON_SHUTDOWN.getValueAsBoolean()).isTrue();
...@@ -120,7 +136,8 @@ public class RecoverSSHInfrastructureV2Test extends RMFunctionalTest { ...@@ -120,7 +136,8 @@ public class RecoverSSHInfrastructureV2Test extends RMFunctionalTest {
resourceManager.createNodeSource(NODE_SOURCE_NAME, resourceManager.createNodeSource(NODE_SOURCE_NAME,
SSHInfrastructureV2.class.getName(), SSHInfrastructureV2.class.getName(),
TestSSHInfrastructureV2.infraParams, TestSSHInfrastructureV2.infraParams,
StaticPolicy.class.getName(), restartDownNodesPolicy ? RestartDownNodesPolicy.class.getName()
: StaticPolicy.class.getName(),
TestSSHInfrastructureV2.policyParameters, TestSSHInfrastructureV2.policyParameters,
NODES_RECOVERABLE); NODES_RECOVERABLE);
RMTHelper.waitForNodeSourceCreation(NODE_SOURCE_NAME, RMTHelper.waitForNodeSourceCreation(NODE_SOURCE_NAME,
...@@ -149,7 +166,7 @@ public class RecoverSSHInfrastructureV2Test extends RMFunctionalTest { ...@@ -149,7 +166,7 @@ public class RecoverSSHInfrastructureV2Test extends RMFunctionalTest {
assertThat(nodeSourceEvent.size()).isEqualTo(1); assertThat(nodeSourceEvent.size()).isEqualTo(1);
assertThat(nodeSourceEvent.get(0).getSourceName()).isEqualTo(NODE_SOURCE_NAME); assertThat(nodeSourceEvent.get(0).getSourceName()).isEqualTo(NODE_SOURCE_NAME);
// the nodes should have been recovered too, and should be alive // the nodes should have been recovered too, and should be alive or down
Set<String> allNodes = resourceManagerMonitor.getState().getAllNodes(); Set<String> allNodes = resourceManagerMonitor.getState().getAllNodes();
assertThat(allNodes.size()).isEqualTo(TestSSHInfrastructureV2.NB_NODES); assertThat(allNodes.size()).isEqualTo(TestSSHInfrastructureV2.NB_NODES);
Set<String> nodeSourceNames = new HashSet<>(); Set<String> nodeSourceNames = new HashSet<>();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment