From d6e970f1df6c73a37394a3ecba1c31894b2d330c Mon Sep 17 00:00:00 2001 From: ipatini Date: Fri, 26 May 2023 14:18:18 +0300 Subject: [PATCH 1/7] EMS: bin: Fixed the 'run.sh' script to forward TERM and INT signals to the EMS java process --- event-management/bin/run.sh | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/event-management/bin/run.sh b/event-management/bin/run.sh index 143c0d2ee..db892e383 100755 --- a/event-management/bin/run.sh +++ b/event-management/bin/run.sh @@ -67,6 +67,9 @@ if [[ -z ${EMS_SKIP_WAIT_CDO+x} ]] && [[ -f $MELODIC_CONFIG_DIR/wait-for-cdo.sh $MELODIC_CONFIG_DIR/wait-for-cdo.sh fi +# Setup TERM & INT signal handler +trap 'echo "Signaling EMS to exit"; kill -TERM "${emsPid}"; wait "${emsPid}"; ' SIGTERM SIGINT + # Run EMS server # Uncomment next line to set JAVA runtime options #JAVA_OPTS=-Djavax.net.debug=all @@ -83,7 +86,10 @@ while :; do # java $JAVA_OPTS -Djasypt.encryptor.password=$JASYPT_PASSWORD -Duser.timezone=Europe/Athens -Djava.security.egd=file:/dev/urandom -jar $JARS_DIR/control-service/target/control-service.jar "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:$LOG_CONFIG_FILE" # Use when Esper is NOT packaged in control-service.jar - java $EMS_DEBUG_OPTS $JAVA_OPTS -Djasypt.encryptor.password=$JASYPT_PASSWORD -Djava.security.egd=file:/dev/urandom -cp ${JARS_DIR}/control-service.jar -Dloader.path=${JARS_DIR}/esper-7.1.0.jar org.springframework.boot.loader.PropertiesLauncher "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:$LOG_CONFIG_FILE" $* + java $EMS_DEBUG_OPTS $JAVA_OPTS -Djasypt.encryptor.password=$JASYPT_PASSWORD -Djava.security.egd=file:/dev/urandom -cp ${JARS_DIR}/control-service.jar -Dloader.path=${JARS_DIR}/esper-7.1.0.jar org.springframework.boot.loader.PropertiesLauncher "--spring.config.location=${EMS_CONFIG_LOCATION}" "--logging.config=file:$LOG_CONFIG_FILE" $* & + emsPid=$! + echo "EMS Pid: $emsPid" + wait $emsPid retCode=$? if [[ $retCode -eq $RESTART_EXIT_CODE ]]; then echo "Restarting EMS server..."; else break; fi @@ -94,4 +100,5 @@ echo "EMS server exited" # e.g. --spring.config.location=$MELODIC_CONFIG_DIR # e.g. --spring.config.name=application.properties -cd $PREVWORKDIR \ No newline at end of file +cd $PREVWORKDIR +exit $retCode \ No newline at end of file -- GitLab From 234f6109bddc8260b7751e83aff25abc19f30bb5 Mon Sep 17 00:00:00 2001 From: ipatini Date: Fri, 26 May 2023 15:38:56 +0300 Subject: [PATCH 2/7] EMS: Broker-CEP: Added check for app. shutdown to avoid re-initializing connections during exit (and avoid the related exceptions) --- .../event/brokercep/BrokerCepConsumer.java | 20 ++++++++++++++++--- .../broker/BrokerAdvisoryWatcher.java | 17 +++++++++++++--- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java index bec9fccf1..87f7b4c91 100644 --- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java @@ -21,6 +21,9 @@ import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.scheduling.TaskScheduler; import org.springframework.stereotype.Service; import javax.jms.*; @@ -33,7 +36,7 @@ import java.util.concurrent.atomic.AtomicLong; @Slf4j @Service @RequiredArgsConstructor -public class BrokerCepConsumer implements MessageListener, InitializingBean { +public class BrokerCepConsumer implements MessageListener, InitializingBean, ApplicationListener { private final static AtomicLong eventCounter = new AtomicLong(0); private final static AtomicLong textEventCounter = new AtomicLong(0); private final static AtomicLong objectEventCounter = new AtomicLong(0); @@ -49,6 +52,9 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean { private Session session; private final Map addedDestinations = new HashMap<>(); + private final TaskScheduler scheduler; + private boolean shuttingDown; + @Override public void afterPropertiesSet() { initialize(); @@ -71,8 +77,10 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean { ? connectionFactory.createConnection(brokerConfig.getBrokerLocalAdminUsername(), brokerConfig.getBrokerLocalAdminPassword()) : connectionFactory.createConnection(); connection.setExceptionListener(e -> { - log.warn("BrokerCepConsumer: Connection exception listener: Exception caught: ", e); - initialize(); + if (!shuttingDown) { + log.warn("BrokerCepConsumer: Connection exception listener: Exception caught: ", e); + scheduler.schedule(this::initialize, Instant.now()); + } }); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -82,6 +90,12 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean { } } + @Override + public void onApplicationEvent(ContextClosedEvent event) { + log.info("BrokerCepConsumer is shutting down"); + shuttingDown = true; + } + private void closeConnection() { // close previous session and connection try { diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerAdvisoryWatcher.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerAdvisoryWatcher.java index db3063ff9..63d0283b3 100644 --- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerAdvisoryWatcher.java +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerAdvisoryWatcher.java @@ -21,6 +21,8 @@ import org.apache.activemq.command.DataStructure; import org.apache.activemq.command.DestinationInfo; import org.springframework.beans.factory.InitializingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextClosedEvent; import org.springframework.scheduling.TaskScheduler; import org.springframework.stereotype.Service; @@ -31,7 +33,7 @@ import java.time.Instant; @Service @ConditionalOnProperty(name="brokercep.enable-advisory-watcher", matchIfMissing = true) @RequiredArgsConstructor -public class BrokerAdvisoryWatcher implements MessageListener, InitializingBean { +public class BrokerAdvisoryWatcher implements MessageListener, InitializingBean, ApplicationListener { private final BrokerService brokerService; // Added in order to ensure that BrokerService will be instantiated first private final BrokerConfig brokerConfig; private final BrokerCepService brokerCepService; @@ -44,6 +46,7 @@ public class BrokerAdvisoryWatcher implements MessageListener, InitializingBean private Connection connection; private Session session; + private boolean shuttingDown; @Override public void afterPropertiesSet() { @@ -74,8 +77,10 @@ public class BrokerAdvisoryWatcher implements MessageListener, InitializingBean ? connectionFactory.createConnection(username, password) : connectionFactory.createConnection(); connection.setExceptionListener(e -> { - log.warn("BrokerAdvisoryWatcher: Connection exception listener: Exception caught: ", e); - initialize(); + if (!shuttingDown) { + log.warn("BrokerAdvisoryWatcher: Connection exception listener: Exception caught: ", e); + initialize(); + } }); this.connection.start(); @@ -93,6 +98,12 @@ public class BrokerAdvisoryWatcher implements MessageListener, InitializingBean } } + @Override + public void onApplicationEvent(ContextClosedEvent event) { + log.info("BrokerAdvisoryWatcher is shutting down"); + shuttingDown = true; + } + private void closeConnection() { // close previous session and connection try { -- GitLab From 0cc69a80b7d92b255798de079f4268593b1e1a84 Mon Sep 17 00:00:00 2001 From: ipatini Date: Fri, 26 May 2023 16:02:06 +0300 Subject: [PATCH 3/7] EMS: Broker-CEP: Added EventRecorder class and wired it to BrokerCepConsumer. Extended BrokerCepProperties and ems-config.* files with EventRecorder settings. --- event-management/broker-cep/pom.xml | 6 + .../event/brokercep/BrokerCepConsumer.java | 19 ++ .../event/brokercep/event/EventRecorder.java | 238 ++++++++++++++++++ .../properties/BrokerCepProperties.java | 10 + .../config-files/ems-server.properties.sample | 5 + event-management/config-files/ems-server.yml | 6 + 6 files changed, 284 insertions(+) create mode 100644 event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java diff --git a/event-management/broker-cep/pom.xml b/event-management/broker-cep/pom.xml index 1deeeca75..191f87417 100644 --- a/event-management/broker-cep/pom.xml +++ b/event-management/broker-cep/pom.xml @@ -88,6 +88,12 @@ 3.8.1 + + + org.apache.commons + commons-csv + 1.7 + diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java index 87f7b4c91..aa464e868 100644 --- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java @@ -12,7 +12,9 @@ package eu.melodic.event.brokercep; import eu.melodic.event.brokercep.broker.BrokerConfig; import eu.melodic.event.brokercep.cep.CepService; import eu.melodic.event.brokercep.event.EventMap; +import eu.melodic.event.brokercep.event.EventRecorder; import eu.melodic.event.brokercep.properties.BrokerCepProperties; +import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.activemq.broker.BrokerService; @@ -27,6 +29,7 @@ import org.springframework.scheduling.TaskScheduler; import org.springframework.stereotype.Service; import javax.jms.*; +import java.time.Instant; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; @@ -52,6 +55,7 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean, App private Session session; private final Map addedDestinations = new HashMap<>(); + private EventRecorder eventRecorder; private final TaskScheduler scheduler; private boolean shuttingDown; @@ -69,6 +73,19 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean, App // clear added destinations list addedDestinations.clear(); + // clear previous event recorder (if any) + if (eventRecorder!=null && !eventRecorder.isClosed()) + eventRecorder.close(); + + // create new event recorder + if (properties.getEventRecorder()!=null) { + if (!shuttingDown && properties.getEventRecorder().isEnabled()) { + @NonNull String recordFile = properties.getEventRecorder().getFile().replace("%T", "" + System.currentTimeMillis()); + eventRecorder = new EventRecorder(properties.getEventRecorder().getFormat(), recordFile, scheduler); + eventRecorder.startRecording(); + } + } + // If an alternative Broker URL is provided for consumer, it will be used ConnectionFactory connectionFactory = brokerConfig.getConnectionFactoryForConsumer(); @@ -175,6 +192,8 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean, App public void onMessage(Message message) { // Log message logMessage(message); + // Record message + if (eventRecorder!=null) eventRecorder.recordEvent(message); // Handle message try { diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java new file mode 100644 index 000000000..3cc9d3130 --- /dev/null +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java @@ -0,0 +1,238 @@ +/* + * Copyright (C) 2017-2023 Institute of Communication and Computer Systems (imu.iccs.gr) + * + * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless + * Esper library is used, in which case it is subject to the terms of General Public License v2.0. + * If a copy of the MPL was not distributed with this file, you can obtain one at + * https://www.mozilla.org/en-US/MPL/2.0/ + */ + +package eu.melodic.event.brokercep.event; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.springframework.scheduling.TaskScheduler; + +import javax.jms.*; +import javax.jms.Queue; +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Serializable; +import java.lang.IllegalStateException; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ScheduledFuture; + +@Slf4j +public class EventRecorder extends LinkedHashMap implements Runnable { + public enum FORMAT { JSON, CSV } + + private final static Object staticLock = new Object(); + public static Set activeEventRecorders; + + @Getter + private final FORMAT recordFormat; + @Getter + private final String recordFile; + @Getter + private final Deque eventQueue; + @Getter + private boolean closed; + private boolean recording; + private final BufferedWriter recordWriter; + private CSVPrinter csvPrinter; + private JsonGenerator jsonGenerator; + + private final TaskScheduler scheduler; + private final ScheduledFuture runnerFuture; + + public EventRecorder(@NonNull FORMAT recordFormat, @NonNull String recordFile, @NonNull TaskScheduler scheduler) throws IOException { + log.info("EventRecorder: Record format: {}", recordFormat); + log.info("EventRecorder: Record file: {}", recordFile); + this.recordWriter = new BufferedWriter(new FileWriter(recordFile)); + + if (recordFormat==FORMAT.CSV) { + csvPrinter = new CSVPrinter(recordWriter, CSVFormat.DEFAULT + .withHeader("Timestamp", "Destination", "Mime", "Type", "Contents", "Properties")); + csvPrinter.flush(); + } + if (recordFormat==FORMAT.JSON) { + jsonGenerator = new JsonFactory() + .createGenerator(recordWriter) + .setPrettyPrinter(new DefaultPrettyPrinter()); + jsonGenerator.writeStartArray(); + jsonGenerator.flush(); + } + this.recordFormat = recordFormat; + this.recordFile = recordFile; + this.scheduler = scheduler; + this.eventQueue = new ConcurrentLinkedDeque<>(); + + runnerFuture = scheduler.scheduleAtFixedRate(this, Duration.ofMillis(1000)); + + registerShutdownHook(); + activeEventRecorders.add(this); + } + + public static void registerShutdownHook() { + if (activeEventRecorders==null) { + synchronized (staticLock) { + if (activeEventRecorders==null) { + activeEventRecorders = new HashSet<>(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("EventRecorder: closing active recorders: {}", activeEventRecorders.size()); + for (EventRecorder eventRecorder : activeEventRecorders) { + if (!eventRecorder.isClosed()) + eventRecorder.close(); + } + log.info("EventRecorder: Closed active recorders"); + })); + } + } + } + } + + public synchronized void close() { + if (closed) throw new IllegalStateException("EventRecorder has been closed"); + if (recording) stopRecording(); + this.closed = true; + runnerFuture.cancel(false); + activeEventRecorders.remove(this); + + // wait until all records are written in the file + while (!eventQueue.isEmpty()) { + run(); + } + + // close record file + try { + if (recordFormat == FORMAT.CSV) { + csvPrinter.close(true); + } + if (recordFormat == FORMAT.JSON) { + jsonGenerator.writeEndArray(); + jsonGenerator.close(); + } + recordWriter.close(); + } catch (Exception ex) { + log.warn("EventRecorder: Exception while closing: ", ex); + } + } + + public void startRecording() { + if (closed) throw new IllegalStateException("EventRecorder has been closed"); + if (!recording) { + log.info("EventRecorder: Start recording..."); + recording = true; + } + } + + public void stopRecording() { + if (closed) throw new IllegalStateException("EventRecorder has been closed"); + if (recording) { + log.info("EventRecorder: Stop recording..."); + recording = false; + } + } + + public void recordEvent(@NonNull Message message) { + eventQueue.addLast(message); + } + + public void run() { + if (!closed) { + while (!eventQueue.isEmpty()) { + try { + processEvent(eventQueue.removeLast()); + } catch (Exception ex) { + log.warn("EventRecorder: Exception while processing event queue: ", ex); + } + } + } + } + + protected void processEvent(Message message) throws IOException, JMSException { + String messageId = message.getJMSMessageID(); + long timestamp = message.getJMSTimestamp(); + String destinationName = getDestinationName(message); + String mime = message.getJMSType(); + + // Extract event payload and type + PayloadAndType payloadAndType = extractPayloadAndType(message); + String content = payloadAndType.payload; + String type = payloadAndType.type; + + // Extract event properties + String properties = extractProperties(message); + + if (recordFormat==FORMAT.CSV) { + csvPrinter.printRecord(timestamp, destinationName, mime, type, content, properties); + csvPrinter.flush(); + } + if (recordFormat==FORMAT.JSON) { + jsonGenerator.writeStartObject(); + jsonGenerator.writeStringField("id", messageId); + jsonGenerator.writeNumberField("timestamp", timestamp); + jsonGenerator.writeStringField("destination", destinationName); + jsonGenerator.writeStringField("mime", mime); + jsonGenerator.writeStringField("type", type); + jsonGenerator.writeStringField("content", content); + jsonGenerator.writeStringField("properties", properties); + jsonGenerator.writeEndObject(); + jsonGenerator.flush(); + } + } + + protected String getDestinationName(Message message) throws JMSException { + Destination d = message.getJMSDestination(); + if (d instanceof Topic) { + return ((Topic)d).getTopicName(); + } else + if (d instanceof Queue) { + return ((Queue)d).getQueueName(); + } else + throw new IllegalArgumentException("Argument is not a JMS destination: "+d); + } + + protected PayloadAndType extractPayloadAndType(Message message) throws JMSException { + if (message instanceof TextMessage) { + return new PayloadAndType("TEXT", ((TextMessage)message).getText()); + } else + if (message instanceof ObjectMessage) { + Serializable o = ((ObjectMessage) message).getObject(); + return new PayloadAndType("OBJECT", o==null ? null : o.toString()); + } else + throw new IllegalArgumentException("Unsupported message type: "+message.getClass().getName()); + } + + protected String extractProperties(Message message) throws JMSException { + Enumeration en = message.getPropertyNames(); + StringBuilder properties = new StringBuilder("{"); + boolean first = true; + while (en.hasMoreElements()) { + Object k = en.nextElement(); + if (k!=null) { + String v = message.getStringProperty(k.toString()); + if (first) first = false; else properties.append(", "); + properties.append(k).append("=").append(v); + } + } + properties.append(" }"); + return properties.toString(); + } + + @AllArgsConstructor + class PayloadAndType { + public String type; + public String payload; + } +} \ No newline at end of file diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java index 296e9af7c..b1bca1228 100644 --- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java @@ -9,6 +9,7 @@ package eu.melodic.event.brokercep.properties; +import eu.melodic.event.brokercep.event.EventRecorder; import eu.melodic.event.util.EmsConstant; import eu.melodic.event.util.KeystoreAndCertificateProperties; import lombok.Data; @@ -80,6 +81,8 @@ public class BrokerCepProperties implements InitializingBean { private boolean logBrokerMessages = true; private boolean logBrokerMessagesFull = false; + private EventRecorderProperties eventRecorder = new EventRecorderProperties(); + @Data public static class Usage { private Memory memory = new Memory(); @@ -107,4 +110,11 @@ public class BrokerCepProperties implements InitializingBean { @ToString.Exclude private String password; } + + @Data + public static class EventRecorderProperties { + private boolean enabled; + private EventRecorder.FORMAT format = EventRecorder.FORMAT.CSV; + private String file; + } } diff --git a/event-management/config-files/ems-server.properties.sample b/event-management/config-files/ems-server.properties.sample index f0dd7cee1..969124914 100644 --- a/event-management/config-files/ems-server.properties.sample +++ b/event-management/config-files/ems-server.properties.sample @@ -357,6 +357,11 @@ brokercep.usage.memory.jvm-heap-percentage = 20 #brokercep.maxEventForwardRetries: -1 #brokercep.maxEventForwardDuration: -1 +# Event recorder settings +event-recorder.enabled=true +event-recorder.format=JSON +event-recorder.file=${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.json + ################################################################################ ### EMS - Baguette Server properties ### diff --git a/event-management/config-files/ems-server.yml b/event-management/config-files/ems-server.yml index e878defa8..5dd02a3d3 100644 --- a/event-management/config-files/ems-server.yml +++ b/event-management/config-files/ems-server.yml @@ -373,6 +373,12 @@ brokercep: #maxEventForwardRetries: -1 #maxEventForwardDuration: -1 + # Event recorder settings + event-recorder: + enabled: true + format: JSON + file: ${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.json + ################################################################################ ### EMS - Baguette Server properties ### -- GitLab From 916043e71552f8af7f9c967b0b0973cbefbb825b Mon Sep 17 00:00:00 2001 From: ipatini Date: Sat, 27 May 2023 19:35:09 +0300 Subject: [PATCH 4/7] EMS: Broker-CEP: Improved EventRecorder: Moved initialization to BrokerConfig. Added destination filtering modes (ALL, REGISTERED (default), CUSTOM) and allowed destinations (used with CUSTOM mode). Updated EMS configuration. --- .../event/brokercep/BrokerCepConsumer.java | 21 +--- .../event/brokercep/broker/BrokerConfig.java | 23 ++++ .../LogMessageUpdateInterceptor.java | 42 ++++++++ .../event/brokercep/event/EventRecorder.java | 101 +++++++++++++----- .../properties/BrokerCepProperties.java | 4 + .../config-files/ems-server.properties.sample | 5 +- event-management/config-files/ems-server.yml | 5 + 7 files changed, 156 insertions(+), 45 deletions(-) create mode 100644 event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/interceptor/LogMessageUpdateInterceptor.java diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java index aa464e868..dbb54cd74 100644 --- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepConsumer.java @@ -12,9 +12,7 @@ package eu.melodic.event.brokercep; import eu.melodic.event.brokercep.broker.BrokerConfig; import eu.melodic.event.brokercep.cep.CepService; import eu.melodic.event.brokercep.event.EventMap; -import eu.melodic.event.brokercep.event.EventRecorder; import eu.melodic.event.brokercep.properties.BrokerCepProperties; -import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.activemq.broker.BrokerService; @@ -55,7 +53,6 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean, App private Session session; private final Map addedDestinations = new HashMap<>(); - private EventRecorder eventRecorder; private final TaskScheduler scheduler; private boolean shuttingDown; @@ -73,19 +70,6 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean, App // clear added destinations list addedDestinations.clear(); - // clear previous event recorder (if any) - if (eventRecorder!=null && !eventRecorder.isClosed()) - eventRecorder.close(); - - // create new event recorder - if (properties.getEventRecorder()!=null) { - if (!shuttingDown && properties.getEventRecorder().isEnabled()) { - @NonNull String recordFile = properties.getEventRecorder().getFile().replace("%T", "" + System.currentTimeMillis()); - eventRecorder = new EventRecorder(properties.getEventRecorder().getFormat(), recordFile, scheduler); - eventRecorder.startRecording(); - } - } - // If an alternative Broker URL is provided for consumer, it will be used ConnectionFactory connectionFactory = brokerConfig.getConnectionFactoryForConsumer(); @@ -192,8 +176,11 @@ public class BrokerCepConsumer implements MessageListener, InitializingBean, App public void onMessage(Message message) { // Log message logMessage(message); + // Record message - if (eventRecorder!=null) eventRecorder.recordEvent(message); + + if (brokerConfig.getEventRecorder()!=null) + brokerConfig.getEventRecorder().recordRegisteredEvent(message); // Handle message try { diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerConfig.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerConfig.java index e6b9b1d26..9813fe21c 100644 --- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerConfig.java +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/BrokerConfig.java @@ -10,9 +10,11 @@ package eu.melodic.event.brokercep.broker; import eu.melodic.event.brokercep.broker.interceptor.AbstractMessageInterceptor; +import eu.melodic.event.brokercep.event.EventRecorder; import eu.melodic.event.brokercep.properties.BrokerCepProperties; import eu.melodic.event.util.KeystoreUtil; import eu.melodic.event.util.PasswordUtil; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.activemq.ActiveMQConnectionFactory; @@ -35,6 +37,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.FileSystemResource; import org.springframework.jms.annotation.EnableJms; +import org.springframework.scheduling.TaskScheduler; import org.springframework.stereotype.Service; import javax.jms.ConnectionFactory; @@ -42,6 +45,7 @@ import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; +import java.io.IOException; import java.security.KeyStore; import java.util.*; import java.util.stream.Collectors; @@ -80,9 +84,14 @@ public class BrokerConfig implements InitializingBean { private final HashMap connectionFactoryCache = new HashMap<>(); + private final TaskScheduler scheduler; + @Getter + private EventRecorder eventRecorder; + @Override public void afterPropertiesSet() throws Exception { _initializeSecurity(); + _initializeEventRecorder(); } protected synchronized void _initializeSecurity() throws Exception { @@ -174,6 +183,20 @@ public class BrokerConfig implements InitializingBean { log.info("BrokerConfig.initializeKeyAndCert(): Initializing keystore, truststore and certificate for Broker-SSL... done"); } + private void _initializeEventRecorder() throws IOException { + // clear previous event recorder (if any) + if (eventRecorder!=null && !eventRecorder.isClosed()) + eventRecorder.close(); + + // create new event recorder + if (properties.getEventRecorder()!=null) { + if (properties.getEventRecorder().isEnabled()) { + eventRecorder = new EventRecorder(properties.getEventRecorder(), scheduler); + eventRecorder.startRecording(); + } + } + } + public String getBrokerName() { log.trace("BrokerConfig.getBrokerName(): broker-name: {}", properties.getBrokerName()); return properties.getBrokerName(); diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/interceptor/LogMessageUpdateInterceptor.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/interceptor/LogMessageUpdateInterceptor.java new file mode 100644 index 000000000..6d8a79606 --- /dev/null +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/broker/interceptor/LogMessageUpdateInterceptor.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2017-2023 Institute of Communication and Computer Systems (imu.iccs.gr) + * + * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless + * Esper library is used, in which case it is subject to the terms of General Public License v2.0. + * If a copy of the MPL was not distributed with this file, you can obtain one at + * https://www.mozilla.org/en-US/MPL/2.0/ + */ + +package eu.melodic.event.brokercep.broker.interceptor; + +import eu.melodic.event.brokercep.broker.BrokerConfig; +import eu.melodic.event.brokercep.event.EventRecorder; +import lombok.extern.slf4j.Slf4j; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.Message; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; + +@Slf4j +@Lazy +@Component +public class LogMessageUpdateInterceptor extends AbstractMessageInterceptor { + private EventRecorder eventRecorder; + + @Override + public void initialized() { + this.eventRecorder = applicationContext.getBean(BrokerConfig.class).getEventRecorder(); + log.debug("LogMessageUpdateInterceptor: Enabled: {}", eventRecorder!=null); + eventRecorder.startRecording(); + } + + @Override + public void intercept(Message message) { + try { + if (eventRecorder!=null && message instanceof ActiveMQMessage) + eventRecorder.recordEvent((ActiveMQMessage)message); + } catch (Exception e) { + log.error("LogMessageUpdateInterceptor: EXCEPTION: ", e); + } + } +} diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java index 3cc9d3130..c51048957 100644 --- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java @@ -12,10 +12,12 @@ package eu.melodic.event.brokercep.event; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; +import eu.melodic.event.brokercep.properties.BrokerCepProperties; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; import org.springframework.scheduling.TaskScheduler; @@ -42,45 +44,41 @@ public class EventRecorder extends LinkedHashMap implements Runn @Getter private final FORMAT recordFormat; @Getter - private final String recordFile; + private final String recordFilePattern; @Getter - private final Deque eventQueue; + private final BrokerCepProperties.EVENT_RECORDER_FILTER_MODE filterMode; + @Getter + private final List allowedDestinations; + + @Getter + private String recordFile; @Getter private boolean closed; + @Getter private boolean recording; - private final BufferedWriter recordWriter; + + private BufferedWriter recordWriter; private CSVPrinter csvPrinter; private JsonGenerator jsonGenerator; + private final Deque eventQueue; private final TaskScheduler scheduler; - private final ScheduledFuture runnerFuture; + private ScheduledFuture runnerFuture; - public EventRecorder(@NonNull FORMAT recordFormat, @NonNull String recordFile, @NonNull TaskScheduler scheduler) throws IOException { - log.info("EventRecorder: Record format: {}", recordFormat); - log.info("EventRecorder: Record file: {}", recordFile); - this.recordWriter = new BufferedWriter(new FileWriter(recordFile)); + public EventRecorder(@NonNull BrokerCepProperties.EventRecorderProperties properties, @NonNull TaskScheduler scheduler) throws IOException { + this(properties.getFormat(), properties.getFile(), properties.getFilterMode(), properties.getAllowedDestinations(), scheduler); + } - if (recordFormat==FORMAT.CSV) { - csvPrinter = new CSVPrinter(recordWriter, CSVFormat.DEFAULT - .withHeader("Timestamp", "Destination", "Mime", "Type", "Contents", "Properties")); - csvPrinter.flush(); - } - if (recordFormat==FORMAT.JSON) { - jsonGenerator = new JsonFactory() - .createGenerator(recordWriter) - .setPrettyPrinter(new DefaultPrettyPrinter()); - jsonGenerator.writeStartArray(); - jsonGenerator.flush(); - } + public EventRecorder(@NonNull FORMAT recordFormat, @NonNull String recordFilePattern, BrokerCepProperties.EVENT_RECORDER_FILTER_MODE filterMode, List allowedDestinations, @NonNull TaskScheduler scheduler) throws IOException { this.recordFormat = recordFormat; - this.recordFile = recordFile; + this.recordFilePattern = recordFilePattern; + this.filterMode = filterMode; + this.allowedDestinations = allowedDestinations==null ? Collections.emptyList() : Collections.unmodifiableList(allowedDestinations); this.scheduler = scheduler; this.eventQueue = new ConcurrentLinkedDeque<>(); - runnerFuture = scheduler.scheduleAtFixedRate(this, Duration.ofMillis(1000)); - registerShutdownHook(); - activeEventRecorders.add(this); + rotate(); } public static void registerShutdownHook() { @@ -101,8 +99,42 @@ public class EventRecorder extends LinkedHashMap implements Runn } } + public synchronized void rotate() throws IOException { + // Close current recording file + if (recordFile!=null && !isClosed()) { + close(); + } + closed = false; + + // Create new recording file + this.recordFile = recordFilePattern.replace("%T", "" + System.currentTimeMillis()); + this.recordWriter = new BufferedWriter(new FileWriter(recordFile)); + + log.info("EventRecorder: Record format: {}", recordFormat); + log.info("EventRecorder: Record file: {}", recordFile); + + if (recordFormat==FORMAT.CSV) { + csvPrinter = new CSVPrinter(recordWriter, CSVFormat.DEFAULT + .withHeader("Timestamp", "Destination", "Mime", "Type", "Contents", "Properties")); + csvPrinter.flush(); + } + if (recordFormat==FORMAT.JSON) { + jsonGenerator = new JsonFactory() + .createGenerator(recordWriter) + .setPrettyPrinter(new DefaultPrettyPrinter()); + jsonGenerator.writeStartArray(); + jsonGenerator.flush(); + } + + // Start processing loop + runnerFuture = scheduler.scheduleAtFixedRate(this, Duration.ofMillis(1000)); + activeEventRecorders.add(this); + + startRecording(); + } + public synchronized void close() { - if (closed) throw new IllegalStateException("EventRecorder has been closed"); + if (closed) throw new IllegalStateException("EventRecorder has already been closed"); if (recording) stopRecording(); this.closed = true; runnerFuture.cancel(false); @@ -144,8 +176,23 @@ public class EventRecorder extends LinkedHashMap implements Runn } } - public void recordEvent(@NonNull Message message) { - eventQueue.addLast(message); + public void recordEvent(@NonNull ActiveMQMessage message) throws JMSException { + recordAllowedEvent(message); + } + + public void recordAllowedEvent(@NonNull Message message) throws JMSException { + if (filterMode == BrokerCepProperties.EVENT_RECORDER_FILTER_MODE.ALL + || filterMode == BrokerCepProperties.EVENT_RECORDER_FILTER_MODE.CUSTOM + && allowedDestinations.stream().anyMatch(getDestinationName(message)::equalsIgnoreCase)) + { + eventQueue.addLast(message); + } + } + + public void recordRegisteredEvent(@NonNull Message message) { + if (filterMode==BrokerCepProperties.EVENT_RECORDER_FILTER_MODE.REGISTERED) { + eventQueue.addLast(message); + } } public void run() { diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java index b1bca1228..0aede4bf0 100644 --- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java @@ -111,10 +111,14 @@ public class BrokerCepProperties implements InitializingBean { private String password; } + public enum EVENT_RECORDER_FILTER_MODE { ALL, REGISTERED, CUSTOM } + @Data public static class EventRecorderProperties { private boolean enabled; private EventRecorder.FORMAT format = EventRecorder.FORMAT.CSV; private String file; + private EVENT_RECORDER_FILTER_MODE filterMode = EVENT_RECORDER_FILTER_MODE.REGISTERED; + private List allowedDestinations; } } diff --git a/event-management/config-files/ems-server.properties.sample b/event-management/config-files/ems-server.properties.sample index 969124914..10d8378d1 100644 --- a/event-management/config-files/ems-server.properties.sample +++ b/event-management/config-files/ems-server.properties.sample @@ -333,9 +333,10 @@ brokercep.broker-populate-jmsx-user-id = true # Message interceptors brokercep.message-interceptors[0].destination = > brokercep.message-interceptors[0].className = eu.melodic.event.brokercep.broker.interceptor.SequentialCompositeInterceptor -brokercep.message-interceptors[0].params = #SourceAddressMessageUpdateInterceptor, #MessageForwarderInterceptor +brokercep.message-interceptors[0].params = #SourceAddressMessageUpdateInterceptor, #LogMessageUpdateInterceptor, #MessageForwarderInterceptor brokercep.message-interceptors-specs.SourceAddressMessageUpdateInterceptor.className = eu.melodic.event.brokercep.broker.interceptor.SourceAddressMessageUpdateInterceptor +brokercep.message-interceptors-specs.LogMessageUpdateInterceptor.className = eu.melodic.event.brokercep.broker.interceptor.LogMessageUpdateInterceptor brokercep.message-interceptors-specs.MessageForwarderInterceptor.className = eu.melodic.event.brokercep.broker.interceptor.MessageForwarderInterceptor # Message forward destinations (MessageForwarderInterceptor must be included in 'message-interceptors' property) @@ -361,6 +362,8 @@ brokercep.usage.memory.jvm-heap-percentage = 20 event-recorder.enabled=true event-recorder.format=JSON event-recorder.file=${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.json +# event-recorder.filterMode: ALL | REGISTERED (default) | CUSTOM +# event-recorder.allowed-destinations: ################################################################################ diff --git a/event-management/config-files/ems-server.yml b/event-management/config-files/ems-server.yml index 5dd02a3d3..5e63abf35 100644 --- a/event-management/config-files/ems-server.yml +++ b/event-management/config-files/ems-server.yml @@ -343,11 +343,14 @@ brokercep: className: 'eu.melodic.event.brokercep.broker.interceptor.SequentialCompositeInterceptor' params: - '#SourceAddressMessageUpdateInterceptor' + - '#LogMessageUpdateInterceptor' - '#MessageForwarderInterceptor' message-interceptors-specs: SourceAddressMessageUpdateInterceptor: className: eu.melodic.event.brokercep.broker.interceptor.SourceAddressMessageUpdateInterceptor + LogMessageUpdateInterceptor: + className: eu.melodic.event.brokercep.broker.interceptor.LogMessageUpdateInterceptor MessageForwarderInterceptor: className: eu.melodic.event.brokercep.broker.interceptor.MessageForwarderInterceptor @@ -378,6 +381,8 @@ brokercep: enabled: true format: JSON file: ${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.json +# filter-mode: ALL | REGISTERED (default) | CUSTOM +# allowed-destinations: ################################################################################ -- GitLab From 1819987c18aab8e2e04d31184341e833bb12e428 Mon Sep 17 00:00:00 2001 From: ipatini Date: Mon, 29 May 2023 07:44:21 +0300 Subject: [PATCH 5/7] EMS: Broker-CEP: Updated ems-config.yml and set event recorder format to CSV. Renamed event recorder filter mode from CUSTOM to ALLOWED --- .../eu/melodic/event/brokercep/event/EventRecorder.java | 2 +- .../event/brokercep/properties/BrokerCepProperties.java | 2 +- event-management/config-files/ems-server.properties.sample | 6 +++--- event-management/config-files/ems-server.yml | 6 +++--- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java index c51048957..9b157108e 100644 --- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java @@ -182,7 +182,7 @@ public class EventRecorder extends LinkedHashMap implements Runn public void recordAllowedEvent(@NonNull Message message) throws JMSException { if (filterMode == BrokerCepProperties.EVENT_RECORDER_FILTER_MODE.ALL - || filterMode == BrokerCepProperties.EVENT_RECORDER_FILTER_MODE.CUSTOM + || filterMode == BrokerCepProperties.EVENT_RECORDER_FILTER_MODE.ALLOWED && allowedDestinations.stream().anyMatch(getDestinationName(message)::equalsIgnoreCase)) { eventQueue.addLast(message); diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java index 0aede4bf0..bb1f4efdb 100644 --- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/properties/BrokerCepProperties.java @@ -111,7 +111,7 @@ public class BrokerCepProperties implements InitializingBean { private String password; } - public enum EVENT_RECORDER_FILTER_MODE { ALL, REGISTERED, CUSTOM } + public enum EVENT_RECORDER_FILTER_MODE { ALL, REGISTERED, ALLOWED } @Data public static class EventRecorderProperties { diff --git a/event-management/config-files/ems-server.properties.sample b/event-management/config-files/ems-server.properties.sample index 10d8378d1..9313cbeb6 100644 --- a/event-management/config-files/ems-server.properties.sample +++ b/event-management/config-files/ems-server.properties.sample @@ -360,10 +360,10 @@ brokercep.usage.memory.jvm-heap-percentage = 20 # Event recorder settings event-recorder.enabled=true -event-recorder.format=JSON +#event-recorder.format=JSON event-recorder.file=${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.json -# event-recorder.filterMode: ALL | REGISTERED (default) | CUSTOM -# event-recorder.allowed-destinations: +#event-recorder.filterMode: ALL | REGISTERED (default) | ALLOWED +#event-recorder.allowed-destinations: ################################################################################ diff --git a/event-management/config-files/ems-server.yml b/event-management/config-files/ems-server.yml index 5e63abf35..cf237fe01 100644 --- a/event-management/config-files/ems-server.yml +++ b/event-management/config-files/ems-server.yml @@ -379,10 +379,10 @@ brokercep: # Event recorder settings event-recorder: enabled: true - format: JSON + #format: JSON file: ${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.json -# filter-mode: ALL | REGISTERED (default) | CUSTOM -# allowed-destinations: + #filter-mode: ALL | REGISTERED (default) | ALLOWED + #allowed-destinations: ################################################################################ -- GitLab From a7e2e3a64e1d1d335daad0a70020e161dcdb1eb3 Mon Sep 17 00:00:00 2001 From: ipatini Date: Mon, 29 May 2023 07:59:01 +0300 Subject: [PATCH 6/7] EMS: Broker-CEP: Fixed event recorder file suffix --- event-management/config-files/ems-server.properties.sample | 2 +- event-management/config-files/ems-server.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/event-management/config-files/ems-server.properties.sample b/event-management/config-files/ems-server.properties.sample index 9313cbeb6..b279a1b94 100644 --- a/event-management/config-files/ems-server.properties.sample +++ b/event-management/config-files/ems-server.properties.sample @@ -361,7 +361,7 @@ brokercep.usage.memory.jvm-heap-percentage = 20 # Event recorder settings event-recorder.enabled=true #event-recorder.format=JSON -event-recorder.file=${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.json +event-recorder.file=${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.csv #event-recorder.filterMode: ALL | REGISTERED (default) | ALLOWED #event-recorder.allowed-destinations: diff --git a/event-management/config-files/ems-server.yml b/event-management/config-files/ems-server.yml index cf237fe01..ebb20a8f2 100644 --- a/event-management/config-files/ems-server.yml +++ b/event-management/config-files/ems-server.yml @@ -380,7 +380,7 @@ brokercep: event-recorder: enabled: true #format: JSON - file: ${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.json + file: ${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.csv #filter-mode: ALL | REGISTERED (default) | ALLOWED #allowed-destinations: -- GitLab From dd243ed39c8851c9718fc8751d0ba66c32211ab4 Mon Sep 17 00:00:00 2001 From: ipatini Date: Mon, 29 May 2023 08:23:22 +0300 Subject: [PATCH 7/7] EMS: Broker-CEP: Added %S placeholder in event recorder file pattern, that is resolved to the correct suffix based on selected format. Changed event recorder file suffix --- .../melodic/event/brokercep/event/EventRecorder.java | 10 +++++++++- .../config-files/ems-server.properties.sample | 2 +- event-management/config-files/ems-server.yml | 2 +- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java index 9b157108e..78e60693c 100644 --- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/event/EventRecorder.java @@ -107,7 +107,9 @@ public class EventRecorder extends LinkedHashMap implements Runn closed = false; // Create new recording file - this.recordFile = recordFilePattern.replace("%T", "" + System.currentTimeMillis()); + this.recordFile = recordFilePattern + .replace("%T", "" + System.currentTimeMillis()) + .replace("%S", getSuffix()); this.recordWriter = new BufferedWriter(new FileWriter(recordFile)); log.info("EventRecorder: Record format: {}", recordFormat); @@ -133,6 +135,12 @@ public class EventRecorder extends LinkedHashMap implements Runn startRecording(); } + private String getSuffix() { + if (recordFormat==FORMAT.JSON) return "json"; + if (recordFormat==FORMAT.CSV) return "csv"; + throw new IllegalStateException("No suffix for FORMAT: "+recordFormat); + } + public synchronized void close() { if (closed) throw new IllegalStateException("EventRecorder has already been closed"); if (recording) stopRecording(); diff --git a/event-management/config-files/ems-server.properties.sample b/event-management/config-files/ems-server.properties.sample index b279a1b94..290347a77 100644 --- a/event-management/config-files/ems-server.properties.sample +++ b/event-management/config-files/ems-server.properties.sample @@ -361,7 +361,7 @@ brokercep.usage.memory.jvm-heap-percentage = 20 # Event recorder settings event-recorder.enabled=true #event-recorder.format=JSON -event-recorder.file=${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.csv +event-recorder.file=${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.%S #event-recorder.filterMode: ALL | REGISTERED (default) | ALLOWED #event-recorder.allowed-destinations: diff --git a/event-management/config-files/ems-server.yml b/event-management/config-files/ems-server.yml index ebb20a8f2..209f25392 100644 --- a/event-management/config-files/ems-server.yml +++ b/event-management/config-files/ems-server.yml @@ -380,7 +380,7 @@ brokercep: event-recorder: enabled: true #format: JSON - file: ${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.csv + file: ${LOGS_DIR:${MELODIC_CONFIG_DIR}/../logs}/events-%T.%S #filter-mode: ALL | REGISTERED (default) | ALLOWED #allowed-destinations: -- GitLab