Commit d8057502 authored by afreyssin's avatar afreyssin
Browse files

Use DestinationConstants shared definition.

Fix issue in WakeUp handling with asynchronous behavior. Previously only one message is added to daemon queue each time a wakeup is done, so the distribution process is very slow after a restart if there is multiples messages to send.
parent 48a70cfa
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2010 - 2015 ScalAgent Distributed Technologies
* Copyright (C) 2010 - 2017 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
......@@ -29,6 +29,7 @@ import java.util.Properties;
import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.mom.notifications.WakeUpNot;
import org.objectweb.joram.mom.util.DMQManager;
import org.objectweb.joram.shared.DestinationConstants;
import org.objectweb.joram.shared.MessageErrorConstants;
import org.objectweb.joram.shared.excepts.MessageValueException;
import org.objectweb.joram.shared.excepts.RequestException;
......@@ -46,16 +47,11 @@ import fr.dyade.aaa.common.Debug;
* behavior, delivering messages via the {@link DistributionModule}.
*/
public class DistributionQueue extends Queue {
public static Logger logger = Debug.getLogger(DistributionQueue.class.getName());
/** Default period used to clean queue and re-distribute failing messages. */
public static final long DEFAULT_PERIOD = 1000;
public static final String BATCH_DISTRIBUTION_OPTION = "distribution.batch";
public static final String ASYNC_DISTRIBUTION_OPTION = "distribution.async";
/** define serialVersionUID for interoperability */
private static final long serialVersionUID = 1L;
......@@ -106,9 +102,9 @@ public class DistributionQueue extends Queue {
isAsyncDistribution = false;
if (properties != null) {
if (properties.containsKey(BATCH_DISTRIBUTION_OPTION)) {
if (properties.containsKey(DestinationConstants.BATCH_DISTRIBUTION_OPTION)) {
try {
batchDistribution = ConversionHelper.toBoolean(properties.get(BATCH_DISTRIBUTION_OPTION));
batchDistribution = ConversionHelper.toBoolean(properties.get(DestinationConstants.BATCH_DISTRIBUTION_OPTION));
} catch (MessageValueException exc) {
logger.log(BasicLevel.ERROR, "DistributionModule: can't parse batch option.", exc);
}
......@@ -120,11 +116,11 @@ public class DistributionQueue extends Queue {
if (firstTime) {
if (properties != null) {
distributionClassName = properties.getProperty(DistributionModule.CLASS_NAME);
properties.remove(DistributionModule.CLASS_NAME);
distributionClassName = properties.getProperty(DestinationConstants.DISTRIBUTION_CLASS_NAME);
properties.remove(DestinationConstants.DISTRIBUTION_CLASS_NAME);
}
if (distributionClassName == null) {
throw new RequestException("Distribution class name not found: " + DistributionModule.CLASS_NAME
throw new RequestException("Distribution class name not found: " + DestinationConstants.DISTRIBUTION_CLASS_NAME
+ " property must be set on queue creation.");
}
......@@ -146,9 +142,9 @@ public class DistributionQueue extends Queue {
// stop distributionDaemon
distributionDaemon.close();
distributionDaemon = null;
if (properties.containsKey(BATCH_DISTRIBUTION_OPTION)) {
if (properties.containsKey(DestinationConstants.BATCH_DISTRIBUTION_OPTION)) {
try {
batchDistribution = ConversionHelper.toBoolean(properties.get(BATCH_DISTRIBUTION_OPTION));
batchDistribution = ConversionHelper.toBoolean(properties.get(DestinationConstants.BATCH_DISTRIBUTION_OPTION));
} catch (MessageValueException exc) {
logger.log(BasicLevel.ERROR, "DistributionQueue: can't parse batch option.", exc);
}
......@@ -161,9 +157,9 @@ public class DistributionQueue extends Queue {
}
private boolean isAsyncDistribution(Properties properties) {
if (properties.containsKey(ASYNC_DISTRIBUTION_OPTION)) {
if (properties.containsKey(DestinationConstants.ASYNC_DISTRIBUTION_OPTION)) {
try {
return ConversionHelper.toBoolean(properties.get(ASYNC_DISTRIBUTION_OPTION));
return ConversionHelper.toBoolean(properties.get(DestinationConstants.ASYNC_DISTRIBUTION_OPTION));
} catch (MessageValueException exc) {
logger.log(BasicLevel.ERROR, "DistributionQueue: can't parse DaemonDistribution option.", exc);
}
......@@ -416,27 +412,29 @@ public class DistributionQueue extends Queue {
// after a distribution exception
distributionDaemon.notify();
}
}
if (logger.isLoggable(BasicLevel.DEBUG) && distributionDaemon != null) {
logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot distributionDaemon = " + distributionDaemon +
", distributionDaemon.isEmpty() = " + distributionDaemon.isEmpty());
}
if (distributionDaemon != null) {
if (!distributionDaemon.isEmpty()) {
// needless to push an other message
// because the distributionDaemon can't distribute message now.
break;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot distributionDaemon = " + distributionDaemon +
", distributionDaemon.isEmpty() = " + distributionDaemon.isEmpty());
// TODO (AF): verify with JMS and AMQP bridge tests
// if (!distributionDaemon.isEmpty()) {
// // needless to push an other message
// // because the distributionDaemon can't distribute message now.
// break;
// } else {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot " + msg.getId());
// Bug fix (JORAM-200): Avoid to duplicate a message already known by the daemon (either in
// the distributeQueue or the ackedQueue).
if (! distributionDaemon.isHandling(msg.getId())) {
distributionDaemon.push(msg.getFullMessage());
} else {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot " + msg.getId());
// Bug fix (JORAM-200): Avoid to duplicate a message already known by the daemon (either in
// the distributeQueue or the ackedQueue).
if (! distributionDaemon.isHandling(msg.getId()))
distributionDaemon.push(msg.getFullMessage());
// The current message is already in daemon queue, no need to parse the end of list.
break;
}
// }
}
if (!batchDistribution) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment