Commit 7edf71d3 authored by ipatini's avatar ipatini
Browse files

EMS: Baguette Server: Added 'IClusterZoneDetector' interface. Moved...

EMS: Baguette Server: Added  'IClusterZoneDetector' interface. Moved 'getZoneIdFor()' methods from 'IZoneManagementStrategy' to the newly added 'IClusterZoneDetector' interface. Removed 'getZoneIdFor()' methods from the classes implementing 'IZoneManagementStrategy'. Added 'ClusterZoneDetector' class, implementing 'IClusterZoneDetector', that determines the cluster/zone id of a node based on node's preregistration info, using a configurable set of rules. Updated 'ClusteringCoordinator' to initialize and use a 'ClusterZoneDetector' instance for determining cluster/zone id, instead of using 'IZoneManagementStrategy' instance. Added sample settings for cluster detector in 'eu.melodic.event.baguette-server.properties'. A few more code and setting improvements.
parent 211b240d
......@@ -39,53 +39,6 @@ public class AtLeastTwoZoneManagementStrategy implements IZoneManagementStrategy
log.warn("AtLeastTwoZoneManagementStrategy: Node connection from an already registered IP address: {} @ {}", csc.getId(), csc.getClientIpAddress());
}
@Override
public String getZoneIdFor(ClientShellCommand c) {
String nodeAddress = c.getClientIpAddress();
String hostname = c.getClientHostname();
log.debug("getZoneIdFor: {}: address: {}", c.getId(), nodeAddress);
log.debug("getZoneIdFor: {}: hostname: {}", c.getId(), hostname);
String zoneName = null;
NodeRegistryEntry entry = c.getNodeRegistryEntry();
if (entry!=null) {
String zoneId = getZoneIdFor(c.getNodeRegistryEntry());
if (StringUtils.isNotBlank(zoneId)) {
return zoneId;
}
}
if (StringUtils.isNotBlank(hostname) && !InetAddresses.isUriInetAddress(hostname)) {
int p = hostname.indexOf(".");
if (p>0)
zoneName = hostname.substring(p+1);
}
if (StringUtils.isBlank(zoneName) && StringUtils.isNotBlank(nodeAddress)) {
int p = nodeAddress.lastIndexOf(".");
if (p<0) p = nodeAddress.lastIndexOf(":");
if (p>0)
zoneName = nodeAddress.substring(0, p);
}
return StringUtils.isBlank(zoneName)
? UUID.randomUUID().toString()
: zoneName;
}
@Override
public String getZoneIdFor(@NonNull NodeRegistryEntry entry) {
log.debug("getZoneIdFor: {}: NRE: {}", entry.getClientId(), entry);
Map<String, String> preregInfo = entry.getPreregistration();
log.trace("getZoneIdFor: {}: Preregistration-Info: {}", entry.getClientId(), preregInfo);
if (preregInfo!=null) {
String zoneId = preregInfo.get("zone-id");
log.debug("getZoneIdFor: {}: Zone-Id: {}", entry.getClientId(), zoneId);
if (StringUtils.isNotBlank(zoneId)) {
log.debug("getZoneIdFor: {}: Found Zone-Id in Preregistration-Info: {}", entry.getClientId(), zoneId);
return zoneId;
}
}
log.debug("getZoneIdFor: {}: Not found Zone-Id in Preregistration-Info. Returning null", entry.getClientId());
return null;
}
@Override
public synchronized void nodeAdded(ClientShellCommand csc, ClusteringCoordinator coordinator, IClusterZone zone) {
if (zone.getNodes().size() < 2)
......
/*
* Copyright (C) 2017-2022 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
* Esper library is used, in which case it is subject to the terms of General Public License v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.melodic.event.baguette.server.coordinator.cluster;
import eu.melodic.event.baguette.server.ClientShellCommand;
import eu.melodic.event.baguette.server.NodeRegistryEntry;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringSubstitutor;
import org.springframework.context.expression.MapAccessor;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Detects the Cluster/Zone the given node must be added,
* using node's pre-registration info and a set of configured rules
*/
@Slf4j
public class ClusterZoneDetector implements IClusterZoneDetector {
private final static List<String> DEFAULT_ZONE_DETECTION_RULES = Arrays.asList(
"'${zone:-}'",
"'${zone-id:-}'",
"'${region:-}'",
"'${region-id:-}'",
"'${cloud:-}'",
"'${cloud-id:-}'",
"'${provider:-}'",
"'${provider-id:-}'",
// "'Cluster_'+T(java.lang.System).currentTimeMillis()",
// "'Cluster_'+T(java.util.UUID).randomUUID()",
"'Cluster_'+T(java.time.OffsetDateTime).now().toString()",
""
);
private final static RULE_TYPE DEFAULT_RULES_TYPE = RULE_TYPE.SPEL;
private final static List<String> DEFAULT_ZONES = Collections.singletonList("DEFAULT_CLUSTER");
private final static ASSIGNMENT_TO_DEFAULT_CLUSTERS DEFAULT_ASSIGNMENT_TO_DEFAULT_CLUSTERS = ASSIGNMENT_TO_DEFAULT_CLUSTERS.RANDOM;
enum RULE_TYPE { SPEL, MAP }
enum ASSIGNMENT_TO_DEFAULT_CLUSTERS { RANDOM, SEQUENTIAL }
private RULE_TYPE clusterDetectionRulesType = DEFAULT_RULES_TYPE;
private List<String> clusterDetectionRules = DEFAULT_ZONE_DETECTION_RULES;
private List<String> defaultClusters = DEFAULT_ZONES;
private ASSIGNMENT_TO_DEFAULT_CLUSTERS assignmentToDefaultClusters = DEFAULT_ASSIGNMENT_TO_DEFAULT_CLUSTERS;
private SpelExpressionParser parser = new SpelExpressionParser();
private AtomicInteger currentDefaultCluster = new AtomicInteger(0);
@Override
public void setProperties(Map<String, String> zoneConfig) {
log.debug("ClusterZoneDetector: setProperties: BEGIN: config: {}", zoneConfig);
// Get rules type (Map keys or SpEL expressions)
RULE_TYPE rulesType = RULE_TYPE.valueOf(
zoneConfig.getOrDefault("cluster-detector-rules-type", DEFAULT_RULES_TYPE.toString()).toUpperCase());
// Get rules texts and separator
String separator = zoneConfig.getOrDefault("cluster-detector-rules-separator", ",");
String rulesStr = zoneConfig.getOrDefault("cluster-detector-rules", null);
if (StringUtils.isNotBlank(rulesStr)) {
List<String> rulesList = Arrays.stream(rulesStr.split(separator))
.filter(StringUtils::isNotBlank)
.map(String::trim)
.map(String::trim)
.collect(Collectors.toList());
clusterDetectionRules = (rulesList.size()>0) ? rulesList : DEFAULT_ZONE_DETECTION_RULES;
clusterDetectionRulesType = (rulesList.size()>0) ? rulesType : DEFAULT_RULES_TYPE;
}
// Get the default cluster(s)
List<String> defaultsList = Arrays.stream(zoneConfig.getOrDefault("default-clusters", "").split(","))
.filter(StringUtils::isNotBlank)
.map(String::trim)
.collect(Collectors.toList());
defaultClusters = (defaultsList.size()>0) ? defaultsList : DEFAULT_ZONES;
// Get assignment method to default clusters
assignmentToDefaultClusters = ASSIGNMENT_TO_DEFAULT_CLUSTERS.valueOf(
zoneConfig.getOrDefault("assignment-to-default-clusters", DEFAULT_ASSIGNMENT_TO_DEFAULT_CLUSTERS.toString().toUpperCase()));
log.debug("ClusterZoneDetector: setProperties: clusterDetectionRulesType: {}", clusterDetectionRulesType);
log.debug("ClusterZoneDetector: setProperties: clusterDetectionRules: {}", clusterDetectionRules);
log.debug("ClusterZoneDetector: setProperties: defaultClusters: {}", defaultClusters);
log.debug("ClusterZoneDetector: setProperties: assignmentToDefaultClusters: {}", assignmentToDefaultClusters);
}
@Override
public String getZoneIdFor(ClientShellCommand csc) {
log.trace("ClusterZoneDetector: getZoneIdFor: BEGIN: CSC: {}", csc);
return csc.getClientZone()==null || StringUtils.isBlank(csc.getClientZone().getId())
? getZoneIdFor(csc.getNodeRegistryEntry())
: csc.getClientZone().getId();
}
@Override
public String getZoneIdFor(NodeRegistryEntry entry) {
log.trace("ClusterZoneDetector: getZoneIdFor: BEGIN: NRE: {}", entry);
final Map<String, String> info = entry.getPreregistration();
// Select and initialize the right valueMapper based on rules type
log.trace("ClusterZoneDetector: getZoneIdFor: PREREGISTRATION-INFO: {}", info);
Function<String,String> valueMapper;
switch (clusterDetectionRulesType) {
case SPEL:
StandardEvaluationContext context = new StandardEvaluationContext(info);
context.addPropertyAccessor(new MapAccessor());
valueMapper = expression -> {
log.trace("ClusterZoneDetector: getZoneIdFor: Expression: {}", expression);
expression = StringSubstitutor.replace(expression, info);
expression = StringSubstitutor.replaceSystemProperties(expression);
log.trace("ClusterZoneDetector: getZoneIdFor: SpEL expr.: {}", expression);
String result = parser.parseRaw(expression).getValue(context, String.class);
log.trace("ClusterZoneDetector: getZoneIdFor: Result: {}", result);
return StringUtils.isBlank(result) ? null : result.trim();
};
break;
case MAP:
valueMapper = info::get;
break;
default:
throw new IllegalArgumentException("Unsupported RULE_TYPE: "+ clusterDetectionRulesType);
}
// Process rules one-by-one, using valueMapper, until one rule yields a non-blank value
String zoneId = clusterDetectionRules.stream()
.filter(StringUtils::isNotBlank)
.peek(s -> log.trace("ClusterZoneDetector: getZoneIdFor: RULE: {}", s))
.map(valueMapper)
.peek(s -> log.trace("ClusterZoneDetector: getZoneIdFor: RESULT: {}", s))
.filter(StringUtils::isNotBlank)
.findFirst()
.orElse(null);
log.debug("ClusterZoneDetector: getZoneIdFor: Intermediate: zoneId: {}", zoneId);
// If all rules yielded blank values then a default cluster id will be selected, using the assignment method
if (StringUtils.isBlank(zoneId)) {
switch (assignmentToDefaultClusters) {
case RANDOM:
zoneId = defaultClusters.get((int) (Math.random() * defaultClusters.size()));
break;
case SEQUENTIAL:
zoneId = defaultClusters.get(currentDefaultCluster.getAndUpdate(operand -> (operand + 1) % defaultClusters.size()));
break;
default:
throw new IllegalArgumentException("Unsupported ASSIGNMENT_TO_DEFAULT_CLUSTERS: "+assignmentToDefaultClusters);
}
}
log.debug("ClusterZoneDetector: getZoneIdFor: END: zoneId: {}", zoneId);
return zoneId;
}
}
......@@ -33,6 +33,7 @@ public class ClusteringCoordinator extends NoopCoordinator {
private final Map<String, ClusterZone> topologyMap = new HashMap<>();
private IClusterZoneDetector clusterZoneDetector;
private IZoneManagementStrategy zoneManagementStrategy;
private int zoneStartPort = 1200;
private int zoneEndPort = 65535;
......@@ -42,7 +43,7 @@ public class ClusteringCoordinator extends NoopCoordinator {
private GROUPING aggregatorGrouping;
private GROUPING lastLevelGrouping;
private Map<String,NodeRegistryEntry> ignoredNodes = new LinkedHashMap<>();
private final Map<String,NodeRegistryEntry> ignoredNodes = new LinkedHashMap<>();
public Collection<String> getClusterIdSet() { return topologyMap.keySet(); }
public Collection<IClusterZone> getClusters() { return topologyMap.values().stream().map(c->(IClusterZone)c).collect(Collectors.toList()); }
......@@ -97,6 +98,20 @@ public class ClusteringCoordinator extends NoopCoordinator {
? Integer.parseInt(zoneConfig.get("zone-port-end")) : zoneEndPort;
zoneKeystoreFileNameFormatter = zoneConfig.containsKey("zone-keystore-file-name-formatter")
? zoneConfig.get("zone-keystore-file-name-formatter") : zoneKeystoreFileNameFormatter;
// Initialize Cluster Detector
String clusterDetectorClass = zoneConfig.get("cluster-detector-class");
if (StringUtils.isNotBlank(clusterDetectorClass)) {
Class<?> clazz = Class.forName(clusterDetectorClass);
if (clazz.isAssignableFrom(IClusterZoneDetector.class))
clusterZoneDetector = (IClusterZoneDetector) clazz.newInstance();
else
throw new IllegalArgumentException("Invalid Cluster Detector class. Not implementing IClusterZoneDetector interface: "+clazz.getName());
} else {
clusterZoneDetector = new ClusterZoneDetector();
}
clusterZoneDetector.setProperties(zoneConfig);
log.info("Cluster Detector class: {}", clusterZoneDetector.getClass().getName());
}
@Override
......@@ -108,7 +123,10 @@ public class ClusteringCoordinator extends NoopCoordinator {
String clientId1 = csc.getId();
String clientId2 = csc.getClientId();
String clientId3 = args[2];
log.trace("processClientInput: csc.zone: {}", csc.getClientZone()!=null ? csc.getClientZone().getId() : null);
log.trace("processClientInput: topology-map: {}", topologyMap.keySet());
ClusterZone zone = findZone(csc);
log.trace("processClientInput: zone={}", zone);
zone.setAggregator(csc);
log.info("Updated aggregator of zone: {} -- New aggregator: {} @ {} ({})",
zone.getId(), clientId1, csc.getClientIpAddress(), clientId2);
......@@ -117,7 +135,7 @@ public class ClusteringCoordinator extends NoopCoordinator {
}
private ClusterZone findZone(ClientShellCommand csc) {
String zoneId = zoneManagementStrategy.getZoneIdFor(csc);
String zoneId = clusterZoneDetector.getZoneIdFor(csc);
return topologyMap.get(zoneId);
}
......@@ -159,7 +177,7 @@ public class ClusteringCoordinator extends NoopCoordinator {
log.debug("ClusteringCoordinator: preregister: Adding node without EMS client: node={}, state={}", entry.getNodeIdAndAddress(), entry.getState());
// Assign node-without-client in a zone
String zoneId = zoneManagementStrategy.getZoneIdFor(entry);
String zoneId = clusterZoneDetector.getZoneIdFor(entry);
log.debug("ClusteringCoordinator: preregister: New entry: node={}, zone-id={}", entry.getNodeIdAndAddress(), zoneId);
if (log.isTraceEnabled()) {
log.trace("preregister: topologyMap: BEFORE: keys={}", topologyMap.keySet());
......@@ -281,7 +299,7 @@ public class ClusteringCoordinator extends NoopCoordinator {
private synchronized void addNodeInTopology(ClientShellCommand csc) {
// Assign client in a zone
String zoneId = zoneManagementStrategy.getZoneIdFor(csc);
String zoneId = clusterZoneDetector.getZoneIdFor(csc);
log.debug("addNodeInTopology: New client: id={}, address={}, zone-id={}", csc.getId(), csc.getClientIpAddress(), zoneId);
ClusterZone zone = topologyMap.computeIfAbsent(zoneId, this::createClusterZone);
log.trace("addNodeInTopology: Zone members: BEFORE: {}", zone.getNodes());
......@@ -314,7 +332,7 @@ public class ClusteringCoordinator extends NoopCoordinator {
private synchronized void removeNodeFromTopology(ClientShellCommand csc) {
// Assign client in a zone
String zoneId = zoneManagementStrategy.getZoneIdFor(csc);
String zoneId = clusterZoneDetector.getZoneIdFor(csc);
ClusterZone zone = topologyMap.get(zoneId);
if (zone == null) {
log.warn("removeNodeFromTopology: Not Registered client removed: client={}, address={}", csc.getId(), csc.getClientIpAddress());
......
......@@ -36,39 +36,6 @@ public class DefaultZoneManagementStrategy implements IZoneManagementStrategy {
log.warn("DefaultZoneManagementStrategy: Node connection from an already registered IP address: {} @ {}", csc.getId(), csc.getClientIpAddress());
}
@Override
public String getZoneIdFor(ClientShellCommand c) {
String nodeAddress = c.getClientIpAddress();
String hostname = c.getClientHostname();
log.debug("getZoneIdFor: {}: address: {}", c.getId(), nodeAddress);
log.debug("getZoneIdFor: {}: hostname: {}", c.getId(), hostname);
log.debug("getZoneIfFor: {}: NRE = {}", c.getId(), c.getNodeRegistryEntry());
String zoneName = getZoneIdFor(c.getNodeRegistryEntry());
log.debug("getZoneIfFor: {}: zoneName = {}", c.getId(), zoneName);
if (StringUtils.isBlank(zoneName) && StringUtils.isNotBlank(hostname) && !InetAddresses.isUriInetAddress(hostname)) {
int p = hostname.indexOf(".");
if (p>0)
zoneName = hostname.substring(p+1);
}
if (StringUtils.isBlank(zoneName) && StringUtils.isNotBlank(nodeAddress)) {
int p = nodeAddress.lastIndexOf(".");
if (p<0) p = nodeAddress.lastIndexOf(":");
if (p>0)
zoneName = nodeAddress.substring(0, p);
}
return StringUtils.isBlank(zoneName)
? UUID.randomUUID().toString()
: zoneName;
}
@Override
public String getZoneIdFor(NodeRegistryEntry entry) {
if (entry==null) return null;
String zoneId = entry.getPreregistration().get("zone-id");
log.debug("getZoneIdFor: {} @ {}: Zone-Id in Preregistration-Info: {}", entry.getClientId(), entry.getIpAddress(), zoneId);
return zoneId;
}
@Override
public synchronized void nodeAdded(ClientShellCommand csc, ClusteringCoordinator coordinator, IClusterZone zone) {
// Instruct new node to join cluster
......
/*
* Copyright (C) 2017-2022 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
* Esper library is used, in which case it is subject to the terms of General Public License v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.melodic.event.baguette.server.coordinator.cluster;
import eu.melodic.event.baguette.server.ClientShellCommand;
import eu.melodic.event.baguette.server.NodeRegistryEntry;
import java.util.Map;
public interface IClusterZoneDetector {
String getZoneIdFor(ClientShellCommand csc);
String getZoneIdFor(NodeRegistryEntry entry);
void setProperties(Map<String,String> zoneConfig);
}
......@@ -15,9 +15,6 @@ import eu.melodic.event.baguette.server.NodeRegistryEntry;
import java.util.Map;
public interface IZoneManagementStrategy {
String getZoneIdFor(ClientShellCommand csc);
String getZoneIdFor(NodeRegistryEntry entry);
default boolean allowAlreadyPreregisteredNode(Map<String,Object> nodeInfo) { return true; }
default boolean allowAlreadyPreregisteredNode(NodeRegistryEntry entry) { return true; }
default boolean allowAlreadyRegisteredNode(ClientShellCommand csc) { return true; }
......
......@@ -27,6 +27,12 @@ baguette.server.coordinatorConfig.clustering.parameters.zone-management-strategy
baguette.server.coordinatorConfig.clustering.parameters.zone-port-start = 2000
baguette.server.coordinatorConfig.clustering.parameters.zone-port-end = 2999
baguette.server.coordinatorConfig.clustering.parameters.zone-keystore-file-name-formatter = ${LOGS_DIR:logs}/cluster_${TIMESTAMP}_${ZONE_ID}.p12
#baguette.server.coordinatorConfig.clustering.parameters.cluster-detector-class = eu.melodic.event.baguette.server.coordinator.cluster.ClusterZoneDetector
#baguette.server.coordinatorConfig.clustering.parameters.cluster-detector-rules-type = MAP
#baguette.server.coordinatorConfig.clustering.parameters.cluster-detector-rules-separator = ,
#baguette.server.coordinatorConfig.clustering.parameters.cluster-detector-rules = zone, zone-id, region, region-id, cloud, cloud-id, provider, provider-id
#baguette.server.coordinatorConfig.clustering.parameters.default-clusters = DEFAULT_CLUSTER_A, DEFAULT_CLUSTER_B
#baguette.server.coordinatorConfig.clustering.parameters.assignment-to-default-clusters = RANDOM
baguette.server.coordinatorConfig.2level.coordinatorClass = eu.melodic.event.baguette.server.coordinator.TwoLevelCoordinator
baguette.server.coordinatorConfig.noop.coordinatorClass = eu.melodic.event.baguette.server.coordinator.NoopCoordinator
......@@ -46,4 +52,4 @@ baguette.server.credentials.bb=yy
#baguette.server.debug.client-address-override-allowed=true
baguette.server.client-id-format.escape = ~
#baguette.server.client-id-format = ~{type}-~{operatingSystem}-~{id}-~{name}-~{providerId}-~{ip}-~{random}
baguette.server.client-id-format = ~{type}-~{operatingSystem}-~{id}-~{name}-~{provider}-~{address}-~{random}
baguette.server.client-id-format = ~{type:-_}-~{operatingSystem:-_}-~{id:-_}-~{name:-_}-~{provider:-_}-~{address:-_}-~{random:-_}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment