Commit fa64cac7 authored by Andre Freyssinet's avatar Andre Freyssinet
Browse files

Fixes Joram##314368.

Inserts messages in right order after a stamp reset.
parent 5fe33012
......@@ -45,6 +45,7 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.ow2.joram</groupId>
<artifactId>a3-common</artifactId>
......
......@@ -795,7 +795,7 @@ class Engine implements Runnable, AgentEngine, EngineMBean {
* @param msg the message
*/
public void insert(Message msg) {
qin.insert(msg);
qin.insert(msg, this);
}
/**
......@@ -920,15 +920,20 @@ class Engine implements Runnable, AgentEngine, EngineMBean {
throw new IllegalStateException();
}
protected final int getStamp() {
/**
* Gets the current value of stamp counter.
*
* @return the current value of stamp counter.
*/
private final int getStamp() {
return stamp;
}
protected final void setStamp(int stamp) {
modified = true;
this.stamp = stamp;
}
/**
* Sets the stamp of the given message.
*
* @param msg The message to stamp.
*/
protected final void stamp(Message msg) {
if (msg.isPersistent())
// If the message is transient there is no need to save the stamp counter.
......@@ -940,6 +945,20 @@ class Engine implements Runnable, AgentEngine, EngineMBean {
stamp = 0;
}
@Override
public boolean isPrior(Message m1, Message m2) {
if (((m1.getStamp() <= stamp) && (m2.getStamp() <= stamp)) ||
((m1.getStamp() > stamp) && (m2.getStamp() > stamp))) {
// The 2 messages were created in the same stamp generation, check the relative order.
return (m1.getStamp() < m2.getStamp());
} else if ((m1.getStamp() > stamp) && (m2.getStamp() <= stamp)) {
// The first message was created after the buffer was reset.
// The second to insert was created before the buffer was reset, it is older then insert it.
return true;
}
return false;
}
/**
* Adds a message in "ready to deliver" list. This method allocates a
* new time stamp to the message ; be Careful, changing the stamp imply
......
/*
* Copyright (C) 2001 - 2020 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2022 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 BULL
* Copyright (C) 1996 - 2000 INRIA
*
......@@ -24,7 +24,7 @@ import java.io.IOException;
/**
* The parent interface for all messages consumers (see {@link Engine}, {@link Network}, etc.).
*/
public interface MessageConsumer {
public interface MessageConsumer extends MessageComparator {
/**
* Returns this <code>MessageConsumer</code>'s name.
*
......
/*
* Copyright (C) 2001 - 2020 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2022 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 BULL
* Copyright (C) 1996 - 2000 INRIA
*
......@@ -34,10 +34,14 @@ public interface MessageQueue {
/**
* Insert a message in the queue, it should only be used during
* initialization for restoring the queue state.
*
* This algorithm sould take into account the reinitialization of the stamp and makes
* it possible to suitably sort messages having been created before this reset.
*
* @param item the message to be pushed onto this queue.
* @param item the message to be pushed onto this queue.
* @param comparator the MessageComparator interface of MessageConsumer.
*/
public void insert(Message item);
public void insert(Message item, MessageComparator comparator);
/**
* Pushes a message onto the bottom of this queue. It should only
......
......@@ -81,26 +81,30 @@ final class MessageVector implements MessageQueue {
count = 0;
validated = 0;
}
/**
* Insert a message in the queue, it should only be used during
* initialization for restoring the queue state.
* Insert a message in the queue, it should only be used during initialization for
* restoring the queue state.
*
* This algorithm takes into account the reinitialization of the stamp and makes it
* possible to suitably sort messages having been created before this reset.
*
* @param item the message to be pushed onto this queue.
* @param item the message to be pushed onto this queue.
* @param comparator the MessageComparator interface of MessageConsumer.
*/
public synchronized void insert(Message item) {
public synchronized void insert(Message item, MessageComparator comparator) {
if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
logmon.log(BasicLevel.DEBUG, logmsg + "insert(" + item + ")");
int i = 0;
for (; i<validated; i++) {
Message msg = getMessageAt(i);
if (item.getStamp() < msg.getStamp()) break;
if (comparator.isPrior(item, msg)) break;
}
insertMessageAt(item, i);
validated += 1;
}
/**
* Pushes a message onto the bottom of this queue. It should only
* be used during a transaction. The item will be really available
......@@ -523,7 +527,7 @@ final class MessageVector implements MessageQueue {
/**
* True if the tracking of the distribution of messages type is allowed.
*/
private static boolean msgTypesTracking = AgentServer.getBoolean(MSG_TYPES_TRACKING);
private static final boolean msgTypesTracking = AgentServer.getBoolean(MSG_TYPES_TRACKING);
static class Counter {
int total = 1;
......
/*
* Copyright (C) 2001 - 2021 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2022 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 BULL
* Copyright (C) 1996 - 2000 INRIA
*
......@@ -353,7 +353,10 @@ public abstract class Network implements MessageConsumer, NetworkMBean {
* @param msg the message
*/
public void insert(Message msg) {
qout.insert(msg); nbMessageOut += 1;
// TODO (AF): /!\ Be careful, the insertion of the message is not done in the right order
// if there is a reset of the buffer!! Needs a specific MessageComparator.
qout.insert(msg, this);
nbMessageOut += 1;
}
/**
......@@ -795,10 +798,19 @@ public abstract class Network implements MessageConsumer, NetworkMBean {
*/
private synchronized int getSendUpdate(short to) throws IOException {
int update = stamp[idxLS] +1;
if (stamp[idxLS] == Integer.MAX_VALUE)
stamp[idxLS] = 0;
updateStamp(idxLS, update);
return update;
}
@Override
public boolean isPrior(Message m1, Message m2) {
// TODO (AF): This simple implementation could not work if there is a stamp reset (see Joram#314368).
return (m1.getStamp() < m2.getStamp());
}
final int getBootTS() {
return bootTS[idxLS];
}
......
/*
* Copyright (C) 2001 - 2020 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2022 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 BULL
* Copyright (C) 1996 - 2000 INRIA
*
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2022 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package fr.dyade.aaa.agent;
import org.junit.Assert;
import org.junit.Test;
import fr.dyade.aaa.common.EmptyQueueException;
/**
* Tests the initialization of the Engine.
* More specifically tests the order of messages in the queue during initialization.
*/
public class EngineInitTest {
/**
* Engine simulation.
* Be careful, insert, validate and isPrior methods needs to be identical to real Engine.
*/
class EngineComparator implements MessageComparator {
int stamp;
MessageQueue qin;
public MessageQueue getQueue() {
return qin;
}
EngineComparator(int stamp) {
this.stamp = stamp;
qin = new MessageVector("test", false);
}
public void insert(Message msg) {
qin.insert(msg, this);
}
public void validate() {
qin.validate();
}
public boolean isPrior(Message m1, Message m2) {
if (((m1.getStamp() <= stamp) && (m2.getStamp() <= stamp)) ||
((m1.getStamp() > stamp) && (m2.getStamp() > stamp))) {
// The 2 messages were created in the same stamp generation, check the relative order.
return (m1.getStamp() < m2.getStamp());
} else if ((m1.getStamp() > stamp) && (m2.getStamp() <= stamp)) {
// The first message was created after the buffer was reset.
// The second to insert was created before the buffer was reset, it is older then insert it.
return true;
}
return false;
}
}
/**
* Simple test with message insertion in bad order.
*/
@Test
public void test1() throws Exception {
EngineComparator engine = new EngineComparator(10);
Message msg = Message.alloc();
msg.stamp = 6;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = 4;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = 5;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = 10;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = 9;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = 8;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = 7;
engine.insert(msg);
engine.validate();
MessageQueue qin = engine .getQueue();
int expected = 4;
while (expected < 11) {
msg = qin.pop();
Assert.assertTrue("Expected=" + expected + ", Received=" + msg.stamp, msg.stamp == expected);
expected += 1;
}
try {
msg = qin.pop();
Assert.assertTrue("Queue should be empty: " + msg, false);
} catch (Throwable exc) {
Assert.assertTrue("Queue should be empty: " + msg, exc instanceof EmptyQueueException);
}
}
/**
* Test with stamp reinitialization.
*/
@Test
public void test2() throws Exception {
EngineComparator engine = new EngineComparator(10);
Message msg = Message.alloc();
msg.stamp = 3;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = 1;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = Integer.MAX_VALUE-2;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = 5;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = Integer.MAX_VALUE-3;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = 4;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = Integer.MAX_VALUE;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = 2;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = Integer.MAX_VALUE-1;
engine.insert(msg);
engine.validate();
MessageQueue qin = engine .getQueue();
int[] expected = new int[] { Integer.MAX_VALUE-3, Integer.MAX_VALUE-2, Integer.MAX_VALUE-1, Integer.MAX_VALUE, 1, 2, 3, 4, 5 };
int nb = 0;
while (nb < 9) {
msg = qin.pop();
Assert.assertTrue("Expected=" + expected[nb] + ", Received=" + msg.stamp, msg.stamp == expected[nb]);
nb += 1;
}
try {
msg = qin.pop();
Assert.assertTrue("Queue should be empty: " + msg, false);
} catch (Throwable exc) {
Assert.assertTrue("Queue should be empty: " + msg, exc instanceof EmptyQueueException);
}
}
/**
* Test with stamp reinitialization.
*/
@Test
public void test3() throws Exception {
EngineComparator engine = new EngineComparator(10);
Message msg = Message.alloc();
msg.stamp = 3;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = Integer.MAX_VALUE-2;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = 0;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = 4;
engine.insert(msg);
msg = Message.alloc();
msg.stamp = Integer.MAX_VALUE-1;
engine.insert(msg);
engine.validate();
MessageQueue qin = engine .getQueue();
int[] expected = new int[] { Integer.MAX_VALUE-2, Integer.MAX_VALUE-1, 0, 3, 4 };
int nb = 0;
while (nb < 5) {
msg = qin.pop();
Assert.assertTrue("Expected=" + expected[nb] + ", Received=" + msg.stamp, msg.stamp == expected[nb]);
nb += 1;
}
try {
msg = qin.pop();
Assert.assertTrue("Queue should be empty: " + msg, false);
} catch (Throwable exc) {
Assert.assertTrue("Queue should be empty: " + msg, exc instanceof EmptyQueueException);
}
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment