Commit c3549cd9 authored by afreyssin's avatar afreyssin
Browse files

Implements flow control mechanism between AcquisitionModule and topic (JORAM-223).

parent 81bd4b8a
......@@ -56,6 +56,40 @@ public class AcquisitionTopic extends Topic implements AcquisitionTopicMBean {
/** The number of produced messages. */
private long msgCount = 0;
/** The threshold of messages send by the handler in the engine */
private long diff_max = 20;
private long diff_min = 10;
private final String ACQ_TOPIC_MAX_MSG = "acquisition.max_msg";
private final String ACQ_TOPIC_MIN_MSG = "acquisition.min_msg";
/**
* Returns the maximum number of acquired messages waiting to be handled by
* the destination. When the number of messages waiting to be handled is greater
* the acquisition handler is temporarily stopped.
* <p>
* A value lesser or equal to 0 disables the mechanism.
*
* @return the maximum number of acquired messages waiting to be handled by
* the destination.
*/
public final long getDiffMax() {
return diff_max;
}
/**
* Returns the minimum threshold of acquired messages waiting to be handled by
* the destination for restarting the acquisition handler.
*
* @return the minimum threshold of acquired messages waiting to be handled by
* the destination.
*/
public final long getDiffMin() {
return diff_min;
}
private boolean pause = false;
/** The acquisition class name. */
private String acquisitionClassName;
......@@ -81,7 +115,16 @@ public class AcquisitionTopic extends Topic implements AcquisitionTopicMBean {
logger.log(BasicLevel.DEBUG, "AcquisitionTopic.setProperties prop = " + properties);
}
this.properties = properties;
diff_max = Long.parseLong(properties.getProperty(ACQ_TOPIC_MAX_MSG, String.valueOf(diff_max)));
diff_min = Long.parseLong(properties.getProperty(ACQ_TOPIC_MIN_MSG, String.valueOf(diff_min)));
if (diff_max < 2) diff_max = 2;
if (diff_min >= diff_max) diff_min = diff_max -2;
if (diff_min < 0) diff_min = 0;
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "AcquisitionTopic.setProperties -> " + diff_min + '/' + diff_max);
// Acquisition class name can only be set the first time.
if (firstTime) {
if (properties != null) {
......@@ -111,9 +154,49 @@ public class AcquisitionTopic extends Topic implements AcquisitionTopicMBean {
acquisitionModule = new AcquisitionModule(this, acquisitionClassName, properties);
}
}
/**
* Returns the number of messages acquired by the acquisition handler.
* Be careful this counter is reseted at each time the server starts.
*
* @return the number of messages acquired by the acquisition handler.
*/
public final long getAcquiredMsgCount() {
if (acquisitionModule != null)
return acquisitionModule.getCount();
return 0L;
}
private transient long acquisitionNotNb = 0;
public void react(AgentId from, Notification not) throws Exception {
try {
long diff = getAcquiredMsgCount() - acquisitionNotNb;
if (not instanceof AcquisitionNot) diff -= 1;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
getId() + " - AcquisitionTopic.react(" + not + ") -> " + pause + ", " + diff);
if (!pause && ((diff_max > 0) && (diff >= diff_max))) {
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "AcquisitionTopic.react: stopHandler " + diff);
stopHandler(properties);
pause = true;
} else if (pause && (diff <= diff_min)) {
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "AcquisitionTopic.react: startHandler " + diff);
startHandler(properties);
pause = false;
}
} catch (Throwable t) {
logger.log(BasicLevel.ERROR, "AcquisitionTopic: error in react.", t);
}
if (not instanceof AcquisitionNot) {
acquisitionNotNb += 1;
acquisitionNot(from, (AcquisitionNot) not);
} else {
super.react(from, not);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment