Commit 9f2c82a1 authored by afreyssin's avatar afreyssin
Browse files

Adds heavy client for clustered queues sample.

parent ca5102da
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2013 - 2014 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package cluster.queue;
import java.util.Enumeration;
import java.util.Hashtable;
import javax.jms.*;
import javax.naming.*;
/**
* Consumes messages from the cluster queue.
*/
public class XConsumer {
static Context ictx = null;
public static void main(String[] args) throws Exception {
ConnectionFactory cf = null;
Queue dest = null;
if (args.length != 1)
throw new Exception("Bad number of argument");
ictx = new InitialContext();
try {
if (args[0].equals("-")) {
// Choose a connection factory and the associated topic depending of
// the location property.
cf = (ConnectionFactory) ictx.lookup("clusterCF");
dest = (Queue) ictx.lookup("clusterQueue");
} else {
cf = (ConnectionFactory) ictx.lookup("cf" + args[0]);
dest = (Queue) ictx.lookup("queue" + args[0]);
System.setProperty("location", "server" + args[0]);
}
} finally {
ictx.close();
}
Connection cnx = cf.createConnection("anonymous", "anonymous");
Session session = cnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer sub = session.createConsumer(dest);
String location = System.getProperty("location");
if (location != null)
System.out.println("Subscribes and listens to queue on " + location);
sub.setMessageListener(new MsgListener("location" + location + " listener"));
cnx.start();
System.out.println("Press a key to exit..");
System.in.read();
cnx.close();
}
static class Counter {
int counter = 0;
Counter() {}
void inc() {
counter += 1;
}
int get() {
return counter;
}
}
/**
* Implements the <code>javax.jms.MessageListener</code> interface.
*/
static class MsgListener implements MessageListener {
String ident = null;
int nbMsg = 0;
long startTime = -1;
int mps;
Hashtable<String, Counter> stats;
public MsgListener() {}
public MsgListener(String ident) {
this.ident = ident;
mps = Integer.getInteger("mps", mps).intValue();
System.out.println("mps = " + mps);
stats = new Hashtable<String, Counter>();
}
public void onMessage(Message msg) {
try {
nbMsg++;
long time = System.currentTimeMillis();
if (nbMsg == 1) {
startTime = time;
} else {
long delta = ((nbMsg *1000L)/mps) - (time - startTime);
if (delta > 0)
try {
Thread.sleep(delta);
} catch (InterruptedException e) {}
}
time = time - startTime;
String location = (String) msg.getStringProperty("location");
Counter counter = stats.get(location);
if (counter == null) {
counter = new Counter();
counter.inc();
stats.put(location, counter);
} else {
counter.inc();
}
if ((nbMsg % 100) == 99) {
StringBuffer strbuf = new StringBuffer();
strbuf.append(ident).append(": time=").append(time).append(", nbMsg=").append(nbMsg).append(", mps=").append((nbMsg*1000L)/time);
for (Enumeration<String> e = stats.keys(); e.hasMoreElements();) {
String key = e.nextElement();
strbuf.append(key).append("->").append(stats.get(key).get()).append(',');
}
System.out.println(strbuf.toString());
}
} catch (Throwable jE) {
jE.printStackTrace();
}
}
}
}
\ No newline at end of file
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2013 - 2014 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package cluster.queue;
import javax.jms.*;
import javax.naming.*;
/**
* Produces messages on the cluster queue.
*/
public class XProducer implements Runnable {
static Context ictx = null;
static int NbClient = 1;
static int Round = 3000;
static int NbMsgPerRound = 10;
static int MsgSize = 500;
static int mps = 100;
static Queue dest = null;
static ConnectionFactory cf = null;
static boolean MsgTransient = false;
static boolean SwapAllowed = false;
static boolean transacted = false;
static boolean asyncSend = false;
static String location;
public static boolean getBoolean(String key, boolean def) {
String value = System.getProperty(key, Boolean.toString(def));
return Boolean.parseBoolean(value);
}
public static void main(String[] args) throws Exception {
if (args.length != 1)
throw new Exception("Bad number of argument");
ictx = new InitialContext();
try {
if (args[0].equals("-")) {
// Choose a connection factory and the associated topic depending of
// the location property.
cf = (ConnectionFactory) ictx.lookup("clusterCF");
dest = (Queue) ictx.lookup("clusterQueue");
} else {
cf = (ConnectionFactory) ictx.lookup("cf" + args[0]);
dest = (Queue) ictx.lookup("queue" + args[0]);
System.setProperty("location", "server" + args[0]);
}
} finally {
ictx.close();
}
NbClient = Integer.getInteger("NbClient", NbClient).intValue();
Round = Integer.getInteger("Round", Round).intValue();
NbMsgPerRound = Integer.getInteger("NbMsgPerRound", NbMsgPerRound).intValue();
MsgSize = Integer.getInteger("MsgSize", MsgSize).intValue();
mps = Integer.getInteger("mps", mps).intValue();
MsgTransient = getBoolean("MsgTransient", MsgTransient);
SwapAllowed = getBoolean("SwapAllowed", SwapAllowed);
transacted = getBoolean("Transacted", transacted);
asyncSend = getBoolean("asyncSend", asyncSend);
System.out.println("Message: MsgTransient=" + MsgTransient);
System.out.println("Message: SwapAllowed=" + SwapAllowed);
System.out.println("Transacted=" + transacted);
System.out.println("asyncSend=" + asyncSend);
System.out.println("NbMsg=" + (Round*NbMsgPerRound) + ", MsgSize=" + MsgSize);
((org.objectweb.joram.client.jms.ConnectionFactory) cf).getParameters().asyncSend = asyncSend;
location = System.getProperty("location");
if (location != null)
System.out.println("Sends messages on queue on " + location);
new XProducer().run();
}
public void run() {
try {
Connection cnx = cf.createConnection();
Session session = cnx.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(dest);
if (MsgTransient) {
producer.setDeliveryMode(javax.jms.DeliveryMode.NON_PERSISTENT);
}
byte[] content = new byte[MsgSize];
for (int i = 0; i< MsgSize; i++)
content[i] = (byte) (i & 0xFF);
long dtx = 0;
long start = System.currentTimeMillis();
for (int i=0; i<(Round*NbMsgPerRound); i++) {
TextMessage msg = session.createTextMessage();
msg.setStringProperty("location", location);
if (SwapAllowed) {
msg.setBooleanProperty("JMS_JORAM_SWAPALLOWED", true);
}
msg.setText("location " + location +" : Test number " + i);
msg.setLongProperty("time", System.currentTimeMillis());
msg.setIntProperty("index", i);
producer.send(msg);
if (transacted && ((i%10) == 9)) session.commit();
if ((i%mps) == (mps-1)) {
long dtx1 = (i * 1000L) / mps;
long dtx2 = System.currentTimeMillis() - start;
if (dtx1 > (dtx2 + 20)) {
dtx += (dtx1 - dtx2);
Thread.sleep(dtx1 - dtx2);
}
System.out.println("sent=" + i + ", mps=" + ((((long) i) * 1000L)/dtx2));
}
}
long end = System.currentTimeMillis();
long dt = end - start;
System.out.println("----------------------------------------------------");
System.out.println("| sender dt=" + ((dt *1000L)/(Round*NbMsgPerRound)) + "us -> " +
((1000L * (Round*NbMsgPerRound)) / (dt)) + "msg/s");
System.out.println("| sender wait=" + dtx + "ms");
cnx.close();
} catch (Exception exc) {
exc.printStackTrace();
}
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment