diff --git a/amq-message-java-library/client.bat b/amq-message-java-library/client.bat
new file mode 100644
index 0000000000000000000000000000000000000000..32e3c121f5ef6e7a6ed0855eacdad8a6daeaaf0a
--- /dev/null
+++ b/amq-message-java-library/client.bat
@@ -0,0 +1,24 @@
+@echo off
+::
+:: Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr)
+::
+:: This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+:: Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+:: If a copy of the MPL was not distributed with this file, you can obtain one at
+:: https://www.mozilla.org/en-US/MPL/2.0/
+::
+
+if not exist target\dependency cmd /C "mvn dependency:copy-dependencies"
+
+set MELODIC_CONFIG_DIR=.
+
+setlocal
+set JAVA_OPTS= -Djavax.net.ssl.trustStore=..\config-files\broker-truststore.p12 ^
+ -Djavax.net.ssl.trustStorePassword=melodic ^
+ -Djavax.net.ssl.trustStoreType=pkcs12
+rem -Djavax.net.debug=all
+rem -Djavax.net.debug=ssl,handshake,record
+
+java %JAVA_OPTS% -classpath "target\classes;target\dependency\*" eu.melodic.event.brokerclient.BrokerClientApp %*
+
+endlocal
diff --git a/amq-message-java-library/client.sh b/amq-message-java-library/client.sh
new file mode 100644
index 0000000000000000000000000000000000000000..64b3645ff9bcb6f758bd7117586e571c29414f27
--- /dev/null
+++ b/amq-message-java-library/client.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+#
+# Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr)
+#
+# This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+# Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+# If a copy of the MPL was not distributed with this file, you can obtain one at
+# https://www.mozilla.org/en-US/MPL/2.0/
+#
+
+if [ ! -d "target/dependency" ]; then
+ mvn dependency:copy-dependencies
+fi
+
+MELODIC_CONFIG_DIR=.
+
+JAVA_OPTS=-Djavax.net.ssl.trustStore=./broker-truststore.p12\ -Djavax.net.ssl.trustStorePassword=melodic\ -Djavax.net.ssl.trustStoreType=pkcs12
+# -Djavax.net.debug=all
+# -Djavax.net.debug=ssl,handshake,record
+
+java $JAVA_OPTS -classpath "target/classes:target/dependency/*" eu.melodic.event.brokerclient.BrokerClientApp $*
diff --git a/amq-message-java-library/eu.melodic.event.brokerclient.properties b/amq-message-java-library/eu.melodic.event.brokerclient.properties
new file mode 100644
index 0000000000000000000000000000000000000000..df01ca619da1daeba32613fe995fce152594a909
--- /dev/null
+++ b/amq-message-java-library/eu.melodic.event.brokerclient.properties
@@ -0,0 +1,18 @@
+#
+# Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr)
+#
+# This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+# Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+# If a copy of the MPL was not distributed with this file, you can obtain one at
+# https://www.mozilla.org/en-US/MPL/2.0/
+#
+
+# Broker Client settings
+brokerclient.broker-url=tcp://localhost:61616
+brokerclient.broker-url-properties=transport.daemon=true&transport.trace=false&transport.useKeepAlive=true&transport.useInactivityMonitor=false&transport.needClientAuth=${brokerclient.ssl.client-auth.required}&transport.verifyHostName=false
+brokerclient.ssl.client-auth.required=false
+brokerclient.preserve-connection=false
+
+# Broker authentication
+brokerclient.broker-username=
+brokerclient.broker-password=
\ No newline at end of file
diff --git a/amq-message-java-library/pom.xml b/amq-message-java-library/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..1fe0c90ebca7bf04e50eb28e6d662ad1c42cb286
--- /dev/null
+++ b/amq-message-java-library/pom.xml
@@ -0,0 +1,83 @@
+
+
+ 4.0.0
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.4.4
+
+
+ amq-message-java-library
+ AMQ message Java library
+ gr.ntua.imu.morphemic
+ 1.0.0
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-activemq
+
+
+ org.apache.activemq
+ activemq-broker
+
+
+
+
+ org.projectlombok
+ lombok
+ provided
+
+
+
+
+ org.apache.commons
+ commons-lang3
+ 3.8.1
+
+
+
+
+
+ eu.7bulls
+ Melodic 7bulls repository
+ https://nexus.7bulls.eu:8443/repository/maven-snapshots/
+
+
+ eu.7bulls
+ Melodic 7bulls repository
+ https://nexus.7bulls.eu:8443/repository/maven-releases/
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.4
+
+
+ package
+
+ shade
+
+
+
+
+
+
+
+
+
diff --git a/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerClient.java b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerClient.java
new file mode 100644
index 0000000000000000000000000000000000000000..373c9f6396c4b1e1d5d940fdbee5e2f4d1add6f3
--- /dev/null
+++ b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerClient.java
@@ -0,0 +1,345 @@
+/*
+ * Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr)
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+ * Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+ * If a copy of the MPL was not distributed with this file, you can obtain one at
+ * https://www.mozilla.org/en-US/MPL/2.0/
+ */
+
+package eu.melodic.event.brokerclient;
+
+import eu.melodic.event.brokerclient.event.EventMap;
+import eu.melodic.event.brokerclient.properties.BrokerClientProperties;
+import java.io.Serializable;
+import java.util.*;
+import javax.jms.*;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSslConnectionFactory;
+import org.apache.activemq.advisory.DestinationSource;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class BrokerClient {
+
+ @Autowired
+ private BrokerClientProperties properties;
+ private Connection connection;
+ private Session session;
+ private HashMap listeners = new HashMap<>();
+
+ public BrokerClient() {
+ }
+
+ public BrokerClient(BrokerClientProperties bcp) {
+ properties = bcp;
+ }
+
+ public BrokerClient(Properties p) {
+ properties = new BrokerClientProperties(p);
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static BrokerClient newClient(String broker_properties_configuration_file_location) throws java.io.IOException, JMSException {
+ log.info("BrokerClient: Initializing...");
+
+ /*
+ // get properties file
+ String configDir = System.getenv("MELODIC_CONFIG_DIR");
+ if (configDir == null || configDir.trim().isEmpty()) configDir = ".";
+ log.info("BrokerClient: config-dir: {}", configDir);
+ String configPropFile = configDir + "/" + "eu.melodic.event.brokerclient.properties";
+ log.info("BrokerClient: config-file: {}", configPropFile);
+ */
+
+ // load properties
+ Properties p = new Properties();
+ //ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ //try (java.io.InputStream in = loader.getClass().getResourceAsStream(configPropFile)) { p.load(in); }
+ try (java.io.InputStream in = new java.io.FileInputStream(broker_properties_configuration_file_location)) {
+ p.load(in);
+ }
+ log.info("BrokerClient: config-properties: {}", p);
+
+ // initialize broker client
+ BrokerClient client = new BrokerClient(p);
+ log.info("BrokerClient: Configuration:\n{}", client.properties);
+
+ return client;
+ }
+
+ public static BrokerClient newClient(String username, String password, String broker_properties_configuration_file_path) throws java.io.IOException, JMSException {
+ BrokerClient client = newClient(broker_properties_configuration_file_path);
+ if (username!=null && password!=null) {
+ client.getClientProperties().setBrokerUsername(username);
+ client.getClientProperties().setBrokerPassword(password);
+ }
+ return client;
+ }
+
+ public static BrokerClient newClient() {
+ return new BrokerClient();
+ }
+
+ // ------------------------------------------------------------------------
+
+ public BrokerClientProperties getClientProperties() {
+ checkProperties();
+ return properties;
+ }
+
+ protected void checkProperties() {
+ if (properties==null) {
+ //use defaults
+ properties = new BrokerClientProperties();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ public synchronized Set getDestinationNames(String connectionString) throws JMSException {
+ // open or reuse connection
+ checkProperties();
+ boolean _closeConn = false;
+ if (session==null) {
+ openConnection(connectionString);
+ _closeConn = ! properties.isPreserveConnection();
+ }
+
+ // Get destinations from Broker
+ log.info("BrokerClient.getDestinationNames(): Getting destinations: connection={}, username={}", connectionString, properties.getBrokerUsername());
+ ActiveMQConnection conn = (ActiveMQConnection)connection;
+ DestinationSource ds = conn.getDestinationSource();
+ Set queues = ds.getQueues();
+ Set topics = ds.getTopics();
+ Set tempQueues = ds.getTemporaryQueues();
+ Set tempTopics = ds.getTemporaryTopics();
+ log.info("BrokerClient.getDestinationNames(): Getting destinations: done");
+
+ // Get destination names
+ HashSet destinationNames = new HashSet<>();
+ for (ActiveMQQueue q : queues) destinationNames.add("QUEUE "+q.getQueueName());
+ for (ActiveMQTopic t : topics) destinationNames.add("TOPIC "+t.getTopicName());
+ for (ActiveMQTempQueue tq : tempQueues) destinationNames.add("Temp QUEUE "+tq.getQueueName());
+ for (ActiveMQTempTopic tt : tempTopics) destinationNames.add("Temp TOPIC "+tt.getTopicName());
+
+ // close connection
+ if (_closeConn) {
+ closeConnection();
+ }
+
+ return destinationNames;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public synchronized void publishEvent(String connectionString, String destinationName, Map eventMap) throws JMSException {
+ _publishEvent(connectionString, destinationName, new EventMap(eventMap));
+ }
+
+ protected synchronized void _publishEvent(String connectionString, String destinationName, Serializable event) throws JMSException {
+ _publishEvent(connectionString,destinationName,event,false);
+ }
+ protected synchronized void _publishEvent(String connectionString, String destinationName, Serializable event, boolean persistent_connection_demanded) throws JMSException {
+ // open or reuse connection
+ checkProperties();
+ boolean _closeConn = false;
+ if (session==null) {
+ openConnection(connectionString);
+ _closeConn = ! properties.isPreserveConnection();
+ }
+ if (persistent_connection_demanded){
+ _closeConn = false;
+ }
+
+ // Create the destination (Topic or Queue)
+ //Destination destination = session.createQueue( destinationName );
+ Destination destination = session.createTopic(destinationName);
+
+ // Create a MessageProducer from the Session to the Topic or Queue
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(javax.jms.DeliveryMode.NON_PERSISTENT);
+
+ // Create a messages
+ //ObjectMessage message = session.createObjectMessage(event);
+ TextMessage message = session.createTextMessage(event.toString());
+
+ // Tell the producer to send the message
+ long hash = message.hashCode();
+ log.info("BrokerClient.publishEvent(): Sending message: connection={}, username={}, destination={}, hash={}, payload={}", connectionString, properties.getBrokerUsername(), destinationName, hash, event);
+ producer.send(message);
+ log.info("BrokerClient.publishEvent(): Message sent: connection={}, username={}, destination={}, hash={}, payload={}", connectionString, properties.getBrokerUsername(), destinationName, hash, event);
+
+ // close connection
+ if (_closeConn) {
+ closeConnection();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ public void subscribe(String connectionString, String destinationName, MessageListener listener) throws JMSException {
+ // Create or open connection
+ checkProperties();
+ if (session==null) {
+ openConnection(connectionString);
+ }
+
+ // Create the destination (Topic or Queue)
+ log.info("BrokerClient: Subscribing to destination: {}...", destinationName);
+ //Destination destination = session.createQueue( destinationName );
+ Destination destination = session.createTopic(destinationName);
+
+ // Create a MessageConsumer from the Session to the Topic or Queue
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(listener);
+ listeners.put(listener, consumer);
+ }
+
+ public void unsubscribe(MessageListener listener) throws JMSException {
+ MessageConsumer consumer = listeners.get(listener);
+ if (consumer!=null) {
+ consumer.close();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ public void receiveEvents(String connectionString, String destinationName, MessageListener listener) throws JMSException {
+ checkProperties();
+ MessageConsumer consumer = null;
+ boolean _closeConn = false;
+ try {
+ // Create or open connection
+ if (session==null) {
+ openConnection(connectionString);
+ _closeConn = ! properties.isPreserveConnection();
+ }
+
+ // Create the destination (Topic or Queue)
+ log.info("BrokerClient: Subscribing to destination: {}...", destinationName);
+ //Destination destination = session.createQueue( destinationName );
+ Destination destination = session.createTopic(destinationName);
+
+ // Create a MessageConsumer from the Session to the Topic or Queue
+ consumer = session.createConsumer(destination);
+
+ // Wait for messages
+ log.info("BrokerClient: Waiting for messages...");
+ while (true) {
+ Message message = consumer.receive();
+ listener.onMessage(message);
+ }
+
+ } finally {
+ // Clean up
+ log.info("BrokerClient: Closing connection...");
+ if (consumer != null) consumer.close();
+ if (_closeConn) {
+ closeConnection();
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ public ActiveMQConnectionFactory createConnectionFactory() {
+ // Create connection factory based on Broker URL scheme
+ checkProperties();
+ final ActiveMQConnectionFactory connectionFactory;
+ String brokerUrl = properties.getBrokerUrl();
+ if (brokerUrl.startsWith("ssl")) {
+ log.info("BrokerClient.createConnectionFactory(): Creating new SSL connection factory instance: url={}", brokerUrl);
+ final ActiveMQSslConnectionFactory sslConnectionFactory = new ActiveMQSslConnectionFactory(brokerUrl);
+ try {
+ sslConnectionFactory.setTrustStore(properties.getTruststoreFile());
+ sslConnectionFactory.setTrustStoreType(properties.getTruststoreType());
+ sslConnectionFactory.setTrustStorePassword(properties.getTruststorePassword());
+ sslConnectionFactory.setKeyStore(properties.getKeystoreFile());
+ sslConnectionFactory.setKeyStoreType(properties.getKeystoreType());
+ sslConnectionFactory.setKeyStorePassword(properties.getKeystorePassword());
+ //sslConnectionFactory.setKeyStoreKeyPassword( properties........ );
+
+ connectionFactory = sslConnectionFactory;
+ } catch (final Exception theException) {
+ throw new Error(theException);
+ }
+ } else {
+ log.info("BrokerClient.createConnectionFactory(): Creating new non-SSL connection factory instance: url={}", brokerUrl);
+ connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
+ }
+
+ // Other connection factory settings
+ //connectionFactory.setSendTimeout(....5000L);
+ //connectionFactory.setTrustedPackages(Arrays.asList("eu.melodic.event"));
+ connectionFactory.setTrustAllPackages(true);
+ connectionFactory.setWatchTopicAdvisories(true);
+
+ return connectionFactory;
+ }
+
+ // ------------------------------------------------------------------------
+
+ public synchronized void openConnection() throws JMSException {
+ checkProperties();
+ openConnection(properties.getBrokerUrl(), null, null);
+ }
+
+ public synchronized void openConnection(String connectionString) throws JMSException {
+ openConnection(connectionString, null, null);
+ }
+
+ public synchronized void openConnection(String connectionString, String username, String password) throws JMSException {
+ openConnection(connectionString, username, password, properties.isPreserveConnection());
+ }
+
+ public synchronized void openConnection(String connectionString, String username, String password, boolean preserveConnection) throws JMSException {
+ checkProperties();
+ if (connectionString == null) connectionString = properties.getBrokerUrl();
+ log.debug("BrokerClient: Credentials provided as arguments: username={}, password={}", username, password);
+ if (StringUtils.isBlank(username)) {
+ username = properties.getBrokerUsername();
+ password = properties.getBrokerPassword();
+ log.debug("BrokerClient: Credentials read from properties: username={}, password={}", username, password);
+ }
+
+ // Create connection factory
+ ActiveMQConnectionFactory connectionFactory = createConnectionFactory();
+ connectionFactory.setBrokerURL(connectionString);
+ if (StringUtils.isNotBlank(username) && password != null) {
+ connectionFactory.setUserName(username);
+ connectionFactory.setPassword(password);
+ }
+ log.debug("BrokerClient: Connection credentials: username={}, password={}", username, password);
+
+ // Create a Connection
+ log.info("BrokerClient: Connecting to broker: {}...", connectionString);
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+
+ // Create a Session
+ log.info("BrokerClient: Opening session...");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ this.connection = connection;
+ this.session = session;
+ }
+
+ public synchronized void closeConnection() throws JMSException {
+ // Clean up
+ session.close();
+ connection.close();
+ session = null;
+ connection = null;
+ }
+}
\ No newline at end of file
diff --git a/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerClientApp.java b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerClientApp.java
new file mode 100644
index 0000000000000000000000000000000000000000..0448fe2cd58bb6244b5a525c9f566484cb2dbdef
--- /dev/null
+++ b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerClientApp.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr)
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+ * Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+ * If a copy of the MPL was not distributed with this file, you can obtain one at
+ * https://www.mozilla.org/en-US/MPL/2.0/
+ */
+
+package eu.melodic.event.brokerclient;
+
+import eu.melodic.event.brokerclient.event.EventGenerator;
+import eu.melodic.event.brokerclient.event.EventMap;
+import javax.jms.*;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class BrokerClientApp {
+
+ public static void main(String args[]) throws java.io.IOException, JMSException {
+ if (args.length==0) {
+ usage();
+ return;
+ }
+
+ int aa=0;
+ String command = args[aa++];
+
+ String username = args.length>aa && args[aa].startsWith("-U") ? args[aa++].substring(2) : null;
+ String password = username!=null && args.length>aa && args[aa].startsWith("-P") ? args[aa++].substring(2) : null;
+ if (StringUtils.isNotBlank(username) && password == null) {
+ password = new String(System.console().readPassword("Enter broker password: "));
+ }
+ String broker_properties_configuration_file_location = args.length>aa && args[aa].startsWith("-C") ? args[aa++].substring(2) : null;
+ if (broker_properties_configuration_file_location == null){
+ broker_properties_configuration_file_location = new String(System.console().readLine());
+ }
+
+ // list destinations
+ if ("list".equalsIgnoreCase(command)) {
+ String url = args[aa++];
+ log.info("BrokerClientApp: Listing destinations:");
+ BrokerClient client = BrokerClient.newClient(username, password,broker_properties_configuration_file_location);
+ client.getDestinationNames(url).stream().forEach(d -> log.info(" {}", d));
+ } else
+ // send an event
+ if ("publish".equalsIgnoreCase(command)) {
+ String url = args[aa++];
+ String topic = args[aa++];
+ String value = args[aa++];
+ String level = args[aa++];
+ EventMap event = new EventMap(Double.parseDouble(value), Integer.parseInt(level), System.currentTimeMillis());
+ log.info("BrokerClientApp: Publishing event: {}", event);
+ BrokerClient client = BrokerClient.newClient(username, password,broker_properties_configuration_file_location);
+ client.publishEvent(url, topic, event);
+ } else
+ //publish an event with custom information
+ if ("custom_publish".equalsIgnoreCase(command)) {
+ Map data = new HashMap<>();
+ String url = args[aa++];
+ String topic = args[aa++];
+ while (aa+1 [-P]] ");
+ log.info("BrokerClientApp: client publish [-U [-P]] ");
+ log.info("BrokerClientApp: client receive [-U [-P]] ");
+ log.info("BrokerClientApp: client subscribe [-U [-P]] ");
+ log.info("BrokerClientApp: client generator [-U [-P]] ");
+ }
+}
\ No newline at end of file
diff --git a/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerPublisher.java b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerPublisher.java
new file mode 100644
index 0000000000000000000000000000000000000000..3bc697749777b4c5c1854fed5461947faa8c2ca2
--- /dev/null
+++ b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerPublisher.java
@@ -0,0 +1,61 @@
+package eu.melodic.event.brokerclient;
+
+import lombok.extern.slf4j.Slf4j;
+
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.util.Map;
+
+@Slf4j
+public class BrokerPublisher {
+
+ private String topic;
+ private String url;
+ private String username;
+ private String password;
+ private String broker_configuration_file_location;
+ private BrokerClient client = null;
+
+ public BrokerPublisher(String topic, String url, String username, String password,String broker_configuration_file_location){
+ this.topic = topic;
+ this.url = url;
+ this.username = username;
+ this.password = password;
+ this.broker_configuration_file_location = broker_configuration_file_location;
+ }
+
+ public void publish(Map event_map) {
+ try {
+ log.info("BrokerClientApp: Publishing to topic: {}", topic);
+ if (client==null) {
+ client = BrokerClient.newClient(username, password, broker_configuration_file_location);
+ }
+ client.publishEvent(url, topic, event_map);
+ }catch (IOException | JMSException i){
+ i.printStackTrace();
+ }
+ }
+
+ public void publish(String s) {
+ try {
+ log.info("BrokerClientApp: Publishing to topic: {}", topic);
+ if(client == null) {
+ client = BrokerClient.newClient(username, password, broker_configuration_file_location);
+ }
+ client._publishEvent(url, topic, s);
+ }catch (IOException | JMSException i){
+ i.printStackTrace();
+ }
+ }
+ public void publish(String s,boolean persistent_connection_demanded) {
+ try {
+ log.info("BrokerClientApp: Publishing to topic: {}", topic);
+ if (client==null) {
+ client = BrokerClient.newClient(username, password, broker_configuration_file_location);
+ }
+ client._publishEvent(url, topic, s,persistent_connection_demanded);
+ }catch (IOException | JMSException i){
+ i.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerSubscriber.java b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerSubscriber.java
new file mode 100644
index 0000000000000000000000000000000000000000..19351cb7197445521a1b513cf1362e56d59eea57
--- /dev/null
+++ b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerSubscriber.java
@@ -0,0 +1,46 @@
+package eu.melodic.event.brokerclient;
+
+import lombok.extern.slf4j.Slf4j;
+
+import javax.jms.*;
+import java.io.IOException;
+import java.util.function.BiFunction;
+
+
+@Slf4j
+public class BrokerSubscriber {
+
+ private String topic;
+ private String url;
+ private String username;
+ private String password;
+ private String broker_configuration_file_location;
+ private BrokerClient client = null;
+
+ public BrokerSubscriber(String topic, String url, String username, String password, String broker_configuration_file_location){
+ this.topic = topic;
+ this.url = url;
+ this.username = username;
+ this.password = password;
+ this.broker_configuration_file_location = broker_configuration_file_location;
+ }
+
+
+ public void subscribe(BiFunction function) {
+ try {
+ log.info("BrokerClientApp: Subscribing to topic: {}", topic);
+ BrokerClient client = BrokerClient.newClient(username, password, broker_configuration_file_location);
+ client.receiveEvents(url, topic, message -> {
+ try {
+ function.apply(topic,((TextMessage) message).getText());
+ } catch (JMSException j) {
+ log.info("Shutting down subscriber...");
+ j.printStackTrace();
+ }
+ });
+ }catch (IOException | JMSException i){
+ i.printStackTrace();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/event/EventGenerator.java b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/event/EventGenerator.java
new file mode 100644
index 0000000000000000000000000000000000000000..4aab576df18732103fd46935a14be3078c7eeb25
--- /dev/null
+++ b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/event/EventGenerator.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr)
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+ * Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+ * If a copy of the MPL was not distributed with this file, you can obtain one at
+ * https://www.mozilla.org/en-US/MPL/2.0/
+ */
+
+package eu.melodic.event.brokerclient.event;
+
+import eu.melodic.event.brokerclient.BrokerClient;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+@Data
+@Slf4j
+public class EventGenerator implements Runnable {
+ private BrokerClient client;
+ private String brokerUrl;
+ private String destinationName;
+ private long interval;
+ private long howmany = -1;
+ private double lowerValue;
+ private double upperValue;
+ private int level;
+
+ private transient boolean keepRunning;
+
+ public void start() {
+ if (keepRunning) return;
+ Thread runner = new Thread(this);
+ runner.setDaemon(true);
+ runner.start();
+ }
+
+ public void stop() {
+ keepRunning = false;
+ }
+
+ public void run() {
+ log.info("EventGenerator.run(): Start sending events: event-generator: {}", this);
+
+ keepRunning = true;
+ double valueRangeWidth = upperValue - lowerValue;
+ long countSent = 0;
+ while (keepRunning) {
+ try {
+ double newValue = Math.random() * valueRangeWidth + lowerValue;
+ EventMap event = new EventMap(newValue, level, System.currentTimeMillis());
+ log.info("EventGenerator.run(): Sending event #{}: {}", countSent + 1, event);
+ client.publishEvent(brokerUrl, destinationName, event);
+ countSent++;
+ if (countSent == howmany) keepRunning = false;
+ log.info("EventGenerator.run(): Event sent #{}: {}", countSent, event);
+ } catch (Exception ex) {
+ log.warn("EventGenerator.run(): WHILE-EXCEPTION: {}", ex);
+ }
+ // sleep for 'interval' ms
+ try {
+ if (keepRunning) {
+ Thread.sleep(interval);
+ }
+ } catch (InterruptedException ex) {
+ log.warn("EventGenerator.run(): Sleep interrupted");
+ }
+ }
+
+ log.info("EventGenerator.run(): Stop sending events: event-generator: {}", this);
+ }
+}
\ No newline at end of file
diff --git a/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/event/EventMap.java b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/event/EventMap.java
new file mode 100644
index 0000000000000000000000000000000000000000..a180508ead865c0767623296e7df3a231c6004d0
--- /dev/null
+++ b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/event/EventMap.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr)
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+ * Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+ * If a copy of the MPL was not distributed with this file, you can obtain one at
+ * https://www.mozilla.org/en-US/MPL/2.0/
+ */
+
+package eu.melodic.event.brokerclient.event;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Getter
+@Slf4j
+public class EventMap extends HashMap implements Serializable {
+ public EventMap() {
+ super();
+ }
+
+ public EventMap(Map map) {
+ super(map);
+ }
+
+ public EventMap(double metricValue, int level, long timestamp) {
+ put("metricValue", metricValue);
+ put("level", level);
+ put("timestamp", timestamp);
+ }
+
+ public static String[] getPropertyNames() {
+ return new String[]{"metricValue", "level", "timestamp"};
+ }
+
+ public static Class[] getPropertyClasses() {
+ return new Class[]{Double.class, Integer.class, Long.class};
+ }
+}
\ No newline at end of file
diff --git a/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/properties/BrokerClientProperties.java b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/properties/BrokerClientProperties.java
new file mode 100644
index 0000000000000000000000000000000000000000..25974e81992ef3bd8a63aa50285a5d5cb1fe4a0d
--- /dev/null
+++ b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/properties/BrokerClientProperties.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr)
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless
+ * Esper library is used, in which case it is subject to the terms of General Public License v2.0.
+ * If a copy of the MPL was not distributed with this file, you can obtain one at
+ * https://www.mozilla.org/en-US/MPL/2.0/
+ */
+
+package eu.melodic.event.brokerclient.properties;
+
+import lombok.Data;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.PropertySource;
+
+@Data
+@ToString(exclude = {"truststorePassword", "keystorePassword", "brokerPassword"})
+@Configuration
+@ConfigurationProperties(prefix = "brokerclient")
+@PropertySource("file:${MELODIC_CONFIG_DIR}/eu.melodic.event.brokerclient.properties")
+@Slf4j
+public class BrokerClientProperties {
+ @Value("${broker-name:broker}")
+ private String brokerName;
+ @Value("${broker-url:ssl://localhost:61616}")
+ private String brokerUrl;
+ @Value("${broker-url-properties:}")
+ private String brokerUrlProperties;
+ @Value("${ssl.client-auth.required:false}")
+ private boolean clientAuthRequired;
+ @Value("${connector-port:-1}")
+ private int connectorPort;
+ @Value("${preserve-connection:false}")
+ private boolean preserveConnection;
+
+ @Value("${ssl.truststore.file:}")
+ private String truststoreFile;
+ @Value("${ssl.truststore.type:}")
+ private String truststoreType;
+ @Value("${ssl.truststore.password:}")
+ private String truststorePassword;
+ @Value("${ssl.keystore.file:}")
+ private String keystoreFile;
+ @Value("${ssl.keystore.type:}")
+ private String keystoreType;
+ @Value("${ssl.keystore.password:}")
+ private String keystorePassword;
+
+ @Value("${broker-username:}")
+ private String brokerUsername;
+ @Value("${broker-password:}")
+ private String brokerPassword;
+
+ public BrokerClientProperties() {
+ brokerName = "broker";
+ brokerUrl = "ssl://localhost:61616}";
+ brokerUrlProperties = "";
+ connectorPort = -1;
+ preserveConnection = true;
+
+ truststoreFile = "";
+ truststoreType = "";
+ truststorePassword = "";
+ keystoreFile = "";
+ keystoreType = "";
+ keystorePassword = "";
+ clientAuthRequired = false;
+
+ brokerUsername = "";
+ brokerPassword = "";
+ }
+
+ public BrokerClientProperties(java.util.Properties p) {
+ brokerName = p.getProperty("brokerclient.broker-name", "broker");
+ brokerUrl = p.getProperty("brokerclient.broker-url", "ssl://localhost:61616}");
+ brokerUrlProperties = p.getProperty("brokerclient.broker-url-properties", "");
+ connectorPort = Integer.parseInt(p.getProperty("brokerclient.connector-port", "-1"));
+ preserveConnection = Boolean.parseBoolean(p.getProperty("brokerclient.preserve-connection", "true"));
+
+ truststoreFile = p.getProperty("brokerclient.ssl.truststore.file", "");
+ truststoreType = p.getProperty("brokerclient.ssl.truststore.type", "");
+ truststorePassword = p.getProperty("brokerclient.ssl.truststore.password", "");
+ keystoreFile = p.getProperty("brokerclient.ssl.keystore.file", "");
+ keystoreType = p.getProperty("brokerclient.ssl.keystore.type", "");
+ keystorePassword = p.getProperty("brokerclient.ssl.keystore.password", "");
+ clientAuthRequired = Boolean.parseBoolean(p.getProperty("brokerclient.ssl.client-auth.required", "false"));
+
+ brokerUsername = p.getProperty("brokerclient.broker-username", "");
+ brokerPassword = p.getProperty("brokerclient.broker-password", "");
+
+ brokerUrlProperties = brokerUrlProperties.replace("${brokerclient.ssl.client-auth.required}", Boolean.toString(clientAuthRequired));
+ }
+}