Commit ee0c1120 authored by Andre Freyssinet's avatar Andre Freyssinet

Adds a Bridge JMS stress test.

parent eea580e9
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2021 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s):
* Contributor(s):
*/
package joram.bridgejms;
import java.util.Properties;
import java.util.Random;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import org.objectweb.joram.client.jms.Queue;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.client.jms.admin.JMSAcquisitionQueue;
import org.objectweb.joram.client.jms.admin.User;
import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
import framework.TestCase;
/**
* Test: Test the bridge behavior with flow-control.
* - Sends messages on foreign server.
* - Receives the messages through a JMS AcquisitionQueue.
*/
public class BridgeTest13x2 extends TestCase implements MessageListener {
public static void main(String[] args) {
new BridgeTest13x2().run();
}
public void run() {
try {
System.out.println("servers start");
// S0=bridge, S1=foreign
startAgentServer((short)0, new String[]{"-DTransaction.UseLockFile=false"});
startAgentServer((short)1, new String[]{"-DTransaction.UseLockFile=false"});
Thread.sleep(1000);
adminForeign();
adminBridge();
Thread.sleep(1000);
test(500000);
} catch (Throwable exc) {
exc.printStackTrace();
error(exc);
} finally {
System.out.println("Server stop ");
killAgentServer((short)0);
killAgentServer((short)1);
endTest();
}
}
public void adminForeign() throws Exception {
try {
System.out.println("adminForeign()");
javax.jms.ConnectionFactory foreignCF = TcpConnectionFactory.create("localhost", 16011);
AdminModule.connect(foreignCF, "root", "root");
javax.naming.Context jndiCtx = new javax.naming.InitialContext();
User.create("anonymous", "anonymous");
// create The foreign destination and connectionFactory
Queue foreignQueue = Queue.create("foreignQueue");
foreignQueue.setFreeReading();
foreignQueue.setFreeWriting();
System.out.println("foreign queue = " + foreignQueue);
// bind foreign destination and connectionFactory
jndiCtx.rebind("foreignQueue", foreignQueue);
jndiCtx.rebind("foreignCF", foreignCF);
jndiCtx.close();
} finally {
AdminModule.disconnect();
}
System.out.println("Admin closed.");
}
public void adminBridge() throws Exception {
try {
System.out.println("adminBridge()");
javax.jms.ConnectionFactory joramCF = TcpConnectionFactory.create("localhost", 16010);
AdminModule.connect(joramCF, "root", "root");
javax.naming.Context jndiCtx = new javax.naming.InitialContext();
User.create("anonymous", "anonymous");
// Setting the bridge properties
Properties prop = new Properties();
// Configure flow control
prop.setProperty("acquisition.max_msg", "50");
prop.setProperty("acquisition.min_msg", "25");
prop.setProperty("acquisition.max_pnd", "50");
prop.setProperty("acquisition.min_pnd", "25");
prop.setProperty("jms.ConnectionUpdatePeriod", "1000");
// Creating a Queue bridge on server 0:
Queue joramInQueue = JMSAcquisitionQueue.create(0, "joramInQueue", "foreignQueue", prop);
joramInQueue.setFreeReading();
joramInQueue.setFreeWriting();
System.out.println("joramInQueue = " + joramInQueue);
jndiCtx.rebind("joramInQueue", joramInQueue);
jndiCtx.rebind("joramCF", joramCF);
jndiCtx.close();
} finally {
AdminModule.disconnect();
}
System.out.println("Admin closed.");
}
final static long timeout = 1000L; // in nanoseconds
boolean test(final int msgs) throws Exception {
System.out.println("test start..");
long start = System.currentTimeMillis();
javax.naming.Context jndiCtx = new javax.naming.InitialContext();
Destination joramInDest = (Destination) jndiCtx.lookup("joramInQueue");
ConnectionFactory joramCF = (ConnectionFactory) jndiCtx.lookup("joramCF");
ConnectionFactory foreignCF = (ConnectionFactory) jndiCtx.lookup("foreignCF");
Destination foreignDest = (Destination) jndiCtx.lookup("foreignQueue");
jndiCtx.close();
Connection joramCnx = joramCF.createConnection();
Session joramSess = joramCnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer joramCons = joramSess.createConsumer(joramInDest);
joramCons.setMessageListener(this);
joramCnx.start();
Connection foreignCnx = foreignCF.createConnection();
Session foreignSess = foreignCnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer foreignProd = foreignSess.createProducer(foreignDest);
foreignCnx.start();
Random rand = new Random();
nbmsg = 0;
TextMessage msgOut = foreignSess.createTextMessage();
for (int i = 0; i < msgs; i++) {
msgOut.setText("Message number " + i);
// System.out.println("send msg = " + msgOut.getText());
foreignProd.send(msgOut);
if (i == 0) {
// start
} else if ((i%1000000) == 0) {
Thread.sleep(1000 * rand.nextInt(60));
} else if ((i%100000) == 0) {
Thread.sleep(1000 * rand.nextInt(30));
} else if ((i%10000) == 0) {
Thread.sleep(500 * rand.nextInt(20));
} else if ((i - nbmsg) > 50000) {
Thread.sleep(1000);
}
}
foreignCnx.close();
while (nbmsg != msgs) {
if ((System.currentTimeMillis() - start) > (2*timeout*msgs)) break;
Thread.sleep(1000);
}
System.out.println("Receives " + nbmsg + " messages.");
assertEquals("Receives " + nbmsg + "messages, should be " + msgs, msgs, nbmsg);
joramCnx.close();
return (nbmsg == msgs);
}
int nbmsg = 0;
Random rand2 = new Random();
public void onMessage(Message msg) {
// System.out.println("receives: " + msg);
try {
if (nbmsg == 0) {
// start
} else if ((nbmsg%1000000) == 0) {
Thread.sleep(1000 * rand2.nextInt(60));
} else if ((nbmsg%100000) == 0) {
Thread.sleep(1000 * rand2.nextInt(30));
} else if((nbmsg%10000) == 0) {
Thread.sleep(500 * rand2.nextInt(20));
}
} catch (InterruptedException e1) {}
try {
String txt1 = "Message number " + nbmsg;
String txt2 = ((TextMessage) msg).getText();
if (! txt1.equals(txt2))
System.out.println("Expected <" + txt1 + "> but was <" + txt2 + "> ");
if ((nbmsg % 1000) == 0)
System.out.println("receives: " + txt2);
assertEquals("Message " + msg.getJMSMessageID(), txt1, txt2);
} catch (JMSException e) {
assertTrue("Exception: " + e, false);
e.printStackTrace();
}
nbmsg += 1;
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment