diff --git a/event-management/bin/run.sh b/event-management/bin/run.sh
index 143c0d2ee3972e4199059611061e91c0b8f33386..db892e3831a16efc39cd35385e4ef3015c35fcad 100755
--- a/event-management/bin/run.sh
+++ b/event-management/bin/run.sh
@@ -67,6 +67,9 @@ if [[ -z ${EMS_SKIP_WAIT_CDO+x} ]] && [[ -f $MELODIC_CONFIG_DIR/wait-for-cdo.sh
$MELODIC_CONFIG_DIR/wait-for-cdo.sh
fi
+# Setup TERM & INT signal handler
+trap 'echo "Signaling EMS to exit"; kill -TERM "${emsPid}"; wait "${emsPid}"; ' SIGTERM SIGINT
+
# Run EMS server
# Uncomment next line to set JAVA runtime options
#JAVA_OPTS=-Djavax.net.debug=all
@@ -83,7 +86,10 @@ while :; do
# java $JAVA_OPTS -Djasypt.encryptor.password=$JASYPT_PASSWORD -Duser.timezone=Europe/Athens -Djava.security.egd=file:/dev/urandom -jar $JARS_DIR/control-service/target/control-service.jar "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:$LOG_CONFIG_FILE"
# Use when Esper is NOT packaged in control-service.jar
- java $EMS_DEBUG_OPTS $JAVA_OPTS -Djasypt.encryptor.password=$JASYPT_PASSWORD -Djava.security.egd=file:/dev/urandom -cp ${JARS_DIR}/control-service.jar -Dloader.path=${JARS_DIR}/esper-7.1.0.jar org.springframework.boot.loader.PropertiesLauncher "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:$LOG_CONFIG_FILE" $*
+ java $EMS_DEBUG_OPTS $JAVA_OPTS -Djasypt.encryptor.password=$JASYPT_PASSWORD -Djava.security.egd=file:/dev/urandom -cp ${JARS_DIR}/control-service.jar -Dloader.path=${JARS_DIR}/esper-7.1.0.jar org.springframework.boot.loader.PropertiesLauncher "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:$LOG_CONFIG_FILE" $* &
+ emsPid=$!
+ echo "EMS Pid: $emsPid"
+ wait $emsPid
retCode=$?
if [[ $retCode -eq $RESTART_EXIT_CODE ]]; then echo "Restarting EMS server..."; else break; fi
@@ -94,4 +100,5 @@ echo "EMS server exited"
# e.g. --spring.config.location=$MELODIC_CONFIG_DIR
# e.g. --spring.config.name=application.properties
-cd $PREVWORKDIR
\ No newline at end of file
+cd $PREVWORKDIR
+exit $retCode
\ No newline at end of file
diff --git a/event-management/broker-cep/pom.xml b/event-management/broker-cep/pom.xml
index 1deeeca75eff7aa20028ceee69cfdaba93fc2897..191f87417b14b4416eebf1685632083a03b7e010 100644
--- a/event-management/broker-cep/pom.xml
+++ b/event-management/broker-cep/pom.xml
@@ -88,6 +88,12 @@
3.8.1
+
+
+ org.apache.commons
+ commons-csv
+ 1.7
+
diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java
index bec9fccf1c140e552995020691f421902568b495..dbb54cd74304db6bb65d631e234e5b44ee44190b 100644
--- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java
+++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java
@@ -21,9 +21,13 @@ import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextClosedEvent;
+import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Service;
import javax.jms.*;
+import java.time.Instant;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
@@ -33,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
@Slf4j
@Service
@RequiredArgsConstructor
-public class BrokerCepConsumer implements MessageListener, InitializingBean {
+public class BrokerCepConsumer implements MessageListener, InitializingBean, ApplicationListener {
private final static AtomicLong eventCounter = new AtomicLong(0);
private final static AtomicLong textEventCounter = new AtomicLong(0);
private final static AtomicLong objectEventCounter = new AtomicLong(0);
@@ -49,6 +53,9 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean {
private Session session;
private final Map addedDestinations = new HashMap<>();
+ private final TaskScheduler scheduler;
+ private boolean shuttingDown;
+
@Override
public void afterPropertiesSet() {
initialize();
@@ -71,8 +78,10 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean {
? connectionFactory.createConnection(brokerConfig.getBrokerLocalAdminUsername(), brokerConfig.getBrokerLocalAdminPassword())
: connectionFactory.createConnection();
connection.setExceptionListener(e -> {
- log.warn("BrokerCepConsumer: Connection exception listener: Exception caught: ", e);
- initialize();
+ if (!shuttingDown) {
+ log.warn("BrokerCepConsumer: Connection exception listener: Exception caught: ", e);
+ scheduler.schedule(this::initialize, Instant.now());
+ }
});
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -82,6 +91,12 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean {
}
}
+ @Override
+ public void onApplicationEvent(ContextClosedEvent event) {
+ log.info("BrokerCepConsumer is shutting down");
+ shuttingDown = true;
+ }
+
private void closeConnection() {
// close previous session and connection
try {
@@ -162,6 +177,11 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean {
// Log message
logMessage(message);
+ // Record message
+
+ if (brokerConfig.getEventRecorder()!=null)
+ brokerConfig.getEventRecorder().recordRegisteredEvent(message);
+
// Handle message
try {
log.trace("BrokerCepConsumer.onMessage(): {}", message);
diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerAdvisoryWatcher.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerAdvisoryWatcher.java
index db3063ff91faf20d7f301f79dddf3d3b7269da4b..63d0283b33bb78824d14af031f9d07d489f1d222 100644
--- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerAdvisoryWatcher.java
+++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerAdvisoryWatcher.java
@@ -21,6 +21,8 @@ import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.DestinationInfo;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextClosedEvent;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Service;
@@ -31,7 +33,7 @@ import java.time.Instant;
@Service
@ConditionalOnProperty(name="brokercep.enable-advisory-watcher", matchIfMissing = true)
@RequiredArgsConstructor
-public class BrokerAdvisoryWatcher implements MessageListener, InitializingBean {
+public class BrokerAdvisoryWatcher implements MessageListener, InitializingBean, ApplicationListener {
private final BrokerService brokerService; // Added in order to ensure that BrokerService will be instantiated first
private final BrokerConfig brokerConfig;
private final BrokerCepService brokerCepService;
@@ -44,6 +46,7 @@ public class BrokerAdvisoryWatcher implements MessageListener, InitializingBean
private Connection connection;
private Session session;
+ private boolean shuttingDown;
@Override
public void afterPropertiesSet() {
@@ -74,8 +77,10 @@ public class BrokerAdvisoryWatcher implements MessageListener, InitializingBean
? connectionFactory.createConnection(username, password)
: connectionFactory.createConnection();
connection.setExceptionListener(e -> {
- log.warn("BrokerAdvisoryWatcher: Connection exception listener: Exception caught: ", e);
- initialize();
+ if (!shuttingDown) {
+ log.warn("BrokerAdvisoryWatcher: Connection exception listener: Exception caught: ", e);
+ initialize();
+ }
});
this.connection.start();
@@ -93,6 +98,12 @@ public class BrokerAdvisoryWatcher implements MessageListener, InitializingBean
}
}
+ @Override
+ public void onApplicationEvent(ContextClosedEvent event) {
+ log.info("BrokerAdvisoryWatcher is shutting down");
+ shuttingDown = true;
+ }
+
private void closeConnection() {
// close previous session and connection
try {
diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerConfig.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerConfig.java
index e6b9b1d266a81aecfe2c606ff5a7bdb328be21ee..9813fe21c7f3af24b489fabdaf6bfdb53e4d5c52 100644
--- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerConfig.java
+++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerConfig.java
@@ -10,9 +10,11 @@
package eu.melodic.event.brokercep.broker;
import eu.melodic.event.brokercep.broker.interceptor.AbstractMessageInterceptor;
+import eu.melodic.event.brokercep.event.EventRecorder;
import eu.melodic.event.brokercep.properties.BrokerCepProperties;
import eu.melodic.event.util.KeystoreUtil;
import eu.melodic.event.util.PasswordUtil;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ActiveMQConnectionFactory;
@@ -35,6 +37,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jms.annotation.EnableJms;
+import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Service;
import javax.jms.ConnectionFactory;
@@ -42,6 +45,7 @@ import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
+import java.io.IOException;
import java.security.KeyStore;
import java.util.*;
import java.util.stream.Collectors;
@@ -80,9 +84,14 @@ public class BrokerConfig implements InitializingBean {
private final HashMap connectionFactoryCache = new HashMap<>();
+ private final TaskScheduler scheduler;
+ @Getter
+ private EventRecorder eventRecorder;
+
@Override
public void afterPropertiesSet() throws Exception {
_initializeSecurity();
+ _initializeEventRecorder();
}
protected synchronized void _initializeSecurity() throws Exception {
@@ -174,6 +183,20 @@ public class BrokerConfig implements InitializingBean {
log.info("BrokerConfig.initializeKeyAndCert(): Initializing keystore, truststore and certificate for Broker-SSL... done");
}
+ private void _initializeEventRecorder() throws IOException {
+ // clear previous event recorder (if any)
+ if (eventRecorder!=null && !eventRecorder.isClosed())
+ eventRecorder.close();
+
+ // create new event recorder
+ if (properties.getEventRecorder()!=null) {
+ if (properties.getEventRecorder().isEnabled()) {
+ eventRecorder = new EventRecorder(properties.getEventRecorder(), scheduler);
+ eventRecorder.startRecording();
+ }
+ }
+ }
+
public String getBrokerName() {
log.trace("BrokerConfig.getBrokerName(): broker-name: {}", properties.getBrokerName());
return properties.getBrokerName();
diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/interceptor/LogMessageUpdateInterceptor.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/interceptor/LogMessageUpdateInterceptor.java
new file mode 100644
index 0000000000000000000000000000000000000000..6d8a796067d56254ef85f92c35ee5c594a1571cc
--- /dev/null
+++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/interceptor/LogMessageUpdateInterceptor.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2017-2023 Institute of Communication and Computer Systems (imu.iccs.gr)
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+ * Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+ * If a copy of the MPL was not distributed with this file, you can obtain one at
+ * https://www.mozilla.org/en-US/MPL/2.0/
+ */
+
+package eu.melodic.event.brokercep.broker.interceptor;
+
+import eu.melodic.event.brokercep.broker.BrokerConfig;
+import eu.melodic.event.brokercep.event.EventRecorder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.Message;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Lazy
+@Component
+public class LogMessageUpdateInterceptor extends AbstractMessageInterceptor {
+ private EventRecorder eventRecorder;
+
+ @Override
+ public void initialized() {
+ this.eventRecorder = applicationContext.getBean(BrokerConfig.class).getEventRecorder();
+ log.debug("LogMessageUpdateInterceptor: Enabled: {}", eventRecorder!=null);
+ eventRecorder.startRecording();
+ }
+
+ @Override
+ public void intercept(Message message) {
+ try {
+ if (eventRecorder!=null && message instanceof ActiveMQMessage)
+ eventRecorder.recordEvent((ActiveMQMessage)message);
+ } catch (Exception e) {
+ log.error("LogMessageUpdateInterceptor: EXCEPTION: ", e);
+ }
+ }
+}
diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java
new file mode 100644
index 0000000000000000000000000000000000000000..78e60693c0df1926b7f64a04f8ae9748a6028aef
--- /dev/null
+++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java
@@ -0,0 +1,293 @@
+/*
+ * Copyright (C) 2017-2023 Institute of Communication and Computer Systems (imu.iccs.gr)
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+ * Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+ * If a copy of the MPL was not distributed with this file, you can obtain one at
+ * https://www.mozilla.org/en-US/MPL/2.0/
+ */
+
+package eu.melodic.event.brokercep.event;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import eu.melodic.event.brokercep.properties.BrokerCepProperties;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.springframework.scheduling.TaskScheduler;
+
+import javax.jms.*;
+import javax.jms.Queue;
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.IllegalStateException;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ScheduledFuture;
+
+@Slf4j
+public class EventRecorder extends LinkedHashMap implements Runnable {
+ public enum FORMAT { JSON, CSV }
+
+ private final static Object staticLock = new Object();
+ public static Set activeEventRecorders;
+
+ @Getter
+ private final FORMAT recordFormat;
+ @Getter
+ private final String recordFilePattern;
+ @Getter
+ private final BrokerCepProperties.EVENT_RECORDER_FILTER_MODE filterMode;
+ @Getter
+ private final List allowedDestinations;
+
+ @Getter
+ private String recordFile;
+ @Getter
+ private boolean closed;
+ @Getter
+ private boolean recording;
+
+ private BufferedWriter recordWriter;
+ private CSVPrinter csvPrinter;
+ private JsonGenerator jsonGenerator;
+
+ private final Deque eventQueue;
+ private final TaskScheduler scheduler;
+ private ScheduledFuture> runnerFuture;
+
+ public EventRecorder(@NonNull BrokerCepProperties.EventRecorderProperties properties, @NonNull TaskScheduler scheduler) throws IOException {
+ this(properties.getFormat(), properties.getFile(), properties.getFilterMode(), properties.getAllowedDestinations(), scheduler);
+ }
+
+ public EventRecorder(@NonNull FORMAT recordFormat, @NonNull String recordFilePattern, BrokerCepProperties.EVENT_RECORDER_FILTER_MODE filterMode, List allowedDestinations, @NonNull TaskScheduler scheduler) throws IOException {
+ this.recordFormat = recordFormat;
+ this.recordFilePattern = recordFilePattern;
+ this.filterMode = filterMode;
+ this.allowedDestinations = allowedDestinations==null ? Collections.emptyList() : Collections.unmodifiableList(allowedDestinations);
+ this.scheduler = scheduler;
+ this.eventQueue = new ConcurrentLinkedDeque<>();
+
+ registerShutdownHook();
+ rotate();
+ }
+
+ public static void registerShutdownHook() {
+ if (activeEventRecorders==null) {
+ synchronized (staticLock) {
+ if (activeEventRecorders==null) {
+ activeEventRecorders = new HashSet<>();
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ log.info("EventRecorder: closing active recorders: {}", activeEventRecorders.size());
+ for (EventRecorder eventRecorder : activeEventRecorders) {
+ if (!eventRecorder.isClosed())
+ eventRecorder.close();
+ }
+ log.info("EventRecorder: Closed active recorders");
+ }));
+ }
+ }
+ }
+ }
+
+ public synchronized void rotate() throws IOException {
+ // Close current recording file
+ if (recordFile!=null && !isClosed()) {
+ close();
+ }
+ closed = false;
+
+ // Create new recording file
+ this.recordFile = recordFilePattern
+ .replace("%T", "" + System.currentTimeMillis())
+ .replace("%S", getSuffix());
+ this.recordWriter = new BufferedWriter(new FileWriter(recordFile));
+
+ log.info("EventRecorder: Record format: {}", recordFormat);
+ log.info("EventRecorder: Record file: {}", recordFile);
+
+ if (recordFormat==FORMAT.CSV) {
+ csvPrinter = new CSVPrinter(recordWriter, CSVFormat.DEFAULT
+ .withHeader("Timestamp", "Destination", "Mime", "Type", "Contents", "Properties"));
+ csvPrinter.flush();
+ }
+ if (recordFormat==FORMAT.JSON) {
+ jsonGenerator = new JsonFactory()
+ .createGenerator(recordWriter)
+ .setPrettyPrinter(new DefaultPrettyPrinter());
+ jsonGenerator.writeStartArray();
+ jsonGenerator.flush();
+ }
+
+ // Start processing loop
+ runnerFuture = scheduler.scheduleAtFixedRate(this, Duration.ofMillis(1000));
+ activeEventRecorders.add(this);
+
+ startRecording();
+ }
+
+ private String getSuffix() {
+ if (recordFormat==FORMAT.JSON) return "json";
+ if (recordFormat==FORMAT.CSV) return "csv";
+ throw new IllegalStateException("No suffix for FORMAT: "+recordFormat);
+ }
+
+ public synchronized void close() {
+ if (closed) throw new IllegalStateException("EventRecorder has already been closed");
+ if (recording) stopRecording();
+ this.closed = true;
+ runnerFuture.cancel(false);
+ activeEventRecorders.remove(this);
+
+ // wait until all records are written in the file
+ while (!eventQueue.isEmpty()) {
+ run();
+ }
+
+ // close record file
+ try {
+ if (recordFormat == FORMAT.CSV) {
+ csvPrinter.close(true);
+ }
+ if (recordFormat == FORMAT.JSON) {
+ jsonGenerator.writeEndArray();
+ jsonGenerator.close();
+ }
+ recordWriter.close();
+ } catch (Exception ex) {
+ log.warn("EventRecorder: Exception while closing: ", ex);
+ }
+ }
+
+ public void startRecording() {
+ if (closed) throw new IllegalStateException("EventRecorder has been closed");
+ if (!recording) {
+ log.info("EventRecorder: Start recording...");
+ recording = true;
+ }
+ }
+
+ public void stopRecording() {
+ if (closed) throw new IllegalStateException("EventRecorder has been closed");
+ if (recording) {
+ log.info("EventRecorder: Stop recording...");
+ recording = false;
+ }
+ }
+
+ public void recordEvent(@NonNull ActiveMQMessage message) throws JMSException {
+ recordAllowedEvent(message);
+ }
+
+ public void recordAllowedEvent(@NonNull Message message) throws JMSException {
+ if (filterMode == BrokerCepProperties.EVENT_RECORDER_FILTER_MODE.ALL
+ || filterMode == BrokerCepProperties.EVENT_RECORDER_FILTER_MODE.ALLOWED
+ && allowedDestinations.stream().anyMatch(getDestinationName(message)::equalsIgnoreCase))
+ {
+ eventQueue.addLast(message);
+ }
+ }
+
+ public void recordRegisteredEvent(@NonNull Message message) {
+ if (filterMode==BrokerCepProperties.EVENT_RECORDER_FILTER_MODE.REGISTERED) {
+ eventQueue.addLast(message);
+ }
+ }
+
+ public void run() {
+ if (!closed) {
+ while (!eventQueue.isEmpty()) {
+ try {
+ processEvent(eventQueue.removeLast());
+ } catch (Exception ex) {
+ log.warn("EventRecorder: Exception while processing event queue: ", ex);
+ }
+ }
+ }
+ }
+
+ protected void processEvent(Message message) throws IOException, JMSException {
+ String messageId = message.getJMSMessageID();
+ long timestamp = message.getJMSTimestamp();
+ String destinationName = getDestinationName(message);
+ String mime = message.getJMSType();
+
+ // Extract event payload and type
+ PayloadAndType payloadAndType = extractPayloadAndType(message);
+ String content = payloadAndType.payload;
+ String type = payloadAndType.type;
+
+ // Extract event properties
+ String properties = extractProperties(message);
+
+ if (recordFormat==FORMAT.CSV) {
+ csvPrinter.printRecord(timestamp, destinationName, mime, type, content, properties);
+ csvPrinter.flush();
+ }
+ if (recordFormat==FORMAT.JSON) {
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeStringField("id", messageId);
+ jsonGenerator.writeNumberField("timestamp", timestamp);
+ jsonGenerator.writeStringField("destination", destinationName);
+ jsonGenerator.writeStringField("mime", mime);
+ jsonGenerator.writeStringField("type", type);
+ jsonGenerator.writeStringField("content", content);
+ jsonGenerator.writeStringField("properties", properties);
+ jsonGenerator.writeEndObject();
+ jsonGenerator.flush();
+ }
+ }
+
+ protected String getDestinationName(Message message) throws JMSException {
+ Destination d = message.getJMSDestination();
+ if (d instanceof Topic) {
+ return ((Topic)d).getTopicName();
+ } else
+ if (d instanceof Queue) {
+ return ((Queue)d).getQueueName();
+ } else
+ throw new IllegalArgumentException("Argument is not a JMS destination: "+d);
+ }
+
+ protected PayloadAndType extractPayloadAndType(Message message) throws JMSException {
+ if (message instanceof TextMessage) {
+ return new PayloadAndType("TEXT", ((TextMessage)message).getText());
+ } else
+ if (message instanceof ObjectMessage) {
+ Serializable o = ((ObjectMessage) message).getObject();
+ return new PayloadAndType("OBJECT", o==null ? null : o.toString());
+ } else
+ throw new IllegalArgumentException("Unsupported message type: "+message.getClass().getName());
+ }
+
+ protected String extractProperties(Message message) throws JMSException {
+ Enumeration en = message.getPropertyNames();
+ StringBuilder properties = new StringBuilder("{");
+ boolean first = true;
+ while (en.hasMoreElements()) {
+ Object k = en.nextElement();
+ if (k!=null) {
+ String v = message.getStringProperty(k.toString());
+ if (first) first = false; else properties.append(", ");
+ properties.append(k).append("=").append(v);
+ }
+ }
+ properties.append(" }");
+ return properties.toString();
+ }
+
+ @AllArgsConstructor
+ class PayloadAndType {
+ public String type;
+ public String payload;
+ }
+}
\ No newline at end of file
diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java
index 296e9af7c00a870452581208a32424de38acb0ae..bb1f4efdb2ba945c9a6891dd269371800e027717 100644
--- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java
+++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java
@@ -9,6 +9,7 @@
package eu.melodic.event.brokercep.properties;
+import eu.melodic.event.brokercep.event.EventRecorder;
import eu.melodic.event.util.EmsConstant;
import eu.melodic.event.util.KeystoreAndCertificateProperties;
import lombok.Data;
@@ -80,6 +81,8 @@ public class BrokerCepProperties implements InitializingBean {
private boolean logBrokerMessages = true;
private boolean logBrokerMessagesFull = false;
+ private EventRecorderProperties eventRecorder = new EventRecorderProperties();
+
@Data
public static class Usage {
private Memory memory = new Memory();
@@ -107,4 +110,15 @@ public class BrokerCepProperties implements InitializingBean {
@ToString.Exclude
private String password;
}
+
+ public enum EVENT_RECORDER_FILTER_MODE { ALL, REGISTERED, ALLOWED }
+
+ @Data
+ public static class EventRecorderProperties {
+ private boolean enabled;
+ private EventRecorder.FORMAT format = EventRecorder.FORMAT.CSV;
+ private String file;
+ private EVENT_RECORDER_FILTER_MODE filterMode = EVENT_RECORDER_FILTER_MODE.REGISTERED;
+ private List allowedDestinations;
+ }
}
diff --git a/event-management/config-files/ems-server.properties.sample b/event-management/config-files/ems-server.properties.sample
index f0dd7cee1d36ce02c823723232c22e7045ad6bf1..290347a770d41b477222fb4d151f63e394da8c86 100644
--- a/event-management/config-files/ems-server.properties.sample
+++ b/event-management/config-files/ems-server.properties.sample
@@ -333,9 +333,10 @@ brokercep.broker-populate-jmsx-user-id = true
# Message interceptors
brokercep.message-interceptors[0].destination = >
brokercep.message-interceptors[0].className = eu.melodic.event.brokercep.broker.interceptor.SequentialCompositeInterceptor
-brokercep.message-interceptors[0].params = #SourceAddressMessageUpdateInterceptor, #MessageForwarderInterceptor
+brokercep.message-interceptors[0].params = #SourceAddressMessageUpdateInterceptor, #LogMessageUpdateInterceptor, #MessageForwarderInterceptor
brokercep.message-interceptors-specs.SourceAddressMessageUpdateInterceptor.className = eu.melodic.event.brokercep.broker.interceptor.SourceAddressMessageUpdateInterceptor
+brokercep.message-interceptors-specs.LogMessageUpdateInterceptor.className = eu.melodic.event.brokercep.broker.interceptor.LogMessageUpdateInterceptor
brokercep.message-interceptors-specs.MessageForwarderInterceptor.className = eu.melodic.event.brokercep.broker.interceptor.MessageForwarderInterceptor
# Message forward destinations (MessageForwarderInterceptor must be included in 'message-interceptors' property)
@@ -357,6 +358,13 @@ brokercep.usage.memory.jvm-heap-percentage = 20
#brokercep.maxEventForwardRetries: -1
#brokercep.maxEventForwardDuration: -1
+# Event recorder settings
+event-recorder.enabled=true
+#event-recorder.format=JSON
+event-recorder.file=${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.%S
+#event-recorder.filterMode: ALL | REGISTERED (default) | ALLOWED
+#event-recorder.allowed-destinations:
+
################################################################################
### EMS - Baguette Server properties ###
diff --git a/event-management/config-files/ems-server.yml b/event-management/config-files/ems-server.yml
index e878defa83550afce638c3dcdab0e85982f2ed62..209f25392160946cb16f9cb7d6d97bbad531fba9 100644
--- a/event-management/config-files/ems-server.yml
+++ b/event-management/config-files/ems-server.yml
@@ -343,11 +343,14 @@ brokercep:
className: 'eu.melodic.event.brokercep.broker.interceptor.SequentialCompositeInterceptor'
params:
- '#SourceAddressMessageUpdateInterceptor'
+ - '#LogMessageUpdateInterceptor'
- '#MessageForwarderInterceptor'
message-interceptors-specs:
SourceAddressMessageUpdateInterceptor:
className: eu.melodic.event.brokercep.broker.interceptor.SourceAddressMessageUpdateInterceptor
+ LogMessageUpdateInterceptor:
+ className: eu.melodic.event.brokercep.broker.interceptor.LogMessageUpdateInterceptor
MessageForwarderInterceptor:
className: eu.melodic.event.brokercep.broker.interceptor.MessageForwarderInterceptor
@@ -373,6 +376,14 @@ brokercep:
#maxEventForwardRetries: -1
#maxEventForwardDuration: -1
+ # Event recorder settings
+ event-recorder:
+ enabled: true
+ #format: JSON
+ file: ${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.%S
+ #filter-mode: ALL | REGISTERED (default) | ALLOWED
+ #allowed-destinations:
+
################################################################################
### EMS - Baguette Server properties ###