Commit a5c0fffc authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

Distribution queue now stores the messages which can't be distributed and...

Distribution queue now stores the messages which can't be distributed and tries to re-distribute them each queue period.
Updating properties is now done in setProperties method with a boolean 'firstTime' set to false.
parent 5c2daf92
......@@ -23,14 +23,8 @@
package org.objectweb.joram.mom.dest;
import java.io.Serializable;
import java.util.List;
import java.util.Properties;
import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.mom.util.DMQManager;
import org.objectweb.joram.shared.MessageErrorConstants;
import org.objectweb.joram.shared.excepts.MessageValueException;
import org.objectweb.joram.shared.messages.ConversionHelper;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
......@@ -47,104 +41,34 @@ public class DistributionModule implements Serializable {
/** The property name for the distribution handler class name. */
public static final String CLASS_NAME = "distribution.className";
/** Keep message property name: tells if distributed message is kept in destination. */
public static final String KEEP_MESSAGE_OPTION = "distribution.keep";
/** Tells if distributed message is kept in destination */
private boolean keep = false;
/** Holds the distribution logic. */
private DistributionHandler distributionHandler;
/** The distribution queue or topic using this module. */
private final Destination destination;
public DistributionModule(Destination destination, Properties properties) {
this.destination = destination;
public DistributionModule(String className, Properties properties) {
try {
Class clazz = Class.forName(className);
distributionHandler = (DistributionHandler) clazz.newInstance();
} catch (Exception exc) {
logger.log(BasicLevel.ERROR, "DistributionModule: can't create distribution handler.", exc);
}
setProperties(properties);
}
/**
* Resets the distribution properties.
*/
private void setProperties(Properties props) {
if (props == null) {
return;
}
// reset default
keep = false;
if (props.containsKey(KEEP_MESSAGE_OPTION)) {
try {
keep = ConversionHelper.toBoolean(props.get(KEEP_MESSAGE_OPTION));
} catch (MessageValueException exc) {
logger.log(BasicLevel.ERROR, "DistributionModule: can't parse keep message option.", exc);
}
props.remove(KEEP_MESSAGE_OPTION);
}
if (props.containsKey(CLASS_NAME)) {
try {
String className = ConversionHelper.toString(props.get(CLASS_NAME));
props.remove(CLASS_NAME);
Class clazz = Class.forName(className);
distributionHandler = (DistributionHandler) clazz.newInstance();
distributionHandler.init(props);
} catch (Exception exc) {
logger.log(BasicLevel.ERROR, "DistributionModule: can't create distribution handler.", exc);
}
}
}
/**
* Messages received on the distribution destination are distributed using the
* specified {@link DistributionHandler}.
*/
public ClientMessages processMessages(ClientMessages cm) {
List msgs = cm.getMessages();
DMQManager dmqManager = null;
for (int i = 0; i < msgs.size(); i++) {
Message msg = (Message) msgs.get(i);
try {
distributionHandler.distribute(msg);
destination.nbMsgsDeliverSinceCreation++;
} catch (Exception exc) {
logger.log(BasicLevel.ERROR, "DistributionModule: distribution error.", exc);
if (dmqManager == null) {
dmqManager = new DMQManager(cm.getDMQId(), destination.getDMQAgentId(), destination.getId());
}
destination.nbMsgsSentToDMQSinceCreation++;
dmqManager.addDeadMessage(msg, MessageErrorConstants.UNDELIVERABLE);
}
}
if (dmqManager != null) {
dmqManager.sendToDMQ();
}
if (keep) {
return cm;
public void setProperties(Properties properties) {
if (distributionHandler != null) {
distributionHandler.init(properties);
}
return null;
}
/**
* Update the properties, resets the distribution properties.
*
* @param properties new properties
* @throws Exception
*/
public void updateProperties(Properties properties) throws Exception {
setProperties(properties);
}
public void close() {
distributionHandler.close();
}
public void processMessage(Message fullMessage) throws Exception {
distributionHandler.distribute(fullMessage);
}
}
......@@ -22,12 +22,18 @@
*/
package org.objectweb.joram.mom.dest;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.mom.notifications.WakeUpNot;
import org.objectweb.joram.mom.util.DMQManager;
import org.objectweb.joram.shared.MessageErrorConstants;
import org.objectweb.joram.shared.excepts.MessageValueException;
import org.objectweb.joram.shared.excepts.RequestException;
import org.objectweb.joram.shared.messages.ConversionHelper;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
......@@ -42,48 +48,93 @@ public class DistributionQueue extends Queue {
public static Logger logger = Debug.getLogger(DistributionQueue.class.getName());
/** Default period used to clean queue and re-distribute failing messages. */
public static final long DEFAULT_PERIOD = 1000;
public static final String BATCH_DISTRIBUTION_OPTION = "distribution.batch";
/** define serialVersionUID for interoperability */
private static final long serialVersionUID = 1L;
private transient DistributionModule distributionModule;
/** The acquisition class name. */
private String distributionClassName;
/**
* Tells if we try to distribute the each message each time (true) or if the
* distribution is stopped on first error (false). Batch mode can (and will
* probably) lose message ordering but will not stop deliverable messages in
* the queue waiting for previous ones to be sent.
*/
private boolean batchDistribution;
private Properties properties;
public DistributionQueue() {
super();
}
/**
* Configures a {@link DistributionQueue} instance.
*
* @param properties
* The initial set of properties.
*/
public void setProperties(Properties properties) throws RequestException {
super.setProperties(properties);
public void setProperties(Properties properties, boolean firstTime) throws Exception {
super.setProperties(properties, firstTime);
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "DistributionQueue.<init> prop = " + properties);
}
this.properties = (Properties) properties.clone();
this.properties = properties;
// Check the existence of the distribution class and the presence of a no-arg constructor.
try {
String className = ConversionHelper.toString(properties.get(DistributionModule.CLASS_NAME));
Class.forName(className).getConstructor();
} catch (Exception exc) {
logger.log(BasicLevel.ERROR, "DistributionQueue: error with distribution class.", exc);
throw new RequestException(exc.getMessage());
batchDistribution = false;
if (properties != null && properties.containsKey(BATCH_DISTRIBUTION_OPTION)) {
try {
batchDistribution = ConversionHelper.toBoolean(properties.get(BATCH_DISTRIBUTION_OPTION));
} catch (MessageValueException exc) {
logger.log(BasicLevel.ERROR, "DistributionModule: can't parse batch option.", exc);
}
}
if (firstTime) {
if (properties != null) {
distributionClassName = properties.getProperty(DistributionModule.CLASS_NAME);
properties.remove(DistributionModule.CLASS_NAME);
}
if (distributionClassName == null) {
throw new RequestException("Distribution class name not found: " + DistributionModule.CLASS_NAME
+ " property must be set on queue creation.");
}
// Check the existence of the distribution class and the presence of a no-arg constructor.
try {
String className = distributionClassName;
Class.forName(className).getConstructor();
} catch (Exception exc) {
logger.log(BasicLevel.ERROR, "DistributionQueue: error with distribution class.", exc);
throw new RequestException(exc.getMessage());
}
} else {
distributionModule.setProperties(properties);
}
}
public void initialize(boolean firstTime) {
super.initialize(firstTime);
if (distributionModule == null) {
distributionModule = new DistributionModule(this, (Properties) properties.clone());
distributionModule = new DistributionModule(distributionClassName, properties);
}
}
public void agentFinalize(boolean lastTime) {
super.agentFinalize(lastTime);
close();
if (distributionModule != null) {
distributionModule.close();
}
}
/**
......@@ -94,36 +145,80 @@ public class DistributionQueue extends Queue {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "DistributionQueue.preProcess(" + from + ", " + cm + ')');
}
return distributionModule.processMessages(cm);
if (!batchDistribution && messages.size() > 0) {
return cm;
}
List msgs = cm.getMessages();
for (Iterator ite = msgs.iterator(); ite.hasNext();) {
Message msg = (Message) ite.next();
try {
distributionModule.processMessage(msg);
nbMsgsDeliverSinceCreation++;
ite.remove();
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.WARN)) {
logger.log(BasicLevel.WARN, "DistributionModule: distribution error.", exc);
}
// if we don't do batch distribution, stop on first error
if (!batchDistribution) {
break;
}
}
}
if (msgs.size() > 0) {
return cm;
}
return null;
}
public String toString() {
return "DistributionQueue:" + getId().toString();
}
private void close() {
if (distributionModule != null) {
distributionModule.close();
/**
* wake up, and cleans the queue.
*/
public void wakeUpNot(WakeUpNot not) {
super.wakeUpNot(not);
for (Iterator ite = messages.iterator(); ite.hasNext();) {
org.objectweb.joram.mom.messages.Message msg = (org.objectweb.joram.mom.messages.Message) ite.next();
try {
distributionModule.processMessage(msg.getFullMessage());
nbMsgsDeliverSinceCreation++;
ite.remove();
msg.delete();
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Distribution redelivery number " + msg.getDeliveryCount()
+ " failed.", exc);
}
msg.incDeliveryCount();
// If message considered as undeliverable, add it to the list of dead messages:
if (isUndeliverable(msg)) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Message can't be delivered, send to DMQ.");
}
ite.remove();
msg.delete();
DMQManager dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
dmqManager.addDeadMessage(msg.getFullMessage(), MessageErrorConstants.UNDELIVERABLE);
dmqManager.sendToDMQ();
continue;
}
if (!batchDistribution) {
break;
}
}
}
}
/**
* Update properties configuration, they are processed by the distribution module
* @param prop properties to update.
* @throws Exception
*/
public void updateProperties(Properties prop) throws Exception {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "DistributionQueue.updateProperties(" + prop + ')');
}
super.setProperties(prop);
// update this.properties
Enumeration e = prop.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
properties.put(key, prop.get(key));
}
// update the module
distributionModule.updateProperties(properties);
protected void processSetRight(AgentId user, int right) throws RequestException {
if (right == READ) {
throw new RequestException("A distribution queue can't be set readable.");
}
super.processSetRight(user, right);
}
}
......@@ -22,12 +22,14 @@
*/
package org.objectweb.joram.mom.dest;
import java.util.Enumeration;
import java.util.List;
import java.util.Properties;
import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.mom.util.DMQManager;
import org.objectweb.joram.shared.MessageErrorConstants;
import org.objectweb.joram.shared.excepts.RequestException;
import org.objectweb.joram.shared.messages.ConversionHelper;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
......@@ -46,6 +48,9 @@ public class DistributionTopic extends Topic {
private static final long serialVersionUID = 1L;
private transient DistributionModule distributionModule;
/** The acquisition class name. */
private String distributionClassName;
private Properties properties;
......@@ -55,35 +60,50 @@ public class DistributionTopic extends Topic {
* @param properties
* The initial set of properties.
*/
public void setProperties(Properties properties) throws RequestException {
super.setProperties(properties);
public void setProperties(Properties properties, boolean firstTime) throws Exception {
super.setProperties(properties, firstTime);
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "DistributionTopic.<init> prop = " + properties);
}
this.properties = (Properties) properties.clone();
// Check the existence of the distribution class and the presence of a no-arg constructor.
try {
String className = ConversionHelper.toString(properties.get(DistributionModule.CLASS_NAME));
Class.forName(className).getConstructor();
} catch (Exception exc) {
logger.log(BasicLevel.ERROR, "DistributionTopic: error with distribution class.", exc);
throw new RequestException(exc.getMessage());
this.properties = properties;
if (firstTime) {
if (properties != null) {
distributionClassName = properties.getProperty(DistributionModule.CLASS_NAME);
properties.remove(DistributionModule.CLASS_NAME);
}
if (distributionClassName == null) {
throw new RequestException("Distribution class name not found: " + DistributionModule.CLASS_NAME
+ " property must be set on queue creation.");
}
// Check the existence of the distribution class and the presence of a no-arg constructor.
try {
String className = distributionClassName;
Class.forName(className).getConstructor();
} catch (Exception exc) {
logger.log(BasicLevel.ERROR, "DistributionQueue: error with distribution class.", exc);
throw new RequestException(exc.getMessage());
}
} else {
distributionModule.setProperties(properties);
}
}
public void initialize(boolean firstTime) {
super.initialize(firstTime);
if (distributionModule == null) {
distributionModule = new DistributionModule(this, (Properties) properties.clone());
distributionModule = new DistributionModule(distributionClassName, properties);
}
}
public void agentFinalize(boolean lastTime) {
super.agentFinalize(lastTime);
close();
if (distributionModule != null) {
distributionModule.close();
}
}
/**
......@@ -94,36 +114,32 @@ public class DistributionTopic extends Topic {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "DistributionTopic. preProcess(" + from + ", " + cm + ')');
}
return distributionModule.processMessages(cm);
List msgs = cm.getMessages();
DMQManager dmqManager = null;
for (int i = 0; i < msgs.size(); i++) {
Message msg = (Message) msgs.get(i);
try {
distributionModule.processMessage(msg);
nbMsgsDeliverSinceCreation++;
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.WARN)) {
logger.log(BasicLevel.WARN, "DistributionTopic: distribution error.", exc);
}
if (dmqManager == null) {
dmqManager = new DMQManager(cm.getDMQId(), getDMQAgentId(), getId());
}
nbMsgsSentToDMQSinceCreation++;
dmqManager.addDeadMessage(msg, MessageErrorConstants.UNDELIVERABLE);
}
}
if (dmqManager != null) {
dmqManager.sendToDMQ();
}
return null;
}
public String toString() {
return "DistributionTopic:" + getId().toString();
}
private void close() {
if (distributionModule != null) {
distributionModule.close();
}
}
/**
* Update properties configuration, they are processed by the distribution module
* @param prop properties to update.
* @throws Exception
*/
public void updateProperties(Properties prop) throws Exception {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "DistributionTopic.updateProperties(" + prop + ')');
}
super.setProperties(prop);
// update this.properties
Enumeration e = prop.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
properties.put(key, prop.get(key));
}
// update the module
distributionModule.updateProperties(properties);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment