Commit 6d999f42 authored by Andre Freyssinet's avatar Andre Freyssinet

Handling of DeliveryDelay and Pause.

parent cb22ac29
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2001 - 2013 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2018 ScalAgent Distributed Technologies
* Copyright (C) 2012 Universite Joseph Fourier
* Copyright (C) 2004 Bull SA
* Copyright (C) 1996 - 2000 Dyade
......@@ -33,8 +33,10 @@ import javax.jms.JMSException;
import org.objectweb.joram.client.jms.admin.AdminException;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.shared.DestinationConstants;
import org.objectweb.joram.shared.admin.AddRemoteDestination;
import org.objectweb.joram.shared.admin.AdminCommandConstant;
import org.objectweb.joram.shared.admin.AdminReply;
import org.objectweb.joram.shared.admin.ClearQueue;
import org.objectweb.joram.shared.admin.ClusterAdd;
import org.objectweb.joram.shared.admin.ClusterLeave;
......@@ -71,8 +73,11 @@ import org.objectweb.joram.shared.admin.SetThresholdRequest;
public class Queue extends Destination implements javax.jms.Queue, QueueMBean {
/** define serialVersionUID for interoperability */
private static final long serialVersionUID = 1L;
public static final String REDELIVERY_DELAY = "redeliveryDelay";
/** Property allowing to fix RedeliveryDelay */
public static final String REDELIVERY_DELAY = DestinationConstants.REDELIVERY_DELAY;
/** Property allowing to fix DeliveryDelay */
public static final String DELIVERY_DELAY = DestinationConstants.DELIVERY_DELAY;
public Queue() {
super(QUEUE_TYPE);
......@@ -150,7 +155,7 @@ public class Queue extends Destination implements javax.jms.Queue, QueueMBean {
* @exception AdminException If the request fails.
*/
public static Queue create(int serverId) throws ConnectException, AdminException {
return create(serverId, null, "org.objectweb.joram.mom.dest.Queue", null);
return create(serverId, null, null, null);
}
/**
......@@ -169,10 +174,7 @@ public class Queue extends Destination implements javax.jms.Queue, QueueMBean {
* @exception AdminException If the request fails.
*/
public static Queue create(String name) throws ConnectException, AdminException {
return create(AdminModule.getLocalServerId(),
name,
"org.objectweb.joram.mom.dest.Queue",
null);
return create(AdminModule.getLocalServerId(), name, null, null);
}
/**
......@@ -192,9 +194,8 @@ public class Queue extends Destination implements javax.jms.Queue, QueueMBean {
* @exception ConnectException If the admin connection is closed or broken.
* @exception AdminException If the request fails.
*/
public static Queue create(int serverId,
String name) throws ConnectException, AdminException {
return create(serverId, name, "org.objectweb.joram.mom.dest.Queue", null);
public static Queue create(int serverId, String name) throws ConnectException, AdminException {
return create(serverId, name, null, null);
}
/**
......@@ -212,9 +213,8 @@ public class Queue extends Destination implements javax.jms.Queue, QueueMBean {
* @exception ConnectException If the admin connection is closed or broken.
* @exception AdminException If the request fails.
*/
public static Queue create(int serverId,
Properties prop) throws ConnectException, AdminException {
return create(serverId, "org.objectweb.joram.mom.dest.Queue", prop);
public static Queue create(int serverId, Properties prop) throws ConnectException, AdminException {
return create(serverId, null, prop);
}
/**
......@@ -233,8 +233,8 @@ public class Queue extends Destination implements javax.jms.Queue, QueueMBean {
* @exception AdminException If the request fails.
*/
public static Queue create(int serverId,
String className,
Properties prop) throws ConnectException, AdminException {
String className,
Properties prop) throws ConnectException, AdminException {
return create(serverId, null, className, prop);
}
......@@ -258,12 +258,13 @@ public class Queue extends Destination implements javax.jms.Queue, QueueMBean {
* @exception AdminException If the request fails.
*/
public static Queue create(int serverId,
String name,
String className,
Properties prop) throws ConnectException, AdminException {
Queue queue = new Queue();
queue.doCreate(serverId, name, className, prop, queue, QUEUE_TYPE);
return queue;
String name,
String className,
Properties prop) throws ConnectException, AdminException {
Queue queue = new Queue();
if (className == null) className = "org.objectweb.joram.mom.dest.Queue";
queue.doCreate(serverId, name, className, prop, queue, QUEUE_TYPE);
return queue;
}
/**
......@@ -675,7 +676,7 @@ public class Queue extends Destination implements javax.jms.Queue, QueueMBean {
/**
* Set redeliveryDelay attribute.
*
* @param redeliveryDelay The delay use to wait before re-delivering messages after a deny.
* @param redeliveryDelay The delay in seconds use to wait before re-delivering messages after a deny.
* @throws ConnectException
* @throws AdminException
*/
......@@ -685,7 +686,47 @@ public class Queue extends Destination implements javax.jms.Queue, QueueMBean {
// configure redeliveryDelay of UserAgent). In this case we have to overload the
// processAdminCommand in the MOM Queue class.
Properties properties = new Properties();
properties.setProperty(Queue.REDELIVERY_DELAY, "5");
properties.setProperty(REDELIVERY_DELAY, "" + redeliveryDelay);
setProperties(properties);
}
/**
* Set deliveryDelay attribute.
*
* @param deliveryDelay The minimum delay in milliseconds use to wait before delivering each message.
* @throws ConnectException
* @throws AdminException
*/
public void setDeliveryDelay(int deliveryDelay) throws ConnectException, AdminException {
Properties properties = new Properties();
properties.setProperty(DELIVERY_DELAY, "" + deliveryDelay);
setProperties(properties);
}
/**
* Stops / Resumes the message delivery.
*
* @param pause if true stops the message delivery, else resumes it.
* @exception ConnectException If the administration connection is closed or broken.
* @exception AdminException If the request fails.
*/
public void setPause(boolean pause) throws ConnectException, AdminException {
Properties properties = new Properties();
properties.setProperty(AdminCommandConstant.PAUSE, "" + pause);
getWrapper().processAdmin(getName(), AdminCommandConstant.CMD_PAUSE, properties);
}
/**
* Administration method to set properties.
*
* @param prop the properties to update.
* @return the admin reply
*
* @exception ConnectException If the administration connection is closed or broken.
* @exception AdminException If the request fails.
*/
public AdminReply setProperties(Properties prop) throws ConnectException, AdminException {
return getWrapper().processAdmin(getName(), AdminCommandConstant.CMD_SET_PROPERTIES, prop);
}
}
......@@ -1121,7 +1121,9 @@ public abstract class Destination extends Agent implements DestinationMBean {
case AdminCommandConstant.CMD_STOP_HANDLER:
replyProp = processStopHandler(request.getProp());
break;
case AdminCommandConstant.CMD_PAUSE:
replyProp = processPause(request.getProp());
break;
default:
throw new Exception("Bad command : \"" + request.getCommand() + "\"");
}
......@@ -1168,6 +1170,23 @@ public abstract class Destination extends Agent implements DestinationMBean {
}
}
/**
* Pause / Resume the message distribution.
*
* @param prop properties for start if needed (can be null)
* @return properties for the reply.
* @throws Exception
*/
protected Properties processPause(Properties prop) throws Exception {
boolean pause = Boolean.parseBoolean(prop.getProperty(AdminCommandConstant.PAUSE));
if (this instanceof Queue) {
((Queue) this).setPause(pause);
return prop;
} else {
throw new Exception("processPause :: not a Queue.");
}
}
/**
* Interceptors process
*
......
......@@ -35,10 +35,11 @@ public class AdminCommandConstant {
public static final int CMD_INVOKE_STATIC = 8;
public static final int CMD_ADD_CLIENTID = 9;
public static final int CMD_SET_REDELIVERY_DELAY = 10;
public static final int CMD_PAUSE = 11;
public static final String[] commandNames = { "CMD_NO", "CMD_ADD_INTERCEPTORS", "CMD_REMOVE_INTERCEPTORS",
"CMD_GET_INTERCEPTORS", "CMD_REPLACE_INTERCEPTORS", "CMD_SET_PROPERTIES", "CMD_START_HANDLER",
"CMD_STOP_HANDLER", "CMD_INVOKE_STATIC", "CMD_ADD_CLIENTID", "CMD_SET_REDELIVERY_DELAY" };
"CMD_STOP_HANDLER", "CMD_INVOKE_STATIC", "CMD_ADD_CLIENTID", "CMD_SET_REDELIVERY_DELAY", "CMD_PAUSE" };
/** use by destination */
public static final String INTERCEPTORS = "jms_joram_interceptors";
......@@ -87,4 +88,7 @@ public class AdminCommandConstant {
/** Used by UserAgent and Destination to set the redeliveryDelay attribute */
public static final String REDELIVERY_DELAY = "joram.jms.redeliveryDelay";
/** use by Queue to set the pause property */
public static final String PAUSE = "jms_joram_pause";
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment