Commit 1bef34aa authored by I Patini's avatar I Patini
Browse files

EMS: Control Service: Moved TopicBeacon settings into the newly added class...

EMS: Control Service: Moved TopicBeacon settings into the newly added class TopicBeaconProperties. Updated config. files accordingly.
parent 63a18cc9
......@@ -154,16 +154,16 @@ control.exit-grace-period = 10
control.exit-code = 0
### Topic Beacon settings
beacon.enable = true
beacon.enabled = true
beacon.initial-delay = 60000
beacon.delay = 60000
#beacon.rate = 60000
beacon.topics.heartbeat =
beacon.topics.threshold = _ui_threshold_info
beacon.topics.instance = _ui_instance_info
beacon.topics.prediction = metrics_to_predict
beacon.topics.prediction.rate = 60000
beacon.topics.slo-violation-detector = metric.metric_list
beacon.heartbeat-topics =
beacon.threshold-topics = _ui_threshold_info
beacon.instance-topics = _ui_instance_info
beacon.prediction-topics = metrics_to_predict
beacon.prediction-rate = 60000
beacon.slo-violation-detector-topics = metric.metric_list
################################################################################
### Management and Endpoint settings
......
/*
* Copyright (C) 2017-2022 Institute of Communication and Computer Systems (imu.iccs.gr)
*
* This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
* Esper library is used, in which case it is subject to the terms of General Public License v2.0.
* If a copy of the MPL was not distributed with this file, you can obtain one at
* https://www.mozilla.org/en-US/MPL/2.0/
*/
package eu.melodic.event.control.properties;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.annotation.Validated;
import javax.validation.constraints.Min;
import java.util.HashSet;
import java.util.Set;
@Slf4j
@Data
@Validated
@Configuration
@ConfigurationProperties(prefix = "beacon")
public class TopicBeaconProperties {
private boolean enabled = true;
@Min(0) private long initialDelay = 60000;
@Min(1) private long delay = 60000;
@Min(1) private long rate = 60000;
private Set<String> heartbeatTopics = new HashSet<>();
private Set<String> thresholdTopics = new HashSet<>();
private Set<String> instanceTopics = new HashSet<>();
private Set<String> predictionTopics = new HashSet<>();
@Min(1) private long predictionRate = 60000;
private Set<String> sloViolationDetectorTopics = new HashSet<>();
}
......@@ -15,13 +15,13 @@ import eu.melodic.event.baguette.server.NodeRegistryEntry;
import eu.melodic.event.brokercep.BrokerCepService;
import eu.melodic.event.brokercep.event.EventMap;
import eu.melodic.event.control.ControlServiceCoordinator;
import eu.melodic.event.control.properties.TopicBeaconProperties;
import eu.melodic.event.translate.TranslationContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.SetUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Service;
......@@ -36,27 +36,8 @@ import java.util.stream.Collectors;
@Service
@EnableScheduling
public class TopicBeacon implements InitializingBean {
// Topic Beacon settings
@Value("${beacon.enable:true}")
private boolean beaconEnable;
@Value("${beacon.initial-delay:60000}")
private long beaconInitialDelay;
@Value("${beacon.delay:60000}")
private long beaconDelay;
@Value("${beacon.rate:60000}")
private long beaconRate;
@Value("${beacon.topics.heartbeat:}")
private Set<String> beaconHeartbeatTopics;
@Value("${beacon.topics.threshold:}")
private Set<String> beaconThresholdTopics;
@Value("${beacon.topics.instance:}")
private Set<String> beaconInstanceTopics;
@Value("${beacon.topics.prediction:}")
private Set<String> beaconPredictionTopics;
@Value("${beacon.topics.prediction.rate:60000}")
private long beaconPredictionRate;
@Value("${beacon.topics.slo-violation-detector:}")
private Set<String> beaconSloViolationDetectorTopics;
@Autowired
private TopicBeaconProperties properties;
@Autowired
private ControlServiceCoordinator coordinator;
......@@ -71,7 +52,7 @@ public class TopicBeacon implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
if (!beaconEnable) {
if (!properties.isEnabled()) {
log.warn("Topic Beacon is disabled");
return;
}
......@@ -80,17 +61,18 @@ public class TopicBeacon implements InitializingBean {
gson = new GsonBuilder().disableHtmlEscaping().create();
// configure and start scheduler
Date startTime = new Date(System.currentTimeMillis()+beaconInitialDelay);
Date startTime = new Date(System.currentTimeMillis() + properties.getInitialDelay());
log.debug("Topic Beacon settings: init-delay={}, delay={}, heartbeat-topics={}, threshold-topics={}, instance-topics={}",
beaconInitialDelay, beaconDelay, beaconHeartbeatTopics, beaconThresholdTopics, beaconInstanceTopics);
properties.getInitialDelay(), properties.getDelay(), properties.getHeartbeatTopics(), properties.getThresholdTopics(),
properties.getInstanceTopics());
scheduler.scheduleWithFixedDelay(() -> {
try {
transmitInfo();
} catch (Exception e) {
log.error("Topic Beacon: Exception while sending info: ", e);
}
}, startTime, beaconDelay);
log.info("Topic Beacon started: init-delay={}ms", beaconInitialDelay);
}, startTime, properties.getDelay());
log.info("Topic Beacon started: init-delay={}ms", properties.getInitialDelay());
}
public void transmitInfo() throws JMSException {
......@@ -105,15 +87,15 @@ public class TopicBeacon implements InitializingBean {
}
public void transmitHeartbeat() throws JMSException {
if (SetUtils.emptyIfNull(beaconHeartbeatTopics).isEmpty()) return;
if (SetUtils.emptyIfNull(properties.getHeartbeatTopics()).isEmpty()) return;
String message = "TOPIC BEACON HEARTBEAT "+new Date();
log.debug("Topic Beacon: Transmitting Heartbeat info: message={}, topics={}", message, beaconHeartbeatTopics);
sendMessageToTopics(message, beaconHeartbeatTopics);
log.debug("Topic Beacon: Transmitting Heartbeat info: message={}, topics={}", message, properties.getHeartbeatTopics());
sendMessageToTopics(message, properties.getHeartbeatTopics());
}
public void transmitThresholdInfo() {
if (SetUtils.emptyIfNull(beaconThresholdTopics).isEmpty()) return;
if (SetUtils.emptyIfNull(properties.getThresholdTopics()).isEmpty()) return;
if (coordinator.getTranslationContextOfCamelModel(coordinator.getCurrentCamelModelId())==null)
return;
......@@ -121,35 +103,35 @@ public class TopicBeacon implements InitializingBean {
.getMetricConstraints()
.forEach(c -> {
String message = gson.toJson(c);
log.debug("Topic Beacon: Transmitting Metric Constraint threshold info: message={}, topics={}",message, beaconThresholdTopics);
log.debug("Topic Beacon: Transmitting Metric Constraint threshold info: message={}, topics={}",message, properties.getThresholdTopics());
try {
sendEventToTopics(message, beaconThresholdTopics);
sendEventToTopics(message, properties.getThresholdTopics());
} catch (JMSException e) {
log.error("Topic Beacon: EXCEPTION while transmitting Metric Constraint threshold info: message={}, topics={}, exception: ",
message, beaconThresholdTopics, e);
message, properties.getThresholdTopics(), e);
}
});
}
public void transmitInstanceInfo() throws JMSException {
if (SetUtils.emptyIfNull(beaconInstanceTopics).isEmpty()) return;
if (SetUtils.emptyIfNull(properties.getInstanceTopics()).isEmpty()) return;
if (coordinator.getBaguetteServer().isServerRunning()) {
log.debug("Topic Beacon: Transmitting Instance info: topics={}", beaconInstanceTopics);
log.debug("Topic Beacon: Transmitting Instance info: topics={}", properties.getInstanceTopics());
for (NodeRegistryEntry node : coordinator.getBaguetteServer().getNodeRegistry().getNodes()) {
String nodeName = node.getPreregistration().getOrDefault("name", "");
String nodeIp = node.getIpAddress();
//String nodeIp = node.getPreregistration().getOrDefault("ip","");
String message = gson.toJson(node);
log.debug("Topic Beacon: Transmitting Instance info for: instance={}, ip-address={}, message={}, topics={}",
nodeName, nodeIp, message, beaconInstanceTopics);
sendEventToTopics(message, beaconInstanceTopics);
nodeName, nodeIp, message, properties.getInstanceTopics());
sendEventToTopics(message, properties.getInstanceTopics());
}
}
}
public void transmitPredictionInfo() {
if (SetUtils.emptyIfNull(beaconPredictionTopics).isEmpty()) return;
if (SetUtils.emptyIfNull(properties.getPredictionTopics()).isEmpty()) return;
String modelId = coordinator.getCurrentCamelModelId();
log.trace("Topic Beacon: transmitPredictionInfo: current-camel-model-id: {}", modelId);
......@@ -167,7 +149,7 @@ public class TopicBeacon implements InitializingBean {
map.put("version", currVersion);
map.put("publish_rate", s.getSchedule()!=null
? s.getSchedule().getIntervalInMillis() :
beaconPredictionRate);
properties.getPredictionRate());
return map;
}).collect(Collectors.toList());
log.debug("Topic Beacon: Transmitting Prediction info: Metric Contexts in event format: {}", payload);
......@@ -180,17 +162,17 @@ public class TopicBeacon implements InitializingBean {
String eventPayload = gson.toJson(payload);
log.debug("Topic Beacon: Transmitting Prediction info: event={}, topics={}", eventPayload, beaconPredictionTopics);
log.debug("Topic Beacon: Transmitting Prediction info: event={}, topics={}", eventPayload, properties.getPredictionTopics());
try {
sendMessageToTopics(eventPayload, beaconPredictionTopics);
sendMessageToTopics(eventPayload, properties.getPredictionTopics());
} catch (JMSException e) {
log.error("Topic Beacon: EXCEPTION while transmitting Prediction info: event={}, topics={}, exception: ",
eventPayload, beaconPredictionTopics, e);
eventPayload, properties.getPredictionTopics(), e);
}
}
public void transmitSloViolatorInfo() {
if (SetUtils.emptyIfNull(beaconSloViolationDetectorTopics).isEmpty()) return;
if (SetUtils.emptyIfNull(properties.getSloViolationDetectorTopics()).isEmpty()) return;
String modelId = coordinator.getCurrentCamelModelId();
log.trace("Topic Beacon: transmitSloViolatorInfo: current-camel-model-id: {}", modelId);
......@@ -207,12 +189,12 @@ public class TopicBeacon implements InitializingBean {
sloMetricDecompositions.put("version", modelVersion.get());
String eventPayload = gson.toJson(sloMetricDecompositions);
log.debug("Topic Beacon: Transmitting SLO Violator info: event={}, topics={}", eventPayload, beaconSloViolationDetectorTopics);
log.debug("Topic Beacon: Transmitting SLO Violator info: event={}, topics={}", eventPayload, properties.getSloViolationDetectorTopics());
try {
sendMessageToTopics(eventPayload, beaconSloViolationDetectorTopics);
sendMessageToTopics(eventPayload, properties.getSloViolationDetectorTopics());
} catch (JMSException e) {
log.error("Topic Beacon: EXCEPTION while transmitting SLO Violator info: event={}, topics={}, exception: ",
eventPayload, beaconPredictionTopics, e);
eventPayload, properties.getSloViolationDetectorTopics(), e);
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment