Commit ff1d25c4 authored by Andre Freyssinet's avatar Andre Freyssinet
Browse files

Enhances message loading at startup (Joram-358).

parent 2eb133b7
......@@ -594,7 +594,7 @@ public class Queue extends Destination implements QueueMBean {
messages = Message.loadAll(getMsgTxPrefix().toString());
long currentTime = System.currentTimeMillis();
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ", start retrieves messages " + currentTime);
logmsg.log(BasicLevel.INFO, getName() + ", start retrieves messages");
for (int index=0; index < messages.size(); ) {
Message persistedMsg = messages.get(index);
......@@ -603,6 +603,9 @@ public class Queue extends Destination implements QueueMBean {
QueueDelivery queueDelivery = deliveryTable.get(persistedMsg.getId());
if (queueDelivery == null) {
if (persistedMsg.hasExpiration())
nbExpirations += 1;
if (persistedMsg.getDeliveryTime() > currentTime) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, getName() + ": schedule delayed message " + persistedMsg.getId());
......@@ -620,12 +623,15 @@ public class Queue extends Destination implements QueueMBean {
getName() + ": Adds message " + persistedMsg.getId() + " in the list of messages to deliver.");
}
} else {
// The message has been delivered before stop.
queueDelivery.setMessage(persistedMsg);
if (isLocal(queueDelivery.getConsumerId())) {
// The delivery is aborted.
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> deny " + persistedMsg.getId());
deliveryTable.remove(persistedMsg.getId());
if (persistedMsg.hasExpiration())
nbExpirations += 1;
} else {
// The delivery is always active, remove message from the list of messages to deliver.
messages.remove(index);
......@@ -637,9 +643,15 @@ public class Queue extends Destination implements QueueMBean {
index += 1;
}
currentTime = System.currentTimeMillis();
if (! messages.isEmpty()) {
if (messages.get(0).getPriority() == messages.get(messages.size() -1).getPriority()) {
samePriorities = true;
priority = messages.get(0).getPriority();
}
}
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ", end retrieves messages " + currentTime);
logmsg.log(BasicLevel.INFO, getName() + ", end retrieves messages -> " + (System.currentTimeMillis() - currentTime));
}
/**
......
......@@ -68,6 +68,7 @@ public final class Message implements Comparable<Message>, Serializable, Encodab
@Override
public int compareTo(Message msg) {
// This method allows to sort messages list in the history order (see JORAM-358).
// Be careful, specific comparators are using in loadAll method.
return Long.compare(order, msg.order);
}
......@@ -94,6 +95,18 @@ public final class Message implements Comparable<Message>, Serializable, Encodab
*/
private transient org.objectweb.joram.shared.messages.Message msg;
/**
* Defines if the queue restoration at startup must use the loadAll transaction
* mechanism when it is implemented.
* <p>
* Default value is false.
* <p>
* This property can be fixed either from <code>java</code> launching
* command, or in <code>a3servers.xml</code> configuration file.
*/
private static final boolean useLoadALL = true;
// AgentServer.getBoolean("org.objectweb.joram.mom.messages.USELOADALL");
/** SoftReference to the body of the MOM message. */
private transient SoftReference bodySoftRef = null;
......@@ -109,7 +122,7 @@ public final class Message implements Comparable<Message>, Serializable, Encodab
* Note: the message swapping can be finely configured using the
* <code>JMS_JORAM_SWAPALLOWED</code> property of the JMS message.
* <p>
* Theses properties can be fixed either from <code>java</code> launching
* This property can be fixed either from <code>java</code> launching
* command, or in <code>a3servers.xml</code> configuration file.
*/
private static final boolean globalUseSoftRef =
......@@ -517,6 +530,43 @@ public final class Message implements Comparable<Message>, Serializable, Encodab
Vector<Message> messages = new Vector<Message>();
Transaction tx = AgentServer.getTransaction();
// TODO (AF): Use DBTransaction optimized loadAll method to get all messages from a unique request.
// Currently this optimization results in an issue if messages swap is allowed, it needs to filter
// body objects that have the same prefix).
if (tx.useLoadAll() && useLoadALL) {
// Retrieving the saved messages from persistency.
long start = System.currentTimeMillis();
Map<String, Message> msgsMap = new HashMap<String, Message>(32000);
tx.loadAll(msgTxname + "", msgsMap);
long end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: all messages loaded (" + msgsMap.size() + ") -> " + (end - start));
start = System.currentTimeMillis();
for (Map.Entry<String, Message> entry : msgsMap.entrySet()) {
String name = entry.getKey();
if (name.charAt(name.length() - 1) == 'B')
// Never happen, the corresponding objects are bytes arrays and throw exceptions in loadAll
continue;
Message msg = entry.getValue();
msg.txname = name;
if (msg.msg == null) {
// TODO (AF): retrieve body in Map && msg.soft ?
}
messages.add(msg);
}
end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: insert (" + messages.size() + ") -> " + (end - start));
start = System.currentTimeMillis();
// Sort messages list in reverse order to optimize queue restoration (JORAM-358)
Collections.sort(messages);
end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: sort -> " + (end - start));
} else {
// Retrieving the names of the persistence message previously saved.
long start = System.currentTimeMillis();
String[] names = tx.getList(msgTxname);
......@@ -599,6 +649,7 @@ public final class Message implements Comparable<Message>, Serializable, Encodab
end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: sort -> " + (end - start));
}
return messages;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment