Commit 9680a018 authored by Andre Freyssinet's avatar Andre Freyssinet
Browse files

Enhances message loading at startup (Joram-358).

parent 05724ecc
......@@ -382,7 +382,7 @@ public class Queue extends Destination implements QueueMBean {
protected transient boolean receiving = false;
/** List holding the messages before delivery. */
protected transient List messages;
protected transient List<Message> messages;
/**
* Removes all messages that the time-to-live is expired.
......@@ -591,63 +591,55 @@ public class Queue extends Destination implements QueueMBean {
}
// Retrieving the persisted messages, if any.
List<Message> persistedMsgs = Message.loadAll(getMsgTxPrefix().toString());
messages = Message.loadAll(getMsgTxPrefix().toString());
long currentTime = System.currentTimeMillis();
if (logmsg.isLoggable(BasicLevel.DEBUG))
logmsg.log(BasicLevel.DEBUG, getName() + ", retrieves messages " + currentTime);
messages = new Vector<Message>(persistedMsgs.size());
if (persistedMsgs != null) {
Message persistedMsg;
QueueDelivery queueDelivery;
while (! persistedMsgs.isEmpty()) {
persistedMsg = (Message) persistedMsgs.remove(0);
queueDelivery = deliveryTable.get(persistedMsg.getId());
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ", start retrieves messages " + currentTime);
for (int index=0; index < messages.size(); ) {
Message persistedMsg = messages.get(index);
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ": retrieves message " + persistedMsg.getId() + " -> " + persistedMsg.getDeliveryTime());
QueueDelivery queueDelivery = deliveryTable.get(persistedMsg.getId());
if (queueDelivery == null) {
if (persistedMsg.getDeliveryTime() > currentTime) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, getName() + ": schedule " + persistedMsg.getId());
logger.log(BasicLevel.DEBUG, getName() + ": schedule delayed message " + persistedMsg.getId());
delayedMessageCount += 1;
// TODO (AF): Be careful, this way of handling timed messages is not scalable. We should maintain an
// ordered list of timed messages, and set a timer for the first timeout (see Scheduler class).
AgentServer.getTimer().schedule(new QueueDeliveryTimeTask(getId(), persistedMsg, false), new Date(persistedMsg.getDeliveryTime()));
// Remove message from the list of messages to deliver.
messages.remove(index);
// Do not increment index.
continue;
} else {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
getName() + ": delay expire, Adds a message " + persistedMsg.getId() + " in the list of messages to deliver.");
// Adds a message in the list of messages to deliver.
addMessage(persistedMsg, false);
//TODO: delete persistedMsg if the queue is full ?
//persistedMsg.order = arrivalState.getAndIncrementArrivalCount(msg.isPersistent());
}
continue;
}
try {
if (queueDelivery == null) {
if (!addMessage(persistedMsg, false)) {
persistedMsg.delete();
getName() + ": Adds message " + persistedMsg.getId() + " in the list of messages to deliver.");
}
} else {
queueDelivery.setMessage(persistedMsg);
if (isLocal(queueDelivery.getConsumerId())) {
// The delivery is aborted.
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> deny " + persistedMsg.getId());
deliveryTable.remove(persistedMsg.getId());
if (!addMessage(persistedMsg, false)) {
persistedMsg.delete();
}
} else {
queueDelivery.setMessage(persistedMsg);
// The delivery is always active, remove message from the list of messages to deliver.
messages.remove(index);
// Do not increment index.
continue;
}
}
} catch (AccessException e) {/*never happens*/}
}
index += 1;
}
currentTime = System.currentTimeMillis();
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ", end retrieves messages " + currentTime);
}
/**
......@@ -1859,7 +1851,7 @@ public class Queue extends Destination implements QueueMBean {
return MessageJMXWrapper.createTabularDataSupport(messages);
}
public List<MessageView> getMessagesView() {
public List<? extends MessageView> getMessagesView() {
return messages;
}
......
......@@ -29,6 +29,7 @@ import java.io.Serializable;
import java.lang.ref.SoftReference;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
......@@ -54,7 +55,7 @@ import fr.dyade.aaa.util.Transaction;
* A message content is always wrapped as a bytes array, it is characterized
* by properties and "header" fields.
*/
public final class Message implements Comparable<Message>, Serializable, Encodable {
public final class Message implements Comparable<Message>, Serializable, Encodable, MessageView {
/** define serialVersionUID for interoperability */
private static final long serialVersionUID = 2L;
......@@ -66,8 +67,8 @@ public final class Message implements Comparable<Message>, Serializable, Encodab
@Override
public int compareTo(Message msg) {
// This method allows to sort messages list in reverse order (see JORAM-358).
return -Long.compare(order, msg.order);
// This method allows to sort messages list in the history order (see JORAM-358).
return Long.compare(order, msg.order);
}
/**
......@@ -499,15 +500,24 @@ public final class Message implements Comparable<Message>, Serializable, Encodab
}
}
/** Loads all persisted messages. */
public static Vector loadAll(String msgTxname) {
/**
* Loads all persisted messages.
* The returned list is sorted according to the message order and priority if any.
*
* If there is more message to restore than max, the Message structure is created but
* the message is not loaded
*
* @param msgTxname prefix of messages to load.
* @return a vector containing the loaded messages.
*/
public static Vector<Message> loadAll(String msgTxname) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Message.loadAll() " + msgTxname);
Vector<Message> messages = new Vector<Message>();
// Retrieving the names of the persistence message previously saved.
Transaction tx = AgentServer.getTransaction();
// Retrieving the names of the persistence message previously saved.
long start = System.currentTimeMillis();
String[] names = tx.getList(msgTxname);
long end = System.currentTimeMillis();
......@@ -519,28 +529,41 @@ public final class Message implements Comparable<Message>, Serializable, Encodab
// Retrieving the messages individually persisted.
int nb = 0;
Message msg;
int priority = -1;
boolean samePriorities = true;
start = System.currentTimeMillis();
for (int i = 0; i < names.length; i++) {
if (names[i].charAt(names[i].length() - 1) == 'B')
for (String name : names) {
if (name.charAt(name.length() - 1) == 'B')
continue;
try {
msg = (Message) tx.load(names[i]);
// Test the order get from txname
int idx = names[i].lastIndexOf('_');
long order = Long.parseLong(names[i].substring(idx+1));
try {
msg = (Message) tx.load(name);
if (msg.msg.priority != priority) {
if (priority == -1) {
// 1st message
priority = msg.msg.priority;
} else {
samePriorities = false;
}
}
if (logger.isLoggable(BasicLevel.DEBUG)) {
// Verify that the order coded in txname is the right one.
int idx = name.lastIndexOf('_');
long order = Long.parseLong(name.substring(idx+1));
if (order != msg.order)
logger.log(BasicLevel.WARN,
"Message.loadAll: Message " + names[i] + " -> " + msg.order + " != " + order);
msg.txname = names[i];
"Message.loadAll: Message " + name + " -> " + msg.order + " != " + order);
}
msg.txname = name;
nb += 1;
if (logger.isLoggable(BasicLevel.DEBUG))
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"Message.loadAll: names[" + i + "] = " + msg.txname);
messages.add(msg);
} catch (Exception exc) {
"Message.loadAll: names[" + nb + "] = " + msg.txname);
messages.add(msg);
} catch (Exception exc) {
logger.log(BasicLevel.ERROR,
"Message.loadAll: Message named [" + names[i] + "] can not be loaded", exc);
"Message.loadAll: Message named [" + name + "] can not be loaded", exc);
}
}
end = System.currentTimeMillis();
......@@ -549,7 +572,30 @@ public final class Message implements Comparable<Message>, Serializable, Encodab
start = System.currentTimeMillis();
// Sort messages list in reverse order to optimize queue restoration (JORAM-358)
Collections.sort(messages);
if (samePriorities) {
Collections.sort(messages, new Comparator<Message>() {
@Override
public int compare(Message msg1, Message msg2) {
return Long.compare(msg1.order, msg2.order);
}
});
} else {
Collections.sort(messages, new Comparator<Message>() {
@Override
public int compare(Message msg1, Message msg2) {
if ((msg1.msg != null) && (msg2.msg != null)) {
return Long.compare(msg1.order, msg2.order);
}
if ((msg1.msg != null) && (msg2.msg == null)) {
return -1;
}
if ((msg1.msg == null) && (msg2.msg != null)) {
return 1;
}
return Long.compare(msg1.order, msg2.order);
}
});
}
end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: sort -> " + (end - start));
......
......@@ -31,7 +31,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Scanner;
import org.objectweb.joram.mom.dest.AdminTopicMBean;
......@@ -892,7 +891,7 @@ public class MOMCommandsImpl implements MOMCommands {
String category = args[0].toLowerCase();
String range = null;
List<MessageView> msgs = null;
List<? extends MessageView> msgs = null;
if(category.equals("queue")) {
if(args.length > 3) {
help("lsMsg");
......@@ -1189,10 +1188,10 @@ public class MOMCommandsImpl implements MOMCommands {
help("receiveMsg");
}
private List<MessageView> getQueueMessages(String queueName) throws QueueNotFoundException {
private List<? extends MessageView> getQueueMessages(String queueName) throws QueueNotFoundException {
QueueMBean queue = findQueue(queueName);
@SuppressWarnings("unchecked")
List<MessageView> msgs = ((org.objectweb.joram.mom.dest.Queue) queue).getMessagesView();
List<? extends MessageView> msgs = ((org.objectweb.joram.mom.dest.Queue) queue).getMessagesView();
return msgs==null?new ArrayList<MessageView>():msgs;
}
......@@ -1219,7 +1218,7 @@ public class MOMCommandsImpl implements MOMCommands {
return null;
}
private List<MessageView> getMessageRange(List<MessageView> msgs, int start, int end) {
private List<? extends MessageView> getMessageRange(List<? extends MessageView> msgs, int start, int end) {
if(start<0)
start=0;
if(end >= msgs.size())
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment