Commit 2d2d142b authored by David Feliot's avatar David Feliot
Browse files

JORAM-172, JORAM-193: flow control; network extension enabled.

parent da962d3a
......@@ -666,6 +666,13 @@ public abstract class Network implements MessageConsumer, NetworkMBean {
// Push it in "ready to deliver" queue.
qout.push(msg);
nbMessageOut += 1;
// Participate in potential flow control
if (msg.getNot() instanceof CallbackNotification) {
CallbackNotification callbackNotification = (CallbackNotification) msg
.getNot();
callbackNotification.done();
}
}
/**
......@@ -685,6 +692,13 @@ public abstract class Network implements MessageConsumer, NetworkMBean {
// Push it in "ready to deliver" queue.
qout.pushAndValidate(msg);
nbMessageOut += 1;
// Participate in potential flow control
if (msg.getNot() instanceof CallbackNotification) {
CallbackNotification callbackNotification = (CallbackNotification) msg
.getNot();
callbackNotification.done();
}
}
/**
......@@ -723,14 +737,14 @@ public abstract class Network implements MessageConsumer, NetworkMBean {
}
/** The message can be delivered. */
static final int DELIVER = 0;
protected static final int DELIVER = 0;
// /**
// * There is other message in the causal ordering before this one.
// * This cannot happened with a FIFO ordering.
// */
// static final int WAIT_TO_DELIVER = 1;
/** The message has already been delivered. */
static final int ALREADY_DELIVERED = 2;
protected static final int ALREADY_DELIVERED = 2;
/**
* Test if a received message with the specified clock must be
......@@ -745,7 +759,7 @@ public abstract class Network implements MessageConsumer, NetworkMBean {
* @return <code>DELIVER</code>, <code>ALREADY_DELIVERED</code>,
* or <code>WAIT_TO_DELIVER</code> code.
*/
private synchronized int testRecvUpdate(short source, int update) throws IOException {
protected synchronized int testRecvUpdate(short source, int update) throws IOException {
int fromIdx = index(source);
if (update > stamp[fromIdx]) {
......@@ -940,6 +954,59 @@ public abstract class Network implements MessageConsumer, NetworkMBean {
return averageLoadTask.getAverageLoad15();
}
protected void deleteMessage(Message msg) {
msg.delete();
msg.free();
}
protected short getMessageSource(Message msg) {
return msg.getSource();
}
protected void setMessageSource(Message msg, short source) {
msg.source = source;
}
protected short getMessageDest(Message msg) {
return msg.getDest();
}
protected void checkActive(ServerDesc desc) {
if (! desc.active) {
desc.active = true;
desc.retry = 0;
}
}
protected void postMessage(Message msg) throws Exception {
Channel.post(msg);
}
protected void channelPostAndValidate(Message msg) throws Exception {
Channel.postAndValidate(msg);
}
protected void saveChannel() throws Exception {
Channel.save();
}
protected void validateChannel() {
Channel.validate();
}
protected void prepareMessage(Message msg) throws Exception {
short to = AgentServer.getServerDesc(msg.getTo().getTo()).getGateway();
// Allocates a new timestamp. Be careful, if the message needs to be
// routed we have to use the next destination in timestamp generation.
msg.source = AgentServer.getServerId();
msg.dest = to;
msg.stamp = getSendUpdate(to);
// Saves the message.
msg.save();
}
class NetworkAverageLoadTask extends AverageLoadTask {
public NetworkAverageLoadTask(Timer timer) {
start(timer);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment