Commit 46ad91b5 authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

Add Transaction methods handling: Select, Commit, Rollback

parent 53bc9ad7
......@@ -29,6 +29,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
......@@ -447,7 +448,8 @@ public class AMQPConnectionListener extends Daemon {
if (!isChannelOpen(channelNumber)) {
throw new ChannelErrorException("Channel not opened.");
}
throw new NotImplementedException("Transactions currently not implemented.");
sendToProxy(method);
break;
default:
// nothing
......@@ -672,10 +674,6 @@ public class AMQPConnectionListener extends Daemon {
private void closeChannel(int channel) {
openChannel.remove(Integer.valueOf(channel));
//TODO close cnx if openChannel isEmpty ?
// if (openChannel.isEmpty()) {
// closeConnection(close);
// }
}
private PublishRequest createPublishRequest(int channel) {
......@@ -832,6 +830,8 @@ public class AMQPConnectionListener extends Daemon {
if (!running)
break;
}
} catch (SocketException exc) {
this.logmon.log(BasicLevel.DEBUG, this.getName() + ", socket error", exc);
} catch (Exception exc) {
this.logmon.log(BasicLevel.FATAL, this.getName() + ", unrecoverable exception", exc);
} finally {
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.ow2.joram.mom.amqp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
......@@ -18,10 +41,18 @@ public class ChannelContext {
QueueShell queue;
boolean waitingCommit;
public Delivery(long deliverytag, long queueMsgId, QueueShell queue) {
this.deliveryTag = deliverytag;
this.queueMsgId = queueMsgId;
this.queue = queue;
waitingCommit = false;
}
public String toString() {
return "Delivery [queueMsgId=" + queueMsgId + ", deliveryTag=" + deliveryTag + ", queue=" + queue
+ ", waitingCommit=" + waitingCommit + "]";
}
}
......@@ -40,7 +71,14 @@ public class ChannelContext {
* Maps a consumer tag to a queue.
*/
Map<String, QueueShell> consumerQueues = new HashMap<String, QueueShell>();
/**
* Tells if the channel is in transacted mode.
*/
boolean transacted = false;
List<PublishRequest> pubToCommit = new ArrayList<PublishRequest>();
/**
* Id used to generate a channel-unique deliveryTag;
*/
......
......@@ -247,7 +247,7 @@ public class Proxy implements DeliveryListener {
AMQP.Basic.Ack ack = (AMQP.Basic.Ack) method;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "ACK = " + ack);
basicAck(ack.deliveryTag, ack.multiple, channelNumber);
basicAck(ack);
break;
case AMQP.Basic.Consume.INDEX:
......@@ -273,7 +273,7 @@ public class Proxy implements DeliveryListener {
case AMQP.Basic.Reject.INDEX:
AMQP.Basic.Reject reject = (AMQP.Basic.Reject) method;
basicReject(reject);
throw new NotImplementedException("Reject method currently not implemented.");
break;
case AMQP.Basic.RecoverAsync.INDEX:
// This method is deprecated in favor of the synchronous Recover/RecoverOk.
......@@ -333,19 +333,42 @@ public class Proxy implements DeliveryListener {
* Class Tx
******************************************************/
case AMQP.Tx.INDEX:
switch (method.getMethodId()) {
case AMQP.Tx.Select.INDEX:
getContext(channelNumber).transacted = true;
AMQP.Tx.SelectOk selectOk = new AMQP.Tx.SelectOk();
selectOk.channelNumber = channelNumber;
send(selectOk);
break;
case AMQP.Tx.Commit.INDEX:
txCommit(channelNumber);
AMQP.Tx.CommitOk commitOk = new AMQP.Tx.CommitOk();
commitOk.channelNumber = channelNumber;
send(commitOk);
break;
case AMQP.Tx.Rollback.INDEX:
txRollback(channelNumber);
AMQP.Tx.RollbackOk rollbackOk = new AMQP.Tx.RollbackOk();
rollbackOk.channelNumber = channelNumber;
send(rollbackOk);
break;
}
break;
default:
break;
}
}
/**
* Releases connection or channel resources and close it by sending a
* notification to the client.
*/
private void throwException(AMQPException amqe, int channelNumber, int classId, int methodId)
throws Exception {
throws AMQPException {
if (amqe instanceof ChannelException) {
channelClose(channelNumber);
AMQP.Channel.Close close = new AMQP.Channel.Close(amqe.getCode(), amqe.getMessage(), classId, methodId);
......@@ -391,7 +414,7 @@ public class Proxy implements DeliveryListener {
return channelContext;
}
public synchronized void cleanConsumers(short sid) throws Exception {
public synchronized void cleanConsumers(short sid) throws AMQPException {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Proxy.cleanConsumers(" + sid + ')');
Set<Integer> channelsIds = channelContexts.keySet();
......@@ -417,17 +440,25 @@ public class Proxy implements DeliveryListener {
}
}
public void basicAck(long deliveryTag, boolean multiple, int channelNumber)
throws PreconditionFailedException {
public void basicAck(AMQP.Basic.Ack ack) throws PreconditionFailedException {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Proxy.basicAck(" + deliveryTag + ", " + channelNumber + ')');
logger.log(BasicLevel.DEBUG, "Proxy.basicAck(" + ack.deliveryTag + ", " + ack.channelNumber + ')');
ChannelContext channelContext = getContext(ack.channelNumber);
ChannelContext channelContext = getContext(channelNumber);
Iterator<Delivery> iter = channelContext.deliveriesToAck.iterator();
if (!multiple) {
if (!ack.multiple) {
while (iter.hasNext()) {
Delivery delivery = iter.next();
if (delivery.deliveryTag == deliveryTag) {
if (delivery.deliveryTag == ack.deliveryTag) {
if (channelContext.transacted) {
if (delivery.waitingCommit) {
break;
} else {
delivery.waitingCommit = true;
return;
}
}
List<Long> ackList = new ArrayList<Long>(1);
ackList.add(new Long(delivery.queueMsgId));
iter.remove();
......@@ -440,25 +471,37 @@ public class Proxy implements DeliveryListener {
return;
}
}
throw new PreconditionFailedException("Acknowledgement error: invalid tag.");
throw new PreconditionFailedException("Acknowledgement error: invalid tag '" + ack.deliveryTag + "'.");
} else {
Map<QueueShell, List<Long>> deliveryMap = new HashMap<QueueShell, List<Long>>();
while (iter.hasNext()) {
Delivery delivery = iter.next();
if (delivery.deliveryTag <= deliveryTag) {
if (delivery.deliveryTag <= ack.deliveryTag || ack.deliveryTag == 0) {
if (channelContext.transacted) {
if (delivery.waitingCommit) {
continue;
} else {
delivery.waitingCommit = true;
}
}
List<Long> ackList = deliveryMap.get(delivery.queue);
if (ackList == null) {
ackList = new ArrayList<Long>();
deliveryMap.put(delivery.queue, ackList);
}
ackList.add(new Long(delivery.queueMsgId));
iter.remove();
} else if (delivery.deliveryTag > deliveryTag) {
if (!channelContext.transacted) {
iter.remove();
}
} else if (delivery.deliveryTag > ack.deliveryTag) {
break;
}
}
if (deliveryMap.size() == 0) {
throw new PreconditionFailedException("Acknowledgement error: invalid tag.");
throw new PreconditionFailedException("Acknowledgement error: invalid tag '" + ack.deliveryTag + "'.");
}
if (channelContext.transacted) {
return;
}
Iterator<QueueShell> iterQueues = deliveryMap.keySet().iterator();
while (iterQueues.hasNext()) {
......@@ -529,7 +572,7 @@ public class Proxy implements DeliveryListener {
}
if (channelContext.consumerQueues.get(tag) != null) {
throw new NotAllowedException("Consume request failed due to non-unique tag.");
throw new NotAllowedException("Consume request failed due to non-unique tag: '" + tag + "'.");
}
QueueShell queueShell;
......@@ -564,12 +607,10 @@ public class Proxy implements DeliveryListener {
ChannelContext channelContext = getContext(basicGet.channelNumber);
if (queueName.equals("")) {
queueName = channelContext.lastQueueCreated;
/*
* If the client did not declare a queue, and the method needs a queue
* name, this will result in a 502 (syntax error) channel exception.
*/
/* If the client did not declare a queue, and the method needs a queue
* name, this will result in a 502 (syntax error) channel exception. */
if (queueName == null) {
throw new SyntaxErrorException("No queue declared.");
throw new SyntaxErrorException("No queue previously declared on the channel.");
}
}
......@@ -597,9 +638,19 @@ public class Proxy implements DeliveryListener {
}
public void basicPublish(PublishRequest publishRequest) throws NotFoundException {
basicPublish(publishRequest, false);
}
public void basicPublish(PublishRequest publishRequest, boolean commiting) throws NotFoundException {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Proxy.basicPublish(" + publishRequest + ')');
ChannelContext channelContext = getContext(publishRequest.channel);
if (channelContext.transacted && !commiting) {
channelContext.pubToCommit.add(publishRequest);
return;
}
if (Naming.isLocal(publishRequest.getPublish().exchange)) {
try {
StubLocal.basicPublish(publishRequest, name.serverId, name.proxyId);
......@@ -620,17 +671,20 @@ public class Proxy implements DeliveryListener {
// Recover non-acked messages on the channel
ChannelContext channelContext = getContext(channelNumber);
Map<QueueShell, List<Long>> recoverMap = new HashMap<QueueShell, List<Long>>();
Iterator<Delivery> iter = channelContext.deliveriesToAck.iterator();
Map<QueueShell, List<Long>> recoverMap = new HashMap<QueueShell, List<Long>>();
while (iter.hasNext()) {
Delivery delivery = iter.next();
if (delivery.waitingCommit) {
continue;
}
List<Long> ackList = recoverMap.get(delivery.queue);
if (ackList == null) {
ackList = new ArrayList<Long>();
recoverMap.put(delivery.queue, ackList);
}
ackList.add(new Long(delivery.queueMsgId));
ackList.add(Long.valueOf(delivery.queueMsgId));
iter.remove();
}
......@@ -646,7 +700,8 @@ public class Proxy implements DeliveryListener {
}
}
public void basicReject(AMQP.Basic.Reject basicReject) throws TransactionException {
public void basicReject(AMQP.Basic.Reject basicReject) throws TransactionException,
PreconditionFailedException {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Proxy.basicReject(" + basicReject + ')');
ChannelContext channelContext = channelContexts.get(Integer.valueOf(basicReject.channelNumber));
......@@ -673,9 +728,12 @@ public class Proxy implements DeliveryListener {
}
}
iter.remove();
return;
} else if (delivery.deliveryTag > basicReject.deliveryTag) {
break;
}
}
throw new PreconditionFailedException("Reject error: invalid tag '" + basicReject.deliveryTag + "'.");
}
public void channelClose(int channelNumber) throws AMQPException {
......@@ -695,6 +753,10 @@ public class Proxy implements DeliveryListener {
// Can't happen.
}
}
if (channelContext.transacted) {
txRollback(channelNumber);
}
}
// Recover all non-acked messages
......@@ -768,22 +830,18 @@ public class Proxy implements DeliveryListener {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Proxy.queueBind(" + queueBind + ')');
/*
* If the queue name is empty, the server uses the last queue declared on
/* If the queue name is empty, the server uses the last queue declared on
* the channel. If the routing key is also empty, the server uses this queue
* name for the routing key as well. If the queue name is provided but the
* routing key is empty, the server does the binding with that empty routing
* key.
*/
* key. */
String queueName = queueBind.queue;
if (queueName.equals("")) {
queueName = getContext(queueBind.channelNumber).lastQueueCreated;
/*
* If the client did not declare a queue, and the method needs a queue
* name, this will result in a 502 (syntax error) channel exception.
*/
/* If the client did not declare a queue, and the method needs a queue
* name, this will result in a 502 (syntax error) channel exception. */
if (queueName == null) {
throw new SyntaxErrorException("No queue declared.");
throw new SyntaxErrorException("No queue previously declared on the channel.");
}
if (queueBind.routingKey.equals("")) {
queueBind.routingKey = queueName;
......@@ -897,6 +955,54 @@ public class Proxy implements DeliveryListener {
}
}
public void txCommit(int channelNumber) throws PreconditionFailedException, NotFoundException {
ChannelContext channelContext = getContext(channelNumber);
if (!channelContext.transacted) {
throw new PreconditionFailedException("Can't commit a non-transacted channel.");
}
for (PublishRequest publish : channelContext.pubToCommit) {
basicPublish(publish, true);
}
Map<QueueShell, List<Long>> deliveryMap = new HashMap<QueueShell, List<Long>>();
Iterator<Delivery> iter = channelContext.deliveriesToAck.iterator();
while (iter.hasNext()) {
Delivery delivery = iter.next();
if (delivery.waitingCommit) {
List<Long> ackList = deliveryMap.get(delivery.queue);
if (ackList == null) {
ackList = new ArrayList<Long>();
deliveryMap.put(delivery.queue, ackList);
}
ackList.add(new Long(delivery.queueMsgId));
iter.remove();
}
}
Iterator<QueueShell> iterQueues = deliveryMap.keySet().iterator();
while (iterQueues.hasNext()) {
QueueShell queue = iterQueues.next();
if (queue.islocal()) {
queue.getReference().ackMessages(deliveryMap.get(queue));
} else {
StubAgentOut.asyncSend(new Ack(queue.getName(), deliveryMap.get(queue)),
Naming.resolveServerId(queue.getName()));
}
}
}
public void txRollback(int channelNumber) throws PreconditionFailedException {
ChannelContext channelContext = getContext(channelNumber);
if (!channelContext.transacted) {
throw new PreconditionFailedException("Can't rollback a non-transacted channel.");
}
channelContext.pubToCommit.clear();
for (Delivery delivery : channelContext.deliveriesToAck) {
delivery.waitingCommit = false;
}
}
public void consumeMessage(ConsumeMessage consumeMessage) throws TransactionException {
Queue queue = Naming.lookupQueue(consumeMessage.queueName);
Message msg = queue.consumeMessage(consumeMessage.noAck, consumeMessage.consumerTag,
......@@ -966,76 +1072,69 @@ public class Proxy implements DeliveryListener {
logger.log(BasicLevel.DEBUG, "Proxy.run()");
while (running) {
canStop = true;
while (true) {
try {
Object obj = queueIn.getAndPop();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Proxy: object on queue : " + obj.getClass().getName());
if (obj instanceof AbstractMarshallingMethod) {
AbstractMarshallingMethod method = (AbstractMarshallingMethod) obj;
try {
doProcessMethod(method);
} catch (AMQPException amqe) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Proxy: AMQP error: " + amqe.getMessage());
}
throwException(amqe, method.channelNumber, method.getClassId(), method.getMethodId());
}
} else if (obj instanceof PublishRequest) {
PublishRequest publishRequest = (PublishRequest) obj;
try {
basicPublish(publishRequest);
} catch (AMQPException amqe) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Proxy: AMQP error: " + amqe.getMessage());
}
throwException(amqe, publishRequest.channel, publishRequest.getPublish().getClassId(),
publishRequest.getPublish().getMethodId());
try {
canStop = true;
Object obj = queueIn.getAndPop();
canStop = false;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Proxy: object on queue : " + obj.getClass().getName());
if (obj instanceof AbstractMarshallingMethod) {
AbstractMarshallingMethod method = (AbstractMarshallingMethod) obj;
try {
doProcessMethod(method);
} catch (AMQPException amqe) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Proxy: AMQP error: " + amqe.getMessage());
}
} else if (obj instanceof ConsumeMessage) {
ConsumeMessage consumeMessage = (ConsumeMessage) obj;
consumeMessage(consumeMessage);
} else {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "Proxy: UNEXPECTED OBJECT CLASS: " + obj.getClass().getName());
throwException(amqe, method.channelNumber, method.getClassId(), method.getMethodId());
}
} else if (obj instanceof PublishRequest) {
PublishRequest publishRequest = (PublishRequest) obj;
try {
commitTx();
} catch (TransactionException e) {
if (obj instanceof AbstractMarshallingMethod) {
AbstractMarshallingMethod method = (AbstractMarshallingMethod) obj;
throwException(e, method.channelNumber, method.getClassId(), method.getMethodId());
} else if (obj instanceof PublishRequest) {
PublishRequest publishRequest = (PublishRequest) obj;
throwException(e, publishRequest.channel, publishRequest.getPublish().getClassId(),
publishRequest.getPublish().getMethodId());
basicPublish(publishRequest);
} catch (AMQPException amqe) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Proxy: AMQP error: " + amqe.getMessage());
}
throwException(amqe, publishRequest.channel, publishRequest.getPublish().getClassId(),
publishRequest.getPublish().getMethodId());
}
} catch (InterruptedException e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "Proxy: error ", e);
} catch (Exception exc) {
} else if (obj instanceof ConsumeMessage) {
ConsumeMessage consumeMessage = (ConsumeMessage) obj;
consumeMessage(consumeMessage);
} else {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "Proxy: error ", exc);
logger.log(BasicLevel.ERROR, "Proxy: UNEXPECTED OBJECT CLASS: " + obj.getClass().getName());
}
if (running) {
continue;
} else {
break;
try {
commitTx();
} catch (TransactionException e) {
if (obj instanceof AbstractMarshallingMethod) {
AbstractMarshallingMethod method = (AbstractMarshallingMethod) obj;
throwException(e, method.channelNumber, method.getClassId(), method.getMethodId());
} else if (obj instanceof PublishRequest) {
PublishRequest publishRequest = (PublishRequest) obj;
throwException(e, publishRequest.channel, publishRequest.getPublish().getClassId(),
publishRequest.getPublish().getMethodId());
}
}
} catch (InterruptedException e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "Proxy: error ", e);
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "Proxy: error ", exc);
}
}
}
// Daemon....
@Override
protected void close() {
// TODO Auto-generated method stub
}
@Override
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment