Commit 3128fc3b authored by afreyssin's avatar afreyssin
Browse files

Improves preProcess code (JORAM-219).

parent 8735b939
......@@ -198,54 +198,41 @@ public class DistributionQueue extends Queue {
* @see Destination#preProcess(AgentId, ClientMessages)
*/
public ClientMessages preProcess(AgentId from, ClientMessages cm) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.preProcess(" + from + ", " + cm + ')');
}
if (!batchDistribution && messages.size() > 0) {
// we already have an Exception, because messages.size>0
// so return immediately the new client messages
// we already have an Exception because messages.size>0
// so return immediately the new client messages
return cm;
}
List msgs = cm.getMessages();
for (Iterator ite = msgs.iterator(); ite.hasNext();) {
Message msg = (Message) ite.next();
try {
// TODO (AF): if (!isAsyncDistribution) ..
distributionModule.processMessage(msg);
nbMsgsDeliverSinceCreation++;
ite.remove();
} catch (Exception exc) {
if (!isAsyncDistribution) {
if (logger.isLoggable(BasicLevel.WARN)) {
logger.log(BasicLevel.WARN, "DistributionQueue.preProcess: distribution error.", exc);
}
} else {
// Bug fix (JORAM-198): Avoid parallelism access during message serialization (save method).
// Do nothing, the handling of message is now in post-process.
// // a processMessage exception is normal with async mode.
// if (distributionDaemon != null) {
// // use msg.id ?
// distributionDaemon.push(msg);
// } else {
// if (logger.isLoggable(BasicLevel.WARN)) {
// logger.log(BasicLevel.WARN,
// "DistributionQueue.preProcess: distribution distributionDaemon = null in async mode.", exc);
// }
// }
}
// if we don't do batch distribution, stop on first error
if (!batchDistribution) {
break;
if (! isAsyncDistribution) {
List msgs = cm.getMessages();
for (Iterator ite = msgs.iterator(); ite.hasNext();) {
Message msg = (Message) ite.next();
try {
distributionModule.processMessage(msg);
nbMsgsDeliverSinceCreation++;
ite.remove();
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "DistributionQueue.preProcess: distribution error.", exc);
// if we don't do batch distribution, stop on first error
if (!batchDistribution)
break;
}
}
}
if (msgs.size() > 0) {
if (msgs.size() > 0) {
return cm;
}
return null;
} else {
return cm;
}
return null;
}
private void removeAndDeleteMessages(List ackList) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment