Commit cce7e94c authored by Andre Freyssinet's avatar Andre Freyssinet

Enhances message loading at startup (Jorma-358).

parent c51d3c7d
......@@ -29,6 +29,7 @@ import java.io.Serializable;
import java.lang.ref.SoftReference;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
......@@ -54,7 +55,7 @@ import fr.dyade.aaa.util.Transaction;
* A message content is always wrapped as a bytes array, it is characterized
* by properties and "header" fields.
*/
public final class Message implements Comparable<Message>, Serializable, Encodable {
public final class Message implements Comparable<Message>, Serializable, Encodable, MessageView {
/** define serialVersionUID for interoperability */
private static final long serialVersionUID = 2L;
......@@ -66,8 +67,9 @@ public final class Message implements Comparable<Message>, Serializable, Encodab
@Override
public int compareTo(Message msg) {
// This method allows to sort messages list in reverse order (see JORAM-358).
return -Long.compare(order, msg.order);
// This method allows to sort messages list in the history order (see JORAM-358).
// Be careful, specific comparators are using in loadAll method.
return Long.compare(order, msg.order);
}
/**
......@@ -520,7 +522,10 @@ public final class Message implements Comparable<Message>, Serializable, Encodab
}
}
/** Loads all persisted messages.
/**
* Loads all persisted messages.
* The returned list is sorted according to the message order and priority if any.
*
* If there is more message to restore than max, the Message structure is created but
* the message is not loaded
*
......@@ -533,64 +538,135 @@ public final class Message implements Comparable<Message>, Serializable, Encodab
logger.log(BasicLevel.DEBUG, "Message.loadAll() " + msgTxname);
Vector<Message> messages = new Vector<Message>();
// Retrieving the names of the persistence message previously saved.
Transaction tx = AgentServer.getTransaction();
long start = System.currentTimeMillis();
String[] names = tx.getList(msgTxname);
long end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: retrieves messages list (" + names.length + ") -> " + (end - start));
logger.log(BasicLevel.DEBUG,
"Message.loadAll: names = " + Arrays.toString(names));
// Retrieving the messages individually persisted.
int nb = 0;
Message msg;
start = System.currentTimeMillis();
for (String name : names) {
if (name.charAt(name.length() - 1) == 'B')
continue;
try {
if (nb < max) {
msg = (Message) tx.load(name);
// Test the order get from txname
int idx = name.lastIndexOf('_');
if (logger.isLoggable(BasicLevel.DEBUG)) {
long order = Long.parseLong(name.substring(idx+1));
if (order != msg.order)
logger.log(BasicLevel.WARN,
"Message.loadAll: Message " + name + " -> " + msg.order + " != " + order);
// TODO (AF): Use DBTransaction optimized loadAll method to get all messages from a unique request.
// Currently this optimization results in an issue if messages swap is allowed, it needs to filter
// body objects that have the same prefix).
// if (! tx.useLoadAll()) {
// Retrieving the names of the persistence message previously saved.
long start = System.currentTimeMillis();
String[] names = tx.getList(msgTxname);
long end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: retrieves messages list (" + names.length + ") -> " + (end - start));
logger.log(BasicLevel.DEBUG,
"Message.loadAll: names = " + Arrays.toString(names));
// Retrieving the messages individually persisted.
int nb = 0;
Message msg;
int priority = -1;
boolean samePriorities = true;
start = System.currentTimeMillis();
for (String name : names) {
if (name.charAt(name.length() - 1) == 'B')
continue;
try {
if (nb < max) {
msg = (Message) tx.load(name);
if (msg.msg.priority != priority) {
if (priority == -1) {
// 1st message
priority = msg.msg.priority;
} else {
samePriorities = false;
}
}
if (logger.isLoggable(BasicLevel.DEBUG)) {
// Verify that the order coded in txname is the right one.
int idx = name.lastIndexOf('_');
long order = Long.parseLong(name.substring(idx+1));
if (order != msg.order)
logger.log(BasicLevel.WARN,
"Message.loadAll: Message " + name + " -> " + msg.order + " != " + order);
}
} else {
msg = new Message();
// Get order from txname
int idx = name.lastIndexOf('_');
msg.order = Long.parseLong(name.substring(idx+1));
}
} else {
msg = new Message();
// Get order from txname
int idx = name.lastIndexOf('_');
msg.order = Long.parseLong(name.substring(idx+1));
msg.txname = name;
nb += 1;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"Message.loadAll: names[" + nb + "] = " + msg.txname);
messages.add(msg);
} catch (Exception exc) {
logger.log(BasicLevel.ERROR,
"Message.loadAll: Message named [" + name + "] can not be loaded", exc);
}
msg.txname = name;
nb += 1;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"Message.loadAll: names[" + nb + "] = " + msg.txname);
messages.add(msg);
} catch (Exception exc) {
logger.log(BasicLevel.ERROR,
"Message.loadAll: Message named [" + name + "] can not be loaded", exc);
}
}
end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: all messages loaded -> " + (end - start));
start = System.currentTimeMillis();
// Sort messages list in reverse order to optimize queue restoration (JORAM-358)
Collections.sort(messages);
end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: sort -> " + (end - start));
end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: all messages loaded -> " + (end - start));
start = System.currentTimeMillis();
// Sort messages list in reverse order to optimize queue restoration (JORAM-358)
if (samePriorities) {
Collections.sort(messages, new Comparator<Message>() {
@Override
public int compare(Message msg1, Message msg2) {
return Long.compare(msg1.order, msg2.order);
}
});
} else {
Collections.sort(messages, new Comparator<Message>() {
@Override
public int compare(Message msg1, Message msg2) {
if ((msg1.msg != null) && (msg2.msg != null)) {
return Long.compare(msg1.order, msg2.order);
}
if ((msg1.msg != null) && (msg2.msg == null)) {
return -1;
}
if ((msg1.msg == null) && (msg2.msg != null)) {
return 1;
}
return Long.compare(msg1.order, msg2.order);
}
});
}
end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: sort -> " + (end - start));
// TODO (AF): Use of Transaction of loadAll
// } else {
// // Retrieving the saved messages from persistency.
// long start = System.currentTimeMillis();
// Map<String, Message> msgsMap = new HashMap<String, Message>(32000);
// tx.loadAll(msgTxname, msgsMap);
// long end = System.currentTimeMillis();
//
// logger.log(BasicLevel.INFO,
// "Message.loadAll: all messages loaded (" + msgsMap.size() + ") -> " + (end - start));
//
// start = System.currentTimeMillis();
// for (Map.Entry<String, Message> entry : msgsMap.entrySet()) {
// String name = entry.getKey();
// if (name.charAt(name.length() - 1) == 'B')
// continue;
// Message msg = entry.getValue();
// msg.txname = name;
// if (msg.msg == null) {
// // TODO (AF): retrieve body in Map && msg.soft ?
// }
// messages.add(msg);
// }
// end = System.currentTimeMillis();
// logger.log(BasicLevel.INFO,
// "Message.loadAll: insert (" + messages.size() + ") -> " + (end - start));
//
// start = System.currentTimeMillis();
// // Sort messages list in reverse order to optimize queue restoration (JORAM-358)
// Collections.sort(messages);
// end = System.currentTimeMillis();
// logger.log(BasicLevel.INFO,
// "Message.loadAll: sort -> " + (end - start));
// }
return messages;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment