Commit 1899a175 authored by Andre Freyssinet's avatar Andre Freyssinet

Bug fix (JORAM-308):

 - Force a default WakeUP period.
 - Clone the JMS message before to push it in the JMS client (this client
can modify the message internal and cause issues).
parent 22d00e38
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2010 - 2017 ScalAgent Distributed Technologies
* Copyright (C) 2010 - 2018 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
......@@ -90,6 +90,12 @@ public class DistributionQueue extends Queue {
* The initial set of properties.
*/
public void setProperties(Properties properties, boolean firstTime) throws Exception {
if (properties != null && ! properties.containsKey(DestinationConstants.WAKEUP_PERIOD)) {
logger.log(BasicLevel.WARN,
"DistributionQueue.setProperties, " + DestinationConstants.WAKEUP_PERIOD + "not defined, set to 1000");
properties.setProperty(DestinationConstants.WAKEUP_PERIOD, "1000");
}
super.setProperties(properties, firstTime);
if (logger.isLoggable(BasicLevel.DEBUG)) {
......@@ -302,7 +308,7 @@ public class DistributionQueue extends Queue {
nbMsgsDeliverSinceCreation++;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.removeAndDeleteMessages() - removes " + id);
logger.log(BasicLevel.DEBUG, "DistributionQueue.removeAndDeleteMessages() - removes " + id + ", " + message.order);
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ": removes message " + id);
......
......@@ -77,14 +77,16 @@ public class JMSDistribution implements DistributionHandler {
updatePeriod = Long.parseLong(properties.getProperty(UPDATE_PERIOD_PROP));
}
} catch (NumberFormatException nfe) {
logger.log(BasicLevel.ERROR, "Property " + UPDATE_PERIOD_PROP
+ "could not be parsed properly, use default value.", nfe);
logger.log(BasicLevel.ERROR,
"Property " + UPDATE_PERIOD_PROP + "could not be parsed properly, use default value.", nfe);
}
if (properties.containsKey(ROUTING_PROP)) {
connectionNames = JMSConnectionService.convertToList(properties.getProperty(ROUTING_PROP));
}
}
long lastwarn = 0;
public void distribute(Message message) throws Exception {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "JMSDistribution.distribute(" + message + ')');
......@@ -110,6 +112,9 @@ public class JMSDistribution implements DistributionHandler {
List<JMSModule> connections = JMSConnectionService.getInstance().getConnections();
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "JMSDistribution.distribute: connections=" + connections.size());
} else if (logger.isLoggable(BasicLevel.INFO) && connections.isEmpty() && ((now - lastwarn) > 60000L)) {
logger.log(BasicLevel.INFO, "JMSDistribution.distribute: no available connection");
lastwarn = now;
}
for (final JMSModule connection : connections) {
......@@ -117,8 +122,8 @@ public class JMSDistribution implements DistributionHandler {
if (sap != null) {
// Verify that the connection still valid
if (sap.connection != connection.getCnx()) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO,
"JMSDistribution.distribute: remove outdated connection " + connection.getName());
sessions.remove(connection.getName());
sap = null;
......@@ -126,20 +131,19 @@ public class JMSDistribution implements DistributionHandler {
}
if (sap == null) { // !sessions.containsKey(connection.getCnxFactName()))
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO,
"JMSDistribution.distribute: Creates new connection for distribution, cf = " + connection.getName());
try {
Session session = connection.getCnx().createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = (Destination) connection.retrieveJndiObject(destName);
MessageProducer producer = session.createProducer(dest);
sessions.put(connection.getName(), new SessionAndProducer(connection.getCnx(), session, producer));
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"JMSDistribution.distribute: New connection available.");
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "JMSDistribution.distribute: New connection available.");
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Connection is not usable.", exc);
if (logger.isLoggable(BasicLevel.INFO)) {
logger.log(BasicLevel.INFO, "Connection still not usable.", exc);
}
}
}
......@@ -168,13 +172,17 @@ public class JMSDistribution implements DistributionHandler {
// convert a Joram message because this message is modified on session send.
// And if we have an exception we must keep the original message.
session.producer.send(org.objectweb.joram.client.jms.Message.convertJMSMessage(org.objectweb.joram.client.jms.Message.wrapMomMessage(null, message)));
} else
session.producer.send(org.objectweb.joram.client.jms.Message.wrapMomMessage(null, message));
} else {
// JORAM-308: It seems that the original message could be modified by the provider even if this one is
// not Joram. So we have to create a new message from the target session and initialize it from the
// original message.
session.producer.send(org.objectweb.joram.client.jms.Message.convertJMSMessage(org.objectweb.joram.client.jms.Message.wrapMomMessage(null, message)));
}
sessions.get(cnxName); // Access the used connection to update the LRU map
return;
} catch (JMSException exc) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Session is not usable, remove from table.", exc);
if (logger.isLoggable(BasicLevel.WARN)) {
logger.log(BasicLevel.WARN, "Session is not usable, remove from table.", exc);
}
// TODO (AF): We should indicate that the corresponding connection is no longer
// available calling JMSModule.onException().
......@@ -184,15 +192,21 @@ public class JMSDistribution implements DistributionHandler {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"Message could not be sent, no usable connection/session found.");
"Message could not be sent, no usable connection/session found - " + sessions.size());
throw new Exception("Message could not be sent, no usable connection/session found.");
throw new Exception("Message could not be sent, no usable connection/session found - " + sessions.size());
}
public void close() {
for (SessionAndProducer session : sessions.values()) {
try {
session.producer.close();
} catch (JMSException exc) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Error while stopping JmsDistribution.", exc);
}
}
try {
session.session.close();
} catch (JMSException exc) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment