Commit e7b2f581 authored by afreyssin's avatar afreyssin

Bug fix (JORAM-281): if the consumer is aborted during a ReceiveRequest we...

Bug fix (JORAM-281): if the consumer is aborted during a ReceiveRequest we shall abort the request in the related queue.
parent b9c6f935
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2001 - 2015 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2017 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 Dyade
*
* This library is free software; you can redistribute it and/or
......@@ -42,6 +42,8 @@ import org.objectweb.joram.shared.client.AbstractJmsReply;
import org.objectweb.joram.shared.client.AbstractJmsRequest;
import org.objectweb.joram.shared.client.CommitRequest;
import org.objectweb.joram.shared.client.ConsumerMessages;
import org.objectweb.joram.shared.client.ConsumerReceiveRequest;
import org.objectweb.joram.shared.client.ConsumerUnsetListRequest;
import org.objectweb.joram.shared.client.MomExceptionReply;
import org.objectweb.joram.shared.client.PingRequest;
import org.objectweb.joram.shared.client.ProducerMessages;
......@@ -399,8 +401,7 @@ public class RequestMultiplexer {
logger.log(BasicLevel.DEBUG, "RequestMultiplexer.doAbortRequest(" + requestId + ')');
if (status == Status.CLOSE) return null;
return (ReplyListener)requestsTable.remove(
new Integer(requestId));
return (ReplyListener)requestsTable.remove(new Integer(requestId));
}
/**
......@@ -455,6 +456,7 @@ public class RequestMultiplexer {
// Else nothing to do.
}
// Denies received messages.
public void deny(ConsumerMessages messages) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RequestMultiplexer.deny(" + messages + ')');
......@@ -471,7 +473,26 @@ public class RequestMultiplexer {
sendRequest(deny);
} catch (JMSException exc) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "", exc);
logger.log(BasicLevel.DEBUG, "Connection is closed", exc);
// Connection failure
// Nothing to do
}
}
// Denies aborted request.
public void denyRequest(ConsumerReceiveRequest request) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RequestMultiplexer.denyRequest(" + request.getRequestId() + ')');
ConsumerUnsetListRequest unsetLR = new ConsumerUnsetListRequest(request.getQueueMode());
unsetLR.setTarget(request.getTarget());
unsetLR.setCancelledRequestId(request.getRequestId());
try {
sendRequest(unsetLR);
} catch (JMSException exc) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Connection is closed", exc);
// Connection failure
// Nothing to do
}
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2001 - 2013 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2017 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 Dyade
*
* This library is free software; you can redistribute it and/or
......@@ -29,6 +29,7 @@ import javax.jms.JMSSecurityException;
import org.objectweb.joram.shared.client.AbstractJmsReply;
import org.objectweb.joram.shared.client.AbstractJmsRequest;
import org.objectweb.joram.shared.client.ConsumerMessages;
import org.objectweb.joram.shared.client.ConsumerReceiveRequest;
import org.objectweb.joram.shared.client.MomExceptionReply;
import org.objectweb.joram.shared.client.ProducerMessages;
import org.objectweb.util.monolog.api.BasicLevel;
......@@ -212,7 +213,7 @@ public class Requestor implements ReplyListener, ErrorListener {
mtpx.abortRequest(requestId);
return null;
} else if (status == Status.INIT) {
// Means that the wait ended with a notify from start method .
// Means that the wait ended with a notify from start method.
// Abort the request.
mtpx.abortRequest(requestId);
// re-send a synchronous request
......@@ -221,7 +222,17 @@ public class Requestor implements ReplyListener, ErrorListener {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> deny " + reply);
if (reply instanceof ConsumerMessages) {
// The consumer is closed, denies the received messages
mtpx.deny((ConsumerMessages)reply);
} else if ((reply == null) &&
(request instanceof ConsumerReceiveRequest)) {
// The request is aborted, we shall try to deny the receive request (JORAM-281).
ConsumerReceiveRequest crr = (ConsumerReceiveRequest) request;
if ((crr.getTimeToLive() <= 0) && (crr.getQueueMode())) {
// If the connection is alive we should try to deny the request
logger.log(BasicLevel.DEBUG, " -> deny request " + request.getRequestId());
mtpx.denyRequest(crr);
}
}
return null;
} else if (status == Status.DONE) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment