Commit 4f4d8c6d authored by monticelli's avatar monticelli
Browse files

jira: JORAM-34

add flow control on acquisition queue
parent 22b8a867
......@@ -135,6 +135,9 @@ public class AcquisitionModule implements ReliableTransmitter {
/** The task used to launch a new acquisition. */
private AcquisitionTask acquisitionTask;
/** The number of transmitted messages */
private static long transmitCounter = 0;
/**
* Tells if acquisition is done on-demand using the acquisition task or with a
* daemon.
......@@ -223,6 +226,15 @@ public class AcquisitionModule implements ReliableTransmitter {
this.expiration = expiration;
}
/**
* Returns the number of transmitted messages
*
* @return the number of transmitted messages
*/
public static long getCount(){
return transmitCounter;
}
/**
* Resets the acquisition properties.
*/
......@@ -457,6 +469,7 @@ public class AcquisitionModule implements ReliableTransmitter {
*/
public void transmit(Message message, String messageId) {
if (message != null) {
transmitCounter ++;
Channel.sendTo(destination.getId(),
new AcquisitionNot(new ClientMessages(-1, -1, message), message.persistent, messageId));
}
......@@ -479,6 +492,7 @@ public class AcquisitionModule implements ReliableTransmitter {
*/
public void transmit(List messages, boolean persistent, String messagesId) {
if (messages != null && messages.size() > 0) {
transmitCounter ++;
Channel.sendTo(destination.getId(),
new AcquisitionNot(new ClientMessages(-1, -1, messages), persistent, messagesId));
}
......
......@@ -56,6 +56,20 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean {
/** The number of produced messages. */
private long msgCount = 0;
/** The threshold of messages send by the handler in the engine */
private long diff_max = 10;
private long diff_min = 0;
private String ACQ_QUEUE_MAX_MSG = "acquisition.max_msg";
private String ACQ_QUEUE_MIN_MSG = "acquisition.min_msg";
/** The threshold of pending messages in the queue */
private long pending_max = 20;
private long pending_min = 10;
private String ACQ_QUEUE_MAX_PND = "acquisition.max_pnd";
private String ACQ_QUEUE_MIN_PND = "acquisition.min_pnd";
private boolean pause = false;
/** The acquisition class name. */
private String acquisitionClassName;
......@@ -81,6 +95,11 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean {
}
this.properties = properties;
diff_max = Long.parseLong(properties.getProperty(ACQ_QUEUE_MAX_MSG, String.valueOf(diff_max)));
diff_min = Long.parseLong(properties.getProperty(ACQ_QUEUE_MIN_MSG, String.valueOf(diff_min)));
pending_max = Long.parseLong(properties.getProperty(ACQ_QUEUE_MAX_PND, String.valueOf(pending_max)));
pending_min = Long.parseLong(properties.getProperty(ACQ_QUEUE_MIN_PND, String.valueOf(pending_min)));
// Acquisition class name can only be set the first time.
if (firstTime) {
if (properties != null) {
......@@ -110,6 +129,16 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean {
}
public void react(AgentId from, Notification not) throws Exception {
long diff = AcquisitionModule.getCount() - msgCount;
int pending = getPendingMessageCount();
if (!pause && ((diff >= diff_max) || (pending >= pending_max))){
stopHandler(properties);
pause = true;
}
else if (pause && (diff <= diff_min) && (pending <= pending_min)){
startHandler(properties);
pause = false;
}
if (not instanceof AcquisitionNot) {
acquisitionNot((AcquisitionNot) not);
} else {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment