diff --git a/event-management/bin/run.sh b/event-management/bin/run.sh index 143c0d2ee3972e4199059611061e91c0b8f33386..db892e3831a16efc39cd35385e4ef3015c35fcad 100755 --- a/event-management/bin/run.sh +++ b/event-management/bin/run.sh @@ -67,6 +67,9 @@ if [[ -z ${EMS_SKIP_WAIT_CDO+x} ]] && [[ -f $MELODIC_CONFIG_DIR/wait-for-cdo.sh $MELODIC_CONFIG_DIR/wait-for-cdo.sh fi +# Setup TERM & INT signal handler +trap 'echo "Signaling EMS to exit"; kill -TERM "${emsPid}"; wait "${emsPid}"; ' SIGTERM SIGINT + # Run EMS server # Uncomment next line to set JAVA runtime options #JAVA_OPTS=-Djavax.net.debug=all @@ -83,7 +86,10 @@ while :; do # java $JAVA_OPTS -Djasypt.encryptor.password=$JASYPT_PASSWORD -Duser.timezone=Europe/Athens -Djava.security.egd=file:/dev/urandom -jar $JARS_DIR/control-service/target/control-service.jar "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:$LOG_CONFIG_FILE" # Use when Esper is NOT packaged in control-service.jar - java $EMS_DEBUG_OPTS $JAVA_OPTS -Djasypt.encryptor.password=$JASYPT_PASSWORD -Djava.security.egd=file:/dev/urandom -cp ${JARS_DIR}/control-service.jar -Dloader.path=${JARS_DIR}/esper-7.1.0.jar org.springframework.boot.loader.PropertiesLauncher "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:$LOG_CONFIG_FILE" $* + java $EMS_DEBUG_OPTS $JAVA_OPTS -Djasypt.encryptor.password=$JASYPT_PASSWORD -Djava.security.egd=file:/dev/urandom -cp ${JARS_DIR}/control-service.jar -Dloader.path=${JARS_DIR}/esper-7.1.0.jar org.springframework.boot.loader.PropertiesLauncher "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:$LOG_CONFIG_FILE" $* & + emsPid=$! + echo "EMS Pid: $emsPid" + wait $emsPid retCode=$? if [[ $retCode -eq $RESTART_EXIT_CODE ]]; then echo "Restarting EMS server..."; else break; fi @@ -94,4 +100,5 @@ echo "EMS server exited" # e.g. --spring.config.location=$MELODIC_CONFIG_DIR # e.g. --spring.config.name=application.properties -cd $PREVWORKDIR \ No newline at end of file +cd $PREVWORKDIR +exit $retCode \ No newline at end of file diff --git a/event-management/broker-cep/pom.xml b/event-management/broker-cep/pom.xml index 1deeeca75eff7aa20028ceee69cfdaba93fc2897..191f87417b14b4416eebf1685632083a03b7e010 100644 --- a/event-management/broker-cep/pom.xml +++ b/event-management/broker-cep/pom.xml @@ -88,6 +88,12 @@ 3.8.1 + + + org.apache.commons + commons-csv + 1.7 + diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java index bec9fccf1c140e552995020691f421902568b495..dbb54cd74304db6bb65d631e234e5b44ee44190b 100644 --- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java @@ -21,9 +21,13 @@ import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.scheduling.TaskScheduler; import org.springframework.stereotype.Service; import javax.jms.*; +import java.time.Instant; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; @@ -33,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong; @Slf4j @Service @RequiredArgsConstructor -public class BrokerCepConsumer implements MessageListener, InitializingBean { +public class BrokerCepConsumer implements MessageListener, InitializingBean, ApplicationListener { private final static AtomicLong eventCounter = new AtomicLong(0); private final static AtomicLong textEventCounter = new AtomicLong(0); private final static AtomicLong objectEventCounter = new AtomicLong(0); @@ -49,6 +53,9 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean { private Session session; private final Map addedDestinations = new HashMap<>(); + private final TaskScheduler scheduler; + private boolean shuttingDown; + @Override public void afterPropertiesSet() { initialize(); @@ -71,8 +78,10 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean { ? connectionFactory.createConnection(brokerConfig.getBrokerLocalAdminUsername(), brokerConfig.getBrokerLocalAdminPassword()) : connectionFactory.createConnection(); connection.setExceptionListener(e -> { - log.warn("BrokerCepConsumer: Connection exception listener: Exception caught: ", e); - initialize(); + if (!shuttingDown) { + log.warn("BrokerCepConsumer: Connection exception listener: Exception caught: ", e); + scheduler.schedule(this::initialize, Instant.now()); + } }); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -82,6 +91,12 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean { } } + @Override + public void onApplicationEvent(ContextClosedEvent event) { + log.info("BrokerCepConsumer is shutting down"); + shuttingDown = true; + } + private void closeConnection() { // close previous session and connection try { @@ -162,6 +177,11 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean { // Log message logMessage(message); + // Record message + + if (brokerConfig.getEventRecorder()!=null) + brokerConfig.getEventRecorder().recordRegisteredEvent(message); + // Handle message try { log.trace("BrokerCepConsumer.onMessage(): {}", message); diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerAdvisoryWatcher.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerAdvisoryWatcher.java index db3063ff91faf20d7f301f79dddf3d3b7269da4b..63d0283b33bb78824d14af031f9d07d489f1d222 100644 --- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerAdvisoryWatcher.java +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerAdvisoryWatcher.java @@ -21,6 +21,8 @@ import org.apache.activemq.command.DataStructure; import org.apache.activemq.command.DestinationInfo; import org.springframework.beans.factory.InitializingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextClosedEvent; import org.springframework.scheduling.TaskScheduler; import org.springframework.stereotype.Service; @@ -31,7 +33,7 @@ import java.time.Instant; @Service @ConditionalOnProperty(name="brokercep.enable-advisory-watcher", matchIfMissing = true) @RequiredArgsConstructor -public class BrokerAdvisoryWatcher implements MessageListener, InitializingBean { +public class BrokerAdvisoryWatcher implements MessageListener, InitializingBean, ApplicationListener { private final BrokerService brokerService; // Added in order to ensure that BrokerService will be instantiated first private final BrokerConfig brokerConfig; private final BrokerCepService brokerCepService; @@ -44,6 +46,7 @@ public class BrokerAdvisoryWatcher implements MessageListener, InitializingBean private Connection connection; private Session session; + private boolean shuttingDown; @Override public void afterPropertiesSet() { @@ -74,8 +77,10 @@ public class BrokerAdvisoryWatcher implements MessageListener, InitializingBean ? connectionFactory.createConnection(username, password) : connectionFactory.createConnection(); connection.setExceptionListener(e -> { - log.warn("BrokerAdvisoryWatcher: Connection exception listener: Exception caught: ", e); - initialize(); + if (!shuttingDown) { + log.warn("BrokerAdvisoryWatcher: Connection exception listener: Exception caught: ", e); + initialize(); + } }); this.connection.start(); @@ -93,6 +98,12 @@ public class BrokerAdvisoryWatcher implements MessageListener, InitializingBean } } + @Override + public void onApplicationEvent(ContextClosedEvent event) { + log.info("BrokerAdvisoryWatcher is shutting down"); + shuttingDown = true; + } + private void closeConnection() { // close previous session and connection try { diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerConfig.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerConfig.java index e6b9b1d266a81aecfe2c606ff5a7bdb328be21ee..9813fe21c7f3af24b489fabdaf6bfdb53e4d5c52 100644 --- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerConfig.java +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerConfig.java @@ -10,9 +10,11 @@ package eu.melodic.event.brokercep.broker; import eu.melodic.event.brokercep.broker.interceptor.AbstractMessageInterceptor; +import eu.melodic.event.brokercep.event.EventRecorder; import eu.melodic.event.brokercep.properties.BrokerCepProperties; import eu.melodic.event.util.KeystoreUtil; import eu.melodic.event.util.PasswordUtil; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.activemq.ActiveMQConnectionFactory; @@ -35,6 +37,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.FileSystemResource; import org.springframework.jms.annotation.EnableJms; +import org.springframework.scheduling.TaskScheduler; import org.springframework.stereotype.Service; import javax.jms.ConnectionFactory; @@ -42,6 +45,7 @@ import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; +import java.io.IOException; import java.security.KeyStore; import java.util.*; import java.util.stream.Collectors; @@ -80,9 +84,14 @@ public class BrokerConfig implements InitializingBean { private final HashMap connectionFactoryCache = new HashMap<>(); + private final TaskScheduler scheduler; + @Getter + private EventRecorder eventRecorder; + @Override public void afterPropertiesSet() throws Exception { _initializeSecurity(); + _initializeEventRecorder(); } protected synchronized void _initializeSecurity() throws Exception { @@ -174,6 +183,20 @@ public class BrokerConfig implements InitializingBean { log.info("BrokerConfig.initializeKeyAndCert(): Initializing keystore, truststore and certificate for Broker-SSL... done"); } + private void _initializeEventRecorder() throws IOException { + // clear previous event recorder (if any) + if (eventRecorder!=null && !eventRecorder.isClosed()) + eventRecorder.close(); + + // create new event recorder + if (properties.getEventRecorder()!=null) { + if (properties.getEventRecorder().isEnabled()) { + eventRecorder = new EventRecorder(properties.getEventRecorder(), scheduler); + eventRecorder.startRecording(); + } + } + } + public String getBrokerName() { log.trace("BrokerConfig.getBrokerName(): broker-name: {}", properties.getBrokerName()); return properties.getBrokerName(); diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/interceptor/LogMessageUpdateInterceptor.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/interceptor/LogMessageUpdateInterceptor.java new file mode 100644 index 0000000000000000000000000000000000000000..6d8a796067d56254ef85f92c35ee5c594a1571cc --- /dev/null +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/interceptor/LogMessageUpdateInterceptor.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2017-2023 Institute of Communication and Computer Systems (imu.iccs.gr) + * + * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless + * Esper library is used, in which case it is subject to the terms of General Public License v2.0. + * If a copy of the MPL was not distributed with this file, you can obtain one at + * https://www.mozilla.org/en-US/MPL/2.0/ + */ + +package eu.melodic.event.brokercep.broker.interceptor; + +import eu.melodic.event.brokercep.broker.BrokerConfig; +import eu.melodic.event.brokercep.event.EventRecorder; +import lombok.extern.slf4j.Slf4j; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.Message; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; + +@Slf4j +@Lazy +@Component +public class LogMessageUpdateInterceptor extends AbstractMessageInterceptor { + private EventRecorder eventRecorder; + + @Override + public void initialized() { + this.eventRecorder = applicationContext.getBean(BrokerConfig.class).getEventRecorder(); + log.debug("LogMessageUpdateInterceptor: Enabled: {}", eventRecorder!=null); + eventRecorder.startRecording(); + } + + @Override + public void intercept(Message message) { + try { + if (eventRecorder!=null && message instanceof ActiveMQMessage) + eventRecorder.recordEvent((ActiveMQMessage)message); + } catch (Exception e) { + log.error("LogMessageUpdateInterceptor: EXCEPTION: ", e); + } + } +} diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java new file mode 100644 index 0000000000000000000000000000000000000000..78e60693c0df1926b7f64a04f8ae9748a6028aef --- /dev/null +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java @@ -0,0 +1,293 @@ +/* + * Copyright (C) 2017-2023 Institute of Communication and Computer Systems (imu.iccs.gr) + * + * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless + * Esper library is used, in which case it is subject to the terms of General Public License v2.0. + * If a copy of the MPL was not distributed with this file, you can obtain one at + * https://www.mozilla.org/en-US/MPL/2.0/ + */ + +package eu.melodic.event.brokercep.event; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; +import eu.melodic.event.brokercep.properties.BrokerCepProperties; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.springframework.scheduling.TaskScheduler; + +import javax.jms.*; +import javax.jms.Queue; +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Serializable; +import java.lang.IllegalStateException; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ScheduledFuture; + +@Slf4j +public class EventRecorder extends LinkedHashMap implements Runnable { + public enum FORMAT { JSON, CSV } + + private final static Object staticLock = new Object(); + public static Set activeEventRecorders; + + @Getter + private final FORMAT recordFormat; + @Getter + private final String recordFilePattern; + @Getter + private final BrokerCepProperties.EVENT_RECORDER_FILTER_MODE filterMode; + @Getter + private final List allowedDestinations; + + @Getter + private String recordFile; + @Getter + private boolean closed; + @Getter + private boolean recording; + + private BufferedWriter recordWriter; + private CSVPrinter csvPrinter; + private JsonGenerator jsonGenerator; + + private final Deque eventQueue; + private final TaskScheduler scheduler; + private ScheduledFuture runnerFuture; + + public EventRecorder(@NonNull BrokerCepProperties.EventRecorderProperties properties, @NonNull TaskScheduler scheduler) throws IOException { + this(properties.getFormat(), properties.getFile(), properties.getFilterMode(), properties.getAllowedDestinations(), scheduler); + } + + public EventRecorder(@NonNull FORMAT recordFormat, @NonNull String recordFilePattern, BrokerCepProperties.EVENT_RECORDER_FILTER_MODE filterMode, List allowedDestinations, @NonNull TaskScheduler scheduler) throws IOException { + this.recordFormat = recordFormat; + this.recordFilePattern = recordFilePattern; + this.filterMode = filterMode; + this.allowedDestinations = allowedDestinations==null ? Collections.emptyList() : Collections.unmodifiableList(allowedDestinations); + this.scheduler = scheduler; + this.eventQueue = new ConcurrentLinkedDeque<>(); + + registerShutdownHook(); + rotate(); + } + + public static void registerShutdownHook() { + if (activeEventRecorders==null) { + synchronized (staticLock) { + if (activeEventRecorders==null) { + activeEventRecorders = new HashSet<>(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("EventRecorder: closing active recorders: {}", activeEventRecorders.size()); + for (EventRecorder eventRecorder : activeEventRecorders) { + if (!eventRecorder.isClosed()) + eventRecorder.close(); + } + log.info("EventRecorder: Closed active recorders"); + })); + } + } + } + } + + public synchronized void rotate() throws IOException { + // Close current recording file + if (recordFile!=null && !isClosed()) { + close(); + } + closed = false; + + // Create new recording file + this.recordFile = recordFilePattern + .replace("%T", "" + System.currentTimeMillis()) + .replace("%S", getSuffix()); + this.recordWriter = new BufferedWriter(new FileWriter(recordFile)); + + log.info("EventRecorder: Record format: {}", recordFormat); + log.info("EventRecorder: Record file: {}", recordFile); + + if (recordFormat==FORMAT.CSV) { + csvPrinter = new CSVPrinter(recordWriter, CSVFormat.DEFAULT + .withHeader("Timestamp", "Destination", "Mime", "Type", "Contents", "Properties")); + csvPrinter.flush(); + } + if (recordFormat==FORMAT.JSON) { + jsonGenerator = new JsonFactory() + .createGenerator(recordWriter) + .setPrettyPrinter(new DefaultPrettyPrinter()); + jsonGenerator.writeStartArray(); + jsonGenerator.flush(); + } + + // Start processing loop + runnerFuture = scheduler.scheduleAtFixedRate(this, Duration.ofMillis(1000)); + activeEventRecorders.add(this); + + startRecording(); + } + + private String getSuffix() { + if (recordFormat==FORMAT.JSON) return "json"; + if (recordFormat==FORMAT.CSV) return "csv"; + throw new IllegalStateException("No suffix for FORMAT: "+recordFormat); + } + + public synchronized void close() { + if (closed) throw new IllegalStateException("EventRecorder has already been closed"); + if (recording) stopRecording(); + this.closed = true; + runnerFuture.cancel(false); + activeEventRecorders.remove(this); + + // wait until all records are written in the file + while (!eventQueue.isEmpty()) { + run(); + } + + // close record file + try { + if (recordFormat == FORMAT.CSV) { + csvPrinter.close(true); + } + if (recordFormat == FORMAT.JSON) { + jsonGenerator.writeEndArray(); + jsonGenerator.close(); + } + recordWriter.close(); + } catch (Exception ex) { + log.warn("EventRecorder: Exception while closing: ", ex); + } + } + + public void startRecording() { + if (closed) throw new IllegalStateException("EventRecorder has been closed"); + if (!recording) { + log.info("EventRecorder: Start recording..."); + recording = true; + } + } + + public void stopRecording() { + if (closed) throw new IllegalStateException("EventRecorder has been closed"); + if (recording) { + log.info("EventRecorder: Stop recording..."); + recording = false; + } + } + + public void recordEvent(@NonNull ActiveMQMessage message) throws JMSException { + recordAllowedEvent(message); + } + + public void recordAllowedEvent(@NonNull Message message) throws JMSException { + if (filterMode == BrokerCepProperties.EVENT_RECORDER_FILTER_MODE.ALL + || filterMode == BrokerCepProperties.EVENT_RECORDER_FILTER_MODE.ALLOWED + && allowedDestinations.stream().anyMatch(getDestinationName(message)::equalsIgnoreCase)) + { + eventQueue.addLast(message); + } + } + + public void recordRegisteredEvent(@NonNull Message message) { + if (filterMode==BrokerCepProperties.EVENT_RECORDER_FILTER_MODE.REGISTERED) { + eventQueue.addLast(message); + } + } + + public void run() { + if (!closed) { + while (!eventQueue.isEmpty()) { + try { + processEvent(eventQueue.removeLast()); + } catch (Exception ex) { + log.warn("EventRecorder: Exception while processing event queue: ", ex); + } + } + } + } + + protected void processEvent(Message message) throws IOException, JMSException { + String messageId = message.getJMSMessageID(); + long timestamp = message.getJMSTimestamp(); + String destinationName = getDestinationName(message); + String mime = message.getJMSType(); + + // Extract event payload and type + PayloadAndType payloadAndType = extractPayloadAndType(message); + String content = payloadAndType.payload; + String type = payloadAndType.type; + + // Extract event properties + String properties = extractProperties(message); + + if (recordFormat==FORMAT.CSV) { + csvPrinter.printRecord(timestamp, destinationName, mime, type, content, properties); + csvPrinter.flush(); + } + if (recordFormat==FORMAT.JSON) { + jsonGenerator.writeStartObject(); + jsonGenerator.writeStringField("id", messageId); + jsonGenerator.writeNumberField("timestamp", timestamp); + jsonGenerator.writeStringField("destination", destinationName); + jsonGenerator.writeStringField("mime", mime); + jsonGenerator.writeStringField("type", type); + jsonGenerator.writeStringField("content", content); + jsonGenerator.writeStringField("properties", properties); + jsonGenerator.writeEndObject(); + jsonGenerator.flush(); + } + } + + protected String getDestinationName(Message message) throws JMSException { + Destination d = message.getJMSDestination(); + if (d instanceof Topic) { + return ((Topic)d).getTopicName(); + } else + if (d instanceof Queue) { + return ((Queue)d).getQueueName(); + } else + throw new IllegalArgumentException("Argument is not a JMS destination: "+d); + } + + protected PayloadAndType extractPayloadAndType(Message message) throws JMSException { + if (message instanceof TextMessage) { + return new PayloadAndType("TEXT", ((TextMessage)message).getText()); + } else + if (message instanceof ObjectMessage) { + Serializable o = ((ObjectMessage) message).getObject(); + return new PayloadAndType("OBJECT", o==null ? null : o.toString()); + } else + throw new IllegalArgumentException("Unsupported message type: "+message.getClass().getName()); + } + + protected String extractProperties(Message message) throws JMSException { + Enumeration en = message.getPropertyNames(); + StringBuilder properties = new StringBuilder("{"); + boolean first = true; + while (en.hasMoreElements()) { + Object k = en.nextElement(); + if (k!=null) { + String v = message.getStringProperty(k.toString()); + if (first) first = false; else properties.append(", "); + properties.append(k).append("=").append(v); + } + } + properties.append(" }"); + return properties.toString(); + } + + @AllArgsConstructor + class PayloadAndType { + public String type; + public String payload; + } +} \ No newline at end of file diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java index 296e9af7c00a870452581208a32424de38acb0ae..bb1f4efdb2ba945c9a6891dd269371800e027717 100644 --- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java @@ -9,6 +9,7 @@ package eu.melodic.event.brokercep.properties; +import eu.melodic.event.brokercep.event.EventRecorder; import eu.melodic.event.util.EmsConstant; import eu.melodic.event.util.KeystoreAndCertificateProperties; import lombok.Data; @@ -80,6 +81,8 @@ public class BrokerCepProperties implements InitializingBean { private boolean logBrokerMessages = true; private boolean logBrokerMessagesFull = false; + private EventRecorderProperties eventRecorder = new EventRecorderProperties(); + @Data public static class Usage { private Memory memory = new Memory(); @@ -107,4 +110,15 @@ public class BrokerCepProperties implements InitializingBean { @ToString.Exclude private String password; } + + public enum EVENT_RECORDER_FILTER_MODE { ALL, REGISTERED, ALLOWED } + + @Data + public static class EventRecorderProperties { + private boolean enabled; + private EventRecorder.FORMAT format = EventRecorder.FORMAT.CSV; + private String file; + private EVENT_RECORDER_FILTER_MODE filterMode = EVENT_RECORDER_FILTER_MODE.REGISTERED; + private List allowedDestinations; + } } diff --git a/event-management/config-files/ems-server.properties.sample b/event-management/config-files/ems-server.properties.sample index f0dd7cee1d36ce02c823723232c22e7045ad6bf1..290347a770d41b477222fb4d151f63e394da8c86 100644 --- a/event-management/config-files/ems-server.properties.sample +++ b/event-management/config-files/ems-server.properties.sample @@ -333,9 +333,10 @@ brokercep.broker-populate-jmsx-user-id = true # Message interceptors brokercep.message-interceptors[0].destination = > brokercep.message-interceptors[0].className = eu.melodic.event.brokercep.broker.interceptor.SequentialCompositeInterceptor -brokercep.message-interceptors[0].params = #SourceAddressMessageUpdateInterceptor, #MessageForwarderInterceptor +brokercep.message-interceptors[0].params = #SourceAddressMessageUpdateInterceptor, #LogMessageUpdateInterceptor, #MessageForwarderInterceptor brokercep.message-interceptors-specs.SourceAddressMessageUpdateInterceptor.className = eu.melodic.event.brokercep.broker.interceptor.SourceAddressMessageUpdateInterceptor +brokercep.message-interceptors-specs.LogMessageUpdateInterceptor.className = eu.melodic.event.brokercep.broker.interceptor.LogMessageUpdateInterceptor brokercep.message-interceptors-specs.MessageForwarderInterceptor.className = eu.melodic.event.brokercep.broker.interceptor.MessageForwarderInterceptor # Message forward destinations (MessageForwarderInterceptor must be included in 'message-interceptors' property) @@ -357,6 +358,13 @@ brokercep.usage.memory.jvm-heap-percentage = 20 #brokercep.maxEventForwardRetries: -1 #brokercep.maxEventForwardDuration: -1 +# Event recorder settings +event-recorder.enabled=true +#event-recorder.format=JSON +event-recorder.file=${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.%S +#event-recorder.filterMode: ALL | REGISTERED (default) | ALLOWED +#event-recorder.allowed-destinations: + ################################################################################ ### EMS - Baguette Server properties ### diff --git a/event-management/config-files/ems-server.yml b/event-management/config-files/ems-server.yml index e878defa83550afce638c3dcdab0e85982f2ed62..209f25392160946cb16f9cb7d6d97bbad531fba9 100644 --- a/event-management/config-files/ems-server.yml +++ b/event-management/config-files/ems-server.yml @@ -343,11 +343,14 @@ brokercep: className: 'eu.melodic.event.brokercep.broker.interceptor.SequentialCompositeInterceptor' params: - '#SourceAddressMessageUpdateInterceptor' + - '#LogMessageUpdateInterceptor' - '#MessageForwarderInterceptor' message-interceptors-specs: SourceAddressMessageUpdateInterceptor: className: eu.melodic.event.brokercep.broker.interceptor.SourceAddressMessageUpdateInterceptor + LogMessageUpdateInterceptor: + className: eu.melodic.event.brokercep.broker.interceptor.LogMessageUpdateInterceptor MessageForwarderInterceptor: className: eu.melodic.event.brokercep.broker.interceptor.MessageForwarderInterceptor @@ -373,6 +376,14 @@ brokercep: #maxEventForwardRetries: -1 #maxEventForwardDuration: -1 + # Event recorder settings + event-recorder: + enabled: true + #format: JSON + file: ${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.%S + #filter-mode: ALL | REGISTERED (default) | ALLOWED + #allowed-destinations: + ################################################################################ ### EMS - Baguette Server properties ###