+@echo off
+:: Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr)
+:: This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+:: Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+:: If a copy of the MPL was not distributed with this file, you can obtain one at
+:: https://www.mozilla.org/en-US/MPL/2.0/
+if not exist target\dependency cmd /C "mvn dependency:copy-dependencies"
+set JAVA_OPTS= -Djavax.net.ssl.trustStore=..\config-files\broker-truststore.p12 ^
+ -Djavax.net.ssl.trustStorePassword=melodic ^
+ -Djavax.net.ssl.trustStoreType=pkcs12
+rem -Djavax.net.debug=all
+rem -Djavax.net.debug=ssl,handshake,record
+java %JAVA_OPTS% -classpath "target\classes;target\dependency\*" eu.melodic.event.brokerclient.BrokerClientApp %*
+# Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr)
+# This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+# Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+# If a copy of the MPL was not distributed with this file, you can obtain one at
+# https://www.mozilla.org/en-US/MPL/2.0/
+if [ ! -d "target/dependency" ]; then
+ mvn dependency:copy-dependencies
+JAVA_OPTS=-Djavax.net.ssl.trustStore=./broker-truststore.p12\ -Djavax.net.ssl.trustStorePassword=melodic\ -Djavax.net.ssl.trustStoreType=pkcs12
+# -Djavax.net.debug=all
+# -Djavax.net.debug=ssl,handshake,record
+java $JAVA_OPTS -classpath "target/classes:target/dependency/*" eu.melodic.event.brokerclient.BrokerClientApp $*
+# Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr)
+# This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+# Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+# If a copy of the MPL was not distributed with this file, you can obtain one at
+# https://www.mozilla.org/en-US/MPL/2.0/
+# Broker Client settings
+# Broker authentication
+ 4.0.0
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.4.4
+ amq-message-java-library
+ AMQ message Java library
+ gr.ntua.imu.morphemic
+ 1.0.0
+ org.springframework.boot
+ spring-boot-starter-activemq
+ org.apache.activemq
+ activemq-broker
+ org.projectlombok
+ lombok
+ provided
+ org.apache.commons
+ commons-lang3
+ 3.8.1
+ eu.7bulls
+ Melodic 7bulls repository
+ https://nexus.7bulls.eu:8443/repository/maven-snapshots/
+ eu.7bulls
+ Melodic 7bulls repository
+ https://nexus.7bulls.eu:8443/repository/maven-releases/
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.4
+ package
+ shade
+ * Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr)
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+ * Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+ * If a copy of the MPL was not distributed with this file, you can obtain one at
+ * https://www.mozilla.org/en-US/MPL/2.0/
+ */
+package eu.melodic.event.brokerclient;
+import eu.melodic.event.brokerclient.event.EventMap;
+import eu.melodic.event.brokerclient.properties.BrokerClientProperties;
+import java.io.Serializable;
+import java.util.*;
+import javax.jms.*;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSslConnectionFactory;
+import org.apache.activemq.advisory.DestinationSource;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+public class BrokerClient {
+ @Autowired
+ private BrokerClientProperties properties;
+ private Connection connection;
+ private Session session;
+ private HashMap listeners = new HashMap<>();
+ public BrokerClient() {
+ }
+ public BrokerClient(BrokerClientProperties bcp) {
+ properties = bcp;
+ }
+ public BrokerClient(Properties p) {
+ properties = new BrokerClientProperties(p);
+ }
+ // ------------------------------------------------------------------------
+ public static BrokerClient newClient(String broker_properties_configuration_file_location) throws java.io.IOException, JMSException {
+ log.info("BrokerClient: Initializing...");
+ /*
+ // get properties file
+ String configDir = System.getenv("MELODIC_CONFIG_DIR");
+ if (configDir == null || configDir.trim().isEmpty()) configDir = ".";
+ log.info("BrokerClient: config-dir: {}", configDir);
+ String configPropFile = configDir + "/" + "eu.melodic.event.brokerclient.properties";
+ log.info("BrokerClient: config-file: {}", configPropFile);
+ */
+ // load properties
+ Properties p = new Properties();
+ //ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ //try (java.io.InputStream in = loader.getClass().getResourceAsStream(configPropFile)) { p.load(in); }
+ try (java.io.InputStream in = new java.io.FileInputStream(broker_properties_configuration_file_location)) {
+ p.load(in);
+ }
+ log.info("BrokerClient: config-properties: {}", p);
+ // initialize broker client
+ BrokerClient client = new BrokerClient(p);
+ log.info("BrokerClient: Configuration:\n{}", client.properties);
+ return client;
+ }
+ public static BrokerClient newClient(String username, String password, String broker_properties_configuration_file_path) throws java.io.IOException, JMSException {
+ BrokerClient client = newClient(broker_properties_configuration_file_path);
+ if (username!=null && password!=null) {
+ client.getClientProperties().setBrokerUsername(username);
+ client.getClientProperties().setBrokerPassword(password);
+ }
+ return client;
+ }
+ public static BrokerClient newClient() {
+ return new BrokerClient();
+ }
+ // ------------------------------------------------------------------------
+ public BrokerClientProperties getClientProperties() {
+ checkProperties();
+ return properties;
+ }
+ protected void checkProperties() {
+ if (properties==null) {
+ //use defaults
+ properties = new BrokerClientProperties();
+ }
+ }
+ // ------------------------------------------------------------------------
+ public synchronized Set getDestinationNames(String connectionString) throws JMSException {
+ // open or reuse connection
+ checkProperties();
+ boolean _closeConn = false;
+ if (session==null) {
+ openConnection(connectionString);
+ _closeConn = ! properties.isPreserveConnection();
+ }
+ // Get destinations from Broker
+ log.info("BrokerClient.getDestinationNames(): Getting destinations: connection={}, username={}", connectionString, properties.getBrokerUsername());
+ ActiveMQConnection conn = (ActiveMQConnection)connection;
+ DestinationSource ds = conn.getDestinationSource();
+ Set queues = ds.getQueues();
+ Set topics = ds.getTopics();
+ Set tempQueues = ds.getTemporaryQueues();
+ Set tempTopics = ds.getTemporaryTopics();
+ log.info("BrokerClient.getDestinationNames(): Getting destinations: done");
+ // Get destination names
+ HashSet destinationNames = new HashSet<>();
+ for (ActiveMQQueue q : queues) destinationNames.add("QUEUE "+q.getQueueName());
+ for (ActiveMQTopic t : topics) destinationNames.add("TOPIC "+t.getTopicName());
+ for (ActiveMQTempQueue tq : tempQueues) destinationNames.add("Temp QUEUE "+tq.getQueueName());
+ for (ActiveMQTempTopic tt : tempTopics) destinationNames.add("Temp TOPIC "+tt.getTopicName());
+ // close connection
+ if (_closeConn) {
+ closeConnection();
+ }
+ return destinationNames;
+ }
+ // ------------------------------------------------------------------------
+ public synchronized void publishEvent(String connectionString, String destinationName, Map eventMap) throws JMSException {
+ _publishEvent(connectionString, destinationName, new EventMap(eventMap));
+ }
+ protected synchronized void _publishEvent(String connectionString, String destinationName, Serializable event) throws JMSException {
+ _publishEvent(connectionString,destinationName,event,false);
+ }
+ protected synchronized void _publishEvent(String connectionString, String destinationName, Serializable event, boolean persistent_connection_demanded) throws JMSException {
+ // open or reuse connection
+ checkProperties();
+ boolean _closeConn = false;
+ if (session==null) {
+ openConnection(connectionString);
+ _closeConn = ! properties.isPreserveConnection();
+ }
+ if (persistent_connection_demanded){
+ _closeConn = false;
+ }
+ // Create the destination (Topic or Queue)
+ //Destination destination = session.createQueue( destinationName );
+ Destination destination = session.createTopic(destinationName);
+ // Create a MessageProducer from the Session to the Topic or Queue
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(javax.jms.DeliveryMode.NON_PERSISTENT);
+ // Create a messages
+ //ObjectMessage message = session.createObjectMessage(event);
+ TextMessage message = session.createTextMessage(event.toString());
+ // Tell the producer to send the message
+ long hash = message.hashCode();
+ log.info("BrokerClient.publishEvent(): Sending message: connection={}, username={}, destination={}, hash={}, payload={}", connectionString, properties.getBrokerUsername(), destinationName, hash, event);
+ producer.send(message);
+ log.info("BrokerClient.publishEvent(): Message sent: connection={}, username={}, destination={}, hash={}, payload={}", connectionString, properties.getBrokerUsername(), destinationName, hash, event);
+ // close connection
+ if (_closeConn) {
+ closeConnection();
+ }
+ }
+ // ------------------------------------------------------------------------
+ public void subscribe(String connectionString, String destinationName, MessageListener listener) throws JMSException {
+ // Create or open connection
+ checkProperties();
+ if (session==null) {
+ openConnection(connectionString);
+ }
+ // Create the destination (Topic or Queue)
+ log.info("BrokerClient: Subscribing to destination: {}...", destinationName);
+ //Destination destination = session.createQueue( destinationName );
+ Destination destination = session.createTopic(destinationName);
+ // Create a MessageConsumer from the Session to the Topic or Queue
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(listener);
+ listeners.put(listener, consumer);
+ }
+ public void unsubscribe(MessageListener listener) throws JMSException {
+ MessageConsumer consumer = listeners.get(listener);
+ if (consumer!=null) {
+ consumer.close();
+ }
+ }
+ // ------------------------------------------------------------------------
+ public void receiveEvents(String connectionString, String destinationName, MessageListener listener) throws JMSException {
+ checkProperties();
+ MessageConsumer consumer = null;
+ boolean _closeConn = false;
+ try {
+ // Create or open connection
+ if (session==null) {
+ openConnection(connectionString);
+ _closeConn = ! properties.isPreserveConnection();
+ }
+ // Create the destination (Topic or Queue)
+ log.info("BrokerClient: Subscribing to destination: {}...", destinationName);
+ //Destination destination = session.createQueue( destinationName );
+ Destination destination = session.createTopic(destinationName);
+ // Create a MessageConsumer from the Session to the Topic or Queue
+ consumer = session.createConsumer(destination);
+ // Wait for messages
+ log.info("BrokerClient: Waiting for messages...");
+ while (true) {
+ Message message = consumer.receive();
+ listener.onMessage(message);
+ }
+ } finally {
+ // Clean up
+ log.info("BrokerClient: Closing connection...");
+ if (consumer != null) consumer.close();
+ if (_closeConn) {
+ closeConnection();
+ }
+ }
+ }
+ // ------------------------------------------------------------------------
+ public ActiveMQConnectionFactory createConnectionFactory() {
+ // Create connection factory based on Broker URL scheme
+ checkProperties();
+ final ActiveMQConnectionFactory connectionFactory;
+ String brokerUrl = properties.getBrokerUrl();
+ if (brokerUrl.startsWith("ssl")) {
+ log.info("BrokerClient.createConnectionFactory(): Creating new SSL connection factory instance: url={}", brokerUrl);
+ final ActiveMQSslConnectionFactory sslConnectionFactory = new ActiveMQSslConnectionFactory(brokerUrl);
+ try {
+ sslConnectionFactory.setTrustStore(properties.getTruststoreFile());
+ sslConnectionFactory.setTrustStoreType(properties.getTruststoreType());
+ sslConnectionFactory.setTrustStorePassword(properties.getTruststorePassword());
+ sslConnectionFactory.setKeyStore(properties.getKeystoreFile());
+ sslConnectionFactory.setKeyStoreType(properties.getKeystoreType());
+ sslConnectionFactory.setKeyStorePassword(properties.getKeystorePassword());
+ //sslConnectionFactory.setKeyStoreKeyPassword( properties........ );
+ connectionFactory = sslConnectionFactory;
+ } catch (final Exception theException) {
+ throw new Error(theException);
+ }
+ } else {
+ log.info("BrokerClient.createConnectionFactory(): Creating new non-SSL connection factory instance: url={}", brokerUrl);
+ connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
+ }
+ // Other connection factory settings
+ //connectionFactory.setSendTimeout(....5000L);
+ //connectionFactory.setTrustedPackages(Arrays.asList("eu.melodic.event"));
+ connectionFactory.setTrustAllPackages(true);
+ connectionFactory.setWatchTopicAdvisories(true);
+ return connectionFactory;
+ }
+ // ------------------------------------------------------------------------
+ public synchronized void openConnection() throws JMSException {
+ checkProperties();
+ openConnection(properties.getBrokerUrl(), null, null);
+ }
+ public synchronized void openConnection(String connectionString) throws JMSException {
+ openConnection(connectionString, null, null);
+ }
+ public synchronized void openConnection(String connectionString, String username, String password) throws JMSException {
+ openConnection(connectionString, username, password, properties.isPreserveConnection());
+ }
+ public synchronized void openConnection(String connectionString, String username, String password, boolean preserveConnection) throws JMSException {
+ checkProperties();
+ if (connectionString == null) connectionString = properties.getBrokerUrl();
+ log.debug("BrokerClient: Credentials provided as arguments: username={}, password={}", username, password);
+ if (StringUtils.isBlank(username)) {
+ username = properties.getBrokerUsername();
+ password = properties.getBrokerPassword();
+ log.debug("BrokerClient: Credentials read from properties: username={}, password={}", username, password);
+ }
+ // Create connection factory
+ ActiveMQConnectionFactory connectionFactory = createConnectionFactory();
+ connectionFactory.setBrokerURL(connectionString);
+ if (StringUtils.isNotBlank(username) && password != null) {
+ connectionFactory.setUserName(username);
+ connectionFactory.setPassword(password);
+ }
+ log.debug("BrokerClient: Connection credentials: username={}, password={}", username, password);
+ // Create a Connection
+ log.info("BrokerClient: Connecting to broker: {}...", connectionString);
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+ // Create a Session
+ log.info("BrokerClient: Opening session...");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ this.connection = connection;
+ this.session = session;
+ }
+ public synchronized void closeConnection() throws JMSException {
+ // Clean up
+ session.close();
+ connection.close();
+ session = null;
+ connection = null;
+ }
+ * Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr)
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+ * Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+ * If a copy of the MPL was not distributed with this file, you can obtain one at
+ * https://www.mozilla.org/en-US/MPL/2.0/
+ */
+package eu.melodic.event.brokerclient;
+import eu.melodic.event.brokerclient.event.EventGenerator;
+import eu.melodic.event.brokerclient.event.EventMap;
+import javax.jms.*;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import java.util.HashMap;
+import java.util.Map;
+public class BrokerClientApp {
+ public static void main(String args[]) throws java.io.IOException, JMSException {
+ if (args.length==0) {
+ usage();
+ return;
+ }
+ int aa=0;
+ String command = args[aa++];
+ String username = args.length>aa && args[aa].startsWith("-U") ? args[aa++].substring(2) : null;
+ String password = username!=null && args.length>aa && args[aa].startsWith("-P") ? args[aa++].substring(2) : null;
+ if (StringUtils.isNotBlank(username) && password == null) {
+ password = new String(System.console().readPassword("Enter broker password: "));
+ }
+ String broker_properties_configuration_file_location = args.length>aa && args[aa].startsWith("-C") ? args[aa++].substring(2) : null;
+ if (broker_properties_configuration_file_location == null){
+ broker_properties_configuration_file_location = new String(System.console().readLine());
+ }
+ // list destinations
+ if ("list".equalsIgnoreCase(command)) {
+ String url = args[aa++];
+ log.info("BrokerClientApp: Listing destinations:");
+ BrokerClient client = BrokerClient.newClient(username, password,broker_properties_configuration_file_location);
+ client.getDestinationNames(url).stream().forEach(d -> log.info(" {}", d));
+ } else
+ // send an event
+ if ("publish".equalsIgnoreCase(command)) {
+ String url = args[aa++];
+ String topic = args[aa++];
+ String value = args[aa++];
+ String level = args[aa++];
+ EventMap event = new EventMap(Double.parseDouble(value), Integer.parseInt(level), System.currentTimeMillis());
+ log.info("BrokerClientApp: Publishing event: {}", event);
+ BrokerClient client = BrokerClient.newClient(username, password,broker_properties_configuration_file_location);
+ client.publishEvent(url, topic, event);
+ } else
+ //publish an event with custom information
+ if ("custom_publish".equalsIgnoreCase(command)) {
+ Map data = new HashMap<>();
+ String url = args[aa++];
+ String topic = args[aa++];
+ while (aa+1 [-P]] ");
+ log.info("BrokerClientApp: client publish [-U [-P]] ");
+ log.info("BrokerClientApp: client receive [-U [-P]] ");
+ log.info("BrokerClientApp: client subscribe [-U [-P]] ");
+ log.info("BrokerClientApp: client generator [-U [-P]] ");
+ }
+package eu.melodic.event.brokerclient;
+import lombok.extern.slf4j.Slf4j;
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.util.Map;
+public class BrokerPublisher {
+ private String topic;
+ private String url;
+ private String username;
+ private String password;
+ private String broker_configuration_file_location;
+ private BrokerClient client = null;
+ public BrokerPublisher(String topic, String url, String username, String password,String broker_configuration_file_location){
+ this.topic = topic;
+ this.url = url;
+ this.username = username;
+ this.password = password;
+ this.broker_configuration_file_location = broker_configuration_file_location;
+ }
+ public void publish(Map event_map) {
+ try {
+ log.info("BrokerClientApp: Publishing to topic: {}", topic);
+ if (client==null) {
+ client = BrokerClient.newClient(username, password, broker_configuration_file_location);
+ }
+ client.publishEvent(url, topic, event_map);
+ }catch (IOException | JMSException i){
+ i.printStackTrace();
+ }
+ }
+ public void publish(String s) {
+ try {
+ log.info("BrokerClientApp: Publishing to topic: {}", topic);
+ if(client == null) {
+ client = BrokerClient.newClient(username, password, broker_configuration_file_location);
+ }
+ client._publishEvent(url, topic, s);
+ }catch (IOException | JMSException i){
+ i.printStackTrace();
+ }
+ }
+ public void publish(String s,boolean persistent_connection_demanded) {
+ try {
+ log.info("BrokerClientApp: Publishing to topic: {}", topic);
+ if (client==null) {
+ client = BrokerClient.newClient(username, password, broker_configuration_file_location);
+ }
+ client._publishEvent(url, topic, s,persistent_connection_demanded);
+ }catch (IOException | JMSException i){
+ i.printStackTrace();
+ }
+ }
+package eu.melodic.event.brokerclient;
+import lombok.extern.slf4j.Slf4j;
+import javax.jms.*;
+import java.io.IOException;
+import java.util.function.BiFunction;
+public class BrokerSubscriber {
+ private String topic;
+ private String url;
+ private String username;
+ private String password;
+ private String broker_configuration_file_location;
+ private BrokerClient client = null;
+ public BrokerSubscriber(String topic, String url, String username, String password, String broker_configuration_file_location){
+ this.topic = topic;
+ this.url = url;
+ this.username = username;
+ this.password = password;
+ this.broker_configuration_file_location = broker_configuration_file_location;
+ }
+ public void subscribe(BiFunction function) {
+ try {
+ log.info("BrokerClientApp: Subscribing to topic: {}", topic);
+ BrokerClient client = BrokerClient.newClient(username, password, broker_configuration_file_location);
+ client.receiveEvents(url, topic, message -> {
+ try {
+ function.apply(topic,((TextMessage) message).getText());
+ } catch (JMSException j) {
+ log.info("Shutting down subscriber...");
+ j.printStackTrace();
+ }
+ });
+ }catch (IOException | JMSException i){
+ i.printStackTrace();
+ }
+ }
+ * Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr)
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+ * Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+ * If a copy of the MPL was not distributed with this file, you can obtain one at
+ * https://www.mozilla.org/en-US/MPL/2.0/
+ */
+package eu.melodic.event.brokerclient.event;
+import eu.melodic.event.brokerclient.BrokerClient;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+public class EventGenerator implements Runnable {
+ private BrokerClient client;
+ private String brokerUrl;
+ private String destinationName;
+ private long interval;
+ private long howmany = -1;
+ private double lowerValue;
+ private double upperValue;
+ private int level;
+ private transient boolean keepRunning;
+ public void start() {
+ if (keepRunning) return;
+ Thread runner = new Thread(this);
+ runner.setDaemon(true);
+ runner.start();
+ }
+ public void stop() {
+ keepRunning = false;
+ }
+ public void run() {
+ log.info("EventGenerator.run(): Start sending events: event-generator: {}", this);
+ keepRunning = true;
+ double valueRangeWidth = upperValue - lowerValue;
+ long countSent = 0;
+ while (keepRunning) {
+ try {
+ double newValue = Math.random() * valueRangeWidth + lowerValue;
+ EventMap event = new EventMap(newValue, level, System.currentTimeMillis());
+ log.info("EventGenerator.run(): Sending event #{}: {}", countSent + 1, event);
+ client.publishEvent(brokerUrl, destinationName, event);
+ countSent++;
+ if (countSent == howmany) keepRunning = false;
+ log.info("EventGenerator.run(): Event sent #{}: {}", countSent, event);
+ } catch (Exception ex) {
+ log.warn("EventGenerator.run(): WHILE-EXCEPTION: {}", ex);
+ }
+ // sleep for 'interval' ms
+ try {
+ if (keepRunning) {
+ Thread.sleep(interval);
+ }
+ } catch (InterruptedException ex) {
+ log.warn("EventGenerator.run(): Sleep interrupted");
+ }
+ }
+ log.info("EventGenerator.run(): Stop sending events: event-generator: {}", this);
+ }
+ * Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr)
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+ * Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+ * If a copy of the MPL was not distributed with this file, you can obtain one at
+ * https://www.mozilla.org/en-US/MPL/2.0/
+ */
+package eu.melodic.event.brokerclient.event;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+public class EventMap extends HashMap implements Serializable {
+ public EventMap() {
+ super();
+ }
+ public EventMap(Map map) {
+ super(map);
+ }
+ public EventMap(double metricValue, int level, long timestamp) {
+ put("metricValue", metricValue);
+ put("level", level);
+ put("timestamp", timestamp);
+ }
+ public static String[] getPropertyNames() {
+ return new String[]{"metricValue", "level", "timestamp"};
+ }
+ public static Class[] getPropertyClasses() {
+ return new Class[]{Double.class, Integer.class, Long.class};
+ }
+ * Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr)
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+ * Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+ * If a copy of the MPL was not distributed with this file, you can obtain one at
+ * https://www.mozilla.org/en-US/MPL/2.0/
+ */
+package eu.melodic.event.brokerclient.properties;
+import lombok.Data;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.PropertySource;
+@ToString(exclude = {"truststorePassword", "keystorePassword", "brokerPassword"})
+@ConfigurationProperties(prefix = "brokerclient")
+public class BrokerClientProperties {
+ @Value("${broker-name:broker}")
+ private String brokerName;
+ @Value("${broker-url:ssl://localhost:61616}")
+ private String brokerUrl;
+ @Value("${broker-url-properties:}")
+ private String brokerUrlProperties;
+ @Value("${ssl.client-auth.required:false}")
+ private boolean clientAuthRequired;
+ @Value("${connector-port:-1}")
+ private int connectorPort;
+ @Value("${preserve-connection:false}")
+ private boolean preserveConnection;
+ @Value("${ssl.truststore.file:}")
+ private String truststoreFile;
+ @Value("${ssl.truststore.type:}")
+ private String truststoreType;
+ @Value("${ssl.truststore.password:}")
+ private String truststorePassword;
+ @Value("${ssl.keystore.file:}")
+ private String keystoreFile;
+ @Value("${ssl.keystore.type:}")
+ private String keystoreType;
+ @Value("${ssl.keystore.password:}")
+ private String keystorePassword;
+ @Value("${broker-username:}")
+ private String brokerUsername;
+ @Value("${broker-password:}")
+ private String brokerPassword;
+ public BrokerClientProperties() {
+ brokerName = "broker";
+ brokerUrl = "ssl://localhost:61616}";
+ brokerUrlProperties = "";
+ connectorPort = -1;
+ preserveConnection = true;
+ truststoreFile = "";
+ truststoreType = "";
+ truststorePassword = "";
+ keystoreFile = "";
+ keystoreType = "";
+ keystorePassword = "";
+ clientAuthRequired = false;
+ brokerUsername = "";
+ brokerPassword = "";
+ }
+ public BrokerClientProperties(java.util.Properties p) {
+ brokerName = p.getProperty("brokerclient.broker-name", "broker");
+ brokerUrl = p.getProperty("brokerclient.broker-url", "ssl://localhost:61616}");
+ brokerUrlProperties = p.getProperty("brokerclient.broker-url-properties", "");
+ connectorPort = Integer.parseInt(p.getProperty("brokerclient.connector-port", "-1"));
+ preserveConnection = Boolean.parseBoolean(p.getProperty("brokerclient.preserve-connection", "true"));
+ truststoreFile = p.getProperty("brokerclient.ssl.truststore.file", "");
+ truststoreType = p.getProperty("brokerclient.ssl.truststore.type", "");
+ truststorePassword = p.getProperty("brokerclient.ssl.truststore.password", "");
+ keystoreFile = p.getProperty("brokerclient.ssl.keystore.file", "");
+ keystoreType = p.getProperty("brokerclient.ssl.keystore.type", "");
+ keystorePassword = p.getProperty("brokerclient.ssl.keystore.password", "");
+ clientAuthRequired = Boolean.parseBoolean(p.getProperty("brokerclient.ssl.client-auth.required", "false"));
+ brokerUsername = p.getProperty("brokerclient.broker-username", "");
+ brokerPassword = p.getProperty("brokerclient.broker-password", "");
+ brokerUrlProperties = brokerUrlProperties.replace("${brokerclient.ssl.client-auth.required}", Boolean.toString(clientAuthRequired));
+ }