Commit 74616df3 authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

New destination extension mechanism with Acquisition and Distribution queues and topics.

parent 388898d3
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2010 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.mom.dest;
import java.util.Properties;
/**
* {@link AcquisitionDaemon} interface is made to work with an acquisition queue
* or topic via an {@link AcquisitionModule}. Its purpose is to collect messages
* from non-JMS sources in order to inject them into the JMS world. When the
* daemon is started, it starts listening to incoming messages. Once received,
* these messages are transmitted to the MOM using the given transmitter.
* <p>
* This interface is made for implicit acquisition. For on-demand or periodic
* acquisition, you need to extend {@link AcquisitionHandler} instead.
*/
public interface AcquisitionDaemon {
/**
* Tells the daemon to start with the given properties.
*
* @param properties
* The initial set of properties.
* @param transmitter
* a transmitter used to transmit retrieved messages to the MOM
* reliably.
*/
public void start(Properties properties, ReliableTransmitter transmitter);
/**
* Tells the daemon to stop. Any system resources previously allocated must be
* released.
*/
public void stop();
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2010 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.mom.dest;
import java.util.Properties;
/**
* {@link AcquisitionHandler} interface is made to work with an acquisition
* queue or topic via an {@link AcquisitionModule}. Its purpose is to retrieve
* messages from non-JMS sources (e-mail, ftp, JMX, ...) in order to inject them
* into the JMS world. The {@link #retrieve(ReliableTransmitter)} method is
* called regularly depending on how the {@link AcquisitionModule} is
* configured.
* <p>
* This interface is made for explicit acquisition. For implicit acquisition
* such as message listeners, you need to extend {@link AcquisitionDaemon}
* instead.
*/
public interface AcquisitionHandler {
/**
* Retrieves one or more message from an external source (e-mail, ftp, ...).
* Message properties such as priority, expiration or persistence will be set
* afterwards by the {@link AcquisitionModule}.<br>
* <br>
* If the external source is reliable, acknowledgment can be done safely after
* transmitting the message using the transmitter.
*
* @param transmitter
* a transmitter used to transmit retrieved messages to the MOM
* reliably.
* @throws Exception
*/
public void retrieve(ReliableTransmitter transmitter) throws Exception;
/**
* Configures the handler with the given properties. This method is called one
* time before the first call to retrieve and then when the acquisition
* destination receives a configuration message.
*
* @param properties
* The new set of properties.
*/
public void setProperties(Properties properties);
/**
* Closes this handler and releases any system resources associated to it.
* There will be no subsequent call to this handler after it has been closed.
*/
public void close();
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2010 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.mom.dest;
/**
* JMX interface for the acquisition monitoring.
*/
public interface AcquisitionMBean {
/**
* Returns true if the messages produced are persistent.
*
* @return true if the messages produced are persistent.
*/
public boolean isMessagePersistent();
/**
* Sets the DeliveryMode value for the produced messages. If the parameter is
* true the messages produced are persistent.
*
* @param isPersistent
* if true the messages produced are persistent.
*/
public void setMessagePersistent(boolean isPersistent);
/**
* Returns the priority of produced messages.
*
* @return the priority of produced messages.
*/
public int getPriority();
/**
* Sets the priority of produced messages.
*
* @param priority
* the priority to set.
*/
public void setPriority(int priority);
/**
* Returns the expiration value for produced messages.
*
* @return the expiration value for produced messages.
*/
public long getExpiration();
/**
* Sets the expiration value for produced messages.
*
* @param expiration
* the expiration to set.
*/
public void setExpiration(long expiration);
/**
* Returns the acquisition handler class name.
*
* @return the acquisition handler class name.
*/
public String getAcquisitionClassName();
/**
* Returns the acquisition period.
*
* @return the acquisition period.
*/
public long getAcquisitionPeriod();
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2010 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.mom.dest;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.TimerTask;
import javax.jms.DeliveryMode;
import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.shared.excepts.MessageValueException;
import org.objectweb.joram.shared.messages.ConversionHelper;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import fr.dyade.aaa.agent.AgentServer;
import fr.dyade.aaa.agent.Channel;
import fr.dyade.aaa.common.Debug;
/**
* The {@link AcquisitionModule} interfaces between the acquisition destinations
* and the specified {@link AcquisitionHandler}.
*/
public class AcquisitionModule implements ReliableTransmitter {
public static Logger logger = Debug.getLogger(AcquisitionModule.class.getName());
/** The property name for the acquisition period. */
public static final String PERIOD = "acquisition.period";
/** The property name for the acquisition handler class name. */
public static final String CLASS_NAME = "acquisition.className";
/** Persistent property name: tells if produced messages will be persistent. */
public static final String PERSISTENT_PROPERTY = "persistent";
/** Expiration property name: tells the life expectancy of produced messages. */
public static final String EXPIRATION_PROPERTY = "expiration";
/** Priority property name: tells the JMS priority of produced messages. */
public static final String PRIORITY_PROPERTY = "priority";
public static void checkAcquisitionClass(String className) throws Exception {
if (className == null) {
throw new Exception("AcquisitionHandler class not defined: use " + CLASS_NAME
+ " property to chose acquisition class.");
}
Class clazz = Class.forName(className);
boolean isDaemon = false;
boolean isHandler = false;
while (clazz != null) {
Class[] interfaces = clazz.getInterfaces();
for (int i = 0; i < interfaces.length; i++) {
if (interfaces[i].equals(AcquisitionDaemon.class)) {
isDaemon = true;
} else if (interfaces[i].equals(AcquisitionHandler.class)) {
isHandler = true;
}
}
clazz = clazz.getSuperclass();
}
if (isDaemon && isHandler) {
throw new Exception("Acquisition class " + className
+ " can't implement both AcquisitionHandler and AcquisitionDaemon interfaces.");
} else if (!isDaemon && !isHandler) {
throw new Exception("Acquisition class " + className
+ " must implement either AcquisitionHandler or AcquisitionDaemon interface.");
}
}
private static Properties transform(fr.dyade.aaa.common.stream.Properties properties) {
if (properties == null)
return null;
Properties prop = new Properties();
Enumeration e = properties.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
prop.put(key, properties.get(key));
}
return prop;
}
/** The acquisition logic. */
protected Object acquisitionHandler;
/** The priority of produced messages, default is 4. */
private int priority = javax.jms.Message.DEFAULT_PRIORITY;
/** Tells if the messages produced are persistent. */
private boolean isPersistent = (javax.jms.Message.DEFAULT_DELIVERY_MODE == DeliveryMode.PERSISTENT);
/** The duration of produced messages. */
private long expiration = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
/** The acquisition queue or topic using this module. */
private final DestinationImpl destination;
/** The destination type set in acquired messages/ */
private final byte destType;
/** The period before subsequent acquisition if positive. */
private long period = 0;
/** The task used to launch a new acquisition. */
private AcquisitionTask acquisitionTask;
/**
* Tells if acquisition is done on-demand using the acquisition task or with a
* daemon.
*/
private boolean isDaemon = false;
public AcquisitionModule(DestinationImpl destination, String className, Properties properties, byte destType) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "AcquisitionModule.<init> prop = " + properties);
}
this.destination = destination;
this.destType = destType;
try {
Class clazz = Class.forName(className);
acquisitionHandler = clazz.newInstance();
// Verify that one and only one correct interface is implemented.
if (acquisitionHandler instanceof AcquisitionDaemon) {
isDaemon = true;
setProperties(properties);
} else {
acquisitionTask = new AcquisitionTask();
setProperties(properties);
}
} catch (Exception exc) {
logger.log(BasicLevel.ERROR, "AcquisitionModule: can't create acquisition handler.", exc);
}
}
/**
* Returns true if the messages produced are persistent.
*
* @return true if the messages produced are persistent.
*/
public boolean isMessagePersistent() {
return isPersistent;
}
/**
* Sets the DeliveryMode value for the produced messages. If the parameter is
* true the messages produced are persistent.
*
* @param isPersistent
* if true the messages produced are persistent.
*/
public void setMessagePersistent(boolean isPersistent) {
this.isPersistent = isPersistent;
}
/**
* Returns the priority of produced messages.
*
* @return the priority of produced messages.
*/
public int getPriority() {
return priority;
}
/**
* Sets the priority of produced messages.
*
* @param priority
* the priority to set.
*/
public void setPriority(int priority) {
this.priority = priority;
}
/**
* Returns the expiration value for produced messages.
*
* @return the expiration value for produced messages.
*/
public long getExpiration() {
return expiration;
}
/**
* Sets the expiration value for produced messages.
*
* @param expiration
* the expiration to set.
*/
public void setExpiration(long expiration) {
this.expiration = expiration;
}
/**
* Resets the acquisition properties.
*/
private void setProperties(Properties properties) {
// Clone properties as it is modified before setting handler properties
// and we want to keep all properties in destinations to persist them
Properties props = (Properties) properties.clone();
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "AcquisitionModule.setProperties = " + props + " daemon = " + isDaemon);
}
// Restore defaults
priority = javax.jms.Message.DEFAULT_PRIORITY;
isPersistent = (javax.jms.Message.DEFAULT_DELIVERY_MODE == DeliveryMode.PERSISTENT);
expiration = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
period = 0;
if (acquisitionTask != null) {
acquisitionTask.cancel();
}
if (props.containsKey(PERIOD)) {
try {
period = ConversionHelper.toLong(props.getProperty(PERIOD));
} catch (MessageValueException exc) {
logger.log(BasicLevel.ERROR, "AcquisitionModule: can't parse defined period property.");
}
props.remove(PERIOD);
}
if (!isDaemon && period > 0) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "schedule acquisition every " + period + "ms.");
}
acquisitionTask = new AcquisitionTask();
AgentServer.getTimer().schedule(acquisitionTask, period, period);
}
if (props.containsKey(PERSISTENT_PROPERTY)) {
try {
isPersistent = ConversionHelper.toBoolean(props.getProperty(PERSISTENT_PROPERTY));
} catch (MessageValueException exc) {
logger.log(BasicLevel.ERROR, "AcquisitionModule: can't parse defined message persistence property.");
}
props.remove(PERSISTENT_PROPERTY);
}
if (props.containsKey(PRIORITY_PROPERTY)) {
try {
priority = ConversionHelper.toInt(props.getProperty(PRIORITY_PROPERTY));
} catch (MessageValueException exc) {
logger.log(BasicLevel.ERROR, "AcquisitionModule: can't parse defined message priority property.");
}
props.remove(PRIORITY_PROPERTY);
}
if (props.containsKey(EXPIRATION_PROPERTY)) {
try {
expiration = ConversionHelper.toLong(props.getProperty(EXPIRATION_PROPERTY));
} catch (MessageValueException exc) {
logger.log(BasicLevel.ERROR, "AcquisitionModule: can't parse defined message expiration property.");
}
props.remove(EXPIRATION_PROPERTY);
}
if (props.containsKey(CLASS_NAME)
&& !props.getProperty(CLASS_NAME).equals(acquisitionHandler.getClass().getName())) {
logger.log(BasicLevel.ERROR,
"AcquisitionModule: Changing dynamically the acquisition class is not allowed.");
props.remove(CLASS_NAME);
}
if (isDaemon) {
((AcquisitionDaemon) acquisitionHandler).start(props, this);
} else {
((AcquisitionHandler) acquisitionHandler).setProperties(props);
}
}
/**
* In <b>periodic mode</b> (period > 0), a message with non-null properties
* will be treated as a new configuration for the destination, and ignored
* otherwise.<br>
* <br>
* In <b>request mode</b>, a message received will launch an acquisition
* process with the given message properties or use the last known properties
* if empty.<br>
* <br>
* Destination mode can be changed using the "period" property.
*/
public Properties processMessages(ClientMessages cm) {
Iterator msgs = cm.getMessages().iterator();
Properties lastProperties = null;
while (msgs.hasNext()) {
Message msg = (Message) msgs.next();
// If non-empty, sets the new properties
if (msg.properties != null) {
lastProperties = AcquisitionModule.transform(msg.properties);
if (isDaemon) {
((AcquisitionDaemon) acquisitionHandler).stop();
setProperties(lastProperties);
} else {
setProperties(lastProperties);
}
}
if (!isDaemon && period <= 0) {
acquisitionTask = new AcquisitionTask();
AgentServer.getTimer().schedule(acquisitionTask, 0);
}
}
return lastProperties;
}
public ClientMessages acquisitionNot(AcquisitionNot not, long msgCount) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "AcquisitionModule.acquisitionNot(" + not + ")");
}
ClientMessages acquiredCM = not.getAcquiredMessages();
List messages = acquiredCM.getMessages();
if (messages.size() == 0) {
return null;
}
setMessagesInfo(messages, msgCount);
return acquiredCM;
}
private void setMessagesInfo(List messages, long msgCount) {
long currentTime = System.currentTimeMillis();
for (Iterator iterator = messages.iterator(); iterator.hasNext();) {
Message message = (Message) iterator.next();
message.id = "ID:" + destination.getDestinationId() + '_' + msgCount;
message.timestamp = currentTime;
message.persistent = isPersistent;
message.setDestination(destination.getId().toString(), destType);
message.priority = priority;
if (expiration > 0) {
message.expiration = currentTime + expiration;
} else {
message.expiration = 0;
}
msgCount++;
}
}
/**
* Closes the handler.
*/
public void close() {
if (isDaemon) {
((AcquisitionDaemon) acquisitionHandler).stop();
} else {