Commit 9ccf294a authored by afreyssin's avatar afreyssin
Browse files

Adds a particular logger allowing to follow the messages flows with a limited amount of traces.

Allows to override the messages counters in order to fix a bug with distribution destination (JORAM-232).
parent 081bb457
......@@ -106,6 +106,7 @@ public class Queue extends Destination implements QueueMBean {
private static final long serialVersionUID = 1L;
public static Logger logger = Debug.getLogger(Queue.class.getName());
public static Logger logmsg = Debug.getLogger(Queue.class.getName() + ".TraceMsg");
public static final String DELIVERY_TABLE_PREFIX = "DT_";
public static final String ARRIVAL_STATE_PREFIX = "AS_";
......@@ -404,11 +405,11 @@ public class Queue extends Destination implements QueueMBean {
return nbMsgsDeniedSinceCreation;
}
public final long getNbMsgsDeliverSinceCreation() {
public long getNbMsgsDeliverSinceCreation() {
return nbMsgsDeliverSinceCreation - nbMsgsDeniedSinceCreation;
}
public final long getNbMsgsReceiveSinceCreation() {
public long getNbMsgsReceiveSinceCreation() {
return nbMsgsSentToDMQSinceCreation + nbMsgsDeliverSinceCreation + getPendingMessageCount() - nbMsgsDeniedSinceCreation;
}
......@@ -471,6 +472,9 @@ public class Queue extends Destination implements QueueMBean {
while (! persistedMsgs.isEmpty()) {
persistedMsg = (Message) persistedMsgs.remove(0);
queueDelivery = deliveryTable.get(persistedMsg.getId());
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ": retrieves message " + persistedMsg.getId());
try {
if (queueDelivery == null) {
......@@ -526,7 +530,10 @@ public class Queue extends Destination implements QueueMBean {
// Cleaning the possibly expired messages.
DMQManager dmqManager = cleanPendingMessage(current);
// If needed, sending the dead messages to the DMQ:
if (dmqManager != null) dmqManager.sendToDMQ();
if (dmqManager != null) {
setSave();
dmqManager.sendToDMQ();
}
long prod = getNbMsgsReceiveSinceCreation();
long cons = getNbMsgsDeliverSinceCreation();
......@@ -673,6 +680,9 @@ public class Queue extends Destination implements QueueMBean {
}
private void acknowledge(String msgId) {
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ": acknowledges message " + msgId);
QueueDelivery queueDelivery = deliveryTable.remove(msgId);
if (queueDelivery != null) {
// The DeliveryTable is saved outside the Queue agent
......@@ -686,9 +696,9 @@ public class Queue extends Destination implements QueueMBean {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Message " + msgId + " acknowledged.");
} else if (logger.isLoggable(BasicLevel.WARN)) {
logger.log(BasicLevel.WARN,
"Message " + msgId + " not found for acknowledgement.");
} else if ((logger.isLoggable(BasicLevel.WARN) || logmsg.isLoggable(BasicLevel.WARN))) {
logger.log(BasicLevel.WARN, "Message " + msgId + " not found for acknowledgement.");
logmsg.log(BasicLevel.WARN, getName() + ": message " + msgId + " not found for acknowledgement.");
}
}
......@@ -760,6 +770,8 @@ public class Queue extends Destination implements QueueMBean {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Message " + msgId + " denied.");
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ": denies message " + msgId);
}
}
}
......@@ -775,8 +787,10 @@ public class Queue extends Destination implements QueueMBean {
// the message from the queue - and in that case it also sends an
// individual denying.
if (queueDelivery == null) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, " -> already denied message " + msgId);
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, " -> already denied message " + msgId);
if (logmsg.isLoggable(BasicLevel.WARN))
logmsg.log(BasicLevel.WARN, getName() + ": already denied message " + msgId);
break;
}
......@@ -792,6 +806,8 @@ public class Queue extends Destination implements QueueMBean {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> deny " + msgId);
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ": denies message " + msgId);
// state change, so save.
setSave();
......@@ -1297,6 +1313,9 @@ public class Queue extends Destination implements QueueMBean {
*/
protected final void storeMessage(Message msg, boolean throwsExceptionOnFullDest) throws AccessException {
if (addMessage(msg, throwsExceptionOnFullDest)) {
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ": adds new message " + msg.getId() + ", " + msg.order);
if (msg.isPersistent()) {
// Persisting the message.
setMsgTxName(msg);
......@@ -1629,8 +1648,7 @@ public class Queue extends Destination implements QueueMBean {
lsMessages = getMessages(notRec.getMessageCount(), notRec.getSelector(), notRec.getAutoAck());
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.deliverMessages: notRec.getAutoAck() = " + notRec.getAutoAck()
+ ", lsMessages = " + lsMessages);
logger.log(BasicLevel.DEBUG, "Queue.deliverMessages: notRec.getAutoAck() = " + notRec.getAutoAck() + ", lsMessages = " + lsMessages);
Iterator itMessages = lsMessages.iterator();
while (itMessages.hasNext()) {
......@@ -1645,8 +1663,10 @@ public class Queue extends Destination implements QueueMBean {
}
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"Message " + message.getId() + " to " + notRec.requester +
" as reply to " + notRec.getRequestId());
"Message " + message.getId() + " to " + notRec.requester + " as reply to " + notRec.getRequestId());
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO,
getName() + ": delivers message " + message.getId() + " to " + notRec.requester + " / " + notRec.getRequestId());
}
if (isLocal(notRec.requester)) {
......@@ -1783,13 +1803,6 @@ public class Queue extends Destination implements QueueMBean {
dmqManager.sendToDMQ();
}
public String getTxName(String msgId) {
Message momMsg = getMomMessage(msgId);
if (momMsg != null)
return momMsg.getTxName();
return null;
}
// Get flow Control related informations.
// TODO AF: may be we can use generic Destination.getJMXStatistics method.
protected fr.dyade.aaa.common.stream.Properties getStats() {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment