Commit 1b1c0e33 authored by afreyssin's avatar afreyssin

Adds a particular logger allowing to follow the messages flows with a limited amount of traces.

Bug fixes: Avoids potential duplication of messages (JORAM-233, JORAM-234).
parent d7fbc858
......@@ -38,6 +38,7 @@ import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import fr.dyade.aaa.agent.AgentId;
import fr.dyade.aaa.agent.AgentServer;
import fr.dyade.aaa.common.Debug;
/**
......@@ -139,7 +140,7 @@ public class DistributionQueue extends Queue {
if (distributionDaemon == null && isAsyncDistribution) {
// start distributionDaemon
distributionDaemon = new DistributionDaemon(distributionModule.getDistributionHandler(), getName(), this);
distributionDaemon = new DistributionDaemon(distributionModule.getDistributionHandler(), getAgentId(), getName(), this);
distributionDaemon.start();
} else if (distributionDaemon != null && !isAsyncDistribution) {
// stop distributionDaemon
......@@ -149,7 +150,7 @@ public class DistributionQueue extends Queue {
try {
batchDistribution = ConversionHelper.toBoolean(properties.get(BATCH_DISTRIBUTION_OPTION));
} catch (MessageValueException exc) {
logger.log(BasicLevel.ERROR, "DistributionModule: can't parse batch option.", exc);
logger.log(BasicLevel.ERROR, "DistributionQueue: can't parse batch option.", exc);
}
}
}
......@@ -164,7 +165,7 @@ public class DistributionQueue extends Queue {
try {
return ConversionHelper.toBoolean(properties.get(ASYNC_DISTRIBUTION_OPTION));
} catch (MessageValueException exc) {
logger.log(BasicLevel.ERROR, "DistributionModule: can't parse DaemonDistribution option.", exc);
logger.log(BasicLevel.ERROR, "DistributionQueue: can't parse DaemonDistribution option.", exc);
}
}
return false;
......@@ -172,13 +173,30 @@ public class DistributionQueue extends Queue {
public void initialize(boolean firstTime) throws Exception {
super.initialize(firstTime);
try {
String list = (String) AgentServer.getTransaction().load(getAgentId() + "AL");
if (logmsg.isLoggable(BasicLevel.DEBUG))
logmsg.log(BasicLevel.DEBUG, getName() + ": removes " + list);
if ((list != null) && (list.length() > 0)) {
String[] ids = list.split(",");
for (int i=0; i<ids.length; i++) {
removeAndDeleteMessage(ids[i]);
}
}
} catch (Exception exc) {
logger.log(BasicLevel.ERROR, "DistributionQueue: error during initialisation.", exc);
}
if (distributionModule == null) {
distributionModule = new DistributionModule(distributionClassName, properties, firstTime);
}
if (properties != null)
isAsyncDistribution = isAsyncDistribution(properties);
if (distributionDaemon == null && isAsyncDistribution) {
distributionDaemon = new DistributionDaemon(distributionModule.getDistributionHandler(), getName(), this);
distributionDaemon = new DistributionDaemon(distributionModule.getDistributionHandler(), getAgentId(), getName(), this);
distributionDaemon.start();
}
}
......@@ -193,6 +211,19 @@ public class DistributionQueue extends Queue {
}
}
// Bug fix (JORAM-232): Adapts the counters algorithmic to fit with the behavior of the
// Distribution Queue.
protected long nbMsgsReceiveSinceCreation = 0;
public final long getNbMsgsReceiveSinceCreation() {
return nbMsgsReceiveSinceCreation;
}
public final long getNbMsgsDeliverSinceCreation() {
return nbMsgsReceiveSinceCreation - nbMsgsSentToDMQSinceCreation - getPendingMessageCount();
}
/**
* @see DistributionModule#processMessages(ClientMessages)
* @see Destination#preProcess(AgentId, ClientMessages)
......@@ -201,6 +232,9 @@ public class DistributionQueue extends Queue {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.preProcess(" + from + ", " + cm + ')');
// This method modifies the counters (Bug fix JORAM-232).
setSave();
if (!batchDistribution && messages.size() > 0) {
// we already have an Exception because messages.size>0
// so return immediately the new client messages
......@@ -210,6 +244,8 @@ public class DistributionQueue extends Queue {
if (! isAsyncDistribution) {
List msgs = cm.getMessages();
for (Iterator ite = msgs.iterator(); ite.hasNext();) {
// Bug fix JORAM-232: now counts received messages.
nbMsgsReceiveSinceCreation++;
Message msg = (Message) ite.next();
try {
distributionModule.processMessage(msg);
......@@ -235,40 +271,49 @@ public class DistributionQueue extends Queue {
}
private void removeAndDeleteMessages(List ackList) {
logger.log(BasicLevel.DEBUG,
"DistributionQueue.wakeUpNot() - Handles AckList: " + ackList);
String id = null;
Iterator itMessages = ackList.iterator();
while (itMessages.hasNext()) {
id = (String) itMessages.next();
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "DistributionQueue.removeAndDeleteMessages() - Acked: " + id);
int i = 0;
org.objectweb.joram.mom.messages.Message message = null;
while (i < messages.size()) {
message = (org.objectweb.joram.mom.messages.Message) messages.get(i);
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "DistributionQueue.removeAndDeleteMessages() - handles: " + message.getId());
private void removeAndDeleteMessages() {
// This method modifies the counters (Bug fix JORAM-232).
setSave();
if (logmsg.isLoggable(BasicLevel.DEBUG))
logmsg.log(BasicLevel.DEBUG, getName() + ": removeAndDeleteMessages..");
if (id.equals(message.getId())) {
messages.remove(i);
message.delete();
nbMsgsDeliverSinceCreation++;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.removeAndDeleteMessages() - removes " + id);
break;
}
// Bug fix (JORAM-199): avoid infinite loop!!
i++;
String id = distributionDaemon.getNextAck();
while (id != null) {
removeAndDeleteMessage(id);
id = distributionDaemon.getNextAck();
}
}
private void removeAndDeleteMessage(String id) {
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "DistributionQueue.removeAndDeleteMessages() - Acked: " + id);
int i = 0;
org.objectweb.joram.mom.messages.Message message = null;
while (i < messages.size()) {
message = (org.objectweb.joram.mom.messages.Message) messages.get(i);
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "DistributionQueue.removeAndDeleteMessages() - handles: " + message.getId());
if (id.equals(message.getId())) {
messages.remove(i);
message.delete();
// Now this counter is no longer used (JORAM-232).
nbMsgsDeliverSinceCreation++;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.removeAndDeleteMessages() - removes " + id);
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ": removes message " + id);
break;
}
// Bug fix (JORAM-199): avoid infinite loop!!
i++;
}
}
......@@ -301,13 +346,7 @@ public class DistributionQueue extends Queue {
if (distributionDaemon != null) {
// Cleans message list using ackList from daemon.
List ackList = distributionDaemon.getAckList();
if (ackList != null) {
// Bug fix (JORAM-74): delete anew the forwarded messages, replacing the call to removeMessages(ackList)
// by a similar code deleting the related messages. Since the fix of JORAM-198 it should not be longer
// useful.
removeAndDeleteMessages(ackList);
}
removeAndDeleteMessages();
}
}
......@@ -325,15 +364,12 @@ public class DistributionQueue extends Queue {
// Cleans outdated waiting messages
super.wakeUpNot(not);
// delete the ackQueue
// This method modifies the counters (Bug fix JORAM-232).
setSave();
if (distributionDaemon != null) {
List ackList = distributionDaemon.getAckList();
if (ackList != null) {
// Bug fix (JORAM-74): delete anew the forwarded messages, replacing the call to removeMessages(ackList)
// by a similar code deleting the related messages. Since the fix of JORAM-198 it should not be longer
// useful.
removeAndDeleteMessages(ackList);
}
// Cleans message list using ackList from daemon.
removeAndDeleteMessages();
}
for (Iterator ite = messages.iterator(); ite.hasNext();) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment