Commit 3e6acd37 authored by Nicolas Tachker's avatar Nicolas Tachker

Refactoring of the resource adapter.

Add new functionalities (bind the resource adapter in jndi, deploy many resource adapter in the application server, ...)
Fix bugs.
parent f0250c93
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2004 - 2011 Bull SA
* Copyright (C) 2008 - ScalAgent Distributed Technologies
* Copyright (C) 2004 - 208 Bull SA
* Copyright (C) 2008 - 2012 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
......@@ -23,6 +23,8 @@
*/
package org.objectweb.joram.client.connector;
import java.util.Properties;
import javax.resource.ResourceException;
import javax.resource.spi.IllegalStateException;
import javax.resource.spi.InvalidPropertyException;
......@@ -30,6 +32,9 @@ import javax.resource.spi.ResourceAdapter;
import org.objectweb.joram.shared.security.SimpleIdentity;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import fr.dyade.aaa.common.Debug;
/**
* An <code>ActivationSpecImpl</code> instance holds configuration information
......@@ -38,13 +43,14 @@ import org.objectweb.util.monolog.api.BasicLevel;
public class ActivationSpecImpl
implements javax.resource.spi.ActivationSpec,
javax.resource.spi.ResourceAdapterAssociation,
java.io.Serializable
{
java.io.Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
public static Logger logger = Debug.getLogger(ActivationSpecImpl.class.getName());
/**
* Value for the property <code>acknowledgeMode</code>
* defined in the MDB deployment descriptor.
......@@ -67,7 +73,7 @@ public class ActivationSpecImpl
/** User password. */
private String password = "anonymous";
/** identity class name. */
String identityClass = SimpleIdentity.class.getName();
private String identityClass = SimpleIdentity.class.getName();
/** Message selector. */
private String messageSelector = null;
......@@ -88,17 +94,216 @@ public class ActivationSpecImpl
* Default is 10.
*/
private String maxMessages = "10";
/**
* Determine whether durable subscription must be deleted or not
* at close time of the InboundConsumer.
* <p>
* Default is false.
*/
private Boolean deleteDurableSubscription = false;
/** Resource adapter central authority. */
private transient ResourceAdapter ra = null;
private transient JoramAdapter ra = null;
/** <code>true</code> if the underlying JORAM server is collocated. */
private Boolean collocated = false;
/**
* Constructs an <code>ActivationSpecImpl</code> instance.
public void setCollocated(String collocatedServer) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "ActivationSpecImpl.setCollocated(" + collocatedServer + ')');
collocated = new Boolean(collocatedServer);
}
public Boolean getCollocated() {
return new Boolean(collocated);
}
/** Host name or IP of the underlying JORAM server. */
private String hostName = null;
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "ActivationSpecImpl.setHostName(" + hostName + ')');
this.hostName = hostName;
}
/** Port number of the underlying JORAM server. */
private int serverPort = -2;
public Integer getServerPort() {
return new Integer(serverPort);
}
public void setServerPort(String serverPort) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "ActivationSpecImpl.setServerPort(" + serverPort + ')');
this.serverPort = new Integer(serverPort).intValue();
}
/** URL hajoram (for collocated mode). */
private String haURL = null;
public String getHAURL() {
if (haURL == null)
if (ra != null)
haURL = ra.getHAURL();
return haURL;
}
public void setHAURL(String haURL) {
this.haURL = haURL;
}
/** <code>true</code> if the underlying a JORAM HA server is defined */
private Boolean isHA = false;
public void setIsHA(String isHA) {
this.isHA = new Boolean(isHA);
}
public Boolean isHA() {
return new Boolean(isHA);
}
/**
* Duration in seconds during which connecting is attempted (connecting
* might take time if the server is temporarily not reachable); the 0 value
* is set for connecting only once and aborting if connecting failed.
*/
private int connectingTimer = 0;
public Integer getConnectingTimer() {
return new Integer(connectingTimer);
}
public void setConnectingTimer(String connectingTimer) {
this.connectingTimer = new Integer(connectingTimer).intValue();
}
/**
* Duration in seconds during which a JMS transacted (non XA) session might
* be pending; above that duration the session is rolled back and closed;
* the 0 value means "no timer".
*/
public ActivationSpecImpl()
{}
private int txPendingTimer = 0;
public Integer getTxPendingTimer() {
return new Integer(txPendingTimer);
}
public void setTxPendingTimer(String txPendingTimer) {
this.txPendingTimer = new Integer(txPendingTimer).intValue();
}
/**
* Period in milliseconds between two ping requests sent by the client
* connection to the server; if the server does not receive any ping
* request during more than 2 * cnxPendingTimer, the connection is
* considered as dead and processed as required.
*/
private int cnxPendingTimer = 0;
public Integer getCnxPendingTimer() {
return new Integer(cnxPendingTimer);
}
public void setCnxPendingTimer(String cnxPendingTimer) {
this.cnxPendingTimer = new Integer(cnxPendingTimer).intValue();
}
/**
* The maximum number of messages that can be
* read at once from a queue.
*
* Default value is 2 in order to compensate
* the former subscription mechanism.
*/
private int queueMessageReadMax = 2;
public Integer getQueueMessageReadMax() {
return new Integer(queueMessageReadMax);
}
public void setQueueMessageReadMax(String queueMessageReadMax) {
this.queueMessageReadMax = new Integer(queueMessageReadMax).intValue();
}
/**
* The maximum number of acknowledgements
* that can be buffered in
* Session.DUPS_OK_ACKNOWLEDGE mode when listening to a topic.
* Default is 0.
*/
private int topicAckBufferMax = 0;
public Integer getTopicAckBufferMax() {
return new Integer(topicAckBufferMax);
}
public void setTopicAckBufferMax(String topicAckBufferMax) {
this.topicAckBufferMax = new Integer(topicAckBufferMax).intValue();
}
/**
* This threshold is the maximum messages
* number over
* which the subscription is passivated.
* Default is Integer.MAX_VALUE.
*/
private int topicPassivationThreshold = Integer.MAX_VALUE;
public Integer getTopicPassivationThreshold() {
return new Integer(topicPassivationThreshold);
}
public void setTopicPassivationThreshold(String topicPassivationThreshold) {
this.topicPassivationThreshold = new Integer(topicPassivationThreshold).intValue();
}
/**
* This threshold is the minimum
* messages number below which
* the subscription is activated.
* Default is 0.
*/
private int topicActivationThreshold = 0;
public Integer getTopicActivationThreshold() {
return new Integer(topicActivationThreshold);
}
public void setTopicActivationThreshold(String topicActivationThreshold) {
this.topicActivationThreshold = new Integer(topicActivationThreshold).intValue();
}
private String name;
/**
* @return the name
*/
public String getName() {
return name;
}
/**
* @param name the name to set
*/
public void setName(String name) {
this.name = name;
}
/**
* Constructs an <code>ActivationSpecImpl</code> instance.
*/
public ActivationSpecImpl() {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "ActivationSpecImpl<>");
}
/**
* Checks if the configuration information is valid.
......@@ -107,8 +312,7 @@ public class ActivationSpecImpl
* or not consistent with other
* parameters.
*/
public void validate() throws InvalidPropertyException
{
public void validate() throws InvalidPropertyException {
if (destinationType != null
&& ! destinationType.equals("javax.jms.Queue")
&& ! destinationType.equals("javax.jms.Topic"))
......@@ -139,10 +343,9 @@ public class ActivationSpecImpl
/** Sets the resource adapter central authority. */
public void setResourceAdapter(ResourceAdapter ra) throws ResourceException
{
if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " setResourceAdapter(" + ra + ")");
public void setResourceAdapter(ResourceAdapter ra) throws ResourceException {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, this + " setResourceAdapter(" + ra + ")");
if (this.ra != null)
throw new IllegalStateException("Can not change resource adapter"
......@@ -153,14 +356,26 @@ public class ActivationSpecImpl
+ "resource adapter: "
+ ra.getClass().getName());
this.ra = ra;
this.ra = (JoramAdapter) ra;
if (hostName == null && serverPort == -2) {
// set the ra value for this ActivationSpec
// this value can be override by the MDB descriptor
hostName = this.ra.getHostName();
serverPort = this.ra.getServerPort();
haURL = this.ra.getHAURL();
// userName = the default value is anonymous
// password = the default value is anonymous
identityClass = this.ra.getIdentityClass();
collocated = this.ra.getCollocated();
isHA = this.ra.getIsHa();
}
}
/** Returns the resource adapter central authority instance. */
public ResourceAdapter getResourceAdapter()
{
if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG, this + " getResourceAdapter = " + ra);
public ResourceAdapter getResourceAdapter() {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, this + " getResourceAdapter = " + ra);
return ra;
}
......@@ -219,7 +434,7 @@ public class ActivationSpecImpl
this.subscriptionName = subscriptionName;
}
/** Sets the acknowledgement mode. */
/** Sets the acknowledgment mode. */
public void setAcknowledgeMode(String acknowledgeMode)
{
this.acknowledgeMode = acknowledgeMode;
......@@ -234,6 +449,16 @@ public class ActivationSpecImpl
public void setMaxMessages(String maxMessages) {
this.maxMessages = maxMessages;
}
/**
* Set the deleteDurableSubscription attribute.
*
* @param deleteDurableSubscription to set deleteDurableSubscription
* @see #deleteDurableSubscription
*/
public void setDeleteDurableSubscription(String deleteDurableSubscription) {
this.deleteDurableSubscription = new Boolean(deleteDurableSubscription);
}
/** Returns the destination type. */
public String getDestinationType()
......@@ -282,7 +507,7 @@ public class ActivationSpecImpl
return subscriptionName;
}
/** Returns the acknowledgement mode. */
/** Returns the acknowledgment mode. */
public String getAcknowledgeMode()
{
return acknowledgeMode;
......@@ -297,4 +522,74 @@ public class ActivationSpecImpl
public String getMaxMessages() {
return maxMessages;
}
/**
* Returns the deleteDurableSubscription attribute.
*
* @return the DeleteDurableSubscription
* @see #deleteDurableSubscription
*/
public Boolean getDeleteDurableSubscription() {
return new Boolean(deleteDurableSubscription);
}
public void setActivationSpecConfig(Properties props) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "ActivationSpecImpl.setActivationSpecConfig(" + props + ')');
if (props == null)
return;
name = props.getProperty("name");
hostName = props.getProperty("HostName", "localhost");
serverPort = new Integer(props.getProperty("ServerPort", "16010")).intValue();
haURL = props.getProperty("HAURL", "hajoram://localhost:16010,localhost:16011");
userName = props.getProperty("UserName", "anonymous");
password = props.getProperty("Password", "anonymous");
identityClass = props.getProperty("IdentityClass", "org.objectweb.joram.shared.security.SimpleIdentity");
collocated = new Boolean(props.getProperty("Collocated", "false")).booleanValue();
isHA = new Boolean(props.getProperty("IsHA", "false")).booleanValue();
connectingTimer = new Integer(props.getProperty("ConnectingTimer", "0")).intValue();
cnxPendingTimer = new Integer(props.getProperty("CnxPendingTimer", "0")).intValue();
txPendingTimer = new Integer(props.getProperty("TxPendingTimer", "0")).intValue();
topicPassivationThreshold = new Integer(props.getProperty("TopicPassivationThreshold", ""+Integer.MAX_VALUE)).intValue();
topicActivationThreshold = new Integer(props.getProperty("TopicActivationThreshold", "0")).intValue();
topicAckBufferMax = new Integer(props.getProperty("TopicAckBufferMax", "0")).intValue();
subscriptionName = props.getProperty("SubscriptionName");
subscriptionDurability = props.getProperty("SubscriptionDurability");
queueMessageReadMax = new Integer(props.getProperty("QueueMessageReadMax", "2")).intValue();
messageSelector = props.getProperty("MessageSelector");
maxNumberOfWorks = props.getProperty("MaxNumberOfWorks", "0");
maxMessages = props.getProperty("MaxMessages", "10");
destinationType = props.getProperty("DestinationType");
destination = props.getProperty("Destination");
deleteDurableSubscription = new Boolean(props.getProperty("DeleteDurableSubscription", "false")).booleanValue();
acknowledgeMode = props.getProperty("AcknowledgeMode", AUTO_ACKNOWLEDGE);
}
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "ActivationSpecImpl@" + hashCode() + " [destinationType=" + destinationType
+ ", destination=" + destination + ", userName=" + userName
+ ", password=" + password + ", identityClass=" + identityClass
+ ", messageSelector=" + messageSelector + ", subscriptionDurability="
+ subscriptionDurability + ", subscriptionName=" + subscriptionName
+ ", acknowledgeMode=" + acknowledgeMode + ", maxNumberOfWorks="
+ maxNumberOfWorks + ", maxMessages=" + maxMessages
+ ", deleteDurableSubscription=" + deleteDurableSubscription + ", ra="
+ ra + ", collocated=" + collocated + ", hostName=" + hostName
+ ", serverPort=" + serverPort + ", haURL=" + haURL + ", isHA=" + isHA
+ ", connectingTimer=" + connectingTimer + ", txPendingTimer="
+ txPendingTimer + ", cnxPendingTimer=" + cnxPendingTimer
+ ", queueMessageReadMax=" + queueMessageReadMax
+ ", topicAckBufferMax=" + topicAckBufferMax
+ ", topicPassivationThreshold=" + topicPassivationThreshold
+ ", topicActivationThreshold=" + topicActivationThreshold + ", name="
+ name + "]";
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2004 - 2008 ScalAgent Distributed Technologies
* Copyright (C) 2004 - 2012 ScalAgent Distributed Technologies
* Copyright (C) 2004 - Bull SA
*
* This library is free software; you can redistribute it and/or
......@@ -131,16 +131,9 @@ public class DefaultConnectionManager
private void setFactoryParameters(AbstractConnectionFactory factory , ManagedConnectionFactoryImpl mcf) {
if (AdapterTracing.dbgAdapter.isLoggable(BasicLevel.DEBUG))
AdapterTracing.dbgAdapter.log(BasicLevel.DEBUG,
this + " setFactoryParameters(" + factory + "," + mcf + ")");
factory.getParameters().connectingTimer = mcf.getConnectingTimer();
factory.getParameters().cnxPendingTimer = mcf.getCnxPendingTimer();
factory.getParameters().txPendingTimer = mcf.getTxPendingTimer();
factory.getParameters().asyncSend = mcf.isAsyncSend();
factory.getParameters().multiThreadSync = mcf.isMultiThreadSync();
factory.getParameters().multiThreadSyncDelay = mcf.getMultiThreadSyncDelay();
factory.getParameters().outLocalAddress = mcf.getOutLocalAddress();
factory.getParameters().outLocalPort = mcf.getOutLocalPort().intValue();
this + " setFactoryParameters(" + factory + "," + mcf + ")");
mcf.setParameters(factory);
}
/**
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2004 - 2012 ScalAgent Distributed Technologies
* Copyright (C) 2004 - Bull SA
*
* This library is free software; you can redistribute it and/or
......@@ -126,6 +127,7 @@ class InboundConsumer implements javax.jms.ServerSessionPool
this.cnx = cnx;
this.transacted = transacted;
this.ackMode = ackMode;
this.closeDurSub = closeDurSub;
if (maxWorks < 0) maxWorks = 0;
this.maxWorks = maxWorks;
......
......@@ -25,57 +25,42 @@
package org.objectweb.joram.client.connector;
import java.io.FileNotFoundException;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.util.Date;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.XAConnection;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.resource.NotSupportedException;
import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.CommException;
import javax.resource.spi.IllegalStateException;
import javax.resource.spi.ResourceAdapter;
import javax.resource.spi.ResourceAdapterInternalException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;
import org.objectweb.joram.client.jms.ConnectionFactory;
import org.objectweb.joram.client.jms.ConnectionMetaData;
import org.objectweb.joram.client.jms.Destination;
import org.objectweb.joram.client.jms.Queue;
import org.objectweb.joram.client.jms.Topic;
import org.objectweb.joram.client.jms.admin.AdminException;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.client.jms.admin.AdminWrapper;
import org.objectweb.joram.client.jms.admin.User;
import org.objectweb.joram.client.jms.ha.local.HALocalConnectionFactory;
import org.objectweb.joram.client.jms.ha.tcp.HATcpConnectionFactory;
import org.objectweb.joram.client.jms.local.LocalConnectionFactory;
import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
import org.objectweb.joram.mom.proxies.tcp.TcpProxyService;
import org.objectweb.joram.shared.security.SimpleIdentity;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import com.scalagent.jmx.JMXServer;
import org.osgi.framework.ServiceRegistration;
import org.osgi.util.tracker.ServiceTracker;
import fr.dyade.aaa.agent.AgentServer;
import fr.dyade.aaa.agent.ServerDesc;
import fr.dyade.aaa.common.Debug;
import fr.dyade.aaa.common.osgi.Activator;
import fr.dyade.aaa.util.management.MXWrapper;
/**
......@@ -84,7 +69,7 @@ import fr.dyade.aaa.util.management.MXWrapper;
* inbound connectivity (asynchronous message delivery as specified by the JCA
* message inflow contract).
*/
public final class JoramAdapter implements ResourceAdapter, JoramAdapterMBean, ExceptionListener {
public final class JoramAdapter extends JoramResourceAdapter implements JoramAdapterMBean, ExceptionListener {
/** Define serialVersionUID for interoperability. */
private static final long serialVersionUID = 1L;
......@@ -99,6 +84,10 @@ public final class JoramAdapter implements ResourceAdapter, JoramAdapterMBean, E
private boolean isActive = false;
/** The duration of admin connection before change state.*/
private long adminDurationState = 0;
private AdminWrapper wrapper = null;
private ServerDesc serverDesc = null;
private ServiceRegistration registration;
// ------------------------------------------