Commit f7762a62 authored by maciek riedl's avatar maciek riedl
Browse files

Merge branch 'mqadapterlogging' into 'morphemic-rc2.0'

Fixed errors in Mq-Adapter, shortened pom, added new Extractor, Changed InstanceInfo extractor

See merge request !141
parents 79b8a578 bdc560ef
Pipeline #21743 passed with stages
in 88 minutes
......@@ -15,6 +15,7 @@
<properties>
<!--DOCKER plugin properties-->
<docker.imageName>mq-http-adapter</docker.imageName>
<lombok.version>1.18.16</lombok.version>
</properties>
<dependencies>
......@@ -35,23 +36,12 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
......
......@@ -17,6 +17,10 @@ public class MorphemicTopicsMatcher {
return "topic://training_models".equals(topic);
}
public static boolean isStartEnsembler(String topic) {
return "topic://start_ensembler".equals(topic);
}
public static boolean isMetricsToPredict(String topic) {
return "topic://metrics_to_predict".equals(topic);
}
......@@ -31,6 +35,9 @@ public class MorphemicTopicsMatcher {
public static boolean isMetricTopic(String topic) {
Pattern pattern = Pattern.compile("topic://\\w+");
return (!isTrainingModels(topic)) && (!isMetricsToPredict(topic)) && pattern.matcher(topic).matches();
return (!isTrainingModels(topic)) && (!isMetricsToPredict(topic)) &&
(!isThresholdTopic(topic)) && (!isInstanceInfoTopic(topic)) &&
(!isStartEnsembler(topic)) &&
pattern.matcher(topic).matches();
}
}
......@@ -20,4 +20,5 @@ public class MqConstants {
public static final String META_MESSAGE_IDENTIFIER = "message=";
public static final String META_TIMESTAMP_IDENTIFIER = ", timestamp=";
public static final String META_MESSAGE_END_IDENTIFIER = "}}";
}
......@@ -4,7 +4,6 @@ import eu.melodic.event.brokerclient.BrokerClient;
import eu.melodic.upperware.activemqtorest.MelodicConfiguration;
import eu.melodic.upperware.activemqtorest.activemq.extraction.IMqDataEntryExtractor;
import eu.melodic.upperware.activemqtorest.entry.MqBaseEntry;
import eu.melodic.upperware.activemqtorest.entry.MqDefaultMetricEntry;
import eu.melodic.upperware.activemqtorest.influxdb.InfluxDbConnector;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQMessage;
......
package eu.melodic.upperware.activemqtorest.activemq.extraction;
import java.util.Arrays;
import java.util.Optional;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import eu.melodic.upperware.activemqtorest.MelodicConfiguration;
import eu.melodic.upperware.activemqtorest.activemq.MqAdapterStatusHolder;
import eu.melodic.upperware.activemqtorest.activemq.MqConstants;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class MqDataEntryBaseExtractor {
@Autowired
MelodicConfiguration melodicConfiguration;
@Autowired
MqAdapterStatusHolder mqAdapterStatusHolder;
String extractUsedSeparator(String[] keyValuePairs) {
int delimiterConsistentCounter = (int) Arrays.stream(keyValuePairs).filter(string -> string.contains(MqConstants.VALUE_SEPARATOR_JSON)).count();
if (delimiterConsistentCounter == keyValuePairs.length) {
return MqConstants.VALUE_SEPARATOR_JSON;
}
return MqConstants.VALUE_SEPARATOR_DEFAULT;
}
String normalizeMqString(String mqString) {
return mqString.trim().replaceAll("\"", "");
}
String extractPayload(String rawPayload) {
return StringUtils.substringBetween(rawPayload, "{", "}");
}
Optional<JsonObject> extractJsonPayload(ActiveMQMessage activeMQMessage) {
String rawPayload = new String(activeMQMessage.getContent().getData());
int messageBegin = rawPayload.indexOf(MqConstants.META_MESSAGE_IDENTIFIER);
int messageEnd = rawPayload.indexOf(MqConstants.META_TIMESTAMP_IDENTIFIER);
String payload = rawPayload.substring(messageBegin + MqConstants.META_MESSAGE_IDENTIFIER.length(), messageEnd);
try {
JsonObject jsonObject = new JsonParser().parse(payload).getAsJsonObject();
return Optional.of(jsonObject);
} catch (JsonSyntaxException x) {
return Optional.empty();
}
}
}
package eu.melodic.upperware.activemqtorest.activemq.extraction;
import java.util.Arrays;
import java.util.Optional;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import eu.melodic.upperware.activemqtorest.MelodicConfiguration;
import eu.melodic.upperware.activemqtorest.activemq.MqAdapterStatusHolder;
import eu.melodic.upperware.activemqtorest.activemq.MqConstants;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class MqDataEntryBaseExtractor {
@Autowired
MelodicConfiguration melodicConfiguration;
@Autowired
MqAdapterStatusHolder mqAdapterStatusHolder;
String extractUsedSeparator(String[] keyValuePairs) {
int delimiterConsistentCounter = (int) Arrays.stream(keyValuePairs).filter(string -> string.contains(MqConstants.VALUE_SEPARATOR_JSON)).count();
if (delimiterConsistentCounter == keyValuePairs.length) {
return MqConstants.VALUE_SEPARATOR_JSON;
}
return MqConstants.VALUE_SEPARATOR_DEFAULT;
}
String normalizeMqString(String mqString) {
return mqString.trim().replaceAll("\"", "");
}
String extractPayload(String rawPayload) {
return StringUtils.substringBetween(rawPayload, "{", "}");
}
Optional<JsonObject> extractJsonPayload(ActiveMQMessage activeMQMessage) {
String rawPayload = new String(activeMQMessage.getContent().getData());
int messageBegin = rawPayload.indexOf(MqConstants.META_MESSAGE_IDENTIFIER);
int messageEnd = rawPayload.indexOf(MqConstants.META_MESSAGE_END_IDENTIFIER);
String payload = rawPayload.substring(messageBegin + MqConstants.META_MESSAGE_IDENTIFIER.length(), messageEnd + 1);
log.debug("Payload: {}", payload);
try {
JsonObject jsonObject = new JsonParser().parse(payload).getAsJsonObject();
return Optional.of(jsonObject);
} catch (JsonSyntaxException x) {
return Optional.empty();
}
}
}
......@@ -32,7 +32,6 @@ public class MqDefaultMetricExtractor extends MqDataEntryBaseExtractor implement
@Override
public Optional<MqBaseEntry> extractMqDataEntry(ActiveMQMessage activeMQMessage) {
log.debug("Extracting MqBaseEntry");
String rawMqContent = extractPayload(new String(activeMQMessage.getContent().getData()));
String[] keyValuePairsAsStrings = rawMqContent.split(MqConstants.KEY_VALUE_PAIR_SEPARATOR);
String keyValueEncoding = extractUsedSeparator(keyValuePairsAsStrings);
......
......@@ -23,17 +23,14 @@ public class MqEmsInstanceInfoExtractor extends MqDataEntryBaseExtractor impleme
public Optional<MqBaseEntry> extractMqDataEntry(ActiveMQMessage activeMQMessage) {
return extractJsonPayload(activeMQMessage).map(jsonObject -> {
MqInstanceInfoEntry mqInstanceInfoEntry = new MqInstanceInfoEntry();
mqInstanceInfoEntry.setBaguetteClientId(jsonObject.get("baguette-client-id").getAsString());
mqInstanceInfoEntry.setOs(jsonObject.get("operatingSystem").getAsString());
mqInstanceInfoEntry.setType(jsonObject.get("type").getAsString());
mqInstanceInfoEntry.setName(jsonObject.get("name").getAsString());
mqInstanceInfoEntry.setIpAddress(jsonObject.get("ip").getAsString());
mqInstanceInfoEntry.setRandom(jsonObject.get("random").getAsString());
mqInstanceInfoEntry.setInstanceId(jsonObject.get("id").getAsString());
mqInstanceInfoEntry.setProviderId(jsonObject.get("providerId").getAsString());
mqInstanceInfoEntry.setClientId(jsonObject.get("clientId").getAsString());
mqInstanceInfoEntry.setIpAddress(jsonObject.get("ipAddress").getAsString());
mqInstanceInfoEntry.setReference(jsonObject.get("reference").getAsString());
mqInstanceInfoEntry.setState(jsonObject.get("state").getAsString());
mqInstanceInfoEntry.setStateLastUpdate(jsonObject.get("stateLastUpdate").getAsString());
mqInstanceInfoEntry.setTimestamp(String.valueOf(activeMQMessage.getTimestamp()));
mqAdapterStatusHolder.addExtractedMetricDescription(mqInstanceInfoEntry.getName(), new ExtractedMetricsDescriptions(mqInstanceInfoEntry.getClass().toString(), mqInstanceInfoEntry.toString()));
mqAdapterStatusHolder.addExtractedMetricDescription(mqInstanceInfoEntry.getReference(), new ExtractedMetricsDescriptions(mqInstanceInfoEntry.getClass().toString(), mqInstanceInfoEntry.toString()));
return mqInstanceInfoEntry;
});
......
package eu.melodic.upperware.activemqtorest.activemq.extraction;
import eu.melodic.upperware.activemqtorest.MorphemicTopicsMatcher;
import eu.melodic.upperware.activemqtorest.entry.MqBaseEntry;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQMessage;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Slf4j
@Component
public class MqStartEnsemblerExtractor extends MqDataEntryBaseExtractor implements IMqDataEntryExtractor {
@Override
public boolean isApplicable(ActiveMQMessage activeMQMessage) {
return MorphemicTopicsMatcher.isStartEnsembler(activeMQMessage.getJMSDestination().toString());
}
@Override
public Optional<MqBaseEntry> extractMqDataEntry(ActiveMQMessage activeMQMessage) {
log.warn("Saving start ensembler messages to InfluxDb is not supported");
return Optional.empty();
}
}
\ No newline at end of file
package eu.melodic.upperware.activemqtorest.entry;
import java.util.concurrent.TimeUnit;
import org.influxdb.dto.Point;
import com.google.common.base.Strings;
import eu.melodic.upperware.activemqtorest.influxdb.InfluxDataRetainer;
import eu.melodic.upperware.activemqtorest.influxdb.geolocation.IIpGeoCoder;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ToString
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode
public class MqInstanceInfoEntry extends MqBaseEntry {
@EqualsAndHashCode.Exclude
private long id = 42l;
@EqualsAndHashCode.Exclude
private String random;
private String name;
private String ipAddress;
@EqualsAndHashCode.Exclude
private String type;
@EqualsAndHashCode.Exclude
private String os;
private String baguetteClientId;
@EqualsAndHashCode.Exclude
private String timestamp;
@EqualsAndHashCode.Exclude
private String providerId;
private String instanceId;
@Override
public Point getInfluxDbDataPoint(IIpGeoCoder ipGeoCoder) {
String timestamp = normalizeTimestamp(getTimestamp());
Point point = Point.measurement("_Instances")
.time(Long.valueOf(timestamp), TimeUnit.MILLISECONDS)
.addField("name", Strings.nullToEmpty(getName()))
.addField("random", Strings.nullToEmpty(getRandom()))
.addField("ipAddress", Strings.nullToEmpty(getIpAddress()))
.addField("countryCode", ipGeoCoder.getCountryCode(getIpAddress()))
.addField("type", Strings.nullToEmpty(getType()))
.addField("os", Strings.nullToEmpty(getOs()))
.addField("baguetteClientId", Strings.nullToEmpty(getBaguetteClientId()))
.addField("providerId", Strings.nullToEmpty(getProviderId()))
.addField("instanceId", Strings.nullToEmpty(getInstanceId()))
.build();
return point;
}
@Override
public boolean mustRetain(InfluxDataRetainer influxDataRetainer) {
return influxDataRetainer.getInstanceInfoEntryCache().containsKey(this.hashCode());
}
@Override
public void updateRetained(InfluxDataRetainer influxDataRetainer) {
influxDataRetainer.getInstanceInfoEntryCache().put(this.hashCode(), this);
}
}
package eu.melodic.upperware.activemqtorest.entry;
import java.util.concurrent.TimeUnit;
import org.influxdb.dto.Point;
import com.google.common.base.Strings;
import eu.melodic.upperware.activemqtorest.influxdb.InfluxDataRetainer;
import eu.melodic.upperware.activemqtorest.influxdb.geolocation.IIpGeoCoder;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ToString
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode
public class MqInstanceInfoEntry extends MqBaseEntry {
@EqualsAndHashCode.Exclude
private long id = 42l;
private String ipAddress;
private String clientId;
@EqualsAndHashCode.Exclude
private String timestamp;
@EqualsAndHashCode.Exclude
private String reference;
private String state;
@EqualsAndHashCode.Exclude
private String stateLastUpdate;
@Override
public Point getInfluxDbDataPoint(IIpGeoCoder ipGeoCoder) {
String timestamp = normalizeTimestamp(getTimestamp());
Point point = Point.measurement("_Instances")
.time(Long.valueOf(timestamp), TimeUnit.MILLISECONDS)
.addField("ipAddress", Strings.nullToEmpty(getIpAddress()))
.addField("clientId", Strings.nullToEmpty(getClientId()))
.addField("reference", Strings.nullToEmpty(getReference()))
.addField("state", Strings.nullToEmpty(getState()))
.addField("stateLastUpdate", Strings.nullToEmpty(getStateLastUpdate()))
.build();
return point;
}
@Override
public boolean mustRetain(InfluxDataRetainer influxDataRetainer) {
return influxDataRetainer.getInstanceInfoEntryCache().containsKey(this.hashCode());
}
@Override
public void updateRetained(InfluxDataRetainer influxDataRetainer) {
influxDataRetainer.getInstanceInfoEntryCache().put(this.hashCode(), this);
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment