Commit 86de4d34 authored by maciek riedl's avatar maciek riedl
Browse files

Merge branch 'ems/morphemic-rc2.0' into 'morphemic-rc2.0'

EMS: Broker-CEP, Control Service: Added an extra argument in...

See merge request !133
parents 35ed93c3 64ac1300
Pipeline #19986 passed with stages
in 18 minutes and 28 seconds
......@@ -203,25 +203,25 @@ public class BrokerCepService {
public synchronized void publishEvent(String connectionString, String destinationName, Map<String, Object> eventMap) throws JMSException {
if (properties.isBypassLocalBroker() && _publishLocalEvent(connectionString, destinationName, new EventMap(eventMap)))
return;
_publishEvent(connectionString, destinationName, EventMap.toEventMap(eventMap));
_publishEvent(connectionString, destinationName, EventMap.toEventMap(eventMap), true);
}
public synchronized void publishEvent(String connectionString, String username, String password, String destinationName, Map<String, Object> eventMap) throws JMSException {
if (properties.isBypassLocalBroker() && _publishLocalEvent(connectionString, destinationName, new EventMap(eventMap)))
return;
_publishEvent(connectionString, username, password, destinationName, new EventMap(eventMap));
_publishEvent(connectionString, username, password, destinationName, new EventMap(eventMap), true);
}
public synchronized void publishSerializable(String connectionString, String destinationName, Serializable event) throws JMSException {
public synchronized void publishSerializable(String connectionString, String destinationName, Serializable event, boolean convertToJson) throws JMSException {
if (properties.isBypassLocalBroker() && _publishLocalEvent(connectionString, destinationName, event))
return;
_publishEvent(connectionString, destinationName, event);
_publishEvent(connectionString, destinationName, event, convertToJson);
}
public synchronized void publishSerializable(String connectionString, String username, String password, String destinationName, Serializable event) throws JMSException {
public synchronized void publishSerializable(String connectionString, String username, String password, String destinationName, Serializable event, boolean convertToJson) throws JMSException {
if (properties.isBypassLocalBroker() && _publishLocalEvent(connectionString, destinationName, event))
return;
_publishEvent(connectionString, username, password, destinationName, event);
_publishEvent(connectionString, username, password, destinationName, event, convertToJson);
}
// When destination is the local broker then hand event to (local) CEP engine, bypassing local broker
......@@ -250,7 +250,7 @@ public class BrokerCepService {
return true;
}
private synchronized void _publishEvent(String connectionString, String destinationName, Serializable event) throws JMSException {
private synchronized void _publishEvent(String connectionString, String destinationName, Serializable event, boolean convertToJson) throws JMSException {
// Get username/password for local broker service
String username = null;
String password = null;
......@@ -260,10 +260,10 @@ public class BrokerCepService {
log.debug("BrokerCepService._publishEvent(): Setting LOCAL BROKER credentials: {} / {}",
username, passwordUtil.encodePassword(password));
}
_publishEvent(connectionString, username, password, destinationName, event);
_publishEvent(connectionString, username, password, destinationName, event, convertToJson);
}
private synchronized void _publishEvent(String connectionString, String username, String password, String destinationName, Serializable event) throws JMSException {
private synchronized void _publishEvent(String connectionString, String username, String password, String destinationName, Serializable event, boolean convertToJson) throws JMSException {
// Clone connection factory
if (connectionString == null) connectionString = properties.getBrokerUrlForConsumer();
ConnectionFactory connectionFactory = brokerConfig.getConnectionFactoryFor(connectionString);
......@@ -277,26 +277,26 @@ public class BrokerCepService {
connection.start();
// Publish event
_publishEvent(connection, destinationName, event);
_publishEvent(connection, destinationName, event, convertToJson);
// Clean up
connection.close();
}
private synchronized void _publishEvent(Connection connection, String destinationName, Serializable event) throws JMSException {
private synchronized void _publishEvent(Connection connection, String destinationName, Serializable event, boolean convertToJson) throws JMSException {
log.trace("BrokerCepService._publishEvent(): Connection given: {}", connection);
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Publish event
_publishEvent(session, destinationName, event);
_publishEvent(session, destinationName, event, convertToJson);
// Clean up
session.close();
}
private synchronized void _publishEvent(Session session, String destinationName, Serializable event) throws JMSException {
private synchronized void _publishEvent(Session session, String destinationName, Serializable event, boolean convertToJson) throws JMSException {
log.trace("BrokerCepService._publishEvent(): Session: {}", session);
// Create the destination (Topic or Queue)
......@@ -310,8 +310,8 @@ public class BrokerCepService {
// Create a message
//ObjectMessage message = session.createObjectMessage(event);
String payload = gson.toJson(event);
log.trace("BrokerCepService.publishEvent(): Message payload: topic={}, payload={}", destination, payload);
String payload = convertToJson ? gson.toJson(event) : (event!=null ? event.toString() : null);
log.trace("BrokerCepService.publishEvent(): Message payload: topic={}, convert-to-json={}, payload={}", destination, convertToJson, payload);
TextMessage message = session.createTextMessage(payload);
// Set message properties
......
......@@ -232,7 +232,8 @@ public class TopicBeacon implements InitializingBean {
brokerCepService.getBrokerUsername(),
brokerCepService.getBrokerPassword(),
topicName,
event);
event,
false);
log.debug("Topic Beacon: Event sent to topic: event={}, topic={}", event, topicName);
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment