Commit 9364166f authored by afreyssin's avatar afreyssin
Browse files

Bug fix about receiveNoWait and automaticRequest set to true.

parent efcc139e
......@@ -280,86 +280,93 @@ public class JMSBridgeModule implements javax.jms.ExceptionListener,
logger.log(BasicLevel.DEBUG, "receiveNoWait()");
Message momMessage = null;
synchronized (lock) {
try {
setConsumer();
consumerCnx.start();
Xid xid = null;
if (! automaticRequest) {
// Be careful, with automaticRequest set to true this code causes a dead-lock.
// In this case if there is no available message return null.
synchronized (lock) {
try {
if (isXA) {
xid = new XidImpl(new byte[0], 1, (agentId.toString() + System.currentTimeMillis()).getBytes());
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "receiveNoWait: XA xid=" + xid);
setConsumer();
consumerCnx.start();
Xid xid = null;
try {
if (isXA) {
xid = new XidImpl(new byte[0], 1, (agentId.toString() + System.currentTimeMillis()).getBytes());
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "receiveNoWait: XA xid=" + xid);
try {
consumerRes.start(xid, XAResource.TMNOFLAGS);
} catch (XAException e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "Exception:: XA can't start resource : " + consumerRes, e);
try {
consumerRes.start(xid, XAResource.TMNOFLAGS);
} catch (XAException e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "Exception:: XA can't start resource : " + consumerRes, e);
}
}
}
javax.jms.Message msg = consumer.receiveNoWait();
if (msg != null) {
org.objectweb.joram.client.jms.Message clientMessage =
org.objectweb.joram.client.jms.Message.convertJMSMessage(msg);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "receiveNoWait: clientMessage=" + clientMessage);
momMessage = clientMessage.getMomMsg();
} else {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "receiveNoWait: no message available");
}
if (isXA) {
try {
consumerRes.end(xid, XAResource.TMSUCCESS);
javax.jms.Message msg = consumer.receiveNoWait();
if (msg != null) {
org.objectweb.joram.client.jms.Message clientMessage =
org.objectweb.joram.client.jms.Message.convertJMSMessage(msg);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "receiveNoWait: XA end " + consumerRes);
logger.log(BasicLevel.DEBUG, "receiveNoWait: clientMessage=" + clientMessage);
} catch (XAException e) {
throw new JMSException("XA resource end(...) failed: " + consumerRes + " :: " + e.getMessage());
momMessage = clientMessage.getMomMsg();
} else {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "receiveNoWait: no message available");
}
try {
int ret = consumerRes.prepare(xid);
if (ret == XAResource.XA_OK)
consumerRes.commit(xid, false);
if (isXA) {
try {
consumerRes.end(xid, XAResource.TMSUCCESS);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "receiveNoWait: XA end " + consumerRes);
} catch (XAException e) {
throw new JMSException("XA resource end(...) failed: " + consumerRes + " :: " + e.getMessage());
}
try {
int ret = consumerRes.prepare(xid);
if (ret == XAResource.XA_OK)
consumerRes.commit(xid, false);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "receiveNoWait: XA commit " + consumerRes);
} catch (XAException e) {
try {
consumerRes.rollback(xid);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "receiveNoWait: XA rollback" + consumerRes);
} catch (XAException e1) { }
throw new JMSException("XA resource rollback(" + xid + ") failed: " +
consumerRes + " :: " + e.getMessage());
}
} else {
consumerSession.commit();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "receiveNoWait: XA commit " + consumerRes);
} catch (XAException e) {
logger.log(BasicLevel.DEBUG, "receiveNoWait: commit " + consumerSession);
}
} catch (MessageFormatException exc) {
// Conversion error: denying the message.
if (isXA) {
try {
consumerRes.rollback(xid);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "receiveNoWait: XA rollback" + consumerRes);
logger.log(BasicLevel.DEBUG, "receiveNoWait: XA rollback " + consumerRes);
} catch (XAException e1) { }
throw new JMSException("XA resource rollback(" + xid + ") failed: " +
consumerRes + " :: " + e.getMessage());
}
} else {
consumerSession.commit();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "receiveNoWait: commit " + consumerSession);
}
} catch (MessageFormatException exc) {
// Conversion error: denying the message.
if (isXA) {
try {
consumerRes.rollback(xid);
} else {
consumerSession.rollback();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "receiveNoWait: XA rollback " + consumerRes);
} catch (XAException e1) { }
} else {
consumerSession.rollback();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "receiveNoWait: rollback " + consumerSession);
logger.log(BasicLevel.DEBUG, "receiveNoWait: rollback " + consumerSession);
}
}
} catch (JMSException commitExc) {
// Connection start, or session commit/rollback failed:
// setting the message to null.
momMessage = null;
}
} catch (JMSException commitExc) {
// Connection start, or session commit/rollback failed:
// setting the message to null.
momMessage = null;
}
}
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "receiveNoWait: momMessage=" + momMessage);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment