Commit 563078b5 authored by Andre Freyssinet's avatar Andre Freyssinet

JORAM-358, JORAM-359: Optimizes the handling of messages list in Queue

both at startup and during operation.
parent 07eeec5f
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 - 2012 ScalAgent Distributed Technologies
* Copyright (C) 2011 - 2020 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
......@@ -96,6 +96,7 @@ public class AliasQueue extends Queue {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "AliasQueue.preProcess(" + from + ", " + cm + ')');
}
if (messages.size() > 0) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Messages are already waiting, enqueue the new ones");
......@@ -130,6 +131,8 @@ public class AliasQueue extends Queue {
org.objectweb.joram.mom.messages.Message msg = (org.objectweb.joram.mom.messages.Message) ite.next();
cm.addMessage(msg.getFullMessage());
ite.remove();
if ((nbExpirations > 0 ) && msg.hasExpiration())
nbExpirations -= 1;
msg.delete();
}
......
......@@ -298,6 +298,8 @@ public class DistributionQueue extends Queue {
if (id.equals(message.getId())) {
messages.remove(i);
if ((nbExpirations > 0 ) && message.hasExpiration())
nbExpirations -= 1;
message.delete();
// Now this counter is no longer used (JORAM-232).
......@@ -380,6 +382,8 @@ public class DistributionQueue extends Queue {
distributionModule.processMessage(msg.getFullMessage());
nbMsgsDeliverSinceCreation++;
ite.remove();
if ((nbExpirations > 0 ) && msg.hasExpiration())
nbExpirations -= 1;
msg.delete();
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.DEBUG) && !isAsyncDistribution) {
......@@ -400,6 +404,8 @@ public class DistributionQueue extends Queue {
logger.log(BasicLevel.DEBUG, "Message can't be delivered, send to DMQ.");
}
ite.remove();
if ((nbExpirations > 0 ) && msg.hasExpiration())
nbExpirations -= 1;
msg.delete();
DMQManager dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
......
......@@ -242,6 +242,9 @@ public class Queue extends Destination implements QueueMBean {
/** <code>true</code> if all the stored messages have the same priority. */
private boolean samePriorities;
/** Number of stored messages with an expiration date. */
protected int nbExpirations;
/** Common priority value. */
private int priority;
......@@ -375,7 +378,7 @@ public class Queue extends Destination implements QueueMBean {
return 0;
}
/** <code>true</code> if the queue is currently receiving messages. */
/** <code>true</code> if the queue is currently handling a new received message. */
protected transient boolean receiving = false;
/** List holding the messages before delivery. */
......@@ -402,15 +405,20 @@ public class Queue extends Destination implements QueueMBean {
* <code>null</code> if there wasn't any.
*/
protected DMQManager cleanPendingMessage(long currentTime) {
int index = 0;
DMQManager dmqManager = null;
if (nbExpirations == 0)
return dmqManager;
// Be careful, browsing the message list is expensive when there are many messages waiting.
int index = 0;
Message message = null;
while (index < messages.size()) {
message = (Message) messages.get(index);
if (! message.isValid(currentTime)) {
messages.remove(index);
nbExpirations -= 1;
if (dmqManager == null)
dmqManager = new DMQManager(dmqId, getId());
......@@ -567,7 +575,6 @@ public class Queue extends Destination implements QueueMBean {
cleanWaitingRequest(System.currentTimeMillis());
receiving = false;
messages = new Vector();
// averageLoadTask = new QueueAverageLoadTask(AgentServer.getTimer(), this);
......@@ -576,6 +583,7 @@ public class Queue extends Destination implements QueueMBean {
if (firstTime) {
arrivalState = new QueueArrivalState(arrivalStateTxName);
deliveryTable = new QueueDeliveryTable(deliveryTableTxName);
messages = new Vector<Message>();
return;
} else {
arrivalState = QueueArrivalState.load(arrivalStateTxName);
......@@ -583,11 +591,13 @@ public class Queue extends Destination implements QueueMBean {
}
// Retrieving the persisted messages, if any.
List persistedMsgs = Message.loadAll(getMsgTxPrefix().toString());
List<Message> persistedMsgs = Message.loadAll(getMsgTxPrefix().toString(), Integer.MAX_VALUE);
long currentTime = System.currentTimeMillis();
if (logmsg.isLoggable(BasicLevel.DEBUG))
logmsg.log(BasicLevel.DEBUG, getName() + ", retrieves messages " + currentTime);
messages = new Vector<Message>(persistedMsgs.size());
if (persistedMsgs != null) {
Message persistedMsg;
QueueDelivery queueDelivery;
......@@ -1149,6 +1159,8 @@ public class Queue extends Destination implements QueueMBean {
Message message = (Message) messages.get(i);
if (message.getId().equals(request.getMessageId())) {
messages.remove(i);
if ((nbExpirations > 0 ) && message.hasExpiration())
nbExpirations -= 1;
message.delete();
DMQManager dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
......@@ -1177,6 +1189,7 @@ public class Queue extends Destination implements QueueMBean {
}
dmqManager.sendToDMQ();
messages.clear();
nbExpirations = 0;
}
replyToTopic(new AdminReply(true, null), replyTo, requestMsgId, replyMsgId);
}
......@@ -1584,6 +1597,7 @@ public class Queue extends Destination implements QueueMBean {
/**
* Adds a message in the list of messages to deliver.
* This method take care of the message priority if needed.
*
* @param message the message to add.
* @param throwsExceptionOnFullDest true, can throws an exception on sending message on full destination
......@@ -1591,9 +1605,7 @@ public class Queue extends Destination implements QueueMBean {
* @throws AccessException If syncExceptionOnFullDest and the queue isFull
*/
protected final boolean addMessage(Message message, boolean throwsExceptionOnFullDest) throws AccessException {
if (nbMaxMsg > -1 && nbMaxMsg <= (messages.size() + deliveryTable.size() + getDelayedMessageCount())) {
if (throwsExceptionOnFullDest && isSyncExceptionOnFullDest()) {
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "addMessage " + message.getId() + " throws Exception: The queue \"" + getName() + "\" is full (syncExceptionOnFullDest).");
......@@ -1612,8 +1624,15 @@ public class Queue extends Destination implements QueueMBean {
if (messages.isEmpty()) {
samePriorities = true;
priority = message.getPriority();
} else if (samePriorities && priority != message.getPriority()) {
samePriorities = false;
if (message.hasExpiration())
nbExpirations = 1;
else
nbExpirations = 0;
} else {
if (samePriorities && priority != message.getPriority())
samePriorities = false;
if (message.hasExpiration())
nbExpirations += 1;
}
if (samePriorities) {
......@@ -1689,26 +1708,28 @@ public class Queue extends Destination implements QueueMBean {
*
* @param msgIds List of message id.
*/
protected void removeMessages(List msgIds) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.removeMessages(" + msgIds + ')');
String id = null;
Iterator itMessages = msgIds.iterator();
while (itMessages.hasNext()) {
id = (String) itMessages.next();
int i = 0;
Message message = null;
while (i < messages.size()) {
message = (Message) messages.get(i);
if (id.equals(message.getId())) {
messages.remove(i);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.removeMessages msgId = " + id);
break;
}
}
}
}
// protected void removeMessages(List msgIds) {
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "Queue.removeMessages(" + msgIds + ')');
// String id = null;
// Iterator itMessages = msgIds.iterator();
// while (itMessages.hasNext()) {
// id = (String) itMessages.next();
// int i = 0;
// Message message = null;
// while (i < messages.size()) {
// message = (Message) messages.get(i);
// if (id.equals(message.getId())) {
// messages.remove(i);
// if ((nbExpirations > 0 ) && message.hasExpiration())
// nbExpirations -= 1;
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "Queue.removeMessages msgId = " + id);
// break;
// }
// }
// }
// }
/**
* get messages, if it's possible.
......@@ -1731,8 +1752,7 @@ public class Queue extends Destination implements QueueMBean {
message = (Message) messages.get(j);
// If selector matches, sending the message:
if (Selector.matches(message.getHeaderMessage(), selector) &&
checkDelivery(message.getHeaderMessage())) {
if (Selector.matches(message.getHeaderMessage(), selector) && checkDelivery(message.getHeaderMessage())) {
message.incDeliveryCount();
nbMsgsDeliverSinceCreation++;
// JORAM-326: Saves the queue to avoid bad counter value at restart.
......@@ -1751,6 +1771,8 @@ public class Queue extends Destination implements QueueMBean {
if (remove) {
messages.remove(message);
if ((nbExpirations > 0 ) && message.hasExpiration())
nbExpirations -= 1;
message.delete();
} else {
// message not remove: going on.
......@@ -1801,6 +1823,8 @@ public class Queue extends Destination implements QueueMBean {
if (remove) {
messages.remove(message);
if ((nbExpirations > 0 ) && message.hasExpiration())
nbExpirations -= 1;
message.delete();
}
}
......@@ -1880,10 +1904,11 @@ public class Queue extends Destination implements QueueMBean {
notMsg.addMessage(message.getFullMessage());
if (!notRec.getAutoAck()) {
// putting the message in the delivered messages table:
QueueDelivery queueDelivery = new QueueDelivery(notRec.requester,
notRec.getClientContext(), message);
QueueDelivery queueDelivery = new QueueDelivery(notRec.requester, notRec.getClientContext(), message);
deliveryTable.put(message.getId(), queueDelivery);
messages.remove(message);
if ((nbExpirations > 0 ) && message.hasExpiration())
nbExpirations -= 1;
}
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
......
......@@ -1282,7 +1282,7 @@ public final class UserAgent extends Agent implements UserAgentMBean, ProxyAgent
}
// Retrieving the subscriptions' messages.
List messages = Message.loadAll(getMsgTxname());
List messages = Message.loadAll(getMsgTxname(), Integer.MAX_VALUE);
if (subsTable.isEmpty()) {
// it is possible because we always save MessageSoftRef
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment