Commit 24f87ec8 authored by afreyssin's avatar afreyssin
Browse files

Adds flow control when the engine queue is overloaded.

parent 3bff0fab
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2004 - 2012 ScalAgent Distributed Technologies
* Copyright (C) 2004 - 2013 ScalAgent Distributed Technologies
* Copyright (C) 2004 France-Telecom R&D
*
* This library is free software; you can redistribute it and/or
......@@ -32,6 +32,7 @@ import org.objectweb.joram.mom.dest.AdminTopic;
import org.objectweb.joram.mom.notifications.GetProxyIdNot;
import org.objectweb.joram.shared.client.AbstractJmsRequest;
import org.objectweb.joram.shared.client.JmsRequestGroup;
import org.objectweb.joram.shared.client.CommitRequest;
import org.objectweb.joram.shared.client.ProducerMessages;
import org.objectweb.joram.shared.security.Identity;
import org.objectweb.util.monolog.api.BasicLevel;
......@@ -51,16 +52,72 @@ public class ConnectionManager implements ConnectionManagerMBean {
/** logger */
public static Logger logger = Debug.getLogger(ConnectionManager.class.getName());
/**
* Name of property allowing to activate the synchronization mode of multiples
* connections.
* <p>
* This mode allows to pack commands that occurs in a same time in order to minimize
* the number of transactions.
* <p>
* This property can be fixed either from <code>java</code> launching command or
* a3servers.xml configuration file.
*/
public static final String MULTI_CNX_SYNC =
"org.objectweb.joram.mom.proxies.ConnectionManager.multiCnxSync";
/**
* True if the synchronization mode is activated.
*/
private static boolean multiCnxSync = AgentServer.getBoolean(MULTI_CNX_SYNC);
/**
* Name of property allowing to configure the synchronization mode of multiples
* connections.
* <p>
* This property allows to define the duration of instant to pack the commands.
* <p>
* This property can be fixed either from <code>java</code> launching command or
* a3servers.xml configuration file.
*/
public static final String MULTI_CNX_SYNC_DELAY =
"org.objectweb.joram.mom.proxies.ConnectionManager.multiCnxSyncDelay";
private static boolean multiCnxSync = AgentServer.getBoolean(MULTI_CNX_SYNC);
/**
* Duration in ms of instant to pack the commands.
*/
private static long multiThreadSyncDelay =
AgentServer.getLong(MULTI_CNX_SYNC_DELAY, 1).longValue();
/**
* Name of property allowing to define the threshold beyond which the flow-control
* of incoming messages is activated. Default value is 25.
* <p>
* This property can be fixed either from <code>java</code> launching command or
* a3servers.xml configuration file.
*/
public static final String CTRLFLOW_THRESHOLD =
"org.objectweb.joram.mom.proxies.ConnectionManager.CtrlFlowThreshold";
/**
* Threshold beyond which the flow-control of incoming messages is activated.
*/
private static int ctrlFlowThreshold = AgentServer.getInteger(CTRLFLOW_THRESHOLD, 25).intValue();
/**
* Name of property allowing to define the average throughput of the server for the
* calculation of flow control. Default value is 100000.
* <p>
* This property can be fixed either from <code>java</code> launching command or
* a3servers.xml configuration file.
*/
public static final String CTRLFLOW_THROUGHPUT =
"org.objectweb.joram.mom.proxies.ConnectionManager.CtrlFlowThroughput";
/**
* Definition of average delay for the implementation of flow control (it is computed
* from the CTRLFLOW_THROUGHPUT parameter.
*/
private static long ctrlFlowDelay = 1000000000L/AgentServer.getLong(CTRLFLOW_THROUGHPUT, 100000L).longValue();
private static final String MBEAN_NAME = "type=Connection";
......@@ -73,8 +130,7 @@ public class ConnectionManager implements ConnectionManagerMBean {
/** Unique ConnectionManager instance. */
private static ConnectionManager currentInstance;
public static final void sendToProxy(AgentId proxyId, int cnxKey,
AbstractJmsRequest req, Object msg) {
public static final void sendToProxy(AgentId proxyId, int cnxKey, AbstractJmsRequest req, Object msg) {
RequestNot rn = new RequestNot(cnxKey, msg);
if (multiCnxSync
&& (req instanceof ProducerMessages ||
......@@ -82,9 +138,22 @@ public class ConnectionManager implements ConnectionManagerMBean {
MultiCnxSync mcs = ConnectionManager.getMultiCnxSync(proxyId);
mcs.send(rn);
} else {
if ((req instanceof CommitRequest) || (req instanceof ProducerMessages)) {
int load = AgentServer.getEngineLoad();
if (load > ctrlFlowThreshold) {
try {
long delay = load * ctrlFlowDelay;
Thread.sleep(delay/1000000, (int) (delay%1000000));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Channel.sendTo(proxyId, rn);
}
if (req instanceof ProducerMessages) {
if ((inFlow != -1) && (req instanceof ProducerMessages)) {
FlowControl.flowControl();
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment