Commit 867e6fcb authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

New jms bridge destination using the new destination extension mechanism.

parent dfcf4008
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2010 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.mom.dest.jms;
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageFormatException;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.objectweb.joram.client.jms.XidImpl;
import org.objectweb.joram.mom.dest.AcquisitionDaemon;
import org.objectweb.joram.mom.dest.ReliableTransmitter;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import fr.dyade.aaa.common.Debug;
public class JMSAcquisition extends JMSModule implements AcquisitionDaemon, MessageListener {
private static final Logger logger = Debug.getLogger(JMSDistribution.class.getName());
/** Consumer object. */
protected MessageConsumer consumer;
/** Selector for filtering messages. */
protected String selector;
private ReliableTransmitter transmitter;
public void start(Properties properties, ReliableTransmitter transmitter) {
super.init(properties);
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "<init>(" + properties + ')');
}
this.transmitter = transmitter;
selector = properties.getProperty("selector");
}
public void stop() {
super.close();
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "close()");
}
unsetMessageListener();
}
/**
* Sets a message listener on the foreign JMS destination.
*
* @exception javax.jms.IllegalStateException
* If the module state does not allow to set a listener.
*/
protected void connectionDone() {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "setMessageListener()");
}
try {
try {
if (dest instanceof Queue) {
consumer = session.createConsumer(dest, selector);
} else {
consumer = session.createDurableSubscriber((Topic) dest, "JMSAcquisition", selector, false);
}
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "setConsumer: consumer=" + consumer);
}
} catch (JMSException exc) {
throw exc;
} catch (Exception exc) {
throw new JMSException("JMS resources do not allow to create consumer: " + exc);
}
consumer.setMessageListener(this);
cnx.start();
} catch (JMSException exc) {
}
}
/**
* Unsets the set message listener on the foreign JMS destination.
*/
private void unsetMessageListener() {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "unsetMessageListener()");
}
try {
cnx.stop();
consumer.setMessageListener(null);
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "unsetConsumer()");
}
try {
if (dest instanceof Topic) {
session.unsubscribe("JMSAcquisition");
}
consumer.close();
} catch (Exception exc) {
}
consumer = null;
} catch (JMSException exc) {
}
}
/**
* Implements the {@link MessageListener} interface for processing the
* asynchronous deliveries coming from the foreign JMS server.
*/
public void onMessage(javax.jms.Message jmsMessage) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "onMessage(" + jmsMessage + ')');
}
try {
Xid xid = null;
synchronized (lock) {
try {
if (isXA) {
xid = new XidImpl(new byte[0], 1, Long.toString(System.currentTimeMillis()).getBytes());
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "onMessage: xid=" + xid);
}
try {
xaRes.start(xid, XAResource.TMNOFLAGS);
} catch (XAException e) {
if (logger.isLoggable(BasicLevel.WARN)) {
logger.log(BasicLevel.WARN, "Exception onMessage:: XA can't start resource : " + xaRes,
e);
}
}
}
org.objectweb.joram.client.jms.Message clientMessage = org.objectweb.joram.client.jms.Message
.convertJMSMessage(jmsMessage);
Message momMessage = clientMessage.getMomMsg();
transmitter.transmit(momMessage, jmsMessage.getJMSMessageID());
if (isXA) {
try {
xaRes.end(xid, XAResource.TMSUCCESS);
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "onMessage: XA end " + xaRes);
}
} catch (XAException e) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Exception onMessage:: XA resource end(...) failed: "
+ xaRes, e);
}
throw new JMSException("onMessage: XA resource end(...) failed: " + xaRes + " :: "
+ e.getMessage());
}
try {
int ret = xaRes.prepare(xid);
if (ret == XAResource.XA_OK) {
xaRes.commit(xid, false);
}
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "onMessage: XA commit " + xaRes);
}
} catch (XAException e) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Exception onMessage:: XA resource rollback(" + xid + ")", e);
}
try {
xaRes.rollback(xid);
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "onMessage: XA rollback " + xaRes);
}
} catch (XAException e1) {
}
throw new JMSException("onMessage: XA resource rollback(" + xid + ") failed: " + xaRes
+ " :: " + e.getMessage());
}
} else {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "onMessage: commit.");
}
session.commit();
}
} catch (MessageFormatException conversionExc) {
// Conversion error: denying the message.
if (isXA) {
try {
xaRes.rollback(xid);
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "run: XA rollback " + xaRes);
}
} catch (XAException e1) {
}
} else {
session.rollback();
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Exception:: onMessage: rollback.");
}
}
}
}
} catch (JMSException exc) {
// Commit or rollback failed: nothing to do.
}
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2010 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.mom.dest.jms;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.objectweb.joram.client.jms.XidImpl;
import org.objectweb.joram.mom.dest.DistributionHandler;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import fr.dyade.aaa.common.Debug;
public class JMSDistribution extends JMSModule implements DistributionHandler {
private static final Logger logger = Debug.getLogger(JMSDistribution.class.getName());
/** Producer object. */
protected MessageProducer producer;
public void distribute(Message message) throws Exception {
if (!usable) {
throw new IllegalStateException(notUsableMessage);
}
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "send(" + message + ')');
}
synchronized (lock) {
Xid xid = null;
if (isXA) {
xid = new XidImpl(new byte[0], 1, message.id.getBytes());
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "send: xid=" + xid);
}
try {
xaRes.start(xid, XAResource.TMNOFLAGS);
} catch (XAException e) {
if (logger.isLoggable(BasicLevel.WARN)) {
logger.log(BasicLevel.WARN, "Exception:: XA can't start resource : " + xaRes, e);
}
}
}
producer.send(org.objectweb.joram.client.jms.Message.wrapMomMessage(null, message));
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "send: " + producer + " send.");
}
if (isXA) {
try {
xaRes.end(xid, XAResource.TMSUCCESS);
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "send: XA end " + xaRes);
}
} catch (XAException e) {
throw new JMSException("resource end(...) failed: " + xaRes + " :: " + e.getMessage());
}
try {
int ret = xaRes.prepare(xid);
if (ret == XAResource.XA_OK) {
xaRes.commit(xid, false);
}
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "send: XA commit " + xaRes);
}
} catch (XAException e) {
try {
xaRes.rollback(xid);
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "send: XA rollback " + xaRes);
}
} catch (XAException e1) {
}
throw new JMSException("XA resource rollback(" + xid + ") failed: " + xaRes + " :: "
+ e.getMessage());
}
}
}
}
/**
* Opens a connection with the foreign JMS server and creates the JMS
* resources for interacting with the foreign JMS destination.
*
* @exception JMSException
* If the needed JMS resources could not be created.
*/
protected void doConnect() throws JMSException {
super.doConnect();
producer = session.createProducer(dest);
}
/**
* Opens a XA connection with the foreign JMS server and creates the XA JMS
* resources for interacting with the foreign JMS destination.
*
* @exception JMSException
* If the needed JMS resources could not be created.
*/
protected void doXAConnect() throws JMSException {
super.doXAConnect();
producer = session.createProducer(dest);
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2010 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.mom.dest.jms;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.Session;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.objectweb.joram.client.jms.XidImpl;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import fr.dyade.aaa.common.Daemon;
import fr.dyade.aaa.common.Debug;
public class JMSModule implements ExceptionListener {
private static final Logger logger = Debug.getLogger(JMSDistribution.class.getName());
/** <code>true</code> if the module is fully usable. */
protected boolean usable = true;
/** Message explaining why the module is not usable. */
protected String notUsableMessage;
/** Daemon used for the reconnection process. */
protected ReconnectionDaemon reconnectionDaemon;
/** serializable object for synchronization */
protected Object lock = new String();
/** Indicates to use an XAConnection. Default is false. */
protected boolean isXA = false;
/** Connection to the foreign JMS server. */
protected Connection cnx;
/** Session with the foreign JMS destination. */
protected Session session;
/** XAResource */
protected XAResource xaRes = null;
/** User identification for connecting to the foreign JMS server. */
protected String userName = null;
/** User password for connecting to the foreign JMS server. */
protected String password = null;
/** Name of the JNDI factory class to use. */
protected String jndiFactory = null;
/** JNDI URL. */
protected String jndiUrl = null;
/** ConnectionFactory JNDI name. */
protected String cnxFactName;
/** Destination JNDI name. */
protected String destName;
/** Foreign JMS destination object. */
protected Destination dest = null;
/** JMS clientID field. */
protected String clientID = null;
/** Connection factory object for connecting to the foreign JMS server. */
protected ConnectionFactory cnxFact = null;
public void init(Properties properties) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "<init>(" + properties + ')');
}
jndiFactory = properties.getProperty("jndiFactory");
jndiUrl = properties.getProperty("jndiUrl");
cnxFactName = properties.getProperty("connectionFactoryName");
if (cnxFactName == null) {
throw new IllegalArgumentException("Missing ConnectionFactory JNDI name.");
}
destName = properties.getProperty("destinationName");
if (destName == null) {
throw new IllegalArgumentException("Missing Destination JNDI name.");
}
String userName = properties.getProperty("userName");
String password = properties.getProperty("password");
if (userName != null && password != null) {
this.userName = userName;
this.password = password;
}
clientID = properties.getProperty("clientId");
isXA = Boolean.valueOf(properties.getProperty("useXAConnection", "false")).booleanValue();
try {
connect();
} catch (JMSException exc) {
if (logger.isLoggable(BasicLevel.ERROR)) {
logger.log(BasicLevel.ERROR, "Not usable: ", exc);
}
}
}
public void close() {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "close()");
}
try {
cnx.setExceptionListener(null);
} catch (JMSException exc1) {
logger.log(BasicLevel.ERROR, "", exc1);
}
try {
cnx.stop();
} catch (JMSException exc) {
}
try {
reconnectionDaemon.stop();
} catch (Exception exc) {
}
try {
cnx.close();
} catch (JMSException exc) {
}
}
/**
* Launches the connection process to the foreign JMS server.
*
* @exception javax.jms.IllegalStateException
* If the module can't access the foreign JMS server.
* @exception javax.jms.JMSException
* If the needed JMS resources can't be created.
*/
private void connect() throws JMSException {
if (!usable) {
throw new IllegalStateException(notUsableMessage);
}
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "connect()");
}
// Creating the module's daemons.
reconnectionDaemon = new ReconnectionDaemon();
StartupDaemon startup = new StartupDaemon();
startup.start();
}
/**
* Opens a connection with the foreign JMS server and creates the JMS
* resources for interacting with the foreign JMS destination.
*
* @exception JMSException
* If the needed JMS resources could not be created.
*/
protected void doConnect() throws JMSException {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "doConnect()");
}
if (userName != null && password != null) {
cnx = cnxFact.createConnection(userName, password);
} else {
cnx = cnxFact.createConnection();
}
cnx.setExceptionListener(this);
if (clientID != null) {
cnx.setClientID(clientID);
}
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "doConnect: cnx=" + cnx);