Commit 1a46f4d9 authored by Nicolas Tachker's avatar Nicolas Tachker
Browse files

Move interceptors before preProcess call in doClientMessages and addClientMessage.

If interceptors, set JMSDestinationName property on the message.
parent 90535b76
......@@ -865,7 +865,44 @@ public class Queue extends Destination implements QueueMBean, BagSerializer {
*/
protected void doClientMessages(AgentId from, ClientMessages not) {
receiving = true;
ClientMessages clientMsgs = preProcess(from, not);
ClientMessages cm = null;
// interceptors
if (interceptorsAvailable()) {
// new client message
cm = new ClientMessages(not.getClientContext(), not.getRequestId());
cm.setAsyncSend(not.getAsyncSend());
cm.setDMQId(not.getDMQId());
for (Iterator msgs = not.getMessages().iterator(); msgs.hasNext();) {
org.objectweb.joram.shared.messages.Message message = (org.objectweb.joram.shared.messages.Message) msgs.next();
// set the destination name
message.setProperty("JMSDestinationName", getName());
// interceptors process
org.objectweb.joram.shared.messages.Message m = processInterceptors(message);
if (m == null) {
// send message to the DMQ
DMQManager dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
dmqManager.addDeadMessage(message, MessageErrorConstants.INTERCEPTORS);
dmqManager.sendToDMQ();
new Message(message).releaseFullMessage();
} else {
// add message to the client message
cm.addMessage(m);
}
}
// test client message size.
if (cm.getMessageCount() == 0) {
receiving = false;
return;
}
} else {
cm = not;
}
// pre process the client message
ClientMessages clientMsgs = preProcess(from, cm);
if (clientMsgs != null) {
Message msg;
......@@ -1016,32 +1053,13 @@ public class Queue extends Destination implements QueueMBean, BagSerializer {
* @param msg The message to store.
*/
protected final synchronized void storeMessage(Message msg) {
Message message = msg;
if (interceptorsAvailable()) {
// interceptors process
org.objectweb.joram.shared.messages.Message m = processInterceptors(msg.getFullMessage());
if (m == null) {
// send message to the DMQ
DMQManager dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
dmqManager.addDeadMessage(msg.getFullMessage(), MessageErrorConstants.INTERCEPTORS);
dmqManager.sendToDMQ();
msg.releaseFullMessage();
return;
} else {
message = new org.objectweb.joram.mom.messages.Message(m);
}
}
if (addMessage(message)) {
if (addMessage(msg)) {
// Persisting the message.
setMsgTxName(message);
message.save();
message.releaseFullMessage();
if (interceptorsAvailable())
msg.releaseFullMessage();
setMsgTxName(msg);
msg.save();
msg.releaseFullMessage();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Message " + message.getIdentifier() + " stored.");
logger.log(BasicLevel.DEBUG, "Message " + msg.getIdentifier() + " stored.");
}
}
......@@ -1425,6 +1443,28 @@ public class Queue extends Destination implements QueueMBean, BagSerializer {
for (Iterator msgs = clientMsgs.getMessages().iterator(); msgs.hasNext();) {
msg = new Message((org.objectweb.joram.shared.messages.Message) msgs.next());
msg.order = arrivalsCounter++;
if (interceptorsAvailable()) {
// get the shared message
org.objectweb.joram.shared.messages.Message message = msg.getFullMessage();
// set the destination name
message.setProperty("JMSDestinationName", getName());
// interceptors process
org.objectweb.joram.shared.messages.Message m = processInterceptors(message);
if (m == null) {
// send message to the DMQ
DMQManager dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
dmqManager.addDeadMessage(msg.getFullMessage(), MessageErrorConstants.INTERCEPTORS);
dmqManager.sendToDMQ();
msg.releaseFullMessage();
continue;
} else {
msg = new org.objectweb.joram.mom.messages.Message(m);
}
}
// store message
storeMessage(msg);
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment