Commit 65f1d706 authored by afreyssin's avatar afreyssin
Browse files

Use DestinationConstants shared definition.

Adds methods to start and stop the handler.
parent 29b90643
...@@ -25,6 +25,7 @@ package org.objectweb.joram.mom.dest; ...@@ -25,6 +25,7 @@ package org.objectweb.joram.mom.dest;
import java.util.Properties; import java.util.Properties;
import org.objectweb.joram.mom.notifications.ClientMessages; import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.shared.DestinationConstants;
import org.objectweb.joram.shared.excepts.AccessException; import org.objectweb.joram.shared.excepts.AccessException;
import org.objectweb.joram.shared.excepts.RequestException; import org.objectweb.joram.shared.excepts.RequestException;
import org.objectweb.util.monolog.api.BasicLevel; import org.objectweb.util.monolog.api.BasicLevel;
...@@ -70,9 +71,6 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean { ...@@ -70,9 +71,6 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean {
private long diff_max = 20; private long diff_max = 20;
private long diff_min = 10; private long diff_min = 10;
private String ACQ_QUEUE_MAX_MSG = "acquisition.max_msg";
private String ACQ_QUEUE_MIN_MSG = "acquisition.min_msg";
/** /**
* Returns the maximum number of acquired messages waiting to be handled by * Returns the maximum number of acquired messages waiting to be handled by
* the destination. When the number of messages waiting to be handled is greater * the destination. When the number of messages waiting to be handled is greater
...@@ -102,9 +100,6 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean { ...@@ -102,9 +100,6 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean {
private long pending_max = 20; private long pending_max = 20;
private long pending_min = 10; private long pending_min = 10;
private String ACQ_QUEUE_MAX_PND = "acquisition.max_pnd";
private String ACQ_QUEUE_MIN_PND = "acquisition.min_pnd";
/** /**
* Returns the maximum number of waiting messages in the destination. When the number * Returns the maximum number of waiting messages in the destination. When the number
* of waiting messages is greater the acquisition handler is temporarily stopped. * of waiting messages is greater the acquisition handler is temporarily stopped.
...@@ -154,14 +149,14 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean { ...@@ -154,14 +149,14 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean {
this.properties = properties; this.properties = properties;
diff_max = Long.parseLong(properties.getProperty(ACQ_QUEUE_MAX_MSG, String.valueOf(diff_max))); diff_max = Long.parseLong(properties.getProperty(DestinationConstants.ACQ_QUEUE_MAX_MSG, String.valueOf(diff_max)));
diff_min = Long.parseLong(properties.getProperty(ACQ_QUEUE_MIN_MSG, String.valueOf(diff_min))); diff_min = Long.parseLong(properties.getProperty(DestinationConstants.ACQ_QUEUE_MIN_MSG, String.valueOf(diff_min)));
if (diff_max < 2) diff_max = 2; if (diff_max < 2) diff_max = 2;
if (diff_min >= diff_max) diff_min = diff_max -2; if (diff_min >= diff_max) diff_min = diff_max -2;
if (diff_min < 0) diff_min = 0; if (diff_min < 0) diff_min = 0;
pending_max = Long.parseLong(properties.getProperty(ACQ_QUEUE_MAX_PND, String.valueOf(pending_max))); pending_max = Long.parseLong(properties.getProperty(DestinationConstants.ACQ_QUEUE_MAX_PND, String.valueOf(pending_max)));
pending_min = Long.parseLong(properties.getProperty(ACQ_QUEUE_MIN_PND, String.valueOf(pending_min))); pending_min = Long.parseLong(properties.getProperty(DestinationConstants.ACQ_QUEUE_MIN_PND, String.valueOf(pending_min)));
if (pending_max < 2) pending_max = 2; if (pending_max < 2) pending_max = 2;
if (pending_min >= pending_max) pending_min = pending_max -2; if (pending_min >= pending_max) pending_min = pending_max -2;
if (pending_min < 0) pending_min = 0; if (pending_min < 0) pending_min = 0;
...@@ -173,11 +168,11 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean { ...@@ -173,11 +168,11 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean {
// Acquisition class name can only be set the first time. // Acquisition class name can only be set the first time.
if (firstTime) { if (firstTime) {
if (properties != null) { if (properties != null) {
acquisitionClassName = properties.getProperty(AcquisitionModule.CLASS_NAME); acquisitionClassName = properties.getProperty(DestinationConstants.ACQUISITION_CLASS_NAME);
properties.remove(AcquisitionModule.CLASS_NAME); properties.remove(DestinationConstants.ACQUISITION_CLASS_NAME);
} }
if (acquisitionClassName == null) { if (acquisitionClassName == null) {
throw new RequestException("Acquisition class name not found: " + AcquisitionModule.CLASS_NAME throw new RequestException("Acquisition class name not found: " + DestinationConstants.ACQUISITION_CLASS_NAME
+ " property must be set on queue creation."); + " property must be set on queue creation.");
} }
try { try {
...@@ -287,6 +282,19 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean { ...@@ -287,6 +282,19 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean {
return acquisitionModule.startHandler(p); return acquisitionModule.startHandler(p);
} }
/**
* Start the handler.
* To be use by MBean interface
*/
@Override
public void start() {
try {
startHandler(properties);
} catch (Exception exc) {
logger.log(BasicLevel.ERROR, "AcquisitionQueue.start(" + properties + ')');
}
}
/** /**
* Stop the handler. * Stop the handler.
* *
...@@ -304,6 +312,19 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean { ...@@ -304,6 +312,19 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean {
return acquisitionModule.stopHandler(p); return acquisitionModule.stopHandler(p);
} }
/**
* Stop the handler.
* To be use by MBean interface
*/
@Override
public void stop() {
try {
stopHandler(properties);
} catch (Exception exc) {
logger.log(BasicLevel.ERROR, "AcquisitionQueue.start(" + properties + ')');
}
}
/** /**
* This method process messages from the acquisition module. * This method process messages from the acquisition module.
* The method addClientMessages of base implementation is used to handle * The method addClientMessages of base implementation is used to handle
...@@ -375,5 +396,4 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean { ...@@ -375,5 +396,4 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean {
// Not defined: still not encodable // Not defined: still not encodable
return -1; return -1;
} }
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment