Commit f91c86dd authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

Reworked message deliveries.

Various bug fixes.
parent 6c70b167
......@@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import org.objectweb.util.monolog.api.BasicLevel;
......@@ -47,6 +48,7 @@ import org.ow2.joram.mom.amqp.marshalling.AMQP;
import org.ow2.joram.mom.amqp.structures.Deliver;
import fr.dyade.aaa.agent.AgentServer;
import fr.dyade.aaa.common.Debug;
import fr.dyade.aaa.util.Transaction;
/**
......@@ -57,8 +59,7 @@ public class Queue implements QueueMBean, Externalizable {
/** define serialVersionUID for interoperability */
private static final long serialVersionUID = 1L;
public final static Logger logger =
fr.dyade.aaa.common.Debug.getLogger(Queue.class.getName());
public final static Logger logger = Debug.getLogger(Queue.class.getName());
public static final long FIRST_DELIVERY = -1;
......@@ -79,14 +80,15 @@ public class Queue implements QueueMBean, Externalizable {
private long msgCounter;
private TreeSet<Message> toDeliver = new TreeSet<Message>();
private SortedSet<Message> toDeliver = new TreeSet<Message>();
private TreeSet<Message> toAck = new TreeSet<Message>();
private SortedSet<Message> toAck = new TreeSet<Message>();
// Use LinkedHashMap because ordering is necessary for round robin consumers
private Map<SubscriptionKey, Subscription> consumers = new LinkedHashMap<SubscriptionKey, Subscription>();
private Map<SubscriptionKey, Subscription> consumers = new LinkedHashMap<SubscriptionKey, Subscription>(16,
0.75f, true);
public static String PREFIX_QUEUE = "Queue_";
public static final String PREFIX_QUEUE = "Queue_";
private static final String PREFIX_MSG = "M.";
private static final String PREFIX_BOUND_EXCHANGE = "BE_";
......@@ -117,20 +119,20 @@ public class Queue implements QueueMBean, Externalizable {
if (exclusive && (this.serverId != serverId || this.proxyId != proxyId)) {
throw new ResourceLockedException("Can't get message on the non-owned exclusive queue '" + name + "'.");
}
Message msg = null;
if (toDeliver.size() > 0) {
Message msg = toDeliver.pollFirst();
if (!noAck) {
toAck.add(msg);
} else {
if (durable)
msg = toDeliver.first();
toDeliver.remove(msg);
if (noAck) {
if (durable) {
deleteMessage(msg.queueMsgId);
}
} else {
toAck.add(msg);
}
msg.queueSize = toDeliver.size();
return msg;
} else {
// Get empty
return null;
}
return msg;
}
public synchronized void consume(DeliveryListener proxy, int channelId, String consumerTag,
......@@ -153,27 +155,46 @@ public class Queue implements QueueMBean, Externalizable {
consumers.put(new SubscriptionKey(serverId, proxyId, channelId, consumerTag), new Subscription(proxy,
channelId, consumerTag, exclusiveConsumer, noAck, noLocal, serverId, proxyId));
}
public synchronized Message consumeMessage(boolean noAck, String consumerTag, int channelId, short serverId, long proxyId) throws TransactionException {
// Check if subscription is still present
public synchronized List<Deliver> getDeliveries(String consumerTag, int channelId, int maxMessage,
short serverId, long proxyId) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.getDeliveries(" + consumerTag + ',' + maxMessage + ')');
SubscriptionKey subKey = new SubscriptionKey(serverId, proxyId, channelId, consumerTag);
if (consumers.get(subKey) == null) {
Subscription sub = consumers.get(subKey);
if (sub == null || maxMessage == 0) {
return null;
}
Message msg = null;
List<Deliver> deliveries = new ArrayList<Deliver>();
if (toDeliver.size() > 0) {
msg = toDeliver.pollFirst();
if (!noAck) {
toAck.add(msg);
} else {
if (durable)
deleteMessage(msg.queueMsgId);
Iterator<Message> messagesIter = toDeliver.iterator();
while (messagesIter.hasNext() && (maxMessage < 0 || deliveries.size() < maxMessage)) {
Message msg = messagesIter.next();
if (!sub.noAck) {
toAck.add(msg);
} else {
if (durable) {
deleteMessage(msg.queueMsgId);
}
}
messagesIter.remove();
AMQP.Basic.Deliver deliver = new AMQP.Basic.Deliver(consumerTag, msg.queueMsgId, msg.redelivered,
msg.exchange, msg.routingKey);
deliver.channelNumber = channelId;
Deliver msgDelivery = new Deliver(deliver, msg.properties, msg.body, msg.queueMsgId, serverId,
proxyId, name, sub.noAck);
deliveries.add(msgDelivery);
}
}
return msg;
return deliveries;
}
public synchronized void publish(Message msg, boolean immediate, short serverId, long proxyId)
throws NoConsumersException, TransactionException {
if (logger.isLoggable(BasicLevel.DEBUG))
......@@ -192,47 +213,33 @@ public class Queue implements QueueMBean, Externalizable {
recover = true;
}
if (consumers.size() > 0) {
if (toDeliver.size() > 0) {
toDeliver.add(msg);
if (durable) {
if (recover) {
toAck.remove(msg);
deleteMessage(msg.queueMsgId);
}
saveMessage(msg);
}
return;
if (consumers.size() == 0 && immediate) {
throw new NoConsumersException("No consumer available for immediate publication on queue '" + name + "'.");
}
if (durable) {
if (recover) {
toAck.remove(msg);
} else {
saveMessage(msg);
}
}
toDeliver.add(msg);
// If a consumer is present, try to deliver right now
if (consumers.size() > 0) {
Iterator<Entry<SubscriptionKey, Subscription>> iterEntries = consumers.entrySet().iterator();
Entry<SubscriptionKey, Subscription> entry = iterEntries.next();
iterEntries.remove();
Subscription subscription = entry.getValue();
if (!subscription.noAck) {
toAck.add(msg);
} else {
if (durable)
deleteMessage(msg.queueMsgId);
}
AMQP.Basic.Deliver deliver = new AMQP.Basic.Deliver(subscription.consumerTag, msg.queueMsgId,
msg.redelivered, msg.exchange, msg.routingKey);
deliver.channelNumber = subscription.channelId;
Deliver msgDelivery = new Deliver(deliver, msg.properties, msg.body, msg.queueMsgId, subscription.serverId,
subscription.proxyId, name);
subscription.deliveryListener.deliver(msgDelivery, this);
consumers.put(entry.getKey(), entry.getValue());
} else {
if (immediate) {
throw new NoConsumersException("No consumer available for immediate publication on queue '" + name + "'.");
} else {
toDeliver.add(msg);
if (durable) {
if (recover) {
toAck.remove(msg);
deleteMessage(msg.queueMsgId);
}
saveMessage(msg);
// Deliver to the first available (ie QoS buffer not full) consumer
// this will achieve round-robin if no QoS is set.
while (iterEntries.hasNext()) {
Entry<SubscriptionKey, Subscription> entry = iterEntries.next();
Subscription subscription = entry.getValue();
if (subscription.deliveryListener.deliver(subscription.consumerTag, subscription.channelId, this,
subscription.serverId, subscription.proxyId)) {
consumers.get(entry.getKey());
return;
}
}
}
......@@ -344,7 +351,7 @@ public class Queue implements QueueMBean, Externalizable {
return queueInfo;
}
static class Subscription {
private static class Subscription {
long proxyId;
short serverId;
......@@ -455,7 +462,7 @@ public class Queue implements QueueMBean, Externalizable {
}
}
static class SubscriptionKey {
private static class SubscriptionKey {
short serverId;
......@@ -511,10 +518,10 @@ public class Queue implements QueueMBean, Externalizable {
}
}
//**********************************************************
//* Persistence
//**********************************************************
public static Queue loadQueue(String name) throws IOException, ClassNotFoundException, TransactionException {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.loadQueue(" + name + ')');
......@@ -587,7 +594,7 @@ public class Queue implements QueueMBean, Externalizable {
AgentServer.getTransaction().delete(prefixBE + exchangeName.replace('/', '.'));
}
private void deleteAllMessage(TreeSet<Message> messages) {
private void deleteAllMessage(Set<Message> messages) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.deleteAllMessage(" + messages + ')');
if (!durable)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment