Commit 375f1b9b authored by afreyssin's avatar afreyssin

[JORAM-262] - A JMS bridge destination allows to filter BridgeConnection used...

[JORAM-262] - A JMS bridge destination allows to filter BridgeConnection used to collect or distribute messages. No longer use ConnectionFactory name but rather the BridgeConnection name.
parent ab6fa50d
......@@ -164,11 +164,11 @@ public class JMSAcquisition implements AcquisitionDaemon {
List<JMSModule> connections = JMSConnectionService.getInstance().getConnections();
for (JMSModule connection : connections) {
if (!listeners.containsKey(connection.getCnxFactName())) {
if (connectionNames == null || connectionNames.contains(connection.getCnxFactName())) {
if (!listeners.containsKey(connection.getName())) {
if (connectionNames == null || connectionNames.contains(connection.getName())) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG,
"Creating a new consumer for connection: " + connection.getCnxFactName(), new Exception());
"Creating a new consumer for connection: " + connection.getName(), new Exception());
}
try {
dest = (Destination) connection.retrieveJndiObject(destName);
......@@ -192,7 +192,7 @@ public class JMSAcquisition implements AcquisitionDaemon {
consumer.setMessageListener(listener);
connection.getCnx().start();
connection.addExceptionListener(listener);
listeners.put(connection.getCnxFactName(), listener);
listeners.put(connection.getName(), listener);
} catch (Exception e) {
logger.log(BasicLevel.ERROR,
"Error while starting consumer on connection: " + connection.getCnxFactName(), e);
......@@ -217,7 +217,7 @@ public class JMSAcquisition implements AcquisitionDaemon {
*/
public void onMessage(javax.jms.Message jmsMessage) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, connection.getCnxFactName() + ".onMessage(" + jmsMessage + ')');
logger.log(BasicLevel.DEBUG, connection.getName() + ".onMessage(" + jmsMessage + ')');
try {
org.objectweb.joram.client.jms.Message clientMessage = null;
......@@ -228,7 +228,7 @@ public class JMSAcquisition implements AcquisitionDaemon {
// Conversion error: denying the message.
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
connection.getCnxFactName() + ".onMessage: rollback, can not convert message.",
connection.getName() + ".onMessage: rollback, can not convert message.",
conversionExc);
session.rollback();
......@@ -237,25 +237,25 @@ public class JMSAcquisition implements AcquisitionDaemon {
transmitter.transmit(clientMessage.getMomMsg(), jmsMessage.getJMSMessageID());
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, connection.getCnxFactName() + ".onMessage: Try to commit.");
logger.log(BasicLevel.DEBUG, connection.getName() + ".onMessage: Try to commit.");
session.commit();
} catch (JMSException exc) {
// Commit or rollback failed: nothing to do.
logger.log(BasicLevel.ERROR,
connection.getCnxFactName() + ".onMessage(" + jmsMessage + ')', exc);
connection.getName() + ".onMessage(" + jmsMessage + ')', exc);
} catch (Throwable t) {
logger.log(BasicLevel.ERROR,
connection.getCnxFactName() + ".onMessage(" + jmsMessage + ')', t);
connection.getName() + ".onMessage(" + jmsMessage + ')', t);
}
}
public void onException(JMSException exception) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, connection.getCnxFactName() + ": Consumer error for session " + session);
logger.log(BasicLevel.DEBUG, connection.getName() + ": Consumer error for session " + session);
}
if (!closing) {
listeners.remove(connection.getCnxFactName());
listeners.remove(connection.getName());
}
}
......
......@@ -109,18 +109,18 @@ public class JMSDistribution implements DistributionHandler {
}
List<JMSModule> connections = JMSConnectionService.getInstance().getConnections();
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "JMSDistribution.distribute: connections = " + connections);
logger.log(BasicLevel.DEBUG, "JMSDistribution.distribute: connections=" + connections.size());
}
for (final JMSModule connection : connections) {
SessionAndProducer sap = sessions.get(connection.getCnxFactName());
SessionAndProducer sap = sessions.get(connection.getName());
if (sap != null) {
// Verify that the connection still valid
if (sap.connection != connection.getCnx()) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"JMSDistribution.distribute: remove outdated connection " + connection.getCnxFactName());
sessions.remove(connection.getCnxFactName());
"JMSDistribution.distribute: remove outdated connection " + connection.getName());
sessions.remove(connection.getName());
sap = null;
}
}
......@@ -128,12 +128,15 @@ public class JMSDistribution implements DistributionHandler {
if (sap == null) { // !sessions.containsKey(connection.getCnxFactName()))
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
connection.getCnxFactName() + ": New connection factory available for distribution.");
"JMSDistribution.distribute: Creates new connection for distribution, cf = " + connection.getName());
try {
Session session = connection.getCnx().createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = (Destination) connection.retrieveJndiObject(destName);
MessageProducer producer = session.createProducer(dest);
sessions.put(connection.getCnxFactName(), new SessionAndProducer(connection.getCnx(), session, producer));
sessions.put(connection.getName(), new SessionAndProducer(connection.getCnx(), session, producer));
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"JMSDistribution.distribute: New connection available.");
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Connection is not usable.", exc);
......@@ -146,11 +149,15 @@ public class JMSDistribution implements DistributionHandler {
// Send the message
Iterator<Map.Entry<String, SessionAndProducer>> iter = sessions.entrySet().iterator();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Try to send message, sessions available: " + sessions.size());
while (iter.hasNext()) {
Map.Entry<String, SessionAndProducer> entry = iter.next();
try {
SessionAndProducer session = entry.getValue();
String cnxName = entry.getKey();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Try session: " + cnxName);
if (connectionNames != null && !connectionNames.contains(cnxName)) {
continue;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment