Commit 2269a905 authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

Add tests.

parent e2f3c4db
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" output="target/classes" path="src/main/java"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/J2SE-1.5"/>
<classpathentry kind="src" path="src/test/java"/>
<classpathentry kind="src" path="src/test/resources"/>
<classpathentry kind="con" path="org.maven.ide.eclipse.MAVEN2_CLASSPATH_CONTAINER"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry kind="output" path="target/classes"/>
</classpath>
......@@ -20,6 +20,7 @@
<properties>
<joram.version>5.6.0-SNAPSHOT</joram.version>
<paxexamversion>2.1.0</paxexamversion>
</properties>
<build>
......@@ -53,6 +54,70 @@
<groupId>org.objectweb.joram</groupId>
<artifactId>a3-rt</artifactId>
<version>${joram.version}</version>
<exclusions>
<exclusion>
<artifactId>org.osgi</artifactId>
<groupId>org.osgi</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.objectweb.joram</groupId>
<artifactId>joram-mom-extensions-amqp</artifactId>
<version>${joram.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>2.4.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
<!-- Pax Exam Dependencies -->
<dependency>
<groupId>org.ops4j.pax.exam</groupId>
<artifactId>pax-exam-container-paxrunner</artifactId>
<version>${paxexamversion}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.ops4j.pax.exam</groupId>
<artifactId>pax-exam-link-mvn</artifactId>
<version>${paxexamversion}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.ops4j.pax.exam</groupId>
<artifactId>pax-exam-testforge</artifactId>
<version>${paxexamversion}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.6.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.ops4j.pax.runner</groupId>
<artifactId>pax-runner-no-jcl</artifactId>
<version>1.7.2</version>
<scope>test</scope>
<type>bundle</type>
</dependency>
</dependencies>
......
package org.ow2.joram.mom.amqp.tests;
import org.junit.After;
import org.junit.Before;
import org.ow2.joram.mom.amqp.tests.classic.SCAdminClassic;
import org.ow2.joram.mom.amqp.tests.osgi.SCAdminOSGi;
public class BaseTst {
protected SCAdmin admin;
public BaseTst() {
String className = System.getProperty("TestFramework", SCAdminOSGi.class.getName());
try {
admin = (SCAdmin) Class.forName(className).newInstance();
} catch (Exception exc) {
System.out.println("Error instantiating framework SCAdmin class, use default.");
admin = new SCAdminClassic();
}
}
@Before
public void start() throws Exception {
admin.cleanRunDir();
admin.startAgentServer((short) 0);
}
@After
public void stop() throws Exception {
admin.stopAgentServer((short) 0);
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.ow2.joram.mom.amqp.tests;
import java.io.IOException;
import org.junit.Assert;
import org.junit.Test;
import org.objectweb.joram.mom.dest.amqp.LiveServerConnection;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.QueueingConsumer;
public class PersistenceKillTest extends BaseTst {
private LiveServerConnection senderConnection;
private LiveServerConnection receiverConnection;
private Object lock = new Object();
long nbRounds = 200;
long nbMsgRound = 100;
@Test
public void killingTest() throws Exception {
senderConnection = new LiveServerConnection("sender", "localhost", 5672, null, null);
receiverConnection = new LiveServerConnection("receiver", "localhost", 5672, null, null);
senderConnection.startLiveConnection();
receiverConnection.startLiveConnection();
Channel senderChannel = senderConnection.getConnection().createChannel();
senderChannel.txSelect();
DeclareOk declareOk = senderChannel.queueDeclare("testQueue", true, false, false, null);
new Thread(new Runnable() {
int received;
long totalReceived;
Channel consumerChannel;
QueueingConsumer consumer;
// Consumer thread
public void run() {
try {
consumerChannel = receiverConnection.getConnection().createChannel();
consumerChannel.txSelect();
consumer = new QueueingConsumer(consumerChannel);
consumerChannel.basicConsume("testQueue", false, consumer);
} catch (Exception exc) {
exc.printStackTrace();
}
while (true) {
QueueingConsumer.Delivery delivery;
try {
delivery = consumer.nextDelivery();
long receivedNb = Long.parseLong(new String(delivery.getBody()));
consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);
try {
Thread.sleep(1);
} catch (InterruptedException exc1) {
}
if (receivedNb < totalReceived) {
System.out.println("Duplicate received: " + receivedNb);
continue;
}
// We can receive duplicates but can't miss one message
// One duplicate if the channel is transacted, multiple if it is not
Assert.assertEquals(totalReceived, receivedNb);
totalReceived++;
received++;
consumerChannel.txCommit();
if (received == nbMsgRound) {
received = 0;
synchronized (lock) {
lock.notify();
}
}
if (totalReceived == nbMsgRound * nbRounds) {
consumer.getChannel().close();
return;
}
} catch (Exception ie) {
if (totalReceived == nbRounds * nbMsgRound) {
return;
}
System.out.println("Consumer connection broken. Reconnect.");
while (!receiverConnection.isConnectionOpen()) {
try {
Thread.sleep(100);
} catch (InterruptedException exc) {
exc.printStackTrace();
}
}
try {
consumerChannel = receiverConnection.getConnection().createChannel();
consumerChannel.txSelect();
consumer = new QueueingConsumer(consumerChannel);
consumerChannel.basicConsume("testQueue", false, consumer);
} catch (IOException exc) {
exc.printStackTrace();
}
System.out.println("Consumer Reconnected --- totalReceived = " + totalReceived);
continue;
}
}
}
}).start();
long start = System.nanoTime();
// Killer thread
new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(5000);
System.out.println("Kill server");
admin.killAgentServer((short) 0);
admin.startAgentServer((short) 0);
} catch (Exception exc) {
exc.printStackTrace();
}
}
}).start();
// Sender
for (int i = 0; i < nbRounds; i++) {
if (i % 20 == 0) {
long delta = System.nanoTime() - start;
System.out.println("Round " + i + " " + ((i * nbMsgRound * 1000000000L) / delta) + " msg/s");
}
try {
for (int j = 0; j < nbMsgRound; j++) {
senderChannel.basicPublish("", declareOk.getQueue(), MessageProperties.PERSISTENT_BASIC, new Long(i
* nbMsgRound + j).toString().getBytes());
}
synchronized (lock) {
senderChannel.txCommit();
lock.wait();
}
} catch (Exception exc) {
i--;
System.out.println("Sender connection broken. Reconnect.");
while (!senderConnection.isConnectionOpen()) {
Thread.sleep(100);
}
senderChannel = senderConnection.getConnection().createChannel();
senderChannel.txSelect();
System.out.println("Sender Reconnected");
Thread.sleep(1000);
System.out.println("Restart Sender");
}
}
long delta = System.nanoTime() - start;
System.out.println(delta / 1000000L + " ms");
System.out.println(((nbRounds * nbMsgRound * 1000000000L) / delta) + " msg/s");
senderChannel.queueDelete(declareOk.getQueue());
senderChannel.close();
senderConnection.stopLiveConnection();
receiverConnection.stopLiveConnection();
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.ow2.joram.mom.amqp.tests;
import org.junit.Assert;
import org.junit.Test;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
public class PersistenceSimpleTest extends BaseTst {
@Test
public void recover1() throws Exception {
ConnectionFactory cnxFactory = new ConnectionFactory();
Connection connection = cnxFactory.newConnection();
Channel channel = connection.createChannel();
DeclareOk declareOk = channel.queueDeclare("testqueue", true, false, false, null);
channel.txSelect();
for (int i = 0; i < 5; i++) {
channel.basicPublish("", declareOk.getQueue(), MessageProperties.PERSISTENT_BASIC,
"this is a test message !!!".getBytes());
}
channel.txCommit();
admin.killAgentServer((short) 0);
admin.startAgentServer((short) 0);
connection = cnxFactory.newConnection();
channel = connection.createChannel();
declareOk = channel.queueDeclarePassive("testqueue");
for (int i = 0; i < 5; i++) {
GetResponse msg = channel.basicGet("testqueue", true);
Assert.assertNotNull(msg);
Assert.assertEquals("this is a test message !!!", new String(msg.getBody()));
}
GetResponse msg = channel.basicGet("testqueue", true);
Assert.assertNull(msg);
channel.queueDelete(declareOk.getQueue());
}
@Test
public void recover2() throws Exception {
ConnectionFactory cnxFactory = new ConnectionFactory();
Connection connection = cnxFactory.newConnection();
Channel channel = connection.createChannel();
DeclareOk declareOk = channel.queueDeclare("testqueue", true, false, false, null);
channel.txSelect();
for (int i = 0; i < 5; i++) {
channel.basicPublish("", declareOk.getQueue(), MessageProperties.PERSISTENT_BASIC,
"this is a test message !!!".getBytes());
}
channel.txCommit();
admin.killAgentServer((short) 0);
admin.startAgentServer((short) 0);
Thread.sleep(500);
connection = cnxFactory.newConnection();
channel = connection.createChannel();
declareOk = channel.queueDeclarePassive("testqueue");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(declareOk.getQueue(), true, consumer);
for (int i = 0; i < 5; i++) {
Delivery msg = consumer.nextDelivery(1000);
Assert.assertNotNull(msg);
Assert.assertEquals("this is a test message !!!", new String(msg.getBody()));
}
Delivery msg = consumer.nextDelivery(1000);
Assert.assertNull(msg);
channel.queueDelete(declareOk.getQueue());
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.ow2.joram.mom.amqp.tests;
import org.junit.Assert;
import org.junit.Test;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class QueueTest extends BaseTst {
@Test
public void queueDeclareName() throws Exception {
ConnectionFactory cnxFactory = new ConnectionFactory();
Connection connection = cnxFactory.newConnection();
Channel channel = connection.createChannel();
DeclareOk declareOk = channel.queueDeclare("testqueue", true, false, false, null);
Assert.assertEquals("testqueue", declareOk.getQueue());
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2009 - 2011 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.ow2.joram.mom.amqp.tests;
import java.io.File;
/**
* Simplified Admin interface used to manage the start and the stop of non
* collocated AgentServers.
*/
public interface SCAdmin {
public static final String TEST_RESOURCES_DIR = System.getProperty("user.dir") + "/target/test-classes/";
public static final String A3DEBUG_LOCATION = new File(TEST_RESOURCES_DIR + "a3debug.cfg").getAbsolutePath();
public static final String A3SERVERS_LOCATION = new File(TEST_RESOURCES_DIR + "a3servers.xml").getAbsolutePath();
public static final String RUNNING_DIR = System.getProperty("user.dir") + "/target/run";
public static final String JORAM_VERSION = "5.6.0-SNAPSHOT";
/**
* Starts a new AgentServer.
*
* @param sid
* id of the server to start
*/
public void startAgentServer(short sid) throws Exception;
/**
* Kills a given AgentServer. The server must have been started with this
* Admin interface.
*
* @param sid
* id of the server to kill
*/
public void killAgentServer(short sid) throws Exception;
/**
* Cleanly stops an AgentServer. The server must have been started with this
* Admin interface.
*
* @param sid
* id of the server to stop
*/
public void stopAgentServer(short sid) throws Exception;
/**
* Cleans run directory
*/
public void cleanRunDir() throws Exception;
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2009 - 2011 ScalAgent Distributed Technologies