Commit 040c330f authored by ipatini's avatar ipatini
Browse files

EMS: Baguette Client: Fixed a bug that caused EMS client recovery tasks to be...

EMS: Baguette Client: Fixed a bug that caused EMS client recovery tasks to be cancelled by Netdata self-healing checks
parent c9cd391b
......@@ -17,9 +17,7 @@ import eu.melodic.event.util.EventBus;
import eu.melodic.event.util.PasswordUtil;
import eu.melodic.event.util.Plugin;
import io.atomix.cluster.ClusterMembershipEvent;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
......@@ -54,7 +52,7 @@ public class SelfHealingPlugin implements Plugin, InitializingBean, EventBus.Eve
private boolean started;
private final HashMap<String,ScheduledFuture<?>> waitingTasks = new HashMap<>();
private final HashMap<NodeKey,ScheduledFuture<?>> waitingTasks = new HashMap<>();
private final TaskScheduler taskScheduler;
@Override
......@@ -89,7 +87,7 @@ public class SelfHealingPlugin implements Plugin, InitializingBean, EventBus.Eve
eventBus.unsubscribe(NetdataCollector.NETDATA_NODE_FAILED, this);
// Cancel all waiting recovery tasks
waitingTasks.forEach((nodeAddress,future) -> {
waitingTasks.forEach((nodeKey,future) -> {
future.cancel(true);
});
waitingTasks.clear();
......@@ -163,7 +161,7 @@ public class SelfHealingPlugin implements Plugin, InitializingBean, EventBus.Eve
}
// Cancel any waiting recovery task
cancelRecoveryTask(nodeId, nodeAddress, false);
cancelRecoveryTask(nodeId, nodeAddress, EmsClientRecoveryTask.class, false);
} else {
log.warn("SelfHealingPlugin: processClusterNodeAddedEvent(): Message is not a {} object. Will ignore it.", ClusterMembershipEvent.class.getSimpleName());
}
......@@ -227,19 +225,24 @@ public class SelfHealingPlugin implements Plugin, InitializingBean, EventBus.Eve
}*/
// Cancel any waiting recovery task
cancelRecoveryTask(null, nodeAddress, false);
@NonNull Class<? extends RecoveryTask> recoverTaskClass =
StringUtils.isNotBlank(nodeAddress)
? NetdataAgentRecoveryTask.class
: NetdataAgentLocalRecoveryTask.class;
cancelRecoveryTask(null, nodeAddress, recoverTaskClass, false);
}
// ------------------------------------------------------------------------
private void createRecoveryTask(String nodeId, @NonNull String nodeAddress, @NonNull Class<? extends RecoveryTask> recoveryTaskClass) {
// Check if a recovery task has already been scheduled
NodeKey nodeKey = new NodeKey(nodeAddress, recoveryTaskClass);
synchronized (waitingTasks) {
if (waitingTasks.containsKey(nodeAddress)) {
if (waitingTasks.containsKey(nodeKey)) {
log.warn("SelfHealingPlugin: createRecoveryTask(): Recovery has already been scheduled for Node: id={}, address={}", nodeId, nodeAddress);
return;
}
waitingTasks.put(nodeAddress, null);
waitingTasks.put(nodeKey, null);
}
// Get node info and credentials from EMS server
......@@ -271,20 +274,21 @@ public class SelfHealingPlugin implements Plugin, InitializingBean, EventBus.Eve
}
if (retries.getAndIncrement() >= selfHealingProperties.getRecovery().getMaxRetries()) {
log.warn("SelfHealingPlugin: Max retries reached. No more recovery retries for node: id={}, address={}", nodeId, nodeAddress);
cancelRecoveryTask(nodeId, nodeAddress, true);
cancelRecoveryTask(nodeId, nodeAddress, recoveryTaskClass, true);
eventBus.send(RecoveryConstant.SELF_HEALING_RECOVERY_GIVE_UP, nodeAddress);
// Notify EMS server about giving up recovery due to permanent failure
commandExecutor.notifyEmsServer("RECOVERY GIVE_UP "+nodeId+" @ "+nodeAddress);
}
}, Instant.now().plusMillis(selfHealingProperties.getRecovery().getDelay()), Duration.ofMillis(selfHealingProperties.getRecovery().getRetryDelay()));
waitingTasks.put(nodeAddress, future);
waitingTasks.put(nodeKey, future);
log.info("SelfHealingPlugin: createRecoveryTask(): Created recovery task for Node: id={}, address={}", nodeId, nodeAddress);
}
private void cancelRecoveryTask(String nodeId, @NonNull String nodeAddress, boolean retainAddress) {
private void cancelRecoveryTask(String nodeId, @NonNull String nodeAddress, @NonNull Class<? extends RecoveryTask> recoveryTaskClass, boolean retainNodeKey) {
NodeKey nodeKey = new NodeKey(nodeAddress, recoveryTaskClass);
synchronized (waitingTasks) {
ScheduledFuture<?> future = retainAddress ? waitingTasks.put(nodeAddress, null) : waitingTasks.remove(nodeAddress);
ScheduledFuture<?> future = retainNodeKey ? waitingTasks.put(nodeKey, null) : waitingTasks.remove(nodeKey);
if (future != null) {
future.cancel(true);
nodeInfoHelper.remove(nodeId, nodeAddress);
......@@ -293,4 +297,11 @@ public class SelfHealingPlugin implements Plugin, InitializingBean, EventBus.Eve
log.debug("SelfHealingPlugin: cancelRecoveryTask(): No recovery task is scheduled for Node: id={}, address={}", nodeId, nodeAddress);
}
}
@Data
@AllArgsConstructor
protected static class NodeKey {
private String address;
@NonNull private Class<?> recoveryTaskClass;
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment