Commit 734096fa authored by afreyssin's avatar afreyssin

Updates code to fit the sample description in documentation.

parent 12c5362e
......@@ -45,43 +45,20 @@ public class Admin {
AdminModule.connect(bridgeCF, "root", "root");
// Create a topic forwarding its messages to the configured rest queue.
// Properties prop = new Properties();
// prop.put("distribution.className", "com.scalagent.joram.mom.dest.rest.RESTDistribution");
// prop.put("rest.hostName", "localhost");
// prop.put("rest.port", "8989");
// prop.put("rest.userName", "anonymous");
// prop.put("rest.password", "anonymous");
// prop.put("jms.destination", "queue");
// prop.put("distribution.batch", "true");
// prop.put("period", "1000");
// Queue queueDist = Queue.create(1, "queueDist", Destination.DISTRIBUTION_QUEUE, prop);
// Create a queue forwarding its messages to the configured rest queue.
Queue queueDist = new RestDistributionQueue()
.setBatch(true)
.setHostName("localhost")
.setHost("localhost")
.setPort(8989)
.setPeriod(10)
.setBatch(true)
.setIdleTimeout(10000)
.create(1, "queueDist", "queue");
queueDist.setFreeWriting();
System.out.println("joram distribution queue = " + queueDist);
// Create a queue getting its messages from the configured rest queue.
// prop = new Properties();
// prop.put("acquisition.className", "com.scalagent.joram.mom.dest.rest.RESTAcquisitionDaemon");
// prop.put("rest.hostName", "localhost");
// prop.put("rest.port", "8989");
// prop.put("rest.userName", "anonymous");
// prop.put("rest.password", "anonymous");
// prop.put("rest.nbMaxMsgByPeriode", "1000");
// //prop.put("rest.mediaTypeJson", "false");
// prop.put("rest.timeout", "5000");
// prop.put("jms.destination", "queue");
// //prop.put("acquisition.period", "1000");
// Queue queueAcq = Queue.create(1, "queueAcq", Destination.ACQUISITION_QUEUE, prop);
Queue queueAcq = new RestAcquisitionQueue()
.setTimeout(1000)
.setNbMaxMsgByPeriode(1000)
.setHost("localhost")
.setPort(8989)
.setTimeout(5000)
.setIdleTimeout(10000)
.create(1, "queueAcq", "queue");
queueAcq.setFreeReading();
System.out.println("joram acquisition queue = " + queueAcq);
......
......@@ -37,13 +37,15 @@ public class Consumer {
public static void main(String[] args) throws Exception {
String queueName = System.getProperty("queue");
Properties jndiProps = new Properties();
jndiProps.setProperty("java.naming.factory.initial", "fr.dyade.aaa.jndi2.client.NamingContextFactory");
jndiProps.setProperty("java.naming.factory.host", "localhost");
jndiProps.setProperty("java.naming.factory.port", "16401");
javax.naming.Context jndiCtx = new javax.naming.InitialContext(jndiProps);
Destination bridgeDest = (Destination) jndiCtx.lookup("queueAcq");
Destination bridgeDest = (Destination) jndiCtx.lookup(queueName);
ConnectionFactory bridgeCF = (ConnectionFactory) jndiCtx.lookup("bridgeCF");
jndiCtx.close();
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2017 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package restbridge;
import java.util.Properties;
import javax.jms.ConnectionFactory;
import org.objectweb.joram.client.jms.Destination;
import org.objectweb.joram.client.jms.Queue;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.client.jms.admin.User;
import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
import org.objectweb.joram.client.jms.admin.JMSAcquisitionQueue;
import org.objectweb.joram.client.jms.admin.JMSDistributionQueue;
import org.objectweb.joram.client.jms.admin.RestAcquisitionQueue;
import org.objectweb.joram.client.jms.admin.RestDistributionQueue;
public class PerfAdmin {
public static void main(String[] args) throws Exception {
System.out.println();
System.out.println("Rest Bridge administration...");
ConnectionFactory bridgeCF = TcpConnectionFactory.create("localhost", 16011);
AdminModule.connect(bridgeCF, "root", "root");
// Creates queues forwarding their messages to the configured rest queue.
Queue queueDist1 = new RestDistributionQueue()
.setHost("localhost")
.setPort(8989)
.setIdleTimeout(10000)
.create(1, "queueDist1", "queue");
queueDist1.setFreeWriting();
System.out.println("joram distribution queue = " + queueDist1);
Queue queueDist2 = new RestDistributionQueue()
.setHost("localhost")
.setPort(8989)
.setIdleTimeout(10000)
.create(1, "queueDist2", "queue");
queueDist2.setFreeWriting();
System.out.println("joram distribution queue = " + queueDist2);
// Creates queues getting its messages from the configured rest queue.
Queue queueAcq1 = new RestAcquisitionQueue()
.setHost("localhost")
.setPort(8989)
.setTimeout(5000)
.setIdleTimeout(10000)
.create(1, "queueAcq1", "queue");
queueAcq1.setFreeReading();
System.out.println("joram acquisition queue = " + queueAcq1);
Queue queueAcq2 = new RestAcquisitionQueue()
.setHost("localhost")
.setPort(8989)
.setTimeout(5000)
.setIdleTimeout(10000)
.create(1, "queueAcq2", "queue");
queueAcq2.setFreeReading();
System.out.println("joram acquisition queue = " + queueAcq2);
User.create("anonymous", "anonymous");
// bind foreign destination and connectionFactory
Properties jndiProps = new Properties();
jndiProps.setProperty("java.naming.factory.initial", "fr.dyade.aaa.jndi2.client.NamingContextFactory");
jndiProps.setProperty("java.naming.factory.host", "localhost");
jndiProps.setProperty("java.naming.factory.port", "16401");
javax.naming.Context jndiCtx = new javax.naming.InitialContext(jndiProps);
jndiCtx.bind("bridgeCF", bridgeCF);
jndiCtx.bind("queueDist1", queueDist1);
jndiCtx.bind("queueDist2", queueDist2);
jndiCtx.bind("queueAcq1", queueAcq1);
jndiCtx.bind("queueAcq2", queueAcq2);
jndiCtx.close();
AdminModule.disconnect();
System.out.println("Admin closed.");
}
}
......@@ -73,14 +73,16 @@ public class PerfConsumer implements MessageListener {
NbMsgPerRound = Integer.getInteger("NbMsgPerRound", NbMsgPerRound).intValue();
NbMaxMessage = Integer.getInteger("NbMaxMessage", NbMaxMessage).intValue();
String queueName = System.getProperty("queue");
Properties jndiProps = new Properties();
jndiProps.setProperty("java.naming.factory.initial", "fr.dyade.aaa.jndi2.client.NamingContextFactory");
jndiProps.setProperty("java.naming.factory.host", "localhost");
jndiProps.setProperty("java.naming.factory.port", "16401");
javax.naming.Context jndiCtx = new javax.naming.InitialContext(jndiProps);
Destination dest = (Destination) jndiCtx.lookup("queueAcq");
Destination dest = (Destination) jndiCtx.lookup(queueName);
ConnectionFactory cf = (ConnectionFactory) jndiCtx.lookup("bridgeCF");
jndiCtx.close();
......@@ -92,7 +94,7 @@ public class PerfConsumer implements MessageListener {
System.out.println("Subscriber: implicitAck=" + implicitAck);
Connection cnx = cf.createConnection();
cnx.setClientID("cnx_dursub");
cnx.setClientID("cnx_dursub" + System.currentTimeMillis());
int mode;
if (dupsOk) {
mode = Session.DUPS_OK_ACKNOWLEDGE;
......@@ -151,12 +153,15 @@ public class PerfConsumer implements MessageListener {
int index = msg.getIntProperty("index");
if (index == 0) start = t1 = last;
travel += (last - msg.getLongProperty("time"));
long dt = (last - msg.getLongProperty("time"));
travel += dt;
counter += 1;
if (transacted && (((counter%10) == 9) || (index == 0)))
session.commit();
if ((counter %100) == 0)
System.out.println("--> " + dt);
if ((counter%NbMsgPerRound) == (NbMsgPerRound -1)) {
long x = (NbMsgPerRound * 1000L) / (last - t1);
t1 = last;
......
......@@ -69,13 +69,15 @@ public class PerfProducer implements Runnable {
transacted = getBoolean("Transacted", transacted);
asyncSend = getBoolean("asyncSend", asyncSend);
String queueName = System.getProperty("queue");
Properties jndiProps = new Properties();
jndiProps.setProperty("java.naming.factory.initial", "fr.dyade.aaa.jndi2.client.NamingContextFactory");
jndiProps.setProperty("java.naming.factory.host", "localhost");
jndiProps.setProperty("java.naming.factory.port", "16401");
javax.naming.Context jndiCtx = new javax.naming.InitialContext(jndiProps);
dest = (Destination) jndiCtx.lookup("queueDist");
dest = (Destination) jndiCtx.lookup(queueName);
cf = (ConnectionFactory) jndiCtx.lookup("bridgeCF");
jndiCtx.close();
......@@ -102,9 +104,10 @@ public class PerfProducer implements Runnable {
producer.setDeliveryMode(javax.jms.DeliveryMode.NON_PERSISTENT);
}
byte[] content = new byte[MsgSize];
StringBuffer strbuf = new StringBuffer();
for (int i = 0; i< MsgSize; i++)
content[i] = (byte) (i & 0xFF);
strbuf.append('0');
String content = strbuf.toString();
long dtx = 0;
long start = System.currentTimeMillis();
......@@ -114,7 +117,7 @@ public class PerfProducer implements Runnable {
if (SwapAllowed) {
msg.setBooleanProperty("JMS_JORAM_SWAPALLOWED", true);
}
msg.setText(new String(content));
msg.setText(content);
//msg.writeBytes(content);
msg.setLongProperty("time", System.currentTimeMillis());
msg.setIntProperty("index", i);
......@@ -128,7 +131,9 @@ public class PerfProducer implements Runnable {
long dtx2 = System.currentTimeMillis() - start;
if (dtx1 > (dtx2 + 20)) {
dtx += (dtx1 - dtx2);
Thread.sleep(dtx1 - dtx2);
try {
Thread.sleep(dtx1 - dtx2);
} catch (InterruptedException exc) { }
}
if (dtx2 > 0)
System.out.println("sent=" + i + ", mps=" + ((((long) i) * 1000L)/dtx2));
......
......@@ -39,13 +39,15 @@ public class Producer {
public static void main(String[] args) throws Exception {
String queueName = System.getProperty("queue");
Properties jndiProps = new Properties();
jndiProps.setProperty("java.naming.factory.initial", "fr.dyade.aaa.jndi2.client.NamingContextFactory");
jndiProps.setProperty("java.naming.factory.host", "localhost");
jndiProps.setProperty("java.naming.factory.port", "16401");
javax.naming.Context jndiCtx = new javax.naming.InitialContext(jndiProps);
Destination bridgeDest = (Destination) jndiCtx.lookup("queueDist");
Destination bridgeDest = (Destination) jndiCtx.lookup(queueName);
ConnectionFactory bridgeCF = (ConnectionFactory) jndiCtx.lookup("bridgeCF");
jndiCtx.close();
......
<?xml version="1.0"?>
<!--
- Copyright (C) 2017 ScalAgent Distributed Technologies
-
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Lesser General Public
- License as published by the Free Software Foundation; either
- version 2.1 of the License, or any later version.
-
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details.
-
- You should have received a copy of the GNU Lesser General Public
- License along with this library; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
- USA.
-->
<JoramAdmin>
<TcpAdminModule host="localhost" port="16011" name="root" password="root">
<property name="connectingTimer" value="60"/>
</TcpAdminModule>
<InitialContext>
<property name="java.naming.factory.initial"
value="fr.dyade.aaa.jndi2.client.NamingContextFactory"/>
<property name="java.naming.factory.host" value="localhost"/>
<property name="java.naming.factory.port" value="16401"/>
</InitialContext>
<TcpConnectionFactory name="bridgeCF" host="localhost" port="16011">
<jndi name="bridgeCF"/>
</TcpConnectionFactory>
<User name="anonymous" password="anonymous"/>
<RestDistributionQueue name="queueDist" foreign="queue" serverId="1">
<property name="rest.host" value="localhost" />
<property name="rest.port" value="8989" />
<property name="rest.idletimeout" value="10000" />
<freeWriter />
<jndi name="queueDist" />
</RestDistributionQueue>
<RestAcquisitionQueue name="queueAcq" foreign="queue" serverId="1">
<property name="rest.host" value="localhost" />
<property name="rest.port" value="8989" />
<property name="rest.timeout" value="5000" />
<property name="rest.idletimeout" value="10000" />
<freeReader />
<jndi name="queueAcq" />
</RestAcquisitionQueue>
</JoramAdmin>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment