Commit 80c50fa8 authored by ipatini's avatar ipatini
Browse files

EMS: Common: Added 'AbstractRecoveryTask' abstract class containing the common...

EMS: Common: Added 'AbstractRecoveryTask' abstract class containing the common method implementations of ShellRecoveryTask and VmNodeRecoveryTask. ShellRecoveryTask and VmNodeRecoveryTask are now sub-classes of AbstractRecoveryTask.
parent 74f5d34b
/*
* Copyright (C) 2017-2022 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
* Esper library is used, in which case it is subject to the terms of General Public License v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.melodic.event.common.recovery;
import eu.melodic.event.util.EventBus;
import eu.melodic.event.util.PasswordUtil;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Component
@RequiredArgsConstructor
public abstract class AbstractRecoveryTask implements RecoveryTask {
@NonNull protected final EventBus<String,Object,Object> eventBus;
@NonNull protected final PasswordUtil passwordUtil;
@NonNull protected final TaskScheduler taskScheduler;
@NonNull protected final SelfHealingProperties selfHealingProperties;
@NonNull
@Getter @Setter
protected Map nodeInfo = Collections.emptyMap();
public abstract List<RECOVERY_COMMAND> getRecoveryCommands();
public abstract void runNodeRecovery() throws Exception;
public abstract void runNodeRecovery(List<RECOVERY_COMMAND> recoveryCommands) throws Exception;
protected void waitFor(long millis, String description) {
if (millis>0) {
log.warn("############## Waiting for {}ms after {}...", millis, description);
try { Thread.sleep(millis); } catch (InterruptedException e) { }
}
}
protected void redirectOutput(InputStream in, String id, AtomicBoolean closed, String connectionClosedMessageFormatter, String exceptionMessageFormatter) {
taskScheduler.schedule(() -> {
try {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
while (reader.ready()) {
log.info(" {}> {}", id, reader.readLine());
}
}
} catch (IOException e) {
if (closed.get()) {
log.info(connectionClosedMessageFormatter, id);
} else {
log.error(exceptionMessageFormatter, id, e);
}
}
},
Instant.now()
);
}
}
......@@ -10,19 +10,15 @@
package eu.melodic.event.common.recovery;
import eu.melodic.event.util.EventBus;
import lombok.*;
import eu.melodic.event.util.PasswordUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import static eu.melodic.event.common.recovery.RecoveryConstant.SELF_HEALING_RECOVERY_COMPLETED;
......@@ -32,16 +28,9 @@ import static eu.melodic.event.common.recovery.RecoveryConstant.SELF_HEALING_REC
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ShellRecoveryTask implements RecoveryTask {
@NonNull private final EventBus<String,Object,Object> eventBus;
@NonNull private final TaskScheduler taskScheduler;
@Getter @Setter
private Map nodeInfo;
public void setNodeInfo(@NonNull Map nodeInfo) {
this.nodeInfo = nodeInfo;
public class ShellRecoveryTask extends AbstractRecoveryTask {
public ShellRecoveryTask(EventBus<String,Object,Object> eventBus, PasswordUtil passwordUtil, TaskScheduler taskScheduler, SelfHealingProperties selfHealingProperties) {
super(eventBus, passwordUtil, taskScheduler, selfHealingProperties);
}
@SneakyThrows
......@@ -83,31 +72,10 @@ public class ShellRecoveryTask implements RecoveryTask {
eventBus.send(SELF_HEALING_RECOVERY_COMPLETED, "");
}
private void waitFor(long millis, String description) {
if (millis>0) {
log.warn("############## Waiting for {}ms after {}...", millis, description);
try { Thread.sleep(millis); } catch (InterruptedException e) { }
}
}
private void redirectShellOutput(InputStream in, String id, AtomicBoolean closed) {
taskScheduler.schedule(() -> {
try {
//IoUtils.copy(in, System.out);
try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
while (reader.ready()) {
log.info(" {}> {}", id, reader.readLine());
}
}
} catch (IOException e) {
if (closed.get()) {
log.info("ShellRecoveryTask: redirectShellOutput(): Connection closed: id={}", id);
} else {
log.error("ShellRecoveryTask: redirectShellOutput(): Exception while copying Process IN stream: id={}\n", id, e);
}
}
},
Instant.now()
);
redirectOutput(in, id, closed,
"ShellRecoveryTask: redirectShellOutput(): Connection closed: id={}",
"ShellRecoveryTask: redirectShellOutput(): Exception while copying Process IN stream: id={}\n");
//IoUtils.copy(in, System.out);
}
}
......@@ -14,17 +14,15 @@ import eu.melodic.event.common.client.SshClientProperties;
import eu.melodic.event.common.collector.CollectorContext;
import eu.melodic.event.util.EventBus;
import eu.melodic.event.util.PasswordUtil;
import lombok.*;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
......@@ -36,20 +34,18 @@ import static eu.melodic.event.common.recovery.RecoveryConstant.SELF_HEALING_REC
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class VmNodeRecoveryTask <P extends SshClientProperties> implements RecoveryTask {
@NonNull private final EventBus<String,Object,Object> eventBus;
@NonNull private final PasswordUtil passwordUtil;
@NonNull private final TaskScheduler taskScheduler;
public class VmNodeRecoveryTask <P extends SshClientProperties> extends AbstractRecoveryTask {
@NonNull private final CollectorContext<P> collectorContext;
@Getter
private Map nodeInfo;
private P sshClientProperties;
public VmNodeRecoveryTask(EventBus<String,Object,Object> eventBus, PasswordUtil passwordUtil, TaskScheduler taskScheduler, SelfHealingProperties selfHealingProperties, CollectorContext<P> collectorContext) {
super(eventBus, passwordUtil, taskScheduler, selfHealingProperties);
this.collectorContext = collectorContext;
}
public void setNodeInfo(@NonNull Map nodeInfo) {
this.nodeInfo = nodeInfo;
super.setNodeInfo(nodeInfo);
this.sshClientProperties = createSshClientProperties();
}
......@@ -96,13 +92,6 @@ public class VmNodeRecoveryTask <P extends SshClientProperties> implements Recov
return o.toString();
}
private void waitFor(long millis, String description) {
if (millis>0) {
log.warn("############## Waiting for {}ms after {}...", millis, description);
try { Thread.sleep(millis); } catch (InterruptedException e) { }
}
}
private P createSshClientProperties() {
log.debug("VmNodeRecoveryTask: createSshClientProperties(): BEGIN:");
......@@ -169,23 +158,9 @@ public class VmNodeRecoveryTask <P extends SshClientProperties> implements Recov
}
private void redirectSshOutput(InputStream in, String id, AtomicBoolean closed) {
taskScheduler.schedule(() -> {
try {
//IoUtils.copy(sshc.getIn(), System.out);
try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
while (reader.ready()) {
log.info(" {}> {}", id, reader.readLine());
}
}
} catch (IOException e) {
if (closed.get()) {
log.info("VmNodeRecoveryTask: redirectSshOutput(): Connection closed: id={}", id);
} else {
log.error("VmNodeRecoveryTask: redirectSshOutput(): Exception while copying SSH IN stream: id={}\n", id, e);
}
}
},
Instant.now()
);
redirectOutput(in, id, closed,
"VmNodeRecoveryTask: redirectSshOutput(): Connection closed: id={}",
"VmNodeRecoveryTask: redirectSshOutput(): Exception while copying SSH IN stream: id={}\n");
//IoUtils.copy(sshc.getIn(), System.out);
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment