Commit f32c99f3 authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

Implement Basic.Qos method.

Reworked message deliveries.
Various bug fixes.
parent 4f99eef4
......@@ -28,6 +28,7 @@ import java.rmi.AlreadyBoundException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
......@@ -49,11 +50,12 @@ import org.ow2.joram.mom.amqp.exceptions.ResourceLockedException;
import org.ow2.joram.mom.amqp.exceptions.SyntaxErrorException;
import org.ow2.joram.mom.amqp.exceptions.TransactionException;
import org.ow2.joram.mom.amqp.marshalling.AMQP;
import org.ow2.joram.mom.amqp.marshalling.AMQP.Basic.Qos;
import org.ow2.joram.mom.amqp.marshalling.AbstractMarshallingMethod;
import org.ow2.joram.mom.amqp.structures.Ack;
import org.ow2.joram.mom.amqp.structures.Cancel;
import org.ow2.joram.mom.amqp.structures.ConsumeMessage;
import org.ow2.joram.mom.amqp.structures.Deliver;
import org.ow2.joram.mom.amqp.structures.GetDeliveries;
import org.ow2.joram.mom.amqp.structures.GetResponse;
import org.ow2.joram.mom.amqp.structures.Recover;
import org.ow2.joram.mom.amqp.structures.Returned;
......@@ -83,12 +85,12 @@ public class Proxy implements DeliveryListener, ProxyMBean {
private ProxyName name;
private fr.dyade.aaa.common.Queue queueIn = null;
private fr.dyade.aaa.common.Queue queueOut = null;
private java.util.Queue queueOut = null;
private NetServerIn netServerIn;
private Transaction transaction = null;
public Proxy(fr.dyade.aaa.common.Queue queueIn, fr.dyade.aaa.common.Queue queueOut) throws IOException {
public Proxy(fr.dyade.aaa.common.Queue queueIn, java.util.Queue queueOut) throws IOException {
if (AgentServer.getTransaction().isPersistent())
loadProxyId();
this.name = new ProxyName(AgentServer.getServerId(), getNextProxyId());
......@@ -291,8 +293,12 @@ public class Proxy implements DeliveryListener, ProxyMBean {
break;
case AMQP.Basic.Qos.INDEX:
//TODO
throw new NotImplementedException("Qos method currently not implemented.");
AMQP.Basic.Qos qos = (AMQP.Basic.Qos) method;
basicQoS(qos);
AMQP.Basic.QosOk qosOk = new AMQP.Basic.QosOk();
qosOk.channelNumber = channelNumber;
send(qosOk);
break;
default:
break;
......@@ -401,7 +407,7 @@ public class Proxy implements DeliveryListener, ProxyMBean {
/* ******************************************* */
/* ******************************************* */
private List<QueueShell> exclusiveQueues = new ArrayList<QueueShell>();
private Set<QueueShell> exclusiveQueues = new HashSet<QueueShell>();
// Maps channel id to its context
private Map<Integer, ChannelContext> channelContexts = new HashMap<Integer, ChannelContext>();
......@@ -469,6 +475,10 @@ public class Proxy implements DeliveryListener, ProxyMBean {
StubAgentOut.asyncSend(new Ack(delivery.queue.getName(), ackList),
Naming.resolveServerId(delivery.queue.getName()));
}
if (channelContext.prefetchCount != 0) {
queueIn.push(new GetDeliveries(null, ack.channelNumber));
}
return;
}
}
......@@ -514,6 +524,10 @@ public class Proxy implements DeliveryListener, ProxyMBean {
Naming.resolveServerId(queue.getName()));
}
}
if (channelContext.prefetchCount != 0) {
queueIn.push(new GetDeliveries(null, ack.channelNumber));
}
}
}
......@@ -553,6 +567,60 @@ public class Proxy implements DeliveryListener, ProxyMBean {
}
}
private void getDeliveries(GetDeliveries getDeliveries) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Proxy.getDeliveries(" + getDeliveries.consumerTag + ')');
ChannelContext channelContext = channelContexts.get(Integer.valueOf(getDeliveries.channelId));
if (channelContext == null) {
// channel has been closed in the meantime
return;
}
int maxMessage = -1;
if (channelContext.prefetchCount > 0) {
maxMessage = channelContext.prefetchCount - channelContext.deliveriesToAck.size();
}
if (maxMessage == 0) {
return;
}
if (getDeliveries.consumerTag != null) {
QueueShell queueShell = channelContext.consumerQueues.get(getDeliveries.consumerTag);
if (queueShell != null) {
Queue queue = queueShell.getReference();
doGetDeliveries(getDeliveries.consumerTag, getDeliveries.channelId, maxMessage, queue);
}
} else {
String[] tags = channelContext.consumerQueues.keySet().toArray(
new String[channelContext.consumerQueues.size()]);
int i = 0;
while (i < tags.length && maxMessage != 0) {
String consumerTag = tags[i];
Queue queue = channelContext.consumerQueues.get(consumerTag).getReference();
int deliveredCount = doGetDeliveries(consumerTag, getDeliveries.channelId, maxMessage, queue);
maxMessage -= deliveredCount;
i++;
}
}
}
private int doGetDeliveries(String consumerTag, int channelId, int maxMessage, Queue queue) {
List<Deliver> deliveries = queue.getDeliveries(consumerTag, channelId, maxMessage, name.serverId,
name.proxyId);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Proxy.doGetDeliveries(" + deliveries + ')');
if (deliveries != null) {
for (Iterator iterator = deliveries.iterator(); iterator.hasNext();) {
Deliver deliver = (Deliver) iterator.next();
send(deliver, new QueueShell(queue));
}
return deliveries.size();
}
return 0;
}
public void basicConsume(AMQP.Basic.Consume basicConsume) throws NotFoundException, NotAllowedException,
AMQPException, AccessRefusedException, ResourceLockedException {
if (logger.isLoggable(BasicLevel.DEBUG))
......@@ -578,16 +646,13 @@ public class Proxy implements DeliveryListener, ProxyMBean {
QueueShell queueShell;
if (Naming.isLocal(queueName)) {
StubLocal.basicConsume(this, basicConsume.queue, tag, basicConsume.exclusive,
basicConsume.noAck, basicConsume.noLocal, basicConsume.channelNumber, name.serverId, name.proxyId);
StubLocal.basicConsume(this, basicConsume.queue, tag, basicConsume.exclusive, basicConsume.noAck,
basicConsume.noLocal, basicConsume.channelNumber, name.serverId, name.proxyId);
queueShell = new QueueShell(Naming.lookupQueue(queueName));
queueIn.push(new ConsumeMessage(queueName, tag, basicConsume.channelNumber, basicConsume.noAck));
} else {
queueShell = new QueueShell(basicConsume.queue);
basicConsume.consumerTag = tag;
StubAgentOut.asyncSend(basicConsume, Naming.resolveServerId(basicConsume.queue), name.proxyId);
StubAgentOut.asyncSend(new ConsumeMessage(queueName, tag, basicConsume.channelNumber,
basicConsume.noAck, name.serverId), Naming.resolveServerId(queueName), name.proxyId);
}
// Send the ok response
......@@ -597,6 +662,9 @@ public class Proxy implements DeliveryListener, ProxyMBean {
send(consumeOk);
}
channelContext.consumerQueues.put(tag, queueShell);
queueIn.push(new GetDeliveries(tag, basicConsume.channelNumber));
}
public GetResponse basicGet(AMQP.Basic.Get basicGet) throws NotFoundException, AMQPException,
......@@ -638,21 +706,29 @@ public class Proxy implements DeliveryListener, ProxyMBean {
return new GetResponse(getOk, msg.properties, msg.body);
}
public void basicPublish(PublishRequest publishRequest) throws NotFoundException {
basicPublish(publishRequest, false);
}
public void basicPublish(PublishRequest publishRequest, boolean commiting) throws NotFoundException {
public void basicPublish(PublishRequest publishRequest) throws NotFoundException, TransactionException {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Proxy.basicPublish(" + publishRequest + ')');
ChannelContext channelContext = getContext(publishRequest.channel);
if (channelContext.transacted && !commiting) {
if (channelContext.transacted) {
IExchange exchange = Naming.lookupExchange(publishRequest.getPublish().exchange);
if (exchange == null) {
throw new NotFoundException("Can't publish on an unknwon exchange: '"
+ publishRequest.getPublish().exchange + "'.");
}
channelContext.pubToCommit.add(publishRequest);
return;
}
if (Naming.isLocal(publishRequest.getPublish().exchange)) {
try {
AgentServer.getTransaction().begin();
} catch (IOException exc) {
throw new TransactionException(exc.getMessage());
}
try {
StubLocal.basicPublish(publishRequest, name.serverId, name.proxyId);
} catch (AMQPException exc) {
......@@ -660,6 +736,12 @@ public class Proxy implements DeliveryListener, ProxyMBean {
returned.channelNumber = publishRequest.channel;
send(new Returned(returned, publishRequest.getHeader(), publishRequest.getBody()));
}
try {
AgentServer.getTransaction().commit(true);
} catch (IOException exc) {
throw new TransactionException(exc.getMessage());
}
} else {
StubAgentOut.asyncSend(publishRequest, Naming.resolveServerId(publishRequest.getPublish().exchange),
name.proxyId);
......@@ -689,6 +771,10 @@ public class Proxy implements DeliveryListener, ProxyMBean {
iter.remove();
}
if (recoverMap.size() > 0 && channelContext.prefetchCount != 0) {
queueIn.push(new GetDeliveries(null, channelNumber));
}
Iterator<QueueShell> iterQueues = recoverMap.keySet().iterator();
while (iterQueues.hasNext()) {
QueueShell queue = iterQueues.next();
......@@ -705,7 +791,7 @@ public class Proxy implements DeliveryListener, ProxyMBean {
PreconditionFailedException {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Proxy.basicReject(" + basicReject + ')');
ChannelContext channelContext = channelContexts.get(Integer.valueOf(basicReject.channelNumber));
ChannelContext channelContext = getContext(basicReject.channelNumber);
Iterator<Delivery> iter = channelContext.deliveriesToAck.iterator();
while (iter.hasNext()) {
Delivery delivery = iter.next();
......@@ -737,6 +823,20 @@ public class Proxy implements DeliveryListener, ProxyMBean {
throw new PreconditionFailedException("Reject error: invalid tag '" + basicReject.deliveryTag + "'.");
}
public void basicQoS(Qos qos) throws NotImplementedException {
if (qos.global) {
throw new NotImplementedException("Global Qos prefetch not implemented.");
}
if (qos.prefetchSize != 0) {
throw new NotImplementedException("Qos prefetch size currently not implemented.");
}
ChannelContext channelContext = getContext(qos.channelNumber);
if (qos.prefetchCount > channelContext.prefetchCount || qos.prefetchCount == 0) {
queueIn.push(new GetDeliveries(null, qos.channelNumber));
}
channelContext.prefetchCount = qos.prefetchCount;
}
public void channelClose(int channelNumber) throws AMQPException {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Proxy.channelClose(" + channelNumber + ")");
......@@ -875,7 +975,7 @@ public class Proxy implements DeliveryListener, ProxyMBean {
declareOk = (AMQP.Queue.DeclareOk) StubAgentOut.syncSend(queueDeclare, Naming.resolveServerId(queueDeclare.queue));
queueShell = new QueueShell(declareOk.queue);
}
if (queueDeclare.exclusive) {
if (!queueDeclare.passive && queueDeclare.exclusive) {
exclusiveQueues.add(queueShell);
}
ChannelContext context = getContext(queueDeclare.channelNumber);
......@@ -956,16 +1056,36 @@ public class Proxy implements DeliveryListener, ProxyMBean {
}
}
public void txCommit(int channelNumber) throws PreconditionFailedException, NotFoundException {
public void txCommit(int channelNumber) throws PreconditionFailedException, TransactionException {
ChannelContext channelContext = getContext(channelNumber);
if (!channelContext.transacted) {
throw new PreconditionFailedException("Can't commit a non-transacted channel.");
}
try {
AgentServer.getTransaction().begin();
} catch (IOException exc) {
throw new TransactionException(exc.getMessage());
}
for (PublishRequest publish : channelContext.pubToCommit) {
basicPublish(publish, true);
if (Naming.isLocal(publish.getPublish().exchange)) {
try {
StubLocal.basicPublish(publish, name.serverId, name.proxyId);
} catch (AMQPException exc) {
// the behaviour of transactions with respect to the immediate and mandatory
// flags on Basic.Publish methods is not defined
AMQP.Basic.Return returned = new AMQP.Basic.Return(exc.getCode(), exc.getMessage(),
publish.getPublish().exchange, publish.getPublish().routingKey);
returned.channelNumber = publish.channel;
send(new Returned(returned, publish.getHeader(), publish.getBody()));
}
} else {
StubAgentOut.asyncSend(publish, Naming.resolveServerId(publish.getPublish().exchange), name.proxyId);
}
}
channelContext.pubToCommit.clear();
Map<QueueShell, List<Long>> deliveryMap = new HashMap<QueueShell, List<Long>>();
Iterator<Delivery> iter = channelContext.deliveriesToAck.iterator();
while (iter.hasNext()) {
......@@ -990,6 +1110,16 @@ public class Proxy implements DeliveryListener, ProxyMBean {
Naming.resolveServerId(queue.getName()));
}
}
try {
AgentServer.getTransaction().commit(true);
} catch (IOException exc) {
throw new TransactionException(exc.getMessage());
}
if (channelContext.prefetchCount != 0) {
queueIn.push(new GetDeliveries(null, channelNumber));
}
}
public void txRollback(int channelNumber) throws PreconditionFailedException {
......@@ -1003,32 +1133,29 @@ public class Proxy implements DeliveryListener, ProxyMBean {
delivery.waitingCommit = false;
}
}
public void consumeMessage(ConsumeMessage consumeMessage) throws TransactionException {
Queue queue = Naming.lookupQueue(consumeMessage.queueName);
Message msg = queue.consumeMessage(consumeMessage.noAck, consumeMessage.consumerTag,
consumeMessage.channelNumber, name.serverId, name.proxyId);
if (msg != null) {
AMQP.Basic.Deliver deliver = new AMQP.Basic.Deliver(consumeMessage.consumerTag, msg.queueMsgId,
msg.redelivered, msg.exchange, msg.routingKey);
deliver.channelNumber = consumeMessage.channelNumber;
send(new Deliver(deliver, msg.properties, msg.body, msg.queueMsgId, (short) -1, -1, null), new QueueShell(queue));
queueIn.push(consumeMessage);
public boolean deliver(String consumerTag, int channelNumber, Queue queue, short serverId, long proxyId) {
ChannelContext channelContext = getContext(channelNumber);
if (channelContext.prefetchCount > 0
&& channelContext.deliveriesToAck.size() == channelContext.prefetchCount) {
return false;
}
}
public void deliver(Deliver deliver, Queue queue) {
send(deliver, new QueueShell(queue));
// Push a getDeliveries method instead of sending deliveries by the current thread
// which leads to synchronization issues and therefore synchronizing proxy structures.
queueIn.push(new GetDeliveries(consumerTag, channelNumber));
return true;
}
public void send(AbstractMarshallingMethod method) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Proxy.send(" + method + ")");
queueOut.push(method);
queueOut.add(method);
}
public void send(GetResponse response) {
queueOut.push(response);
queueOut.add(response);
}
public void send(Deliver deliver, QueueShell queue) {
......@@ -1039,9 +1166,11 @@ public class Proxy implements DeliveryListener, ProxyMBean {
long deliveryTag = channelContext.nextDeliveryTag();
long queueMsgId = deliver.deliver.deliveryTag;
deliver.deliver.deliveryTag = deliveryTag;
channelContext.deliveriesToAck.add(new Delivery(deliveryTag, queueMsgId, queue));
if (!deliver.noAck) {
channelContext.deliveriesToAck.add(new Delivery(deliveryTag, queueMsgId, queue));
}
try {
queueOut.push(deliver);
queueOut.add(deliver);
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "Proxy.send ERROR", e);
......@@ -1049,7 +1178,7 @@ public class Proxy implements DeliveryListener, ProxyMBean {
}
public void send(Returned response) {
queueOut.push(response);
queueOut.add(response);
}
// Daemon....
......@@ -1082,12 +1211,17 @@ public class Proxy implements DeliveryListener, ProxyMBean {
return queueOut.size();
}
public List<QueueShell> getExclusiveQueues() {
return exclusiveQueues;
public Set<String> getExclusiveQueues() {
Set<String> queues = new HashSet<String>(exclusiveQueues.size());
for (Iterator<QueueShell> iterator = exclusiveQueues.iterator(); iterator.hasNext();) {
QueueShell queue = iterator.next();
queues.add(queue.getName());
}
return queues;
}
public Set<Integer> getOpenedChannels() {
return channelContexts.keySet();
public Integer[] getOpenedChannels() {
return channelContexts.keySet().toArray(new Integer[channelContexts.size()]);
}
final class NetServerIn extends Daemon {
......@@ -1129,9 +1263,8 @@ public class Proxy implements DeliveryListener, ProxyMBean {
throwException(amqe, publishRequest.channel, publishRequest.getPublish().getClassId(),
publishRequest.getPublish().getMethodId());
}
} else if (obj instanceof ConsumeMessage) {
ConsumeMessage consumeMessage = (ConsumeMessage) obj;
consumeMessage(consumeMessage);
} else if (obj instanceof GetDeliveries) {
getDeliveries((GetDeliveries) obj);
} else {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "Proxy: UNEXPECTED OBJECT CLASS: " + obj.getClass().getName());
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment