Commit f3b08dae authored by Andre Freyssinet's avatar Andre Freyssinet
Browse files

Minor changes.

parent d37fd3c7
/* /*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging * JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2001 - 2017 ScalAgent Distributed Technologies * Copyright (C) 2001 - 2021 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 Dyade * Copyright (C) 1996 - 2000 Dyade
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public * modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either * License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version. * version 2.1 of the License, or any later version.
* *
* This library is distributed in the hope that it will be useful, * This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of * but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details. * Lesser General Public License for more details.
* *
* You should have received a copy of the GNU Lesser General Public * You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software * License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA. * USA.
* *
* Initial developer(s): ScalAgent Distributed Technologies * Initial developer(s): ScalAgent Distributed Technologies
*/ */
package org.objectweb.joram.client.jms.connection; package org.objectweb.joram.client.jms.connection;
import javax.jms.InvalidDestinationException; import javax.jms.InvalidDestinationException;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.JMSSecurityException; import javax.jms.JMSSecurityException;
import org.objectweb.joram.shared.client.AbstractJmsReply; import org.objectweb.joram.shared.client.AbstractJmsReply;
import org.objectweb.joram.shared.client.AbstractJmsRequest; import org.objectweb.joram.shared.client.AbstractJmsRequest;
import org.objectweb.joram.shared.client.ConsumerMessages; import org.objectweb.joram.shared.client.ConsumerMessages;
import org.objectweb.joram.shared.client.ConsumerReceiveRequest; import org.objectweb.joram.shared.client.ConsumerReceiveRequest;
import org.objectweb.joram.shared.client.MomExceptionReply; import org.objectweb.joram.shared.client.MomExceptionReply;
import org.objectweb.joram.shared.client.ProducerMessages; import org.objectweb.joram.shared.client.ProducerMessages;
import org.objectweb.util.monolog.api.BasicLevel; import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger; import org.objectweb.util.monolog.api.Logger;
import fr.dyade.aaa.common.Debug; import fr.dyade.aaa.common.Debug;
public class Requestor implements ReplyListener, ErrorListener { public class Requestor implements ReplyListener, ErrorListener {
private static class Status { private static class Status {
/** /**
* The requestor is free: it can be called by a client thread. * The requestor is free: it can be called by a client thread.
*/ */
public static final int INIT = 0; public static final int INIT = 0;
/** /**
* The requestor is busy: the client thread is waiting. * The requestor is busy: the client thread is waiting.
* Two threads can make a call: * Two threads can make a call:
* 1- the demultiplexer thread can call replyReceived and replyAborted. * 1- the demultiplexer thread can call replyReceived and replyAborted.
* 2- another client thread can abort the request. * 2- another client thread can abort the request.
*/ */
public static final int RUN = 1; public static final int RUN = 1;
/** /**
* The requestor is either completed (by the demultiplxer thread) or * The requestor is either completed (by the demultiplxer thread) or
* aborted (by another client thread or a timeout). * aborted (by another client thread or a timeout).
* This state is transitional. It enables the requesting client thread to * This state is transitional. It enables the requesting client thread to
* finalize its request. * finalize its request.
*/ */
public static final int DONE = 2; public static final int DONE = 2;
public static final int CLOSE = 3; public static final int CLOSE = 3;
public static final int STOP = 4; public static final int STOP = 4;
private static final String[] names = { private static final String[] names = {
"INIT", "RUN", "DONE", "CLOSE", "STOP"}; "INIT", "RUN", "DONE", "CLOSE", "STOP"};
public static String toString(int status) { public static String toString(int status) {
return names[status]; return names[status];
} }
} }
private static Logger logger = Debug.getLogger(Requestor.class.getName()); private static Logger logger = Debug.getLogger(Requestor.class.getName());
public static final String DEFAULT_REQUEST_TIMEOUT_PROPERTY = "org.objectweb.joram.client.jms.connection.Requestor.defaultRequestTimeout"; public static final String DEFAULT_REQUEST_TIMEOUT_PROPERTY = "org.objectweb.joram.client.jms.connection.Requestor.defaultRequestTimeout";
public static final long DEFAULT_REQUEST_TIMEOUT_VALUE = 0; public static final long DEFAULT_REQUEST_TIMEOUT_VALUE = 0;
private long defaultRequestTimeout; private long defaultRequestTimeout;
private RequestMultiplexer mtpx; private RequestMultiplexer mtpx;
private Object reply; private Object reply;
private int requestId; private int requestId;
private int status; private int status;
public Requestor(RequestMultiplexer mtpx) { public Requestor(RequestMultiplexer mtpx) {
this.mtpx = mtpx; this.mtpx = mtpx;
init(); init();
} }
private void setStatus(int status) { private void setStatus(int status) {
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Requestor.setStatus(" + Status.toString(status) + ')'); logger.log(BasicLevel.DEBUG, "Requestor.setStatus(" + Status.toString(status) + ')');
this.status = status; this.status = status;
} }
public final synchronized int getRequestId() { public final synchronized int getRequestId() {
return requestId; return requestId;
} }
private void init() { private void init() {
// set the default request timeout // set the default request timeout
defaultRequestTimeout = Long.getLong(DEFAULT_REQUEST_TIMEOUT_PROPERTY, defaultRequestTimeout = Long.getLong(DEFAULT_REQUEST_TIMEOUT_PROPERTY,
DEFAULT_REQUEST_TIMEOUT_VALUE).longValue(); DEFAULT_REQUEST_TIMEOUT_VALUE).longValue();
if (status == Status.DONE) { if (status == Status.DONE) {
setStatus(Status.INIT); setStatus(Status.INIT);
reply = null; reply = null;
requestId = -1; requestId = -1;
} }
// Else the requestor can be closed. // Else the requestor can be closed.
// Nothing to do. // Nothing to do.
} }
public synchronized AbstractJmsReply request(AbstractJmsRequest request) throws JMSException { public synchronized AbstractJmsReply request(AbstractJmsRequest request) throws JMSException {
return request(request, defaultRequestTimeout, null); return request(request, defaultRequestTimeout, null);
} }
public synchronized AbstractJmsReply request(AbstractJmsRequest request, CompletionListener completionListener) throws JMSException { public synchronized AbstractJmsReply request(AbstractJmsRequest request, CompletionListener completionListener) throws JMSException {
return request(request, defaultRequestTimeout, completionListener); return request(request, defaultRequestTimeout, completionListener);
} }
/** /**
* Method sending a synchronous request to the server and waiting for an * Method sending a synchronous request to the server and waiting for an
* answer. * answer.
* *
* @exception IllegalStateException If the connection is closed or broken, * @exception IllegalStateException If the connection is closed or broken,
* if the server state does not allow to * if the server state does not allow to
* process the request. * process the request.
* @exception JMSSecurityException When sending a request to a destination * @exception JMSSecurityException When sending a request to a destination
* not accessible because of security. * not accessible because of security.
* @exception InvalidDestinationException When sending a request to a * @exception InvalidDestinationException When sending a request to a
* destination that no longer exists. * destination that no longer exists.
* @exception JMSException If the request failed for any other reason. * @exception JMSException If the request failed for any other reason.
*/ */
public synchronized AbstractJmsReply request(AbstractJmsRequest request, long timeout, CompletionListener completionListener) throws JMSException { public synchronized AbstractJmsReply request(AbstractJmsRequest request, long timeout, CompletionListener completionListener) throws JMSException {
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Requestor.request(" + request + ',' + timeout + ',' + completionListener + "), status = " + Status.toString(status)); logger.log(BasicLevel.DEBUG, "Requestor.request(" + request + ',' + timeout + ',' + completionListener + "), status = " + Status.toString(status));
long sleep = timeout; long sleep = timeout;
if (status != Status.INIT) { if (status != Status.INIT) {
if (status == Status.CLOSE) return null; if (status == Status.CLOSE) return null;
// AF: It seems to be bad correction to the lack of reinitialisation of this // AF: It seems to be bad correction to the lack of reinitialisation of this
// requestor from a previous use with a completion listener. // requestor from a previous use with a completion listener.
if (completionListener == null && status != Status.STOP) if (completionListener == null && status != Status.STOP)
throw new javax.jms.IllegalStateException("Requestor already used"); throw new javax.jms.IllegalStateException("Requestor already used");
} }
if (status == Status.STOP) { if (status == Status.STOP) {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
try { try {
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> request STOP wait"); logger.log(BasicLevel.DEBUG, " -> request STOP wait");
wait(sleep); wait(sleep);
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> request STOP awake"); logger.log(BasicLevel.DEBUG, " -> request STOP awake");
} catch (InterruptedException exc) { } catch (InterruptedException exc) {
if (logger.isLoggable(BasicLevel.WARN)) if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, exc); logger.log(BasicLevel.WARN, exc);
return null; return null;
} }
if (sleep > 0) { if (sleep > 0) {
time = System.currentTimeMillis() - time; time = System.currentTimeMillis() - time;
if (time >= sleep) if (time >= sleep)
return null; return null;
else else
sleep = sleep - time; sleep = sleep - time;
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "new timeout = " + sleep); logger.log(BasicLevel.DEBUG, "new timeout = " + sleep);
} }
} }
mtpx.sendRequest(request, this, completionListener); mtpx.sendRequest(request, this, completionListener);
setStatus(Status.RUN); setStatus(Status.RUN);
requestId = request.getRequestId(); requestId = request.getRequestId();
if (completionListener != null && request instanceof ProducerMessages) { if (completionListener != null && request instanceof ProducerMessages) {
//TODO: use request.getClassId() == AbstractJmsMessage.PRODUCER_MESSAGES //TODO: use request.getClassId() == AbstractJmsMessage.PRODUCER_MESSAGES
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> request #" + requestId + ", completionListener = " + completionListener); logger.log(BasicLevel.DEBUG, " -> request #" + requestId + ", completionListener = " + completionListener);
init(); // The requestor is no longer used. init(); // The requestor is no longer used.
return null; return null;
} }
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> request #" + requestId + " wait"); logger.log(BasicLevel.DEBUG, " -> request #" + requestId + " wait");
try { try {
wait(sleep); wait(sleep);
} catch (InterruptedException exc) { } catch (InterruptedException exc) {
if (logger.isLoggable(BasicLevel.WARN)) if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, exc); logger.log(BasicLevel.WARN, exc);
setStatus(Status.DONE); setStatus(Status.DONE);
} }
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> request #" + requestId + " awake"); logger.log(BasicLevel.DEBUG, " -> request #" + requestId + " awake");
try { try {
if (status == Status.RUN) { if (status == Status.RUN) {
// Means that the wait ended with a timeout. // Means that the wait ended with a timeout.
// Abort the request. // Abort the request.
mtpx.abortRequest(requestId); mtpx.abortRequest(requestId);
return null; return null;
} else if (status == Status.INIT) { } else if (status == Status.INIT) {
// Means that the wait ended with a notify from start method. // Means that the wait ended with a notify from start method.
// Abort the request. // Abort the request.
mtpx.abortRequest(requestId); mtpx.abortRequest(requestId);
// re-send a synchronous request // re-send a synchronous request
return request(request, sleep, completionListener); return request(request, sleep, completionListener);
} else if (status == Status.CLOSE) { } else if (status == Status.CLOSE) {
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> deny " + reply); logger.log(BasicLevel.DEBUG, " -> deny " + reply);
if (reply instanceof ConsumerMessages) { if (reply instanceof ConsumerMessages) {
// The consumer is closed, denies the received messages // The consumer is closed, denies the received messages
mtpx.deny((ConsumerMessages)reply); mtpx.deny((ConsumerMessages)reply);
} else if ((reply == null) && } else if ((reply == null) &&
(request instanceof ConsumerReceiveRequest)) { (request instanceof ConsumerReceiveRequest)) {
// The request is aborted, we shall try to deny the receive request (JORAM-281). // The request is aborted, we shall try to deny the receive request (JORAM-281).
ConsumerReceiveRequest crr = (ConsumerReceiveRequest) request; ConsumerReceiveRequest crr = (ConsumerReceiveRequest) request;
if ((crr.getTimeToLive() <= 0) && (crr.getQueueMode())) { if ((crr.getTimeToLive() <= 0) && (crr.getQueueMode())) {
// If the connection is alive we should try to deny the request // If the connection is alive we should try to deny the request
logger.log(BasicLevel.DEBUG, " -> deny request " + request.getRequestId()); logger.log(BasicLevel.DEBUG, " -> deny request " + request.getRequestId());
mtpx.denyRequest(crr); mtpx.denyRequest(crr);
} }
} }
return null; return null;
} else if (status == Status.DONE) { } else if (status == Status.DONE) {
// Status // Status
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> request #" + requestId + " done : " + reply); logger.log(BasicLevel.DEBUG, " -> request #" + requestId + " done : " + reply);
if (reply instanceof MomExceptionReply) { if (reply instanceof MomExceptionReply) {
JMSException jmsExc = RequestMultiplexer.buildJmsException((MomExceptionReply) reply); JMSException jmsExc = RequestMultiplexer.buildJmsException((MomExceptionReply) reply);
throw jmsExc; throw jmsExc;
} else if (reply instanceof AbstractJmsReply) { } else if (reply instanceof AbstractJmsReply) {
return (AbstractJmsReply) reply; return (AbstractJmsReply) reply;
} else { } else {
// Reply aborted or thread interrupted. // Reply aborted or thread interrupted.
return null; return null;
} }
} else throw new Error(); } else throw new Error();
} finally { } finally {
init(); init();
} }
} }
public synchronized boolean replyReceived(AbstractJmsReply reply) throws AbortedRequestException { public synchronized boolean replyReceived(AbstractJmsReply reply) throws AbortedRequestException {
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Requestor.replyReceived(" + reply + ')'); logger.log(BasicLevel.DEBUG, "Requestor.replyReceived(" + reply + ')');
if (status == Status.RUN && reply.getCorrelationId() == requestId) { if (status == Status.RUN && reply.getCorrelationId() == requestId) {
this.reply = reply; this.reply = reply;
setStatus(Status.DONE); setStatus(Status.DONE);
notify(); notify();
return true; return true;
} }
// The request has been aborted. // The request has been aborted.
throw new AbortedRequestException(); throw new AbortedRequestException();
} }
public synchronized void errorReceived(int replyId, MomExceptionReply exc) { public synchronized void errorReceived(int replyId, MomExceptionReply exc) {
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Requestor.errorReceived(" + replyId + ',' + exc + ')'); logger.log(BasicLevel.DEBUG, "Requestor.errorReceived(" + replyId + ',' + exc + ')');
if (status == Status.RUN && if (status == Status.RUN &&
replyId == requestId) { replyId == requestId) {
reply = exc; reply = exc;
setStatus(Status.DONE); setStatus(Status.DONE);
notify(); notify();
} }
// Else The request has been aborted. // Else The request has been aborted.
// Do nothing // Do nothing
} }
public synchronized void replyAborted(int replyId) { public synchronized void replyAborted(int replyId) {
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Requestor.replyAborted(" + replyId + ')'); logger.log(BasicLevel.DEBUG, "Requestor.replyAborted(" + replyId + ')');
if (status == Status.RUN && if (status == Status.RUN &&
replyId == requestId) { replyId == requestId) {
reply = null; reply = null;
setStatus(Status.DONE); setStatus(Status.DONE);
notify(); notify();
} }
// Else the request has been aborted. // Else the request has been aborted.
// Do nothing // Do nothing
} }
public synchronized void abortRequest() { public synchronized void abortRequest() {
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Requestor[" + Status.toString(status) + ',' + requestId logger.log(BasicLevel.DEBUG, "Requestor[" + Status.toString(status) + ',' + requestId + "].abortRequest()");
+ "].abortRequest()");
if (status == Status.RUN && requestId > 0) { if (status == Status.RUN && requestId > 0) {
mtpx.abortRequest(requestId); mtpx.abortRequest(requestId);
setStatus(Status.DONE); setStatus(Status.DONE);
if (logger.isLoggable(BasicLevel.DEBUG))