Commit 78b51e2d authored by I Patini's avatar I Patini
Browse files

EMS: Baguettte Client, Common: Moved EMS client/Netdata recovery code from...

EMS: Baguettte Client, Common: Moved EMS client/Netdata recovery code from 'baguette-client' to 'common' package to make it available to EMS server too. Added 'SshClient' and 'SshClientProperties' interfaces in order to generify client recovery code. 'Sshc' and 'BaguetteClientProperties' classes in baguette-client are now implementing the new SshClient and SshClientProperties interfaces. Added purging of failed node addresses from NetdataCollector's ignoredNodes list, when Client Configuration is updated (it contains the nodes without EMS client). Few minor improvements.
parent 6555f46b
......@@ -9,6 +9,7 @@
package eu.melodic.event.baguette.client;
import eu.melodic.event.common.client.SshClientProperties;
import lombok.Data;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
......@@ -21,12 +22,8 @@ import java.util.List;
@Configuration
@ConfigurationProperties
@PropertySource("file:${MELODIC_CONFIG_DIR}/baguette-client.properties")
@ToString(exclude = "serverPassword")
public class BaguetteClientProperties {
private long authTimeout = 60000;
private long execTimeout = 120000;
private long retryPeriod = 60000;
@ToString(callSuper = true)
public class BaguetteClientProperties extends SshClientProperties {
private boolean exitCommandAllowed = false;
private int killDelay = 5;
......
......@@ -18,6 +18,7 @@ import eu.melodic.event.brokercep.cep.CepService;
import eu.melodic.event.brokercep.event.EventMap;
import eu.melodic.event.brokerclient.event.EventGenerator;
import eu.melodic.event.brokerclient.properties.BrokerClientProperties;
import eu.melodic.event.common.misc.EventConstant;
import eu.melodic.event.common.misc.SystemResourceMonitor;
import eu.melodic.event.util.*;
import io.atomix.cluster.ClusterMembershipEvent;
......@@ -752,6 +753,10 @@ public class CommandExecutor {
clientConfiguration = config;
}
log.info("New client config.: {}", config);
HashMap<String,ClientConfiguration> payload = new HashMap<>();
payload.put("new", clientConfiguration);
payload.put("old", oldConfig);
eventBus.send(EventConstant.EVENT_CLIENT_CONFIG_UPDATED, payload, this);
} catch (Exception ex) {
log.error("Exception while deserializing received Client configuration: ", ex);
......
......@@ -46,7 +46,7 @@ import java.util.Optional;
*/
@Slf4j
@Service
public class Sshc {
public class Sshc implements eu.melodic.event.common.client.SshClient<BaguetteClientProperties> {
private BaguetteClientProperties config;
private SshClient client;
private SimpleClient simple;
......@@ -62,15 +62,17 @@ public class Sshc {
private InputStream in;
@Getter
private PrintStream out;
//@Getter
//private PrintStream err;
@Getter
private PrintStream err;
@Getter
private String clientId;
@Getter @Setter
private boolean useServerKeyVerifier = true;
public void setConfiguration(BaguetteClientProperties config) throws IOException {
@Override
public void setConfiguration(BaguetteClientProperties config) {
log.trace("Sshc: New config: {}", config);
this.config = config;
this.clientId = config.getClientId();
log.trace("Sshc: cmd-exec: {}", commandExecutor);
......@@ -102,6 +104,7 @@ public class Sshc {
if (started) log.trace("Client started");
}
@Override
public synchronized void start() throws IOException {
if (started) return;
log.info("Connecting to server...");
......@@ -201,6 +204,7 @@ public class Sshc {
this.started = true;
}
@Override
public synchronized void stop() throws IOException {
if (!started) return;
this.started = false;
......
......@@ -9,8 +9,11 @@
package eu.melodic.event.baguette.client.collector;
import eu.melodic.event.baguette.client.BaguetteClientProperties;
import eu.melodic.event.baguette.client.CommandExecutor;
import eu.melodic.event.baguette.client.Sshc;
import eu.melodic.event.brokercep.event.EventMap;
import eu.melodic.event.common.client.SshClient;
import eu.melodic.event.common.collector.CollectorContext;
import eu.melodic.event.util.ClientConfiguration;
import eu.melodic.event.util.GroupingConfiguration;
......@@ -27,7 +30,7 @@ import java.util.Set;
@Slf4j
@Component
@RequiredArgsConstructor
public class ClientCollectorContext implements CollectorContext {
public class ClientCollectorContext implements CollectorContext<BaguetteClientProperties> {
private final CommandExecutor commandExecutor;
public Map<String, GroupingConfiguration> getGroupings() {
......@@ -54,4 +57,14 @@ public class ClientCollectorContext implements CollectorContext {
public boolean sendEvent(String connectionString, String destinationName, EventMap event, boolean createDestination) {
return commandExecutor.sendEvent(connectionString, destinationName, event, createDestination);
}
@Override
public SshClient<BaguetteClientProperties> getSshClient() {
return new Sshc();
}
@Override
public BaguetteClientProperties getSshClientProperties() {
return new BaguetteClientProperties();
}
}
......@@ -10,23 +10,16 @@
package eu.melodic.event.baguette.client.plugin.recovery;
import com.google.gson.Gson;
import eu.melodic.event.baguette.client.BaguetteClientProperties;
import eu.melodic.event.baguette.client.CommandExecutor;
import eu.melodic.event.util.EventBus;
import eu.melodic.event.util.PasswordUtil;
import lombok.Data;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
/**
* Node Info helper -- Retrieves node info from EMS server and caches them
......
......@@ -12,7 +12,7 @@ package eu.melodic.event.baguette.client.plugin.recovery;
import eu.melodic.event.baguette.client.BaguetteClientProperties;
import eu.melodic.event.baguette.client.CommandExecutor;
import eu.melodic.event.baguette.client.collector.netdata.NetdataCollector;
import eu.melodic.event.common.recovery.RecoveryConstant;
import eu.melodic.event.common.recovery.*;
import eu.melodic.event.util.EventBus;
import eu.melodic.event.util.PasswordUtil;
import eu.melodic.event.util.Plugin;
......
/*
* Copyright (C) 2017-2022 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
* Esper library is used, in which case it is subject to the terms of General Public License v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.melodic.event.common.client;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
public interface SshClient<C extends SshClientProperties> {
void setConfiguration(C config);
void setUseServerKeyVerifier(boolean useServerKeyVerifier);
void start() throws IOException;
void stop() throws IOException;
InputStream getIn();
PrintStream getOut();
PrintStream getErr();
}
/*
* Copyright (C) 2017-2022 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
* Esper library is used, in which case it is subject to the terms of General Public License v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.melodic.event.common.client;
import lombok.Data;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@ConfigurationProperties
@ToString(exclude = "serverPassword")
public class SshClientProperties {
private long authTimeout = 60000;
private long execTimeout = 120000;
private long retryPeriod = 60000;
private String clientId;
private String serverAddress;
private int serverPort = 22;
private String serverPubkey;
private String serverFingerprint;
private String serverUsername;
private String serverPassword;
}
......@@ -10,15 +10,19 @@
package eu.melodic.event.common.collector;
import eu.melodic.event.brokercep.event.EventMap;
import eu.melodic.event.common.client.SshClient;
import eu.melodic.event.common.client.SshClientProperties;
import eu.melodic.event.util.ClientConfiguration;
import java.io.Serializable;
import java.util.List;
import java.util.Set;
public interface CollectorContext {
public interface CollectorContext<P extends SshClientProperties> {
List<ClientConfiguration> getNodeConfigurations();
Set<Serializable> getNodesWithoutClient();
boolean isAggregator();
boolean sendEvent(String connectionString, String destinationName, EventMap event, boolean createDestination);
default SshClient<P> getSshClient() { return null; }
default P getSshClientProperties() { return null; }
}
......@@ -11,6 +11,8 @@ package eu.melodic.event.common.collector.netdata;
import eu.melodic.event.brokercep.event.EventMap;
import eu.melodic.event.common.collector.CollectorContext;
import eu.melodic.event.common.misc.EventConstant;
import eu.melodic.event.common.recovery.RecoveryConstant;
import eu.melodic.event.util.EmsConstant;
import eu.melodic.event.util.EventBus;
import lombok.NonNull;
......@@ -24,14 +26,11 @@ import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.web.client.RestTemplate;
import java.io.Serializable;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
import eu.melodic.event.common.recovery.RecoveryConstant;
/**
* Collects measurements from Netdata http server
*/
......@@ -105,6 +104,7 @@ public class NetdataCollector implements InitializingBean, Runnable, EventBus.Ev
// Subscribe for SELF-HEALING plugin GIVE_UP events
eventBus.subscribe(RecoveryConstant.SELF_HEALING_RECOVERY_COMPLETED, this);
eventBus.subscribe(RecoveryConstant.SELF_HEALING_RECOVERY_GIVE_UP, this);
eventBus.subscribe(EventConstant.EVENT_CLIENT_CONFIG_UPDATED, this);
// Schedule collection execution
errorsMap.clear();
......@@ -122,6 +122,7 @@ public class NetdataCollector implements InitializingBean, Runnable, EventBus.Ev
}
// Unsubscribe from SELF-HEALING plugin GIVE_UP events
eventBus.unsubscribe(EventConstant.EVENT_CLIENT_CONFIG_UPDATED, this);
eventBus.unsubscribe(RecoveryConstant.SELF_HEALING_RECOVERY_COMPLETED, this);
eventBus.unsubscribe(RecoveryConstant.SELF_HEALING_RECOVERY_GIVE_UP, this);
......@@ -151,6 +152,14 @@ public class NetdataCollector implements InitializingBean, Runnable, EventBus.Ev
log.info("Collectors::Netdata: Giving up collection from Node: {}", nodeAddress);
ignoredNodes.put(nodeAddress, null);
}
} else
if (EventConstant.EVENT_CLIENT_CONFIG_UPDATED.equals(topic)) {
log.info("Collectors::Netdata: Client configuration updated. Purging nodes without recovery task from ignore list: Old ignore list nodes: {}", ignoredNodes.keySet());
List<String> nodesToPurge = ignoredNodes.entrySet().stream().filter(e -> e.getValue() == null).map(Map.Entry::getKey).collect(Collectors.toList());
nodesToPurge.forEach(node -> {
ignoredNodes.remove(node);
log.info("Collectors::Netdata: Client configuration updated. Node purged from ignore list: {}", node);
});
} else
log.warn("Collectors::Netdata: onMessage: Event from unexpected topic received. Ignoring it: {}", topic);
}
......@@ -179,7 +188,7 @@ public class NetdataCollector implements InitializingBean, Runnable, EventBus.Ev
if (collectorContext.getNodesWithoutClient().size()>0) {
log.info("Collectors::Netdata: Collecting metrics from remote nodes (without EMS client): {}",
collectorContext.getNodesWithoutClient());
for (Serializable nodeAddress : collectorContext.getNodesWithoutClient()) {
for (Object nodeAddress : collectorContext.getNodesWithoutClient()) {
// collect data from remote node
collectAndPublishData(nodeAddress.toString());
}
......
/*
* Copyright (C) 2017-2022 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
* Esper library is used, in which case it is subject to the terms of General Public License v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.melodic.event.common.misc;
/**
* Common Event Constants
*/
public class EventConstant {
public final static String EVENT_CLIENT_CONFIG_UPDATED = "EVENT_CLIENT_CONFIG_UPDATED";
}
\ No newline at end of file
......@@ -7,8 +7,10 @@
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.melodic.event.baguette.client.plugin.recovery;
package eu.melodic.event.common.recovery;
import eu.melodic.event.common.client.SshClientProperties;
import eu.melodic.event.common.collector.CollectorContext;
import eu.melodic.event.util.EventBus;
import eu.melodic.event.util.PasswordUtil;
import lombok.Getter;
......@@ -24,11 +26,11 @@ import java.util.Collections;
import java.util.List;
/**
* EMS client (client-side) Self-Healing
* EMS client Self-Healing
*/
@Slf4j
@Component
public class EmsClientRecoveryTask extends VmNodeRecoveryTask {
public class EmsClientRecoveryTask<P extends SshClientProperties> extends VmNodeRecoveryTask<P> {
@Getter
private final List<RECOVERY_COMMAND> recoveryCommands = Collections.unmodifiableList(Arrays.asList(
new RECOVERY_COMMAND("Initial wait...",
......@@ -42,8 +44,8 @@ public class EmsClientRecoveryTask extends VmNodeRecoveryTask {
@Value("${self.healing.recovery.file.baguette:}")
private String emsRecoveryFile;
public EmsClientRecoveryTask(@NonNull EventBus<String, Object, Object> eventBus, @NonNull PasswordUtil passwordUtil, @NonNull TaskScheduler taskScheduler) {
super(eventBus, passwordUtil, taskScheduler);
public EmsClientRecoveryTask(@NonNull EventBus<String, Object, Object> eventBus, @NonNull PasswordUtil passwordUtil, @NonNull TaskScheduler taskScheduler, @NonNull CollectorContext<P> collectorContext) {
super(eventBus, passwordUtil, taskScheduler, collectorContext);
}
public void runNodeRecovery() throws Exception {
......
......@@ -7,7 +7,7 @@
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.melodic.event.baguette.client.plugin.recovery;
package eu.melodic.event.common.recovery;
import eu.melodic.event.util.EventBus;
import eu.melodic.event.util.PasswordUtil;
......@@ -24,7 +24,7 @@ import java.util.Collections;
import java.util.List;
/**
* Netdata agent (client-side) Self-Healing
* Local Netdata agent Self-Healing
*/
@Slf4j
@Component
......
......@@ -7,8 +7,10 @@
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.melodic.event.baguette.client.plugin.recovery;
package eu.melodic.event.common.recovery;
import eu.melodic.event.common.client.SshClientProperties;
import eu.melodic.event.common.collector.CollectorContext;
import eu.melodic.event.util.EventBus;
import eu.melodic.event.util.PasswordUtil;
import lombok.Getter;
......@@ -24,11 +26,11 @@ import java.util.Collections;
import java.util.List;
/**
* Netdata agent (client-side) Self-Healing
* Remote Netdata agent Self-Healing using an SSH connection
*/
@Slf4j
@Component
public class NetdataAgentRecoveryTask extends VmNodeRecoveryTask {
public class NetdataAgentRecoveryTask<P extends SshClientProperties> extends VmNodeRecoveryTask<P> {
@Getter
private final List<RECOVERY_COMMAND> recoveryCommands = Collections.unmodifiableList(Arrays.asList(
new RECOVERY_COMMAND("Initial wait...",
......@@ -42,8 +44,8 @@ public class NetdataAgentRecoveryTask extends VmNodeRecoveryTask {
@Value("${self.healing.recovery.file.netdata:}")
private String netdataRecoveryFile;
public NetdataAgentRecoveryTask(@NonNull EventBus<String, Object, Object> eventBus, @NonNull PasswordUtil passwordUtil, @NonNull TaskScheduler taskScheduler) {
super(eventBus, passwordUtil, taskScheduler);
public NetdataAgentRecoveryTask(@NonNull EventBus<String, Object, Object> eventBus, @NonNull PasswordUtil passwordUtil, @NonNull TaskScheduler taskScheduler, @NonNull CollectorContext<P> collectorContext) {
super(eventBus, passwordUtil, taskScheduler, collectorContext);
}
public void runNodeRecovery() throws Exception {
......
......@@ -7,12 +7,12 @@
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.melodic.event.baguette.client.plugin.recovery;
package eu.melodic.event.common.recovery;
import lombok.Data;
@Data
class RECOVERY_COMMAND {
public class RECOVERY_COMMAND {
private final String name;
private final String command;
private final long waitBefore;
......
......@@ -7,7 +7,7 @@
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.melodic.event.baguette.client.plugin.recovery;
package eu.melodic.event.common.recovery;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
......@@ -19,7 +19,7 @@ import java.util.List;
import java.util.Map;
/**
* Client-side Self-Healing task
* Self-Healing task
*/
public interface RecoveryTask {
Map getNodeInfo();
......
......@@ -7,7 +7,7 @@
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.melodic.event.baguette.client.plugin.recovery;
package eu.melodic.event.common.recovery;
import eu.melodic.event.util.EventBus;
import lombok.*;
......@@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static eu.melodic.event.common.recovery.RecoveryConstant.SELF_HEALING_RECOVERY_COMPLETED;
/**
* Client-side, Local-node Self-Healing
* Local-node Self-Healing using Shell
*/
@Slf4j
@Component
......
......@@ -7,10 +7,11 @@
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.melodic.event.baguette.client.plugin.recovery;
package eu.melodic.event.common.recovery;
import eu.melodic.event.baguette.client.BaguetteClientProperties;
import eu.melodic.event.baguette.client.Sshc;
import eu.melodic.event.common.client.SshClient;
import eu.melodic.event.common.client.SshClientProperties;
import eu.melodic.event.common.collector.CollectorContext;
import eu.melodic.event.util.EventBus;
import eu.melodic.event.util.PasswordUtil;
import lombok.*;
......@@ -31,24 +32,25 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static eu.melodic.event.common.recovery.RecoveryConstant.SELF_HEALING_RECOVERY_COMPLETED;
/**
* Client-side, VM-node Self-Healing
* VM-node Self-Healing using an SSH connection
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class VmNodeRecoveryTask implements RecoveryTask {
public class VmNodeRecoveryTask <P extends SshClientProperties> implements RecoveryTask {
@NonNull private final EventBus<String,Object,Object> eventBus;
@NonNull private final PasswordUtil passwordUtil;
@NonNull private final TaskScheduler taskScheduler;
@NonNull private final CollectorContext<P> collectorContext;
@Getter @Setter
@Getter
private Map nodeInfo;
private BaguetteClientProperties baguetteClientProperties;
private P sshClientProperties;
public void setNodeInfo(@NonNull Map nodeInfo) {
this.nodeInfo = nodeInfo;
this.baguetteClientProperties = createBaguetteClientProperties();
this.sshClientProperties = createSshClientProperties();
}
@SneakyThrows
......@@ -64,7 +66,7 @@ public class VmNodeRecoveryTask implements RecoveryTask {
log.debug("VmNodeRecoveryTask: runNodeRecovery(): BEGIN: recovery-command: {}", recoveryCommands);
// Connect to Node (VM)
Sshc sshc = connectToNode();
SshClient<? extends SshClientProperties> sshc = connectToNode();
// Redirect SSH output to standard output
final AtomicBoolean closed = new AtomicBoolean(false);
......@@ -86,7 +88,7 @@ public class VmNodeRecoveryTask implements RecoveryTask {
disconnectFromNode(sshc, closed);
// Send recovery complete event
eventBus.send(SELF_HEALING_RECOVERY_COMPLETED, baguetteClientProperties.getServerAddress());
eventBus.send(SELF_HEALING_RECOVERY_COMPLETED, sshClientProperties.getServerAddress());
}
private String str(Object o) {
......@@ -101,8 +103,8 @@ public class VmNodeRecoveryTask implements RecoveryTask {
}
}
private BaguetteClientProperties createBaguetteClientProperties() {
log.debug("VmNodeRecoveryTask: createBaguetteClientProperties(): BEGIN:");
private P createSshClientProperties() {
log.debug("VmNodeRecoveryTask: createSshClientProperties(): BEGIN:");
// Extract connection info and credentials
String os = str(nodeInfo.get("operatingSystem"));
......@@ -121,12 +123,12 @@ public class VmNodeRecoveryTask implements RecoveryTask {
port = 22;
} catch (Exception e) {}
log.debug("VmNodeRecoveryTask: createBaguetteClientProperties(): os={}, address={}, type={}", os, address, type);
log.debug("VmNodeRecoveryTask: createBaguetteClientProperties(): username={}, password={}", username, passwordUtil.encodePassword(password));
log.debug("VmNodeRecoveryTask: createBaguetteClientProperties(): fingerprint={}, key={}", fingerprint, passwordUtil.encodePassword(key));
log.debug("VmNodeRecoveryTask: createSshClientProperties(): os={}, address={}, type={}", os, address, type);
log.debug("VmNodeRecoveryTask: createSshClientProperties(): username={}, password={}", username, passwordUtil.encodePassword(password));
log.debug("VmNodeRecoveryTask: createSshClientProperties(): fingerprint={}, key={}", fingerprint, passwordUtil.encodePassword(key));
// Connect to node and restart Baguette Client
BaguetteClientProperties config = new BaguetteClientProperties();
// Connect to node and restart EMS client
P config = collectorContext.getSshClientProperties();
config.setServerAddress(address);
config.setServerPort(port);