Commit d8e2da49 authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

Adapt agent stubs to server changes.

parent f91c86dd
......@@ -43,7 +43,6 @@ import org.ow2.joram.mom.amqp.marshalling.AbstractMarshallingMethod;
import org.ow2.joram.mom.amqp.structures.Ack;
import org.ow2.joram.mom.amqp.structures.AddBoundExchange;
import org.ow2.joram.mom.amqp.structures.Cancel;
import org.ow2.joram.mom.amqp.structures.ConsumeMessage;
import org.ow2.joram.mom.amqp.structures.Deliver;
import org.ow2.joram.mom.amqp.structures.PublishToQueue;
import org.ow2.joram.mom.amqp.structures.Recover;
......@@ -52,7 +51,6 @@ import org.ow2.joram.mom.amqp.structures.RemoveQueueBindings;
import org.ow2.joram.mom.amqp.structures.Returned;
import fr.dyade.aaa.agent.AgentId;
import fr.dyade.aaa.agent.AgentServer;
import fr.dyade.aaa.agent.Channel;
import fr.dyade.aaa.common.Debug;
......@@ -141,8 +139,6 @@ public class StubAgentIn {
removeQueueBindings((RemoveQueueBindings) request);
} else if (request instanceof RemoveBoundExchange) {
removeBoundExchange((RemoveBoundExchange) request, from.getFrom(), proxyId);
} else if (request instanceof ConsumeMessage) {
consumeMessage((ConsumeMessage) request, proxyId);
}
} catch (AMQPException exc) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
......@@ -484,19 +480,4 @@ public class StubAgentIn {
queueUnbind.arguments, serverId, proxyId);
}
private static void consumeMessage(ConsumeMessage consumeMessage, long proxyId) throws TransactionException {
Queue queue = Naming.lookupQueue(consumeMessage.queueName);
Message msg = queue.consumeMessage(consumeMessage.noAck, consumeMessage.consumerTag,
consumeMessage.channelNumber, consumeMessage.consumerServerId, proxyId);
if (msg != null) {
AMQP.Basic.Deliver deliver = new AMQP.Basic.Deliver(consumeMessage.consumerTag, msg.queueMsgId,
msg.redelivered, msg.exchange, msg.routingKey);
deliver.channelNumber = consumeMessage.channelNumber;
AMQPAgent.stubAgentOut.deliver(new Deliver(deliver, msg.properties, msg.body, msg.queueMsgId,
consumeMessage.consumerServerId, proxyId, consumeMessage.queueName), queue);
// Send to itself in order to finally get all messages on the queue
StubAgentOut.asyncSend(consumeMessage, AgentServer.getServerId(), proxyId);
}
}
}
......@@ -22,6 +22,9 @@
*/
package org.ow2.joram.mom.amqp;
import java.util.Iterator;
import java.util.List;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import org.ow2.joram.mom.amqp.exceptions.AMQPException;
......@@ -120,13 +123,18 @@ public class StubAgentOut implements DeliveryListener {
AMQPAgent.sendRequestTo(request, serverId, proxyId, null);
}
public void deliver(Deliver deliver, Queue queue) {
public boolean deliver(String consumerTag, int channelId, Queue queue, short serverId, long proxyId) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "StubAgentIn.deliver(" + deliver + ", " + queue + ')');
AMQPResponseNot not = new AMQPResponseNot();
not.obj = deliver;
not.keyLock = -1;
Channel.sendTo(AMQPAgent.getAMQPId(deliver.serverId), not);
logger.log(BasicLevel.DEBUG, "StubAgentOut.deliver(" + queue + ')');
List<Deliver> deliveries = queue.getDeliveries(consumerTag, channelId, 1, serverId, proxyId);
for (Iterator<Deliver> iterator = deliveries.iterator(); iterator.hasNext();) {
Deliver deliver = iterator.next();
AMQPResponseNot not = new AMQPResponseNot();
not.obj = deliver;
not.keyLock = -1;
Channel.sendTo(AMQPAgent.getAMQPId(deliver.serverId), not);
}
return true;
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment