Commit e453c161 authored by Andre Freyssinet's avatar Andre Freyssinet

Code cleaning.

parent c0fb60d8
...@@ -581,10 +581,10 @@ public class Queue extends Destination implements QueueMBean { ...@@ -581,10 +581,10 @@ public class Queue extends Destination implements QueueMBean {
*/ */
protected void initialize(boolean firstTime) throws Exception { protected void initialize(boolean firstTime) throws Exception {
cleanWaitingRequest(System.currentTimeMillis()); cleanWaitingRequest(System.currentTimeMillis());
receiving = false; receiving = false;
// averageLoadTask = new QueueAverageLoadTask(AgentServer.getTimer(), this); // averageLoadTask = new QueueAverageLoadTask(AgentServer.getTimer(), this);
String arrivalStateTxName = ARRIVAL_STATE_PREFIX + getId().toString(); String arrivalStateTxName = ARRIVAL_STATE_PREFIX + getId().toString();
String deliveryTableTxName = DELIVERY_TABLE_PREFIX + getId().toString(); String deliveryTableTxName = DELIVERY_TABLE_PREFIX + getId().toString();
...@@ -603,50 +603,50 @@ public class Queue extends Destination implements QueueMBean { ...@@ -603,50 +603,50 @@ public class Queue extends Destination implements QueueMBean {
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
if (logmsg.isLoggable(BasicLevel.INFO)) if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ", start retrieves messages"); logmsg.log(BasicLevel.INFO, getName() + ", start retrieves messages");
for (int index=0; index < messages.size(); ) { for (int index=0; index < messages.size(); ) {
Message persistedMsg = messages.get(index); Message persistedMsg = messages.get(index);
if (logmsg.isLoggable(BasicLevel.INFO)) if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ": retrieves message " + persistedMsg.getId() + " -> " + persistedMsg.getDeliveryTime()); logmsg.log(BasicLevel.INFO, getName() + ": retrieves message " + persistedMsg.getId() + " -> " + persistedMsg.getDeliveryTime());
QueueDelivery queueDelivery = deliveryTable.get(persistedMsg.getId()); QueueDelivery queueDelivery = deliveryTable.get(persistedMsg.getId());
if (queueDelivery == null) { if (queueDelivery == null) {
if (persistedMsg.hasExpiration()) if (persistedMsg.hasExpiration())
nbExpirations += 1; nbExpirations += 1;
if (persistedMsg.getDeliveryTime() > currentTime) { if (persistedMsg.getDeliveryTime() > currentTime) {
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, getName() + ": schedule delayed message " + persistedMsg.getId()); logger.log(BasicLevel.DEBUG, getName() + ": schedule delayed message " + persistedMsg.getId());
delayedMessageCount += 1; delayedMessageCount += 1;
// TODO (AF): Be careful, this way of handling timed messages is not scalable. We should maintain an // TODO (AF): Be careful, this way of handling timed messages is not scalable. We should maintain an
// ordered list of timed messages, and set a timer for the first timeout (see Scheduler class). // ordered list of timed messages, and set a timer for the first timeout (see Scheduler class).
AgentServer.getTimer().schedule(new QueueDeliveryTimeTask(getId(), persistedMsg, false), new Date(persistedMsg.getDeliveryTime())); AgentServer.getTimer().schedule(new QueueDeliveryTimeTask(getId(), persistedMsg, false), new Date(persistedMsg.getDeliveryTime()));
// Remove message from the list of messages to deliver. // Remove message from the list of messages to deliver.
messages.remove(index); messages.remove(index);
// Do not increment index. // Do not increment index.
continue; continue;
} else { } else {
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, logger.log(BasicLevel.DEBUG,
getName() + ": Adds message " + persistedMsg.getId() + " in the list of messages to deliver."); getName() + ": Adds message " + persistedMsg.getId() + " in the list of messages to deliver.");
} }
} else { } else {
// The message has been delivered before stop. // The message has been delivered before stop.
queueDelivery.setMessage(persistedMsg); queueDelivery.setMessage(persistedMsg);
if (isLocal(queueDelivery.getConsumerId())) { if (isLocal(queueDelivery.getConsumerId())) {
// The delivery is aborted. // The delivery is aborted.
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> deny " + persistedMsg.getId()); logger.log(BasicLevel.DEBUG, " -> deny " + persistedMsg.getId());
deliveryTable.remove(persistedMsg.getId()); deliveryTable.remove(persistedMsg.getId());
if (persistedMsg.hasExpiration()) if (persistedMsg.hasExpiration())
nbExpirations += 1; nbExpirations += 1;
} else { } else {
// The delivery is always active, remove message from the list of messages to deliver. // The delivery is always active, remove message from the list of messages to deliver.
messages.remove(index); messages.remove(index);
// Do not increment index. // Do not increment index.
continue; continue;
} }
} }
index += 1; index += 1;
} }
...@@ -657,7 +657,7 @@ public class Queue extends Destination implements QueueMBean { ...@@ -657,7 +657,7 @@ public class Queue extends Destination implements QueueMBean {
priority = messages.get(0).getPriority(); priority = messages.get(0).getPriority();
} }
} }
if (logmsg.isLoggable(BasicLevel.INFO)) if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ", end retrieves messages -> " + (System.currentTimeMillis() - currentTime)); logmsg.log(BasicLevel.INFO, getName() + ", end retrieves messages -> " + (System.currentTimeMillis() - currentTime));
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment