Commit 72368413 authored by afreyssin's avatar afreyssin

Adds BridgeTest18x allowing to test flow control between AcquisitionHandler...

Adds BridgeTest18x allowing to test flow control between AcquisitionHandler and AcquisitionQueue (JORAM-220).
parent ffbbcfa0
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2015 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s):Badolle Fabien (ScalAgent D.T.)
* Contributor(s):
*/
package joram.bridgejms;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.objectweb.joram.client.jms.Queue;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.client.jms.admin.JMSAcquisitionQueue;
import org.objectweb.joram.client.jms.admin.User;
import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
import framework.TestCase;
/**
* Test: Test issue with flow regulation using AcquisitionQueue.
*/
public class BridgeTest18x extends TestCase implements MessageListener {
public static void main(String[] args) {
new BridgeTest18x().run();
}
public void run() {
try {
System.out.println("servers start");
startAgentServer((short)0, new String[]{"-DTransaction.UseLockFile=false"});
startAgentServer((short)1, new String[]{"-DTransaction.UseLockFile=false"});
Thread.sleep(1000);
try{
ConnectionFactory bridgeCF = TcpConnectionFactory.create("localhost", 16010);
ConnectionFactory foreignCF = TcpConnectionFactory.create("localhost", 16011);
AdminModule.connect(bridgeCF, "root", "root");
javax.naming.Context jndiCtx = new javax.naming.InitialContext();
User.create("anonymous", "anonymous", 0);
User.create("anonymous", "anonymous", 1);
// create The foreign destination and connectionFactory
Queue foreignQueue1 = Queue.create(1, "foreignQueue1");
foreignQueue1.setFreeReading();
foreignQueue1.setFreeWriting();
System.out.println("foreign queue#1 = " + foreignQueue1);
// create The foreign destination and connectionFactory
Queue foreignQueue2 = Queue.create(1, "foreignQueue2");
foreignQueue2.setFreeReading();
foreignQueue2.setFreeWriting();
System.out.println("foreign queue#2 = " + foreignQueue2);
// bind foreign destination and connectionFactory
jndiCtx.rebind("foreignQueue1", foreignQueue1);
jndiCtx.rebind("foreignQueue2", foreignQueue2);
jndiCtx.rebind("foreignCF", foreignCF);
// Setting the bridge properties
Properties prop1 = new Properties();
prop1.setProperty("jms.ConnectionUpdatePeriod", "1000");
prop1.setProperty("period", "1000");
prop1.setProperty("acquisition.max_msg", "20");
prop1.setProperty("acquisition.min_msg", "10");
// Creating a Queue bridge on server 0:
Queue joramInQueue1 = JMSAcquisitionQueue.create(0, "joramInQueue1", "foreignQueue1", prop1);
joramInQueue1.setFreeReading();
joramInQueue1.setFreeWriting();
System.out.println("joramInQueue1 = " + joramInQueue1);
// Setting the bridge properties
Properties prop2 = new Properties();
prop2.setProperty("jms.ConnectionUpdatePeriod", "1000");
prop2.setProperty("period", "1000");
prop2.setProperty("acquisition.max_msg", "20");
prop2.setProperty("acquisition.min_msg", "10");
// Creating a Queue bridge on server 0:
Queue joramInQueue2 = JMSAcquisitionQueue.create(0, "joramInQueue2", "foreignQueue2", prop2);
joramInQueue2.setFreeReading();
joramInQueue2.setFreeWriting();
System.out.println("joramInQueue2 = " + joramInQueue2);
jndiCtx.rebind("joramInQueue1", joramInQueue1);
jndiCtx.rebind("joramInQueue2", joramInQueue2);
jndiCtx.rebind("joramCF", bridgeCF);
jndiCtx.close();
AdminModule.disconnect();
System.out.println("Admin closed.");
}catch(Exception exc){
exc.printStackTrace();
}
javax.naming.Context jndiCtx = new javax.naming.InitialContext();
Destination joramInDest1 = (Destination) jndiCtx.lookup("joramInQueue1");
System.out.println("joramInDest1 = " + joramInDest1);
Destination joramInDest2 = (Destination) jndiCtx.lookup("joramInQueue2");
System.out.println("joramInDest2 = " + joramInDest2);
ConnectionFactory joramCF = (ConnectionFactory) jndiCtx.lookup("joramCF");
ConnectionFactory foreignCF = (ConnectionFactory) jndiCtx.lookup("foreignCF");
Destination foreignDest1 = (Destination) jndiCtx.lookup("foreignQueue1");
System.out.println("foreignDest1 = " + foreignDest1);
Destination foreignDest2 = (Destination) jndiCtx.lookup("foreignQueue2");
System.out.println("foreignDest2 = " + foreignDest2);
jndiCtx.close();
Connection foreignCnx = foreignCF.createConnection();
Session foreignSess = foreignCnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer foreignProd1 = foreignSess.createProducer(foreignDest1);
MessageProducer foreignProd2 = foreignSess.createProducer(foreignDest2);
foreignCnx.start();
Connection joramCnx = joramCF.createConnection();
Session joramSess = joramCnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer joramCons1 = joramSess.createConsumer(joramInDest1);
joramCons1.setMessageListener(this);
MessageConsumer joramCons2 = joramSess.createConsumer(joramInDest2);
joramCons2.setMessageListener(this);
joramCnx.start();
for (int i=0; i<10000; i++) {
TextMessage msg1 = foreignSess.createTextMessage("msg1#" + i);
foreignProd1.send(msg1);
TextMessage msg2 = foreignSess.createTextMessage("msg2#" + i);
foreignProd2.send(msg2);
}
Thread.sleep(120000);
assertTrue(nbmsg == 20000);
foreignCnx.close();
joramCnx.close();
} catch (Throwable exc) {
exc.printStackTrace();
error(exc);
} finally {
System.out.println("Server stop ");
killAgentServer((short)0);
killAgentServer((short)1);
endTest();
}
}
volatile int nbmsg = 0;
@Override
public void onMessage(Message m) {
try {
TextMessage msg = (TextMessage) m;
nbmsg += 1;
} catch (Exception exc) {
exc.printStackTrace();
}
}
}
......@@ -5084,6 +5084,16 @@
</antcall>
</target>
<target name="bridge.jms.test18x" depends="init.a3props,compile"
description=" --> Test issue with flow regulation using AcquisitionQueue">
<antcall target="test.run" inheritAll="true">
<param name="testid" value="bridge.jms.test18x"/>
<param name="className" value="joram.bridgejms.BridgeTest18x"/>
<param name="jndiconf" value="bridgejms/jndi.properties"/>
<param name="a3conf" value="bridgejms/a3servers.xml"/>
</antcall>
</target>
<target name="bridge.jms.testX" depends="init.a3props,compile"
description=" --> ">
<antcall target="test.run" inheritAll="true">
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment