Commit 94f2f6b0 authored by Andre Freyssinet's avatar Andre Freyssinet

Enhancements to better fit SEIP sample.

parent e9f050d5
......@@ -27,6 +27,7 @@ import java.util.Properties;
import javax.jms.ConnectionFactory;
import org.objectweb.joram.client.jms.Queue;
import org.objectweb.joram.client.jms.Topic;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.client.jms.admin.JMSAcquisitionQueue;
import org.objectweb.joram.client.jms.admin.JMSDistributionQueue;
......@@ -35,25 +36,53 @@ import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
public class Admin {
public static void main(String[] args) throws Exception {
centralAdmin();
bridgeAdmin();
}
static void centralAdmin() throws Exception {
System.out.println();
System.out.println("Central administration...");
ConnectionFactory cf = TcpConnectionFactory.create("localhost", 16010);
AdminModule.connect(cf, "root", "root");
User.create("anonymous", "anonymous");
Queue queue = Queue.create("mqs_dest");
queue.setFreeReading();
queue.setFreeWriting();
Properties jndiProps = new Properties();
jndiProps.setProperty("java.naming.factory.initial", "fr.dyade.aaa.jndi2.client.NamingContextFactory");
jndiProps.setProperty("java.naming.factory.host", "localhost");
jndiProps.setProperty("java.naming.factory.port", "16401");
javax.naming.Context jndiCtx = new javax.naming.InitialContext(jndiProps);
jndiCtx.bind("mqs_cf", cf);
jndiCtx.bind("mqs_dest", queue);
jndiCtx.close();
AdminModule.disconnect();
System.out.println("Admin closed.");
}
static void bridgeAdmin() throws Exception {
System.out.println();
System.out.println("Bridge administration...");
ConnectionFactory bridgeCF = TcpConnectionFactory.create("localhost", 16011);
AdminModule.connect(bridgeCF, "root", "root");
User.create("anonymous", "anonymous");
// Creating a Queue Distribution bridge on bridge server
Queue distq = JMSDistributionQueue.create(0, "distQ", "mqs_dest");
Properties queueProps = new Properties();
queueProps.setProperty("distribution.async", "true");
Queue distq = JMSDistributionQueue.create(1, "distQ", "mqs_dest", queueProps);
distq.setFreeWriting();
System.out.println("joram distribution queue = " + distq);
// Creating a Queue Acquisition bridge on bridge server
Queue acqq = JMSAcquisitionQueue.create(0, "acqQ", "mqs_dest");
acqq.setFreeReading();
System.out.println("joram acquisition queue = " + acqq);
// bind foreign destination and connectionFactory
Properties jndiProps = new Properties();
jndiProps.setProperty("java.naming.factory.initial", "fr.dyade.aaa.jndi2.client.NamingContextFactory");
......@@ -62,7 +91,6 @@ public class Admin {
javax.naming.Context jndiCtx = new javax.naming.InitialContext(jndiProps);
jndiCtx.rebind("distq", distq);
jndiCtx.rebind("acqq", acqq);
jndiCtx.rebind("bridgeCF", bridgeCF);
jndiCtx.close();
......
......@@ -43,14 +43,14 @@ public class Consumer {
jndiProps.setProperty("java.naming.factory.port", "16401");
javax.naming.Context jndiCtx = new javax.naming.InitialContext(jndiProps);
Destination bridgeDest = (Destination) jndiCtx.lookup("acqq");
ConnectionFactory bridgeCF = (ConnectionFactory) jndiCtx.lookup("bridgeCF");
Destination mqdest = (Destination) jndiCtx.lookup("mqs_dest");
ConnectionFactory bridgeCF = (ConnectionFactory) jndiCtx.lookup("mqs_cf");
jndiCtx.close();
Connection bridgeCnx = bridgeCF.createConnection();
Session bridgeSess = bridgeCnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer bridgeCons = bridgeSess.createConsumer(bridgeDest);
bridgeCons.setMessageListener(new MsgListener("bridge"));
MessageConsumer bridgeCons = bridgeSess.createConsumer(mqdest);
bridgeCons.setMessageListener(new MsgListener("MQS"));
bridgeCnx.start();
System.in.read();
......
......@@ -26,6 +26,7 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.BytesMessage;
/**
* Implements the <code>javax.jms.MessageListener</code> interface.
......@@ -40,7 +41,14 @@ public class MsgListener implements MessageListener {
public void onMessage(Message msg) {
try {
if (msg instanceof TextMessage)
System.out.println(who + " receive on acquisition queue: " + ((TextMessage) msg).getText());
System.out.println(who + " receive on mqs queue: " + ((TextMessage) msg).getText());
else if (msg instanceof BytesMessage) {
BytesMessage m = (BytesMessage) msg;
int len = (int) m.getBodyLength();
byte[] payload = new byte[len];
len = m.readBytes(payload);
System.out.println(who + " receive on mqs queue [" + len + ", " + payload.length + "]: " + new String(payload));
}
}
catch (JMSException exc) {
System.err.println("Exception in listener: " + exc);
......
......@@ -29,7 +29,7 @@ import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.BytesMessage;
/**
* Produces messages on the foreign destination.
......@@ -48,18 +48,18 @@ public class Producer {
jndiCtx.close();
Connection bridgeCnx = bridgeCF.createConnection();
Session bridgeSess = bridgeCnx.createSession(true, 0);
Session bridgeSess = bridgeCnx.createSession();
MessageProducer bridgeProducer = bridgeSess.createProducer(bridgeDest);
TextMessage msg = bridgeSess.createTextMessage();
BytesMessage msg = bridgeSess.createBytesMessage();
for (int i = 1; i < 11; i++) {
msg.setText("Joram message number " + i + " sent through distribution bridge queue.");
System.out.println("send msg = " + msg.getText());
String str = "Joram message number " + i + " sent through distribution bridge queue.";
msg.writeBytes(str.getBytes());
System.out.println("send msg = " + str);
bridgeProducer.send(msg);
}
bridgeSess.commit();
bridgeCnx.close();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment