Commit 90535b76 authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

Improve error messages.

parent ba092504
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2008 - 2009 ScalAgent Distributed Technologies
* Copyright (C) 2008 - 2011 ScalAgent Distributed Technologies
* Copyright (C) 2008 - 2009 CNES
*
* This library is free software; you can redistribute it and/or
......@@ -95,14 +95,19 @@ public class DirectExchange extends IExchange {
throws NotFoundException {
Set<String> boundQueues = bindings.get(routingKey);
if (boundQueues != null) {
boundQueues.remove(queueName);
boolean removed = boundQueues.remove(queueName);
if (!removed) {
throw new NotFoundException("Unknown routing key '" + routingKey + "' between direct exchange '"
+ name + "' and queue '" + queueName + "'.");
}
if (boundQueues.size() == 0) {
bindings.remove(routingKey);
}
if (durable)
saveExchange();
} else {
throw new NotFoundException("Unknown routing key: " + routingKey);
throw new NotFoundException("Unknown routing key '" + routingKey + "' between direct exchange '" + name
+ "' and queue '" + queueName + "'.");
}
}
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2008 - 2009 ScalAgent Distributed Technologies
* Copyright (C) 2008 - 2011 ScalAgent Distributed Technologies
* Copyright (C) 2008 - 2009 CNES
*
* This library is free software; you can redistribute it and/or
......@@ -75,7 +75,7 @@ public class FanoutExchange extends IExchange {
throws NotFoundException {
boolean removed = boundQueues.remove(queueName);
if (!removed) {
throw new NotFoundException("Queue not bound: " + queueName);
throw new NotFoundException("Fanout exchange '" + name + "' not bound with queue '" + queueName + "'.");
}
if (durable) {
saveExchange();
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2008 - 2009 ScalAgent Distributed Technologies
* Copyright (C) 2008 - 2011 ScalAgent Distributed Technologies
* Copyright (C) 2008 - 2009 CNES
*
* This library is free software; you can redistribute it and/or
......@@ -33,9 +33,9 @@ import java.util.Set;
import org.ow2.joram.mom.amqp.exceptions.NoConsumersException;
import org.ow2.joram.mom.amqp.exceptions.NotFoundException;
import org.ow2.joram.mom.amqp.exceptions.TransactionException;
import org.ow2.joram.mom.amqp.marshalling.AMQP.Basic.BasicProperties;
import org.ow2.joram.mom.amqp.marshalling.LongString;
import org.ow2.joram.mom.amqp.marshalling.LongStringHelper;
import org.ow2.joram.mom.amqp.marshalling.AMQP.Basic.BasicProperties;
/**
* The headers exchange type works as follows:
......@@ -104,7 +104,11 @@ public class HeadersExchange extends IExchange {
throws NotFoundException {
Set<String> boundQueues = bindings.get(arguments);
if (boundQueues != null) {
boundQueues.remove(queueName);
boolean removed = boundQueues.remove(queueName);
if (!removed) {
throw new NotFoundException("Unknown headers '" + arguments + "' between headers exchange '" + name
+ "' and queue '" + queueName + "'.");
}
if (boundQueues.size() == 0) {
bindings.remove(arguments);
}
......@@ -112,7 +116,8 @@ public class HeadersExchange extends IExchange {
saveExchange();
}
} else {
throw new NotFoundException("Unknown headers: " + arguments);
throw new NotFoundException("Unknown headers '" + arguments + "' between headers exchange '" + name
+ "' and queue '" + queueName + "'.");
}
}
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2008 - 2009 ScalAgent Distributed Technologies
* Copyright (C) 2008 - 2011 ScalAgent Distributed Technologies
* Copyright (C) 2008 - 2009 CNES
*
* This library is free software; you can redistribute it and/or
......@@ -114,7 +114,7 @@ public class Queue implements Serializable {
logger.log(BasicLevel.DEBUG, "Queue.receive()");
if (exclusive && (this.serverId != serverId || this.proxyId != proxyId)) {
throw new ResourceLockedException("Can't get a message on a non-owned exclusive queue");
throw new ResourceLockedException("Can't get message on the non-owned exclusive queue '" + name + "'.");
}
if (toDeliver.size() > 0) {
Message msg = toDeliver.pollFirst();
......@@ -139,13 +139,15 @@ public class Queue implements Serializable {
logger.log(BasicLevel.DEBUG, "Queue.consume()");
if (exclusive && (this.serverId != serverId || this.proxyId != proxyId)) {
throw new ResourceLockedException("Can't consume on a non-owned exclusive queue.");
throw new ResourceLockedException("Can't consume on the non-owned exclusive queue '" + name + "'.");
}
if (exclusiveConsumer && consumers.size() != 0) {
throw new AccessRefusedException("Exclusive consume request failed due to previous consumer.");
throw new AccessRefusedException("Exclusive consume request failed due to previous consumer on queue '"
+ name + "'.");
}
if (consumers.size() == 1 && consumers.values().iterator().next().exclusive) {
throw new AccessRefusedException("Consume request failed due to previous exclusive consumer.");
throw new AccessRefusedException("Consume request failed due to previous exclusive consumer on queue '"
+ name + "'.");
}
consumers.put(new SubscriptionKey(serverId, proxyId, channelId, consumerTag), new Subscription(proxy,
......@@ -178,7 +180,7 @@ public class Queue implements Serializable {
if (exclusive && serverId != -1 && this.serverId != serverId && this.proxyId != proxyId) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "Publishing to a non-owned exclusive queue.");
logger.log(BasicLevel.WARN, "Publishing to a non-owned exclusive queue '" + name + "'.");
// TODO ?
return;
}
......@@ -221,7 +223,7 @@ public class Queue implements Serializable {
consumers.put(entry.getKey(), entry.getValue());
} else {
if (immediate) {
throw new NoConsumersException("No consumer available for immediate publication.");
throw new NoConsumersException("No consumer available for immediate publication on queue '" + name + "'.");
} else {
toDeliver.add(msg);
if (durable) {
......@@ -241,7 +243,7 @@ public class Queue implements Serializable {
logger.log(BasicLevel.DEBUG, "Queue.cancel()");
if (exclusive && (this.serverId != serverId || this.proxyId != proxyId)) {
throw new ResourceLockedException("Can't cancel a consumer on a non-owned exclusive queue");
throw new ResourceLockedException("Can't cancel a consumer on the non-owned exclusive queue '" + name + "'.");
}
SubscriptionKey subKey = new SubscriptionKey(serverId, proxyId, channelNumber, consumerTag);
......@@ -268,7 +270,7 @@ public class Queue implements Serializable {
logger.log(BasicLevel.DEBUG, "Queue.purge() " + toDeliver.size());
if (exclusive && (this.serverId != serverId || this.proxyId != proxyId)) {
throw new ResourceLockedException("Can't clear a non-owned exclusive queue");
throw new ResourceLockedException("Can't clear the non-owned exclusive queue '" + name + "'.");
}
int msgCount = toDeliver.size();
if (durable && msgCount > 0)
......@@ -335,7 +337,7 @@ public class Queue implements Serializable {
logger.log(BasicLevel.DEBUG, "Queue.getInfo()");
if (exclusive && (this.serverId != serverId || this.proxyId != proxyId)) {
throw new ResourceLockedException("Can't declare a non-owned existing exclusive queue");
throw new ResourceLockedException("Can't redeclare the non-owned exclusive queue '" + name + "'.");
}
AMQP.Queue.DeclareOk queueInfo = new AMQP.Queue.DeclareOk(name, toDeliver.size(), consumers.size());
return queueInfo;
......@@ -396,7 +398,7 @@ public class Queue implements Serializable {
public synchronized void addBoundExchange(String exchange, short serverId, long proxyId)
throws TransactionException, ResourceLockedException {
if (exclusive && (this.serverId != serverId || this.proxyId != proxyId)) {
throw new ResourceLockedException("Can't bind a non-owned exclusive queue");
throw new ResourceLockedException("Can't bind the non-owned exclusive queue '" + name + "'.");
}
boundExchanges.add(exchange);
if (durable) {
......@@ -415,7 +417,7 @@ public class Queue implements Serializable {
public synchronized void removeBoundExchange(String exchangeName, short serverId, long proxyId)
throws ResourceLockedException {
if (exclusive && (this.serverId != serverId || this.proxyId != proxyId)) {
throw new ResourceLockedException("Can't unbind a non-owned exclusive queue");
throw new ResourceLockedException("Can't unbind the non-owned exclusive queue '" + name + "'.");
}
boundExchanges.remove(exchangeName);
if (durable) {
......@@ -429,7 +431,7 @@ public class Queue implements Serializable {
logger.log(BasicLevel.DEBUG, "Queue.deleteQueue(" + queueName + ')');
if (exclusive && (this.serverId != serverId || this.proxyId != proxyId)) {
throw new ResourceLockedException("Can't delete a non-owned exclusive queue");
throw new ResourceLockedException("Can't delete the non-owned exclusive queue '" + name + "'.");
}
if (durable) {
......
......@@ -68,7 +68,7 @@ public class StubLocal {
if (queue == null) {
if (passive) {
throw new NotFoundException("Passive declaration of an unknown queue.");
throw new NotFoundException("Passive declaration of an unknown queue: '" + queueName + "'.");
}
queue = new Queue(queueName, durable, autoDelete, exclusive, serverId, proxyId);
try {
......@@ -85,13 +85,16 @@ public class StubLocal {
} else {
if (!passive) {
if (durable != queue.isDurable()) {
throw new PreconditionFailedException("Queue durable property do not match existing queue one.");
throw new PreconditionFailedException("Queue durable property do not match existing queue '"
+ queueName + "'.");
}
if (exclusive != queue.isExclusive()) {
throw new ResourceLockedException("Queue exclusive property do not match existing queue one.");
throw new ResourceLockedException("Queue exclusive property do not match existing queue '"
+ queueName + "'.");
}
if (autoDelete != queue.isAutodelete()) {
throw new PreconditionFailedException("Queue autodelete property do not match existing queue one.");
throw new PreconditionFailedException("Queue autodelete property do not match existing queue one '"
+ queueName + "'.");
}
}
return queue.getInfo(serverId, proxyId);
......@@ -103,14 +106,14 @@ public class StubLocal {
TransactionException {
Queue queue = Naming.lookupQueue(queueName);
if (queue == null) {
throw new NotFoundException("Unknown queue for deletion: " + queueName);
throw new NotFoundException("Unknown queue for deletion: '" + queueName + "'.");
}
if (ifEmpty && queue.getMessageCount() > 0) {
throw new PreconditionFailedException("Queue not empty.");
throw new PreconditionFailedException("Deletion error: queue '" + queueName + "' is not empty.");
}
if (ifUnused && queue.getConsumerCount() > 0) {
throw new PreconditionFailedException("Queue not unused.");
throw new PreconditionFailedException("Deletion error: queue '" + queueName + "' is not unused.");
}
queue.deleteQueue(queueName, serverId, proxyId);
Naming.unbindQueue(queueName);
......@@ -138,12 +141,12 @@ public class StubLocal {
ResourceLockedException, TransactionException {
IExchange exchange = Naming.lookupExchange(exchangeName);
if (exchange == null) {
throw new NotFoundException("Binding to a non-existent exchange.");
throw new NotFoundException("Binding to a non-existent exchange: '" + exchangeName + "'.");
}
if (Naming.isLocal(queueName)) {
Queue queue = Naming.lookupQueue(queueName);
if (queue == null) {
throw new NotFoundException("Binding to a non-existent queue.");
throw new NotFoundException("Binding to a non-existent queue: '" + queueName + "'.");
}
queue.addBoundExchange(exchangeName, serverId, proxyId);
} else {
......@@ -163,13 +166,13 @@ public class StubLocal {
queue.removeBoundExchange(exchangeName, serverId, proxyId);
exchange.unbind(queueName, routingKey, arguments);
} else {
throw new NotFoundException("Queue not found.");
throw new NotFoundException("Queue not found for unbinding: '" + queueName + "'.");
}
} else {
StubAgentOut.asyncSend(new RemoveBoundExchange(queueName, exchangeName), Naming.resolveServerId(queueName));
}
} else {
throw new NotFoundException("Exchange not found.");
throw new NotFoundException("Exchange not found for unbinding: '" + exchangeName + "'.");
}
}
......@@ -177,7 +180,7 @@ public class StubLocal {
NotFoundException, ResourceLockedException, TransactionException {
Queue queue = Naming.lookupQueue(queueName);
if (queue == null) {
throw new NotFoundException("Purging non-existent queue");
throw new NotFoundException("Purging non-existent queue: '" + queueName + "'.");
}
return queue.clear(serverId, proxyId);
}
......@@ -192,7 +195,7 @@ public class StubLocal {
// letters, digits, hyphen, underscore, period, or colon.
Matcher m = exchangeNamePattern.matcher(name);
if (!m.matches()) {
throw new PreconditionFailedException("Exchange name contains an invalid character.");
throw new PreconditionFailedException("Exchange name contains an invalid character: '" + name + "'.");
}
}
......@@ -202,7 +205,7 @@ public class StubLocal {
IExchange exchange = Naming.lookupExchange(exchangeName);
if (exchange == null) {
if (passive) {
throw new NotFoundException("Passive declaration of an unknown exchange.");
throw new NotFoundException("Passive declaration of an unknown exchange: '" + exchangeName + "'.");
}
checkExchangeName(exchangeName);
if (type.equalsIgnoreCase(DirectExchange.TYPE)) {
......@@ -236,28 +239,29 @@ public class StubLocal {
// Check if exchange type corresponds with existing exchange
if (type.equalsIgnoreCase(DirectExchange.TYPE)) {
if (!(exchange instanceof DirectExchange)) {
throw new NotAllowedException("Exchange type do not match existing exchange.");
throw new NotAllowedException("Exchange type do not match existing exchange '" + exchangeName + "'.");
}
} else if (type.equalsIgnoreCase(TopicExchange.TYPE)) {
if (!(exchange instanceof TopicExchange)) {
throw new NotAllowedException("Exchange type do not match existing exchange.");
throw new NotAllowedException("Exchange type do not match existing exchange '" + exchangeName + "'.");
}
} else if (type.equalsIgnoreCase(FanoutExchange.TYPE)) {
if (!(exchange instanceof FanoutExchange)) {
throw new NotAllowedException("Exchange type do not match existing exchange.");
throw new NotAllowedException("Exchange type do not match existing exchange '" + exchangeName + "'.");
}
} else if (type.equalsIgnoreCase(HeadersExchange.TYPE)) {
if (!(exchange instanceof HeadersExchange)) {
throw new NotAllowedException("Exchange type do not match existing exchange.");
throw new NotAllowedException("Exchange type do not match existing exchange '" + exchangeName + "'.");
}
} else {
if (!exchange.getClass().getName().equals(type)) {
throw new NotAllowedException("Exchange type do not match existing exchange.");
throw new NotAllowedException("Exchange type do not match existing exchange '" + exchangeName + "'.");
}
}
if (durable != exchange.isDurable()) {
throw new PreconditionFailedException("Exchange durable property do not match existing exchange one.");
throw new PreconditionFailedException("Exchange durable property do not match existing exchange '"
+ exchangeName + "'.");
}
}
}
......@@ -266,10 +270,10 @@ public class StubLocal {
PreconditionFailedException, AccessRefusedException {
IExchange exchange = Naming.lookupExchange(exchangeName);
if (exchange == null) {
throw new NotFoundException("Exchange not found for deletion.");
throw new NotFoundException("Exchange not found for deletion: '" + exchangeName + "'.");
}
if (ifUnused && !exchange.isUnused()) {
throw new PreconditionFailedException("Exchange not unused.");
throw new PreconditionFailedException("Deletion error: Exchange '" + exchangeName + "' is not unused.");
}
if (exchange.durable) {
......@@ -295,7 +299,7 @@ public class StubLocal {
throws NotFoundException, ResourceLockedException, TransactionException {
Queue queue = Naming.lookupQueue(queueName);
if (queue == null) {
throw new NotFoundException("Can't get message on an unknown queue.");
throw new NotFoundException("Can't get message on an unknown queue: '" + queueName + "'.");
}
Message msg = queue.receive(noAck, serverId, proxyId);
return msg;
......@@ -306,7 +310,7 @@ public class StubLocal {
short serverId, long proxyId) throws NotFoundException, ResourceLockedException, AccessRefusedException {
Queue queue = Naming.lookupQueue(queueName);
if (queue == null) {
throw new NotFoundException("Consuming from non-existent queue.");
throw new NotFoundException("Consuming from non-existent queue: '" + queueName + "'.");
}
queue.consume(deliveryListener, channelNumber, consumerTag, exclusive, noAck, noLocal, serverId, proxyId);
}
......@@ -315,7 +319,8 @@ public class StubLocal {
throws NotFoundException, NoConsumersException, TransactionException {
IExchange exchange = Naming.lookupExchange(publishRequest.getPublish().exchange);
if (exchange == null) {
throw new NotFoundException("Exchange " + publishRequest.getPublish().exchange + " not found.");
throw new NotFoundException("Can't publish on an unknwon exchange: '"
+ publishRequest.getPublish().exchange + "'.");
}
exchange.publish(publishRequest.getPublish().routingKey, publishRequest.getPublish().mandatory,
publishRequest.getPublish().immediate, publishRequest.getHeader(), publishRequest.getBody(),
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2008 - 2009 ScalAgent Distributed Technologies
* Copyright (C) 2008 - 2011 ScalAgent Distributed Technologies
* Copyright (C) 2008 - 2009 CNES
*
* This library is free software; you can redistribute it and/or
......@@ -114,7 +114,11 @@ public class TopicExchange extends IExchange {
KeyAndPattern keyAndPattern = new KeyAndPattern(routingKey, routingPattern);
Set<String> boundQueues = bindings.get(keyAndPattern);
if (boundQueues != null) {
boundQueues.remove(queueName);
boolean removed = boundQueues.remove(queueName);
if (!removed) {
throw new NotFoundException("Unknown binding '" + routingKey + "' between topic exchange '" + name
+ "' and queue '" + queueName + "'.");
}
if (boundQueues.size() == 0) {
bindings.remove(keyAndPattern);
}
......@@ -122,7 +126,8 @@ public class TopicExchange extends IExchange {
saveExchange();
}
} else {
throw new NotFoundException("Unknown topic: " + routingKey);
throw new NotFoundException("Unknown binding '" + routingKey + "' between topic exchange '" + name
+ "' and queue '" + queueName + "'.");
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment