Commit c1b56e7f authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

Add distribution tests.

parent 9b975e4f
This diff is collapsed.
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package joram.distribution;
import java.util.Properties;
import org.objectweb.joram.mom.dest.AcquisitionHandler;
import org.objectweb.joram.mom.dest.ReliableTransmitter;
public class AcquisitionHandlerTest implements AcquisitionHandler {
public void close() {
}
public void retrieve(ReliableTransmitter trsmiter) throws Exception {
trsmiter.transmit(DistributionHandlerTest.getAllMessages(), null);
}
public void setProperties(Properties arg0) {
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package joram.distribution;
import java.util.Properties;
import org.objectweb.joram.mom.dest.AcquisitionHandler;
import org.objectweb.joram.mom.dest.ReliableTransmitter;
public class AcquisitionHandlerTest2 implements AcquisitionHandler {
public void close() {
}
public void retrieve(ReliableTransmitter trsmiter) throws Exception {
trsmiter.transmit(DistributionHandlerTest2.getAllMessages(), null);
}
public void setProperties(Properties arg0) {
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package joram.distribution;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.objectweb.joram.mom.dest.DistributionHandler;
import org.objectweb.joram.shared.messages.Message;
public class DistributionHandlerTest implements DistributionHandler {
private static List messages = new ArrayList();
public void close() {
}
// distribute the message correctly the fifth time.
public void distribute(Message msg) throws Exception {
if (msg.deliveryCount < 5) {
throw new Exception("Message could not be delivered.");
}
msg.setText("test distribution - MODIFIED");
addMessage(msg);
}
public void init(Properties props) {
}
private static synchronized void addMessage(Message msg) {
messages.add(msg);
}
public static synchronized List getAllMessages() {
List result = messages;
messages = new ArrayList();
return result;
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package joram.distribution;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.objectweb.joram.mom.dest.DistributionHandler;
import org.objectweb.joram.shared.messages.Message;
public class DistributionHandlerTest2 implements DistributionHandler {
private static List messages = new ArrayList();
public void close() {
}
public void distribute(Message msg) throws Exception {
// Block odd messages during some retries, to see
// differences with batch mode: even messages will
// quickly arrive before odd messages
int number = Integer.parseInt(msg.getText());
if (number % 2 == 1) {
if (msg.deliveryCount < 3) {
throw new Exception("Message could not be delivered.");
}
}
addMessage(msg);
}
public void init(Properties props) {
}
private static synchronized void addMessage(Message msg) {
messages.add(msg);
}
public static synchronized List getAllMessages() {
List result = messages;
messages = new ArrayList();
return result;
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package joram.distribution;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.objectweb.joram.client.jms.Destination;
import org.objectweb.joram.client.jms.Queue;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.client.jms.admin.User;
import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
import framework.TestCase;
public class DistributionTest extends TestCase {
public static void main(String[] args) throws Exception {
new DistributionTest().run();
}
public void run() {
try {
System.out.println("Administration...");
startAgentServer((short) 0);
ConnectionFactory cf = TcpConnectionFactory.create("localhost", 2560);
AdminModule.connect(cf);
Properties prop = new Properties();
prop.put("distribution.className", DistributionHandlerTest.class.getName());
prop.put("period", Long.toString(1000));
Queue distributionQueue = Queue.create(0, null, Destination.DISTRIBUTION_QUEUE, prop);
distributionQueue.setThreshold(10);
prop = new Properties();
prop.put("acquisition.className", AcquisitionHandlerTest.class.getName());
prop.put("acquisition.period", Integer.toString(200));
Queue acquisitionQueue = Queue.create(0, null, Destination.ACQUISITION_QUEUE, prop);
Queue dmq = Queue.create();
dmq.setFreeReading();
dmq.setFreeWriting();
distributionQueue.setDMQ(dmq);
User.create("anonymous", "anonymous", 0);
distributionQueue.setFreeWriting();
try {
distributionQueue.setFreeReading();
assertTrue(false);
} catch (Exception exc) {
// OK
}
acquisitionQueue.setFreeReading();
AdminModule.disconnect();
System.out.println("Admin done.");
// ------- Admin done
Connection connection = cf.createConnection("anonymous", "anonymous");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(distributionQueue);
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session1.createConsumer(acquisitionQueue);
// send message
Message msg = session.createTextMessage("test distribution");
producer.send(msg);
msg = consumer.receive(1000);
assertNull(msg);
// Redelivery attempt every second - should work the fifth time (see Handler)
msg = consumer.receive(10000);
assertNotNull(msg);
assertEquals("test distribution - MODIFIED", ((TextMessage) msg).getText());
AdminModule.connect(cf);
// Change threshold, next message will go to DMQ
distributionQueue.setThreshold(3);
// send message
msg = session.createTextMessage("test distribution 2");
producer.send(msg);
Thread.sleep(5000);
assertEquals(1, dmq.getPendingMessages());
AdminModule.disconnect();
} catch (Throwable exc) {
exc.printStackTrace();
error(exc);
} finally {
stopAgentServer((short) 0);
endTest();
}
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package joram.distribution;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.objectweb.joram.client.jms.Destination;
import org.objectweb.joram.client.jms.Queue;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.client.jms.admin.User;
import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
import framework.TestCase;
public class DistributionTest2 extends TestCase {
// /!\ Must be odd to work properly.
static final int MSG_COUNT = 9;
public static void main(String[] args) throws Exception {
new DistributionTest2().run();
}
public void run() {
try {
System.out.println("Administration...");
startAgentServer((short) 0);
ConnectionFactory cf = TcpConnectionFactory.create("localhost", 2560);
AdminModule.connect(cf);
Properties prop = new Properties();
prop.put("distribution.className", DistributionHandlerTest2.class.getName());
prop.put("period", Long.toString(1000));
Queue distributionQueue = Queue.create(0, null, Destination.DISTRIBUTION_QUEUE, prop);
prop = new Properties();
prop.put("acquisition.className", AcquisitionHandlerTest2.class.getName());
prop.put("acquisition.period", Integer.toString(200));
Queue acquisitionQueue = Queue.create(0, null, Destination.ACQUISITION_QUEUE, prop);
User.create("anonymous", "anonymous", 0);
distributionQueue.setFreeWriting();
try {
distributionQueue.setFreeReading();
assertTrue(false);
} catch (Exception exc) {
// OK
}
acquisitionQueue.setFreeReading();
AdminModule.disconnect();
System.out.println("Admin done.");
// ------- Admin done
Connection connection = cf.createConnection("anonymous", "anonymous");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(distributionQueue);
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session1.createConsumer(acquisitionQueue);
MessageListener listener = new MessageListener1();
consumer.setMessageListener(listener);
// send message
for (int i = 0; i < MSG_COUNT; i++) {
Message msg = session.createTextMessage(Integer.toString(i));
producer.send(msg);
}
Thread.sleep(15000);
assertEquals(MSG_COUNT, ((MessageListener1) listener).count);
session1.close();
// *********** Test batch mode
AdminModule.connect(cf);
prop = new Properties();
prop.put("distribution.batch", "true");
prop.put("period", Long.toString(1000));
distributionQueue.setProperties(prop);
AdminModule.disconnect();
session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session1.createConsumer(acquisitionQueue);
listener = new MessageListener2();
consumer.setMessageListener(listener);
// send message
for (int i = 0; i < MSG_COUNT; i++) {
Message msg = session.createTextMessage(Integer.toString(i));
producer.send(msg);
}
Thread.sleep(5000);
assertEquals(MSG_COUNT, ((MessageListener2) listener).count);
connection.close();
} catch (Throwable exc) {
exc.printStackTrace();
error(exc);
} finally {
stopAgentServer((short) 0);
endTest();
}
}
class MessageListener1 implements MessageListener {
int lastId = -1;
long lastTime = -1;
int count = 0;
public void onMessage(Message msg) {
try {
// Check ordering
int newId = Integer.parseInt(((TextMessage) msg).getText());
System.out.println("MessageListener: " + newId);
long time = System.currentTimeMillis();
assertEquals(lastId + 1, newId);
if (newId % 2 == 0) {
// This one is even (so quick, see Handler)
if (lastTime != -1) {
assertTrue(time - lastTime < 300);
}
} else {
// This one is odd (so lenghty, see Handler)
if (lastTime != -1) {
assertTrue(time - lastTime >= 2000);
}
}
lastId = newId;
lastTime = time;
count++;
} catch (Exception exc) {
error(exc);
}
}
}
class MessageListener2 implements MessageListener {
int lastId = -2;
long lastTime = -1;
int count = 0;
public void onMessage(Message msg) {
try {
// Ordering is lost
int newId = Integer.parseInt(((TextMessage) msg).getText());
System.out.println("MessageListener2: " + newId);
long time = System.currentTimeMillis();
// All even messages first
if (newId % 2 == 0) {
assertEquals(lastId + 2, newId);
} else {
// last even message
if (lastId != MSG_COUNT - 1) {
assertEquals(lastId + 2, newId);
}
}
// the messages make 2 groups: even messages then odd
if (lastTime != -1 && lastId != MSG_COUNT - 1) {
assertTrue(" result: " + (time - lastTime), time - lastTime < 300);
}
lastId = newId;
lastTime = time;
count++;
} catch (Exception exc) {
error(exc);
}
}
}
}
<?xml version="1.0"?>
<config>
<property name="Engine" value="@engine@"/>
<property name="Transaction" value="@transaction@"/>
<domain name="D0" network="@network@"/>
<server id="0" name="s0" hostname="localhost">
<network domain="D0" port="27300"/>
<service class="fr.dyade.aaa.agent.AdminProxy" args="7890"/>
<service class="org.objectweb.joram.mom.proxies.ConnectionManager" args="root root"/>
<service class="org.objectweb.joram.mom.proxies.tcp.TcpProxyService" args="2560"/>
</server>
</config>
Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: JORAM :: Test distribution classes
Bundle-SymbolicName: distribution