Commit d4979351 authored by David Feliot's avatar David Feliot
Browse files

Producer message flow control

parent 36590941
...@@ -71,9 +71,7 @@ public class MemoryController { ...@@ -71,9 +71,7 @@ public class MemoryController {
long currentSize = memorySize.addAndGet(delta); long currentSize = memorySize.addAndGet(delta);
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "memorySize=" + memorySize.get()); logger.log(BasicLevel.DEBUG, "memorySize=" + memorySize.get());
if (delta < 0) { checkMemory(currentSize);
checkMemory(currentSize);
}
} }
private void checkMemory(long currentSize) { private void checkMemory(long currentSize) {
......
...@@ -390,6 +390,8 @@ public final class Message implements Serializable, MessageView, TransactionObje ...@@ -390,6 +390,8 @@ public final class Message implements Serializable, MessageView, TransactionObje
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Message.save:" + txid); logger.log(BasicLevel.DEBUG, "Message.save:" + txid);
if (!isPersistent()) return;
// JORAM_PERF_BRANCH // JORAM_PERF_BRANCH
int bodySize; int bodySize;
if (msg.body != null) { if (msg.body != null) {
...@@ -399,8 +401,6 @@ public final class Message implements Serializable, MessageView, TransactionObje ...@@ -399,8 +401,6 @@ public final class Message implements Serializable, MessageView, TransactionObje
} }
MemoryController.getMemoryController().add(bodySize); MemoryController.getMemoryController().add(bodySize);
if (!isPersistent()) return;
if (soft) { if (soft) {
byte[] body = msg.body; byte[] body = msg.body;
// sets the body to null to save it in an other file // sets the body to null to save it in an other file
...@@ -464,6 +464,8 @@ public final class Message implements Serializable, MessageView, TransactionObje ...@@ -464,6 +464,8 @@ public final class Message implements Serializable, MessageView, TransactionObje
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Message.delete:" + txid); logger.log(BasicLevel.DEBUG, "Message.delete:" + txid);
if (!isPersistent()) return;
// JORAM_PERF_BRANCH // JORAM_PERF_BRANCH
int bodySize; int bodySize;
if (msg.body != null) { if (msg.body != null) {
...@@ -472,8 +474,6 @@ public final class Message implements Serializable, MessageView, TransactionObje ...@@ -472,8 +474,6 @@ public final class Message implements Serializable, MessageView, TransactionObje
bodySize = 0; bodySize = 0;
} }
MemoryController.getMemoryController().add(-bodySize); MemoryController.getMemoryController().add(-bodySize);
if (!isPersistent()) return;
AgentServer.getTransaction().delete(txid); AgentServer.getTransaction().delete(txid);
if (soft) { if (soft) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment