Commit cfe9e8ed authored by Andre Freyssinet's avatar Andre Freyssinet
Browse files

Optimizes the handling of messages list (JORAM-358, JORAM-359):

 - Sort the list of loaded messages to optimize insertion in Queue list.
 - Avoid costly message list cleaning if not needed.
parent d85ce245
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2012 - 2013 ScalAgent Distributed Technologies
* Copyright (C) 2012 - 2020 ScalAgent Distributed Technologies
* Copyright (C) 2012 - 2013 Universite Joseph Fourier
*
* This library is free software; you can redistribute it and/or
......@@ -154,6 +154,7 @@ public class AliasInQueue extends Queue {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "ExpiredNot received, messages will be queued.");
}
Notification expiredNot = not.getExpiredNot();
if (expiredNot instanceof ClientMessages) {
nbMsgsDeliverSinceCreation -= ((ClientMessages) expiredNot).getMessageCount();
......@@ -169,6 +170,8 @@ public class AliasInQueue extends Queue {
org.objectweb.joram.mom.messages.Message msg = (org.objectweb.joram.mom.messages.Message) ite.next();
cm.addMessage(msg.getFullMessage());
ite.remove();
if ((nbExpirations > 0 ) && msg.hasExpiration())
nbExpirations -= 1;
msg.delete();
}
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 - 2012 ScalAgent Distributed Technologies
* Copyright (C) 2011 - 2020 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
......@@ -96,6 +96,7 @@ public class AliasQueue extends Queue {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "AliasQueue.preProcess(" + from + ", " + cm + ')');
}
if (messages.size() > 0) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Messages are already waiting, enqueue the new ones");
......@@ -130,6 +131,8 @@ public class AliasQueue extends Queue {
org.objectweb.joram.mom.messages.Message msg = (org.objectweb.joram.mom.messages.Message) ite.next();
cm.addMessage(msg.getFullMessage());
ite.remove();
if ((nbExpirations > 0 ) && msg.hasExpiration())
nbExpirations -= 1;
msg.delete();
}
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2010 - 2018 ScalAgent Distributed Technologies
* Copyright (C) 2010 - 2020 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
......@@ -226,10 +226,6 @@ public class DistributionQueue extends Queue {
return nbMsgsReceiveSinceCreation - nbMsgsSentToDMQSinceCreation - getPendingMessageCount();
}
/**
* @see DistributionModule#processMessages(ClientMessages)
* @see Destination#preProcess(AgentId, ClientMessages)
*/
public ClientMessages preProcess(AgentId from, ClientMessages cm) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.preProcess(" + from + ", " + cm + ')');
......@@ -302,6 +298,8 @@ public class DistributionQueue extends Queue {
if (id.equals(message.getId())) {
messages.remove(i);
if ((nbExpirations > 0 ) && message.hasExpiration())
nbExpirations -= 1;
message.delete();
// Now this counter is no longer used (JORAM-232).
......@@ -384,6 +382,8 @@ public class DistributionQueue extends Queue {
distributionModule.processMessage(msg.getFullMessage());
nbMsgsDeliverSinceCreation++;
ite.remove();
if ((nbExpirations > 0 ) && msg.hasExpiration())
nbExpirations -= 1;
msg.delete();
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.DEBUG) && !isAsyncDistribution) {
......@@ -404,6 +404,8 @@ public class DistributionQueue extends Queue {
logger.log(BasicLevel.DEBUG, "Message can't be delivered, send to DMQ.");
}
ite.remove();
if ((nbExpirations > 0 ) && msg.hasExpiration())
nbExpirations -= 1;
msg.delete();
DMQManager dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2001 - 2019 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2020 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 Dyade
*
* This library is free software; you can redistribute it and/or
......@@ -36,7 +36,6 @@ import java.util.Vector;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import org.objectweb.joram.mom.dest.AdminTopic.DestinationDesc;
import org.objectweb.joram.mom.messages.Message;
import org.objectweb.joram.mom.messages.MessageJMXWrapper;
import org.objectweb.joram.mom.messages.MessageView;
......@@ -180,7 +179,7 @@ public class Queue extends Destination implements QueueMBean {
* Sets the delay in seconds use to wait before re-delivering messages
* after a deny.
*
* @param reDeliveryDelay the reDeliveryDelay to set
* @param redeliveryDelay the reDeliveryDelay to set
*/
@Override
public final void setRedeliveryDelay(int redeliveryDelay) {
......@@ -244,6 +243,9 @@ public class Queue extends Destination implements QueueMBean {
/** <code>true</code> if all the stored messages have the same priority. */
private boolean samePriorities;
/** Number of stored messages with an expiration date. */
protected int nbExpirations;
/** Common priority value. */
private int priority;
......@@ -273,6 +275,7 @@ public class Queue extends Destination implements QueueMBean {
* @param properties
* The initial set of properties.
*/
@Override
public void setProperties(Properties properties, boolean firstTime) throws Exception {
super.setProperties(properties, firstTime);
......@@ -375,7 +378,7 @@ public class Queue extends Destination implements QueueMBean {
return 0;
}
/** <code>true</code> if the queue is currently receiving messages. */
/** <code>true</code> if the queue is currently handling a new received message. */
protected transient boolean receiving = false;
/** List holding the messages before delivery. */
......@@ -402,15 +405,20 @@ public class Queue extends Destination implements QueueMBean {
* <code>null</code> if there wasn't any.
*/
protected DMQManager cleanPendingMessage(long currentTime) {
int index = 0;
DMQManager dmqManager = null;
if (nbExpirations == 0)
return dmqManager;
// Be careful, browsing the message list is expensive when there are many messages waiting.
int index = 0;
Message message = null;
while (index < messages.size()) {
message = (Message) messages.get(index);
if (! message.isValid(currentTime)) {
messages.remove(index);
nbExpirations -= 1;
if (dmqManager == null)
dmqManager = new DMQManager(dmqId, getId());
......@@ -567,7 +575,6 @@ public class Queue extends Destination implements QueueMBean {
cleanWaitingRequest(System.currentTimeMillis());
receiving = false;
messages = new Vector();
// averageLoadTask = new QueueAverageLoadTask(AgentServer.getTimer(), this);
......@@ -576,6 +583,7 @@ public class Queue extends Destination implements QueueMBean {
if (firstTime) {
arrivalState = new QueueArrivalState(arrivalStateTxName);
deliveryTable = new QueueDeliveryTable(deliveryTableTxName);
messages = new Vector<Message>();
return;
} else {
arrivalState = QueueArrivalState.load(arrivalStateTxName);
......@@ -583,11 +591,13 @@ public class Queue extends Destination implements QueueMBean {
}
// Retrieving the persisted messages, if any.
List persistedMsgs = Message.loadAll(getMsgTxPrefix().toString());
List<Message> persistedMsgs = Message.loadAll(getMsgTxPrefix().toString());
long currentTime = System.currentTimeMillis();
if (logmsg.isLoggable(BasicLevel.DEBUG))
logmsg.log(BasicLevel.DEBUG, getName() + ", retrieves messages " + currentTime);
messages = new Vector<Message>(persistedMsgs.size());
if (persistedMsgs != null) {
Message persistedMsg;
QueueDelivery queueDelivery;
......@@ -643,9 +653,9 @@ public class Queue extends Destination implements QueueMBean {
/**
* Finalizes the destination before it is garbaged.
*
* @param lastime true if the destination is deleted
* @param last true if the destination is deleted
*/
protected void finalize(boolean lastTime) {
protected void finalize(boolean last) {
setSave();
// averageLoadTask.cancel();
// averageLoadTask = null;
......@@ -653,6 +663,7 @@ public class Queue extends Destination implements QueueMBean {
/**
* Returns a string representation of this destination.
* @return a string representation of this destination.
*/
public String toString() {
return "Queue:" + getId().toString();
......@@ -1148,6 +1159,8 @@ public class Queue extends Destination implements QueueMBean {
Message message = (Message) messages.get(i);
if (message.getId().equals(request.getMessageId())) {
messages.remove(i);
if ((nbExpirations > 0 ) && message.hasExpiration())
nbExpirations -= 1;
message.delete();
DMQManager dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
......@@ -1176,6 +1189,7 @@ public class Queue extends Destination implements QueueMBean {
}
dmqManager.sendToDMQ();
messages.clear();
nbExpirations = 0;
}
replyToTopic(new AdminReply(true, null), replyTo, requestMsgId, replyMsgId);
}
......@@ -1583,6 +1597,7 @@ public class Queue extends Destination implements QueueMBean {
/**
* Adds a message in the list of messages to deliver.
* This method take care of the message priority if needed.
*
* @param message the message to add.
* @param throwsExceptionOnFullDest true, can throws an exception on sending message on full destination
......@@ -1592,7 +1607,6 @@ public class Queue extends Destination implements QueueMBean {
protected final boolean addMessage(Message message, boolean throwsExceptionOnFullDest) throws AccessException {
if (nbMaxMsg > -1 && nbMaxMsg <= (messages.size() + deliveryTable.size() + getDelayedMessageCount())) {
if (throwsExceptionOnFullDest && isSyncExceptionOnFullDest()) {
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "addMessage " + message.getId() + " throws Exception: The queue \"" + getName() + "\" is full (syncExceptionOnFullDest).");
......@@ -1611,31 +1625,33 @@ public class Queue extends Destination implements QueueMBean {
if (messages.isEmpty()) {
samePriorities = true;
priority = message.getPriority();
} else if (samePriorities && priority != message.getPriority()) {
if (message.hasExpiration())
nbExpirations = 1;
else
nbExpirations = 0;
} else {
if (samePriorities && priority != message.getPriority())
samePriorities = false;
if (message.hasExpiration())
nbExpirations += 1;
}
if (samePriorities) {
// Constant priorities: no need to insert the message according to
// its priority.
// Constant priorities: no need to insert the message according to its priority.
if (receiving) {
// Message being received: adding it at the end of the queue.
messages.add(message);
} else {
// Denying or recovery: adding the message according to its original
// arrival order.
long currentO;
// Denying or recovery: adding the message according to its original arrival order.
int i = 0;
for (Iterator ite = messages.iterator(); ite.hasNext();) {
currentO = ((Message) ite.next()).order;
if (currentO > message.order) break;
if (((Message) ite.next()).order > message.order) break;
i++;
}
messages.add(i, message);
}
} else {
// Non constant priorities: inserting the message according to its
// priority.
// Non constant priorities: inserting the message according to its priority.
Message currentMsg;
int currentP;
long currentO;
......@@ -1693,26 +1709,28 @@ public class Queue extends Destination implements QueueMBean {
*
* @param msgIds List of message id.
*/
protected void removeMessages(List msgIds) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.removeMessages(" + msgIds + ')');
String id = null;
Iterator itMessages = msgIds.iterator();
while (itMessages.hasNext()) {
id = (String) itMessages.next();
int i = 0;
Message message = null;
while (i < messages.size()) {
message = (Message) messages.get(i);
if (id.equals(message.getId())) {
messages.remove(i);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.removeMessages msgId = " + id);
break;
}
}
}
}
// protected void removeMessages(List msgIds) {
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "Queue.removeMessages(" + msgIds + ')');
// String id = null;
// Iterator itMessages = msgIds.iterator();
// while (itMessages.hasNext()) {
// id = (String) itMessages.next();
// int i = 0;
// Message message = null;
// while (i < messages.size()) {
// message = (Message) messages.get(i);
// if (id.equals(message.getId())) {
// messages.remove(i);
// if ((nbExpirations > 0 ) && message.hasExpiration())
// nbExpirations -= 1;
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "Queue.removeMessages msgId = " + id);
// break;
// }
// }
// }
// }
/**
* get messages, if it's possible.
......@@ -1735,8 +1753,7 @@ public class Queue extends Destination implements QueueMBean {
message = (Message) messages.get(j);
// If selector matches, sending the message:
if (Selector.matches(message.getHeaderMessage(), selector) &&
checkDelivery(message.getHeaderMessage())) {
if (Selector.matches(message.getHeaderMessage(), selector) && checkDelivery(message.getHeaderMessage())) {
message.incDeliveryCount();
nbMsgsDeliverSinceCreation++;
// JORAM-326: Saves the queue to avoid bad counter value at restart.
......@@ -1755,6 +1772,8 @@ public class Queue extends Destination implements QueueMBean {
if (remove) {
messages.remove(message);
if ((nbExpirations > 0 ) && message.hasExpiration())
nbExpirations -= 1;
message.delete();
} else {
// message not remove: going on.
......@@ -1805,6 +1824,8 @@ public class Queue extends Destination implements QueueMBean {
if (remove) {
messages.remove(message);
if ((nbExpirations > 0 ) && message.hasExpiration())
nbExpirations -= 1;
message.delete();
}
}
......@@ -1884,10 +1905,11 @@ public class Queue extends Destination implements QueueMBean {
notMsg.addMessage(message.getFullMessage());
if (!notRec.getAutoAck()) {
// putting the message in the delivered messages table:
QueueDelivery queueDelivery = new QueueDelivery(notRec.requester,
notRec.getClientContext(), message);
QueueDelivery queueDelivery = new QueueDelivery(notRec.requester, notRec.getClientContext(), message);
deliveryTable.put(message.getId(), queueDelivery);
messages.remove(message);
if ((nbExpirations > 0 ) && message.hasExpiration())
nbExpirations -= 1;
}
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2006 - 2018 ScalAgent Distributed Technologies
* Copyright (C) 2006 - 2020 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
......@@ -27,6 +27,8 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.ref.SoftReference;
import java.util.Arrays;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
......@@ -52,7 +54,7 @@ import fr.dyade.aaa.util.Transaction;
* A message content is always wrapped as a bytes array, it is characterized
* by properties and "header" fields.
*/
public final class Message implements Serializable, Encodable {
public final class Message implements Comparable<Message>, Serializable, Encodable {
/** define serialVersionUID for interoperability */
private static final long serialVersionUID = 2L;
......@@ -62,6 +64,12 @@ public final class Message implements Serializable, Encodable {
/** Arrival position of this message on its queue or proxy. */
transient public long order;
@Override
public int compareTo(Message msg) {
// This method allows to sort messages list in reverse order (see JORAM-358).
return -Long.compare(order, msg.order);
}
/**
* The number of acknowledgements a message still expects from its
* subscribers before having been fully consumed by them (field used
......@@ -386,6 +394,15 @@ public final class Message implements Serializable, Encodable {
public boolean isValid(long currentTime) {
return (msg.expiration <= 0) || (msg.expiration > currentTime);
}
/**
* Return true if the message has an expiration delay.
*
* @return true if the message has an expiration delay.
*/
public final boolean hasExpiration() {
return getMsg().expiration > 0;
}
/** Name used to store the message */
transient String txname = null;
......@@ -401,9 +418,9 @@ public final class Message implements Serializable, Encodable {
public static Message load(String txname) throws IOException, ClassNotFoundException {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Message.load:" + txname);
Message msg = (Message) AgentServer.getTransaction().load(txname);
msg.txname = txname;
return msg;
Message m = (Message) AgentServer.getTransaction().load(txname);
m.txname = txname;
return m;
}
/**
......@@ -487,27 +504,56 @@ public final class Message implements Serializable, Encodable {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Message.loadAll() " + msgTxname);
Vector messages = new Vector();
Vector<Message> messages = new Vector<Message>();
// Retrieving the names of the persistence message previously saved.
Transaction tx = AgentServer.getTransaction();
long start = System.currentTimeMillis();
String[] names = tx.getList(msgTxname);
long end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: retrieves messages list (" + names.length + ") -> " + (end - start));
logger.log(BasicLevel.DEBUG,
"Message.loadAll: names = " + Arrays.toString(names));
// Retrieving the messages individually persisted.
int nb = 0;
Message msg;
start = System.currentTimeMillis();
for (int i = 0; i < names.length; i++) {
if (names[i].charAt(names[i].length() - 1) != 'B') {
if (names[i].charAt(names[i].length() - 1) == 'B')
continue;
try {
Message msg = (Message) tx.load(names[i]);
msg = (Message) tx.load(names[i]);
// Test the order get from txname
int idx = names[i].lastIndexOf('_');
long order = Long.parseLong(names[i].substring(idx+1));
if (order != msg.order)
logger.log(BasicLevel.WARN,
"Message.loadAll: Message " + names[i] + " -> " + msg.order + " != " + order);
msg.txname = names[i];
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "loadAll: names[" + i + "] = " + msg.txname);
logger.log(BasicLevel.DEBUG,
"Message.loadAll: names[" + i + "] = " + msg.txname);
messages.add(msg);
} catch (Exception exc) {
logger.log(BasicLevel.ERROR, "Message named [" + names[i] + "] could not be loaded", exc);
}
logger.log(BasicLevel.ERROR,
"Message.loadAll: Message named [" + names[i] + "] can not be loaded", exc);
}
}
end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: all messages loaded -> " + (end - start));
start = System.currentTimeMillis();
// Sort messages list in reverse order to optimize queue restoration (JORAM-358)
Collections.sort(messages);
end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: sort -> " + (end - start));
return messages;
}
......@@ -577,10 +623,10 @@ public final class Message implements Serializable, Encodable {
}
public Map getProperties() {
Map props = new HashMap();
if (msg.properties == null) {
if (getMsg().properties == null)
return null;
}
Map props = new HashMap();
Enumeration enu = msg.properties.keys();
while (enu.hasMoreElements()) {
String key = (String) enu.nextElement();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment