From 726c46814208ca8573495bef67995387f146ae91 Mon Sep 17 00:00:00 2001 From: Andreas Tsagkaropoulos Date: Thu, 15 Apr 2021 15:30:14 +0300 Subject: [PATCH] First commit of new AMQ message Java library --- amq-message-java-library/client.bat | 24 ++ amq-message-java-library/client.sh | 21 ++ .../eu.melodic.event.brokerclient.properties | 18 + amq-message-java-library/pom.xml | 83 +++++ .../event/brokerclient/BrokerClient.java | 345 ++++++++++++++++++ .../event/brokerclient/BrokerClientApp.java | 176 +++++++++ .../event/brokerclient/BrokerPublisher.java | 61 ++++ .../event/brokerclient/BrokerSubscriber.java | 46 +++ .../brokerclient/event/EventGenerator.java | 71 ++++ .../event/brokerclient/event/EventMap.java | 42 +++ .../properties/BrokerClientProperties.java | 97 +++++ 11 files changed, 984 insertions(+) create mode 100644 amq-message-java-library/client.bat create mode 100644 amq-message-java-library/client.sh create mode 100644 amq-message-java-library/eu.melodic.event.brokerclient.properties create mode 100644 amq-message-java-library/pom.xml create mode 100644 amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerClient.java create mode 100644 amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerClientApp.java create mode 100644 amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerPublisher.java create mode 100644 amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerSubscriber.java create mode 100644 amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/event/EventGenerator.java create mode 100644 amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/event/EventMap.java create mode 100644 amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/properties/BrokerClientProperties.java diff --git a/amq-message-java-library/client.bat b/amq-message-java-library/client.bat new file mode 100644 index 00000000..32e3c121 --- /dev/null +++ b/amq-message-java-library/client.bat @@ -0,0 +1,24 @@ +@echo off +:: +:: Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr) +:: +:: This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless +:: Esper library is used, in which case it is subject to the terms of General Public License v2.0. +:: If a copy of the MPL was not distributed with this file, you can obtain one at +:: https://www.mozilla.org/en-US/MPL/2.0/ +:: + +if not exist target\dependency cmd /C "mvn dependency:copy-dependencies" + +set MELODIC_CONFIG_DIR=. + +setlocal +set JAVA_OPTS= -Djavax.net.ssl.trustStore=..\config-files\broker-truststore.p12 ^ + -Djavax.net.ssl.trustStorePassword=melodic ^ + -Djavax.net.ssl.trustStoreType=pkcs12 +rem -Djavax.net.debug=all +rem -Djavax.net.debug=ssl,handshake,record + +java %JAVA_OPTS% -classpath "target\classes;target\dependency\*" eu.melodic.event.brokerclient.BrokerClientApp %* + +endlocal diff --git a/amq-message-java-library/client.sh b/amq-message-java-library/client.sh new file mode 100644 index 00000000..64b3645f --- /dev/null +++ b/amq-message-java-library/client.sh @@ -0,0 +1,21 @@ +#!/bin/bash +# +# Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr) +# +# This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless +# Esper library is used, in which case it is subject to the terms of General Public License v2.0. +# If a copy of the MPL was not distributed with this file, you can obtain one at +# https://www.mozilla.org/en-US/MPL/2.0/ +# + +if [ ! -d "target/dependency" ]; then + mvn dependency:copy-dependencies +fi + +MELODIC_CONFIG_DIR=. + +JAVA_OPTS=-Djavax.net.ssl.trustStore=./broker-truststore.p12\ -Djavax.net.ssl.trustStorePassword=melodic\ -Djavax.net.ssl.trustStoreType=pkcs12 +# -Djavax.net.debug=all +# -Djavax.net.debug=ssl,handshake,record + +java $JAVA_OPTS -classpath "target/classes:target/dependency/*" eu.melodic.event.brokerclient.BrokerClientApp $* diff --git a/amq-message-java-library/eu.melodic.event.brokerclient.properties b/amq-message-java-library/eu.melodic.event.brokerclient.properties new file mode 100644 index 00000000..df01ca61 --- /dev/null +++ b/amq-message-java-library/eu.melodic.event.brokerclient.properties @@ -0,0 +1,18 @@ +# +# Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr) +# +# This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless +# Esper library is used, in which case it is subject to the terms of General Public License v2.0. +# If a copy of the MPL was not distributed with this file, you can obtain one at +# https://www.mozilla.org/en-US/MPL/2.0/ +# + +# Broker Client settings +brokerclient.broker-url=tcp://localhost:61616 +brokerclient.broker-url-properties=transport.daemon=true&transport.trace=false&transport.useKeepAlive=true&transport.useInactivityMonitor=false&transport.needClientAuth=${brokerclient.ssl.client-auth.required}&transport.verifyHostName=false +brokerclient.ssl.client-auth.required=false +brokerclient.preserve-connection=false + +# Broker authentication +brokerclient.broker-username= +brokerclient.broker-password= \ No newline at end of file diff --git a/amq-message-java-library/pom.xml b/amq-message-java-library/pom.xml new file mode 100644 index 00000000..1fe0c90e --- /dev/null +++ b/amq-message-java-library/pom.xml @@ -0,0 +1,83 @@ + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.4.4 + + + amq-message-java-library + AMQ message Java library + gr.ntua.imu.morphemic + 1.0.0 + + + + + + org.springframework.boot + spring-boot-starter-activemq + + + org.apache.activemq + activemq-broker + + + + + org.projectlombok + lombok + provided + + + + + org.apache.commons + commons-lang3 + 3.8.1 + + + + + + eu.7bulls + Melodic 7bulls repository + https://nexus.7bulls.eu:8443/repository/maven-snapshots/ + + + eu.7bulls + Melodic 7bulls repository + https://nexus.7bulls.eu:8443/repository/maven-releases/ + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + + + + + diff --git a/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerClient.java b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerClient.java new file mode 100644 index 00000000..373c9f63 --- /dev/null +++ b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerClient.java @@ -0,0 +1,345 @@ +/* + * Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr) + * + * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless + * Esper library is used, in which case it is subject to the terms of General Public License v2.0. + * If a copy of the MPL was not distributed with this file, you can obtain one at + * https://www.mozilla.org/en-US/MPL/2.0/ + */ + +package eu.melodic.event.brokerclient; + +import eu.melodic.event.brokerclient.event.EventMap; +import eu.melodic.event.brokerclient.properties.BrokerClientProperties; +import java.io.Serializable; +import java.util.*; +import javax.jms.*; +import lombok.extern.slf4j.Slf4j; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSslConnectionFactory; +import org.apache.activemq.advisory.DestinationSource; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.command.ActiveMQTempTopic; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class BrokerClient { + + @Autowired + private BrokerClientProperties properties; + private Connection connection; + private Session session; + private HashMap listeners = new HashMap<>(); + + public BrokerClient() { + } + + public BrokerClient(BrokerClientProperties bcp) { + properties = bcp; + } + + public BrokerClient(Properties p) { + properties = new BrokerClientProperties(p); + } + + // ------------------------------------------------------------------------ + + public static BrokerClient newClient(String broker_properties_configuration_file_location) throws java.io.IOException, JMSException { + log.info("BrokerClient: Initializing..."); + + /* + // get properties file + String configDir = System.getenv("MELODIC_CONFIG_DIR"); + if (configDir == null || configDir.trim().isEmpty()) configDir = "."; + log.info("BrokerClient: config-dir: {}", configDir); + String configPropFile = configDir + "/" + "eu.melodic.event.brokerclient.properties"; + log.info("BrokerClient: config-file: {}", configPropFile); + */ + + // load properties + Properties p = new Properties(); + //ClassLoader loader = Thread.currentThread().getContextClassLoader(); + //try (java.io.InputStream in = loader.getClass().getResourceAsStream(configPropFile)) { p.load(in); } + try (java.io.InputStream in = new java.io.FileInputStream(broker_properties_configuration_file_location)) { + p.load(in); + } + log.info("BrokerClient: config-properties: {}", p); + + // initialize broker client + BrokerClient client = new BrokerClient(p); + log.info("BrokerClient: Configuration:\n{}", client.properties); + + return client; + } + + public static BrokerClient newClient(String username, String password, String broker_properties_configuration_file_path) throws java.io.IOException, JMSException { + BrokerClient client = newClient(broker_properties_configuration_file_path); + if (username!=null && password!=null) { + client.getClientProperties().setBrokerUsername(username); + client.getClientProperties().setBrokerPassword(password); + } + return client; + } + + public static BrokerClient newClient() { + return new BrokerClient(); + } + + // ------------------------------------------------------------------------ + + public BrokerClientProperties getClientProperties() { + checkProperties(); + return properties; + } + + protected void checkProperties() { + if (properties==null) { + //use defaults + properties = new BrokerClientProperties(); + } + } + + // ------------------------------------------------------------------------ + + public synchronized Set getDestinationNames(String connectionString) throws JMSException { + // open or reuse connection + checkProperties(); + boolean _closeConn = false; + if (session==null) { + openConnection(connectionString); + _closeConn = ! properties.isPreserveConnection(); + } + + // Get destinations from Broker + log.info("BrokerClient.getDestinationNames(): Getting destinations: connection={}, username={}", connectionString, properties.getBrokerUsername()); + ActiveMQConnection conn = (ActiveMQConnection)connection; + DestinationSource ds = conn.getDestinationSource(); + Set queues = ds.getQueues(); + Set topics = ds.getTopics(); + Set tempQueues = ds.getTemporaryQueues(); + Set tempTopics = ds.getTemporaryTopics(); + log.info("BrokerClient.getDestinationNames(): Getting destinations: done"); + + // Get destination names + HashSet destinationNames = new HashSet<>(); + for (ActiveMQQueue q : queues) destinationNames.add("QUEUE "+q.getQueueName()); + for (ActiveMQTopic t : topics) destinationNames.add("TOPIC "+t.getTopicName()); + for (ActiveMQTempQueue tq : tempQueues) destinationNames.add("Temp QUEUE "+tq.getQueueName()); + for (ActiveMQTempTopic tt : tempTopics) destinationNames.add("Temp TOPIC "+tt.getTopicName()); + + // close connection + if (_closeConn) { + closeConnection(); + } + + return destinationNames; + } + + // ------------------------------------------------------------------------ + + public synchronized void publishEvent(String connectionString, String destinationName, Map eventMap) throws JMSException { + _publishEvent(connectionString, destinationName, new EventMap(eventMap)); + } + + protected synchronized void _publishEvent(String connectionString, String destinationName, Serializable event) throws JMSException { + _publishEvent(connectionString,destinationName,event,false); + } + protected synchronized void _publishEvent(String connectionString, String destinationName, Serializable event, boolean persistent_connection_demanded) throws JMSException { + // open or reuse connection + checkProperties(); + boolean _closeConn = false; + if (session==null) { + openConnection(connectionString); + _closeConn = ! properties.isPreserveConnection(); + } + if (persistent_connection_demanded){ + _closeConn = false; + } + + // Create the destination (Topic or Queue) + //Destination destination = session.createQueue( destinationName ); + Destination destination = session.createTopic(destinationName); + + // Create a MessageProducer from the Session to the Topic or Queue + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(javax.jms.DeliveryMode.NON_PERSISTENT); + + // Create a messages + //ObjectMessage message = session.createObjectMessage(event); + TextMessage message = session.createTextMessage(event.toString()); + + // Tell the producer to send the message + long hash = message.hashCode(); + log.info("BrokerClient.publishEvent(): Sending message: connection={}, username={}, destination={}, hash={}, payload={}", connectionString, properties.getBrokerUsername(), destinationName, hash, event); + producer.send(message); + log.info("BrokerClient.publishEvent(): Message sent: connection={}, username={}, destination={}, hash={}, payload={}", connectionString, properties.getBrokerUsername(), destinationName, hash, event); + + // close connection + if (_closeConn) { + closeConnection(); + } + } + + // ------------------------------------------------------------------------ + + public void subscribe(String connectionString, String destinationName, MessageListener listener) throws JMSException { + // Create or open connection + checkProperties(); + if (session==null) { + openConnection(connectionString); + } + + // Create the destination (Topic or Queue) + log.info("BrokerClient: Subscribing to destination: {}...", destinationName); + //Destination destination = session.createQueue( destinationName ); + Destination destination = session.createTopic(destinationName); + + // Create a MessageConsumer from the Session to the Topic or Queue + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(listener); + listeners.put(listener, consumer); + } + + public void unsubscribe(MessageListener listener) throws JMSException { + MessageConsumer consumer = listeners.get(listener); + if (consumer!=null) { + consumer.close(); + } + } + + // ------------------------------------------------------------------------ + + public void receiveEvents(String connectionString, String destinationName, MessageListener listener) throws JMSException { + checkProperties(); + MessageConsumer consumer = null; + boolean _closeConn = false; + try { + // Create or open connection + if (session==null) { + openConnection(connectionString); + _closeConn = ! properties.isPreserveConnection(); + } + + // Create the destination (Topic or Queue) + log.info("BrokerClient: Subscribing to destination: {}...", destinationName); + //Destination destination = session.createQueue( destinationName ); + Destination destination = session.createTopic(destinationName); + + // Create a MessageConsumer from the Session to the Topic or Queue + consumer = session.createConsumer(destination); + + // Wait for messages + log.info("BrokerClient: Waiting for messages..."); + while (true) { + Message message = consumer.receive(); + listener.onMessage(message); + } + + } finally { + // Clean up + log.info("BrokerClient: Closing connection..."); + if (consumer != null) consumer.close(); + if (_closeConn) { + closeConnection(); + } + } + } + + // ------------------------------------------------------------------------ + + public ActiveMQConnectionFactory createConnectionFactory() { + // Create connection factory based on Broker URL scheme + checkProperties(); + final ActiveMQConnectionFactory connectionFactory; + String brokerUrl = properties.getBrokerUrl(); + if (brokerUrl.startsWith("ssl")) { + log.info("BrokerClient.createConnectionFactory(): Creating new SSL connection factory instance: url={}", brokerUrl); + final ActiveMQSslConnectionFactory sslConnectionFactory = new ActiveMQSslConnectionFactory(brokerUrl); + try { + sslConnectionFactory.setTrustStore(properties.getTruststoreFile()); + sslConnectionFactory.setTrustStoreType(properties.getTruststoreType()); + sslConnectionFactory.setTrustStorePassword(properties.getTruststorePassword()); + sslConnectionFactory.setKeyStore(properties.getKeystoreFile()); + sslConnectionFactory.setKeyStoreType(properties.getKeystoreType()); + sslConnectionFactory.setKeyStorePassword(properties.getKeystorePassword()); + //sslConnectionFactory.setKeyStoreKeyPassword( properties........ ); + + connectionFactory = sslConnectionFactory; + } catch (final Exception theException) { + throw new Error(theException); + } + } else { + log.info("BrokerClient.createConnectionFactory(): Creating new non-SSL connection factory instance: url={}", brokerUrl); + connectionFactory = new ActiveMQConnectionFactory(brokerUrl); + } + + // Other connection factory settings + //connectionFactory.setSendTimeout(....5000L); + //connectionFactory.setTrustedPackages(Arrays.asList("eu.melodic.event")); + connectionFactory.setTrustAllPackages(true); + connectionFactory.setWatchTopicAdvisories(true); + + return connectionFactory; + } + + // ------------------------------------------------------------------------ + + public synchronized void openConnection() throws JMSException { + checkProperties(); + openConnection(properties.getBrokerUrl(), null, null); + } + + public synchronized void openConnection(String connectionString) throws JMSException { + openConnection(connectionString, null, null); + } + + public synchronized void openConnection(String connectionString, String username, String password) throws JMSException { + openConnection(connectionString, username, password, properties.isPreserveConnection()); + } + + public synchronized void openConnection(String connectionString, String username, String password, boolean preserveConnection) throws JMSException { + checkProperties(); + if (connectionString == null) connectionString = properties.getBrokerUrl(); + log.debug("BrokerClient: Credentials provided as arguments: username={}, password={}", username, password); + if (StringUtils.isBlank(username)) { + username = properties.getBrokerUsername(); + password = properties.getBrokerPassword(); + log.debug("BrokerClient: Credentials read from properties: username={}, password={}", username, password); + } + + // Create connection factory + ActiveMQConnectionFactory connectionFactory = createConnectionFactory(); + connectionFactory.setBrokerURL(connectionString); + if (StringUtils.isNotBlank(username) && password != null) { + connectionFactory.setUserName(username); + connectionFactory.setPassword(password); + } + log.debug("BrokerClient: Connection credentials: username={}, password={}", username, password); + + // Create a Connection + log.info("BrokerClient: Connecting to broker: {}...", connectionString); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + // Create a Session + log.info("BrokerClient: Opening session..."); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + this.connection = connection; + this.session = session; + } + + public synchronized void closeConnection() throws JMSException { + // Clean up + session.close(); + connection.close(); + session = null; + connection = null; + } +} \ No newline at end of file diff --git a/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerClientApp.java b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerClientApp.java new file mode 100644 index 00000000..0448fe2c --- /dev/null +++ b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerClientApp.java @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr) + * + * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless + * Esper library is used, in which case it is subject to the terms of General Public License v2.0. + * If a copy of the MPL was not distributed with this file, you can obtain one at + * https://www.mozilla.org/en-US/MPL/2.0/ + */ + +package eu.melodic.event.brokerclient; + +import eu.melodic.event.brokerclient.event.EventGenerator; +import eu.melodic.event.brokerclient.event.EventMap; +import javax.jms.*; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +import java.util.HashMap; +import java.util.Map; + +@Slf4j +public class BrokerClientApp { + + public static void main(String args[]) throws java.io.IOException, JMSException { + if (args.length==0) { + usage(); + return; + } + + int aa=0; + String command = args[aa++]; + + String username = args.length>aa && args[aa].startsWith("-U") ? args[aa++].substring(2) : null; + String password = username!=null && args.length>aa && args[aa].startsWith("-P") ? args[aa++].substring(2) : null; + if (StringUtils.isNotBlank(username) && password == null) { + password = new String(System.console().readPassword("Enter broker password: ")); + } + String broker_properties_configuration_file_location = args.length>aa && args[aa].startsWith("-C") ? args[aa++].substring(2) : null; + if (broker_properties_configuration_file_location == null){ + broker_properties_configuration_file_location = new String(System.console().readLine()); + } + + // list destinations + if ("list".equalsIgnoreCase(command)) { + String url = args[aa++]; + log.info("BrokerClientApp: Listing destinations:"); + BrokerClient client = BrokerClient.newClient(username, password,broker_properties_configuration_file_location); + client.getDestinationNames(url).stream().forEach(d -> log.info(" {}", d)); + } else + // send an event + if ("publish".equalsIgnoreCase(command)) { + String url = args[aa++]; + String topic = args[aa++]; + String value = args[aa++]; + String level = args[aa++]; + EventMap event = new EventMap(Double.parseDouble(value), Integer.parseInt(level), System.currentTimeMillis()); + log.info("BrokerClientApp: Publishing event: {}", event); + BrokerClient client = BrokerClient.newClient(username, password,broker_properties_configuration_file_location); + client.publishEvent(url, topic, event); + } else + //publish an event with custom information + if ("custom_publish".equalsIgnoreCase(command)) { + Map data = new HashMap<>(); + String url = args[aa++]; + String topic = args[aa++]; + while (aa+1 [-P]] "); + log.info("BrokerClientApp: client publish [-U [-P]] "); + log.info("BrokerClientApp: client receive [-U [-P]] "); + log.info("BrokerClientApp: client subscribe [-U [-P]] "); + log.info("BrokerClientApp: client generator [-U [-P]] "); + } +} \ No newline at end of file diff --git a/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerPublisher.java b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerPublisher.java new file mode 100644 index 00000000..3bc69774 --- /dev/null +++ b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerPublisher.java @@ -0,0 +1,61 @@ +package eu.melodic.event.brokerclient; + +import lombok.extern.slf4j.Slf4j; + +import javax.jms.JMSException; +import java.io.IOException; +import java.util.Map; + +@Slf4j +public class BrokerPublisher { + + private String topic; + private String url; + private String username; + private String password; + private String broker_configuration_file_location; + private BrokerClient client = null; + + public BrokerPublisher(String topic, String url, String username, String password,String broker_configuration_file_location){ + this.topic = topic; + this.url = url; + this.username = username; + this.password = password; + this.broker_configuration_file_location = broker_configuration_file_location; + } + + public void publish(Map event_map) { + try { + log.info("BrokerClientApp: Publishing to topic: {}", topic); + if (client==null) { + client = BrokerClient.newClient(username, password, broker_configuration_file_location); + } + client.publishEvent(url, topic, event_map); + }catch (IOException | JMSException i){ + i.printStackTrace(); + } + } + + public void publish(String s) { + try { + log.info("BrokerClientApp: Publishing to topic: {}", topic); + if(client == null) { + client = BrokerClient.newClient(username, password, broker_configuration_file_location); + } + client._publishEvent(url, topic, s); + }catch (IOException | JMSException i){ + i.printStackTrace(); + } + } + public void publish(String s,boolean persistent_connection_demanded) { + try { + log.info("BrokerClientApp: Publishing to topic: {}", topic); + if (client==null) { + client = BrokerClient.newClient(username, password, broker_configuration_file_location); + } + client._publishEvent(url, topic, s,persistent_connection_demanded); + }catch (IOException | JMSException i){ + i.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerSubscriber.java b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerSubscriber.java new file mode 100644 index 00000000..19351cb7 --- /dev/null +++ b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/BrokerSubscriber.java @@ -0,0 +1,46 @@ +package eu.melodic.event.brokerclient; + +import lombok.extern.slf4j.Slf4j; + +import javax.jms.*; +import java.io.IOException; +import java.util.function.BiFunction; + + +@Slf4j +public class BrokerSubscriber { + + private String topic; + private String url; + private String username; + private String password; + private String broker_configuration_file_location; + private BrokerClient client = null; + + public BrokerSubscriber(String topic, String url, String username, String password, String broker_configuration_file_location){ + this.topic = topic; + this.url = url; + this.username = username; + this.password = password; + this.broker_configuration_file_location = broker_configuration_file_location; + } + + + public void subscribe(BiFunction function) { + try { + log.info("BrokerClientApp: Subscribing to topic: {}", topic); + BrokerClient client = BrokerClient.newClient(username, password, broker_configuration_file_location); + client.receiveEvents(url, topic, message -> { + try { + function.apply(topic,((TextMessage) message).getText()); + } catch (JMSException j) { + log.info("Shutting down subscriber..."); + j.printStackTrace(); + } + }); + }catch (IOException | JMSException i){ + i.printStackTrace(); + } + } + +} \ No newline at end of file diff --git a/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/event/EventGenerator.java b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/event/EventGenerator.java new file mode 100644 index 00000000..4aab576d --- /dev/null +++ b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/event/EventGenerator.java @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr) + * + * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless + * Esper library is used, in which case it is subject to the terms of General Public License v2.0. + * If a copy of the MPL was not distributed with this file, you can obtain one at + * https://www.mozilla.org/en-US/MPL/2.0/ + */ + +package eu.melodic.event.brokerclient.event; + +import eu.melodic.event.brokerclient.BrokerClient; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +@Data +@Slf4j +public class EventGenerator implements Runnable { + private BrokerClient client; + private String brokerUrl; + private String destinationName; + private long interval; + private long howmany = -1; + private double lowerValue; + private double upperValue; + private int level; + + private transient boolean keepRunning; + + public void start() { + if (keepRunning) return; + Thread runner = new Thread(this); + runner.setDaemon(true); + runner.start(); + } + + public void stop() { + keepRunning = false; + } + + public void run() { + log.info("EventGenerator.run(): Start sending events: event-generator: {}", this); + + keepRunning = true; + double valueRangeWidth = upperValue - lowerValue; + long countSent = 0; + while (keepRunning) { + try { + double newValue = Math.random() * valueRangeWidth + lowerValue; + EventMap event = new EventMap(newValue, level, System.currentTimeMillis()); + log.info("EventGenerator.run(): Sending event #{}: {}", countSent + 1, event); + client.publishEvent(brokerUrl, destinationName, event); + countSent++; + if (countSent == howmany) keepRunning = false; + log.info("EventGenerator.run(): Event sent #{}: {}", countSent, event); + } catch (Exception ex) { + log.warn("EventGenerator.run(): WHILE-EXCEPTION: {}", ex); + } + // sleep for 'interval' ms + try { + if (keepRunning) { + Thread.sleep(interval); + } + } catch (InterruptedException ex) { + log.warn("EventGenerator.run(): Sleep interrupted"); + } + } + + log.info("EventGenerator.run(): Stop sending events: event-generator: {}", this); + } +} \ No newline at end of file diff --git a/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/event/EventMap.java b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/event/EventMap.java new file mode 100644 index 00000000..a180508e --- /dev/null +++ b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/event/EventMap.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr) + * + * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless + * Esper library is used, in which case it is subject to the terms of General Public License v2.0. + * If a copy of the MPL was not distributed with this file, you can obtain one at + * https://www.mozilla.org/en-US/MPL/2.0/ + */ + +package eu.melodic.event.brokerclient.event; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Getter +@Slf4j +public class EventMap extends HashMap implements Serializable { + public EventMap() { + super(); + } + + public EventMap(Map map) { + super(map); + } + + public EventMap(double metricValue, int level, long timestamp) { + put("metricValue", metricValue); + put("level", level); + put("timestamp", timestamp); + } + + public static String[] getPropertyNames() { + return new String[]{"metricValue", "level", "timestamp"}; + } + + public static Class[] getPropertyClasses() { + return new Class[]{Double.class, Integer.class, Long.class}; + } +} \ No newline at end of file diff --git a/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/properties/BrokerClientProperties.java b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/properties/BrokerClientProperties.java new file mode 100644 index 00000000..25974e81 --- /dev/null +++ b/amq-message-java-library/src/main/java/eu/melodic/event/brokerclient/properties/BrokerClientProperties.java @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2017-2019 Institute of Communication and Computer Systems (imu.iccs.gr) + * + * This Source Code Form is subject to the terms of the Mozilla Public License, v2.0, unless + * Esper library is used, in which case it is subject to the terms of General Public License v2.0. + * If a copy of the MPL was not distributed with this file, you can obtain one at + * https://www.mozilla.org/en-US/MPL/2.0/ + */ + +package eu.melodic.event.brokerclient.properties; + +import lombok.Data; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.PropertySource; + +@Data +@ToString(exclude = {"truststorePassword", "keystorePassword", "brokerPassword"}) +@Configuration +@ConfigurationProperties(prefix = "brokerclient") +@PropertySource("file:${MELODIC_CONFIG_DIR}/eu.melodic.event.brokerclient.properties") +@Slf4j +public class BrokerClientProperties { + @Value("${broker-name:broker}") + private String brokerName; + @Value("${broker-url:ssl://localhost:61616}") + private String brokerUrl; + @Value("${broker-url-properties:}") + private String brokerUrlProperties; + @Value("${ssl.client-auth.required:false}") + private boolean clientAuthRequired; + @Value("${connector-port:-1}") + private int connectorPort; + @Value("${preserve-connection:false}") + private boolean preserveConnection; + + @Value("${ssl.truststore.file:}") + private String truststoreFile; + @Value("${ssl.truststore.type:}") + private String truststoreType; + @Value("${ssl.truststore.password:}") + private String truststorePassword; + @Value("${ssl.keystore.file:}") + private String keystoreFile; + @Value("${ssl.keystore.type:}") + private String keystoreType; + @Value("${ssl.keystore.password:}") + private String keystorePassword; + + @Value("${broker-username:}") + private String brokerUsername; + @Value("${broker-password:}") + private String brokerPassword; + + public BrokerClientProperties() { + brokerName = "broker"; + brokerUrl = "ssl://localhost:61616}"; + brokerUrlProperties = ""; + connectorPort = -1; + preserveConnection = true; + + truststoreFile = ""; + truststoreType = ""; + truststorePassword = ""; + keystoreFile = ""; + keystoreType = ""; + keystorePassword = ""; + clientAuthRequired = false; + + brokerUsername = ""; + brokerPassword = ""; + } + + public BrokerClientProperties(java.util.Properties p) { + brokerName = p.getProperty("brokerclient.broker-name", "broker"); + brokerUrl = p.getProperty("brokerclient.broker-url", "ssl://localhost:61616}"); + brokerUrlProperties = p.getProperty("brokerclient.broker-url-properties", ""); + connectorPort = Integer.parseInt(p.getProperty("brokerclient.connector-port", "-1")); + preserveConnection = Boolean.parseBoolean(p.getProperty("brokerclient.preserve-connection", "true")); + + truststoreFile = p.getProperty("brokerclient.ssl.truststore.file", ""); + truststoreType = p.getProperty("brokerclient.ssl.truststore.type", ""); + truststorePassword = p.getProperty("brokerclient.ssl.truststore.password", ""); + keystoreFile = p.getProperty("brokerclient.ssl.keystore.file", ""); + keystoreType = p.getProperty("brokerclient.ssl.keystore.type", ""); + keystorePassword = p.getProperty("brokerclient.ssl.keystore.password", ""); + clientAuthRequired = Boolean.parseBoolean(p.getProperty("brokerclient.ssl.client-auth.required", "false")); + + brokerUsername = p.getProperty("brokerclient.broker-username", ""); + brokerPassword = p.getProperty("brokerclient.broker-password", ""); + + brokerUrlProperties = brokerUrlProperties.replace("${brokerclient.ssl.client-auth.required}", Boolean.toString(clientAuthRequired)); + } +} -- GitLab