Commit ffbbcfa0 authored by afreyssin's avatar afreyssin
Browse files

Bug fix in flow control between AcquisitionHandler and Acquisition queue (JORAM-220).

Logging enhancements.
parent 89431977
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2010 - 2013 ScalAgent Distributed Technologies
* Copyright (C) 2010 - 2015 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
......@@ -134,9 +134,6 @@ public class AcquisitionModule implements ReliableTransmitter {
/** The task used to launch a new acquisition. */
private AcquisitionTask acquisitionTask;
/** The number of transmitted messages */
private static volatile long transmitCounter = 0;
/**
* Tells if acquisition is done on-demand using the acquisition task or with a
......@@ -226,13 +223,16 @@ public class AcquisitionModule implements ReliableTransmitter {
this.expiration = expiration;
}
/** The number of transmitted messages */
private volatile long transmitCounter = 0;
/**
* Returns the number of transmitted messages
* Be careful this counter is reseted at each time the server starts.
*
* @return the number of transmitted messages
*/
public static long getCount(){
public long getCount(){
return transmitCounter;
}
......
......@@ -194,31 +194,37 @@ public class AcquisitionQueue extends Queue implements AcquisitionQueueMBean {
* @return the number of messages acquired by the acquisition handler.
*/
public final long getAcquiredMsgCount() {
return AcquisitionModule.getCount();
if (acquisitionModule != null)
return acquisitionModule.getCount();
return 0L;
}
private transient long acquisitionNotNb = 0;
public void react(AgentId from, Notification not) throws Exception {
try {
long diff = AcquisitionModule.getCount() - acquisitionNotNb;
long diff = getAcquiredMsgCount() - acquisitionNotNb;
int pending = getPendingMessageCount();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "AcquisitionQueue.react: " + pause + ", " + diff + ", " + pending);
logger.log(BasicLevel.DEBUG,
"AcquisitionQueue.react: " + pause + ", " + diff + ", " + pending);
if (!pause &&
(((diff_max > 0) && (diff >= diff_max)) ||
((pending_max > 0) && (pending >= pending_max)))) {
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "AcquisitionQueue.react: stopHandler " + diff + '/' + pending);
stopHandler(properties);
pause = true;
} else if (pause && (diff <= diff_min) && (pending <= pending_min)){
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "AcquisitionQueue.react: startHandler " + diff + '/' + pending);
startHandler(properties);
pause = false;
}
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "AcquisitionQueue.react: " + pause + ", " + diff + ", " + pending);
} catch (Throwable t) {
logger.log(BasicLevel.ERROR, "AcquisitionQueue: error in react.", t);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment