Commit 02678d3b authored by I Patini's avatar I Patini
Browse files

EMS: Control Service: Fixed TopicBeacon to *not* send slo-violation-detector...

EMS: Control Service: Fixed TopicBeacon to *not* send slo-violation-detector events, if they are going to be empty. Added model version in prediction orchestrator and slo-violation-detector events.
parent eba1dd0d
Pipeline #19864 passed with stages
in 41 minutes and 3 seconds
......@@ -686,7 +686,7 @@ public class ControlServiceCoordinator implements InitializingBean {
.collect(Collectors.toSet());
}*/
public Object getSLOMetricDecomposition(String camelModelId) {
public @NonNull Map<String,Object> getSLOMetricDecomposition(String camelModelId) {
List<Object> slos = _getSLOMetricDecomposition(camelModelId);
Map<String,Object> result = new HashMap<>();
result.put("name", "_");
......@@ -695,7 +695,7 @@ public class ControlServiceCoordinator implements InitializingBean {
return result;
}
public List<Object> _getSLOMetricDecomposition(String camelModelId) {
public @NonNull List<Object> _getSLOMetricDecomposition(String camelModelId) {
TranslationContext _tc = camelToTcCache.get(camelModelId);
if (_tc==null) return Collections.emptyList();
......@@ -760,7 +760,7 @@ public class ControlServiceCoordinator implements InitializingBean {
return null;
}
public Set<TranslationContext.MetricContext> getMetricContextsForPrediction(String camelModelId) {
public @NonNull Set<TranslationContext.MetricContext> getMetricContextsForPrediction(String camelModelId) {
log.debug("getMetricContextsForPrediction: BEGIN: {}", camelModelId);
TranslationContext _tc = camelToTcCache.get(camelModelId);
if (_tc==null) {
......
......@@ -18,6 +18,7 @@ import eu.melodic.event.control.ControlServiceCoordinator;
import eu.melodic.event.translate.TranslationContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.SetUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
......@@ -28,6 +29,7 @@ import org.springframework.stereotype.Service;
import javax.jms.JMSException;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@Slf4j
......@@ -64,6 +66,8 @@ public class TopicBeacon implements InitializingBean {
private TaskScheduler scheduler;
private Gson gson;
private String previousModelId = "";
private final AtomicLong modelVersion = new AtomicLong(0);
@Override
public void afterPropertiesSet() throws Exception {
......@@ -91,6 +95,7 @@ public class TopicBeacon implements InitializingBean {
public void transmitInfo() throws JMSException {
log.debug("Topic Beacon: Start transmitting info: {}", new Date());
updateModelVersion();
transmitHeartbeat();
transmitThresholdInfo();
transmitInstanceInfo();
......@@ -152,14 +157,14 @@ public class TopicBeacon implements InitializingBean {
//log.debug("Topic Beacon: transmitPredictionInfo: DAG Global-Level Metrics: {}", topLevelMetrics);
Set<TranslationContext.MetricContext> metricContexts = coordinator.getMetricContextsForPrediction(modelId);
log.debug("Topic Beacon: transmitPredictionInfo: Metric Contexts for prediction: {}", metricContexts);
if (metricContexts==null)
return;
// Convert to Translator-to-Forecasting Methods event format
final long currVersion = modelVersion.get();
List<HashMap<String, Object>> payload = metricContexts.stream().map(s -> {
HashMap<String, Object> map = new HashMap<>();
map.put("metric", s.getName());
map.put("level", 3);
map.put("version", currVersion);
map.put("publish_rate", s.getSchedule()!=null
? s.getSchedule().getIntervalInMillis() :
beaconPredictionRate);
......@@ -169,7 +174,7 @@ public class TopicBeacon implements InitializingBean {
// Skip event sending if payload is empty
if (payload.size()==0) {
log.debug("Topic Beacon: Transmitting Prediction info: Event is empty. Not sending anything");
log.debug("Topic Beacon: transmitSloViolatorInfo: Payload is empty. Not sending event");
return;
}
......@@ -190,17 +195,16 @@ public class TopicBeacon implements InitializingBean {
String modelId = coordinator.getCurrentCamelModelId();
log.trace("Topic Beacon: transmitSloViolatorInfo: current-camel-model-id: {}", modelId);
//List<Object> sloMetricDecompositions = coordinator.getSLOMetricDecomposition(modelId);
Object sloMetricDecompositions = coordinator.getSLOMetricDecomposition(modelId);
if (sloMetricDecompositions==null)
return;
Map<String, Object> sloMetricDecompositions = coordinator.getSLOMetricDecomposition(modelId);
log.debug("Topic Beacon: transmitSloViolatorInfo: SLO metric decompositions: {}", sloMetricDecompositions);
// Skip event sending if payload is empty
if (!(sloMetricDecompositions instanceof Map) || ((Map) sloMetricDecompositions).size() == 0) {
log.debug("Topic Beacon: transmitSloViolatorInfo: Event is empty. Not sending anything");
if (sloMetricDecompositions.get("constraints") == null || ((List) sloMetricDecompositions.get("constraints")).size() == 0) {
log.debug("Topic Beacon: transmitSloViolatorInfo: Payload is empty. Not sending event");
return;
}
sloMetricDecompositions.put("version", modelVersion.get());
String eventPayload = gson.toJson(sloMetricDecompositions);
log.debug("Topic Beacon: Transmitting SLO Violator info: event={}, topics={}", eventPayload, beaconSloViolationDetectorTopics);
......@@ -232,4 +236,17 @@ public class TopicBeacon implements InitializingBean {
log.debug("Topic Beacon: Event sent to topic: event={}, topic={}", event, topicName);
}
}
private synchronized boolean updateModelVersion() {
String modelId = coordinator.getCurrentCamelModelId();
boolean versionChanged = ! StringUtils.defaultIfBlank(modelId, "").equals(previousModelId);
log.trace("Topic Beacon: updateModelVersion: previousModelId='{}', modelId='{}', version={}, version-changed={}",
previousModelId, modelId, modelVersion.get(), versionChanged);
if (versionChanged) {
long newVersion = modelVersion.incrementAndGet();
log.info("Topic Beacon: updateModelVersion: Model changed: {} -> {}, version: {}", previousModelId, modelId, newVersion);
previousModelId = modelId;
}
return versionChanged;
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment