diff --git a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepService.java b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepService.java index 4466b0116ad24ea59a65eec268a969c647158c69..ca31e62881e77b15312afc66cc021ad2adbe2c99 100644 --- a/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepService.java +++ b/event-management/broker-cep/src/main/java/eu/melodic/event/brokercep/BrokerCepService.java @@ -203,25 +203,25 @@ public class BrokerCepService { public synchronized void publishEvent(String connectionString, String destinationName, Map eventMap) throws JMSException { if (properties.isBypassLocalBroker() && _publishLocalEvent(connectionString, destinationName, new EventMap(eventMap))) return; - _publishEvent(connectionString, destinationName, EventMap.toEventMap(eventMap)); + _publishEvent(connectionString, destinationName, EventMap.toEventMap(eventMap), true); } public synchronized void publishEvent(String connectionString, String username, String password, String destinationName, Map eventMap) throws JMSException { if (properties.isBypassLocalBroker() && _publishLocalEvent(connectionString, destinationName, new EventMap(eventMap))) return; - _publishEvent(connectionString, username, password, destinationName, new EventMap(eventMap)); + _publishEvent(connectionString, username, password, destinationName, new EventMap(eventMap), true); } - public synchronized void publishSerializable(String connectionString, String destinationName, Serializable event) throws JMSException { + public synchronized void publishSerializable(String connectionString, String destinationName, Serializable event, boolean convertToJson) throws JMSException { if (properties.isBypassLocalBroker() && _publishLocalEvent(connectionString, destinationName, event)) return; - _publishEvent(connectionString, destinationName, event); + _publishEvent(connectionString, destinationName, event, convertToJson); } - public synchronized void publishSerializable(String connectionString, String username, String password, String destinationName, Serializable event) throws JMSException { + public synchronized void publishSerializable(String connectionString, String username, String password, String destinationName, Serializable event, boolean convertToJson) throws JMSException { if (properties.isBypassLocalBroker() && _publishLocalEvent(connectionString, destinationName, event)) return; - _publishEvent(connectionString, username, password, destinationName, event); + _publishEvent(connectionString, username, password, destinationName, event, convertToJson); } // When destination is the local broker then hand event to (local) CEP engine, bypassing local broker @@ -250,7 +250,7 @@ public class BrokerCepService { return true; } - private synchronized void _publishEvent(String connectionString, String destinationName, Serializable event) throws JMSException { + private synchronized void _publishEvent(String connectionString, String destinationName, Serializable event, boolean convertToJson) throws JMSException { // Get username/password for local broker service String username = null; String password = null; @@ -260,10 +260,10 @@ public class BrokerCepService { log.debug("BrokerCepService._publishEvent(): Setting LOCAL BROKER credentials: {} / {}", username, passwordUtil.encodePassword(password)); } - _publishEvent(connectionString, username, password, destinationName, event); + _publishEvent(connectionString, username, password, destinationName, event, convertToJson); } - private synchronized void _publishEvent(String connectionString, String username, String password, String destinationName, Serializable event) throws JMSException { + private synchronized void _publishEvent(String connectionString, String username, String password, String destinationName, Serializable event, boolean convertToJson) throws JMSException { // Clone connection factory if (connectionString == null) connectionString = properties.getBrokerUrlForConsumer(); ConnectionFactory connectionFactory = brokerConfig.getConnectionFactoryFor(connectionString); @@ -277,26 +277,26 @@ public class BrokerCepService { connection.start(); // Publish event - _publishEvent(connection, destinationName, event); + _publishEvent(connection, destinationName, event, convertToJson); // Clean up connection.close(); } - private synchronized void _publishEvent(Connection connection, String destinationName, Serializable event) throws JMSException { + private synchronized void _publishEvent(Connection connection, String destinationName, Serializable event, boolean convertToJson) throws JMSException { log.trace("BrokerCepService._publishEvent(): Connection given: {}", connection); // Create a Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Publish event - _publishEvent(session, destinationName, event); + _publishEvent(session, destinationName, event, convertToJson); // Clean up session.close(); } - private synchronized void _publishEvent(Session session, String destinationName, Serializable event) throws JMSException { + private synchronized void _publishEvent(Session session, String destinationName, Serializable event, boolean convertToJson) throws JMSException { log.trace("BrokerCepService._publishEvent(): Session: {}", session); // Create the destination (Topic or Queue) @@ -310,8 +310,8 @@ public class BrokerCepService { // Create a message //ObjectMessage message = session.createObjectMessage(event); - String payload = gson.toJson(event); - log.trace("BrokerCepService.publishEvent(): Message payload: topic={}, payload={}", destination, payload); + String payload = convertToJson ? gson.toJson(event) : (event!=null ? event.toString() : null); + log.trace("BrokerCepService.publishEvent(): Message payload: topic={}, convert-to-json={}, payload={}", destination, convertToJson, payload); TextMessage message = session.createTextMessage(payload); // Set message properties diff --git a/event-management/control-service/src/main/java/eu/melodic/event/control/util/TopicBeacon.java b/event-management/control-service/src/main/java/eu/melodic/event/control/util/TopicBeacon.java index da7fc25f5cf79252e496c2f4eab7628dd67f46cc..6030df0b7d3b7b9f5d215a6ab7052d7a3a17d38f 100644 --- a/event-management/control-service/src/main/java/eu/melodic/event/control/util/TopicBeacon.java +++ b/event-management/control-service/src/main/java/eu/melodic/event/control/util/TopicBeacon.java @@ -232,7 +232,8 @@ public class TopicBeacon implements InitializingBean { brokerCepService.getBrokerUsername(), brokerCepService.getBrokerPassword(), topicName, - event); + event, + false); log.debug("Topic Beacon: Event sent to topic: event={}, topic={}", event, topicName); } }