diff --git a/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/dest/Queue.java b/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/dest/Queue.java index 0788f2b645e99f3fbc7e347e87792504570dcdb0..905431a0d2e7db6434657a9e364d9000f2a89456 100644 --- a/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/dest/Queue.java +++ b/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/dest/Queue.java @@ -581,10 +581,10 @@ public class Queue extends Destination implements QueueMBean { */ protected void initialize(boolean firstTime) throws Exception { cleanWaitingRequest(System.currentTimeMillis()); - + receiving = false; - -// averageLoadTask = new QueueAverageLoadTask(AgentServer.getTimer(), this); + + // averageLoadTask = new QueueAverageLoadTask(AgentServer.getTimer(), this); String arrivalStateTxName = ARRIVAL_STATE_PREFIX + getId().toString(); String deliveryTableTxName = DELIVERY_TABLE_PREFIX + getId().toString(); @@ -603,50 +603,50 @@ public class Queue extends Destination implements QueueMBean { long currentTime = System.currentTimeMillis(); if (logmsg.isLoggable(BasicLevel.INFO)) logmsg.log(BasicLevel.INFO, getName() + ", start retrieves messages"); - + for (int index=0; index < messages.size(); ) { Message persistedMsg = messages.get(index); - if (logmsg.isLoggable(BasicLevel.INFO)) - logmsg.log(BasicLevel.INFO, getName() + ": retrieves message " + persistedMsg.getId() + " -> " + persistedMsg.getDeliveryTime()); + if (logmsg.isLoggable(BasicLevel.INFO)) + logmsg.log(BasicLevel.INFO, getName() + ": retrieves message " + persistedMsg.getId() + " -> " + persistedMsg.getDeliveryTime()); QueueDelivery queueDelivery = deliveryTable.get(persistedMsg.getId()); - if (queueDelivery == null) { + if (queueDelivery == null) { if (persistedMsg.hasExpiration()) nbExpirations += 1; - if (persistedMsg.getDeliveryTime() > currentTime) { - if (logger.isLoggable(BasicLevel.DEBUG)) + if (persistedMsg.getDeliveryTime() > currentTime) { + if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, getName() + ": schedule delayed message " + persistedMsg.getId()); - delayedMessageCount += 1; + delayedMessageCount += 1; // TODO (AF): Be careful, this way of handling timed messages is not scalable. We should maintain an // ordered list of timed messages, and set a timer for the first timeout (see Scheduler class). - AgentServer.getTimer().schedule(new QueueDeliveryTimeTask(getId(), persistedMsg, false), new Date(persistedMsg.getDeliveryTime())); + AgentServer.getTimer().schedule(new QueueDeliveryTimeTask(getId(), persistedMsg, false), new Date(persistedMsg.getDeliveryTime())); // Remove message from the list of messages to deliver. messages.remove(index); // Do not increment index. continue; - } else { - if (logger.isLoggable(BasicLevel.DEBUG)) - logger.log(BasicLevel.DEBUG, + } else { + if (logger.isLoggable(BasicLevel.DEBUG)) + logger.log(BasicLevel.DEBUG, getName() + ": Adds message " + persistedMsg.getId() + " in the list of messages to deliver."); - } - } else { + } + } else { // The message has been delivered before stop. - queueDelivery.setMessage(persistedMsg); - if (isLocal(queueDelivery.getConsumerId())) { + queueDelivery.setMessage(persistedMsg); + if (isLocal(queueDelivery.getConsumerId())) { // The delivery is aborted. - if (logger.isLoggable(BasicLevel.DEBUG)) - logger.log(BasicLevel.DEBUG, " -> deny " + persistedMsg.getId()); - deliveryTable.remove(persistedMsg.getId()); + if (logger.isLoggable(BasicLevel.DEBUG)) + logger.log(BasicLevel.DEBUG, " -> deny " + persistedMsg.getId()); + deliveryTable.remove(persistedMsg.getId()); if (persistedMsg.hasExpiration()) nbExpirations += 1; - } else { + } else { // The delivery is always active, remove message from the list of messages to deliver. messages.remove(index); // Do not increment index. continue; - } - } + } + } index += 1; } @@ -657,7 +657,7 @@ public class Queue extends Destination implements QueueMBean { priority = messages.get(0).getPriority(); } } - + if (logmsg.isLoggable(BasicLevel.INFO)) logmsg.log(BasicLevel.INFO, getName() + ", end retrieves messages -> " + (System.currentTimeMillis() - currentTime)); }