Commit 56d1327e authored by Andre Freyssinet's avatar Andre Freyssinet

Adds tests for Rest/JMS bridge (JMORAM-377).

parent 3f575a48
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2020 - 2021 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent D.T.
* Contributor(s):
*/
package joram.bridgerest;
import java.net.URI;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.*;
import org.glassfish.jersey.client.ClientConfig;
import org.objectweb.joram.client.jms.Queue;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.client.jms.admin.RestAcquisitionQueue;
import org.objectweb.joram.client.jms.admin.User;
import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
import framework.TestCase;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation.Builder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import joram.rest.Helper;
/**
* Test: Test the behavior of BridgeAcquisitionQueue during a long run using Rest/JMS API to
* produces messages (one message producer per message). Uses sleep in consumer to activate the
* flow control of the acquisition queue.
*/
public class BridgeRestTest10 extends TestCase implements MessageListener {
public static void main(String[] args) {
new BridgeRestTest10().run();
}
public void startAgentServer0() throws Exception {
System.setProperty("felix.config.properties", "file:config0.properties");
startAgentServer((short)0, new String[]{"-DTransaction.UseLockFile=false"});
}
public void startAgentServer1() throws Exception {
System.setProperty("felix.config.properties", "file:config1.properties");
startAgentServer((short)1, new String[]{"-DTransaction.UseLockFile=false"});
}
public void run() {
try {
System.out.println("servers start");
startAgentServer0();
startAgentServer1();
Thread.sleep(1000);
admin();
test();
} catch (Throwable exc) {
exc.printStackTrace();
error(exc);
} finally {
System.out.println("Server stop ");
killAgentServer((short)0);
killAgentServer((short)1);
endTest();
}
}
private final static String foreignQueueName = "foreignQueue";
private final static String acqQueueName = "acqQueue";
private void admin() throws Exception {
javax.jms.ConnectionFactory bridgeCF = TcpConnectionFactory.create("localhost", 16010);
AdminModule.connect(bridgeCF, "root", "root");
javax.naming.Context jndiCtx = new javax.naming.InitialContext();
User.create("anonymous", "anonymous", 0);
User.create("anonymous", "anonymous", 1);
// create The foreign destination and connectionFactory on server 1
Queue foreignQueue = Queue.create(1, foreignQueueName);
foreignQueue.setFreeReading();
foreignQueue.setFreeWriting();
System.out.println("foreign queue = " + foreignQueue);
javax.jms.ConnectionFactory foreignCF = TcpConnectionFactory.create("localhost", 16011);
// bind foreign destination and connectionFactory
jndiCtx.rebind(foreignQueueName, foreignQueue);
jndiCtx.rebind("foreignCF", foreignCF);
// Create a REST acquisition queue on server.
Queue acqQueue = new RestAcquisitionQueue()
.setMediaTypeJson(true)
.setTimeout(5000)
.setIdleTimeout(10)
.create(0, acqQueueName, foreignQueueName);
acqQueue.setFreeReading();
System.out.println("joram acquisition queue = " + acqQueue);
jndiCtx.bind(acqQueueName, acqQueue);
jndiCtx.rebind("bridgeCF", bridgeCF);
jndiCtx.close();
AdminModule.disconnect();
System.out.println("admin config ok");
Thread.sleep(1000);
}
int nbmsg = 20000;
Object lock = new Object();
public void test() throws Exception {
Context jndiCtx = new InitialContext();
ConnectionFactory bridgeCF = (ConnectionFactory) jndiCtx.lookup("bridgeCF");
Destination acqQueue = (Destination) jndiCtx.lookup(acqQueueName);
jndiCtx.close();
Connection bridgeCnx = bridgeCF.createConnection();
Session bridgeSess = bridgeCnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer bridgeCons = bridgeSess.createConsumer(acqQueue);
bridgeCons.setMessageListener(this);
bridgeCnx.start();
createProducer("producer", "clientid");
for (int i = 0; i < nbmsg; i++) {
if ((i %1000) == 0)
System.out.println("Send msg #" + i);
try {
send("Message number #" + i, i);
} catch (Exception exc) {
System.err.println("Cannot send Message number #" + i);
}
synchronized (lock) {
if ((i - counter) > 500)
lock.wait((1L*(i - counter)));
}
}
synchronized (lock) {
if (counter != nbmsg)
lock.wait((1000L*nbmsg) + 10000L);
}
assertTrue("Assertion#1", (counter == nbmsg));
if (counter != nbmsg)
throw new Exception("Bad message count");
}
int counter = 0;
public void onMessage(Message msg) {
try {
String txt1 = "Message number #" + counter;
String txt2 = ((TextMessage) msg).getText();
assertEquals("Message " + msg.getJMSMessageID(), txt1, txt2);
if (! txt1.equals(txt2)) {
System.out.println("Message " + msg.getJMSMessageID() + ": Expected <" + txt1 + "> but was <" + txt2 + "> ");
counter = Integer.parseInt(txt2.substring(16));
}
if ((counter %1000) == 0)
System.out.println("Receives " + msg.getJMSMessageID() + " -> " + txt2);
try {
// Sleep to activate control-flow of acquisition queue
if ((counter %100) == 0)
Thread.sleep(1000);
} catch (Exception exc) {}
counter += 1;
if (counter == nbmsg) {
synchronized (lock) {
System.out.println("notify");
lock.notify();
}
}
} catch (JMSException exc) {
error(exc);
exc.printStackTrace();
}
}
ClientConfig config = new ClientConfig();
Client client = ClientBuilder.newClient(config);
URI uriCreateProd = null;
URI uriCloseProd = null;
URI uriSendNext = null;
private void createProducer(String name, String clientId) throws Exception {
WebTarget target = client.target(Helper.getBaseJmsURI());
Builder builder = target.path("jndi").path(foreignQueueName).request();
Response response = builder.accept(MediaType.TEXT_PLAIN).head();
if (response.getStatus() != Response.Status.CREATED.getStatusCode())
throw new Exception("lookup \"" + foreignQueueName + "\" = " + response.getStatus());
// URI to create producer
uriCreateProd = response.getLink("create-producer").getUri();
// TODO (AF): to remove (use FormParam alternate URI)
uriCreateProd = new URI(uriCreateProd.toString() + "-fp");
}
private void send(String msg, int i) throws Exception {
String name = "prod-" +i;
String clientId = "clientid-" +i;
// create producer
WebTarget target = client.target(uriCreateProd);
if (clientId != null)
target = target.queryParam("client-id", clientId);
if (name != null)
target = target.queryParam("name", name);
Response response = target.request()
.accept(MediaType.TEXT_PLAIN)
.post(Entity.entity(null, MediaType.APPLICATION_FORM_URLENCODED));
if (response.getStatus() != Response.Status.CREATED.getStatusCode())
throw new Exception("createProducer = " + response.getStatus() + ", " + target);
// URI to close producer and send next message
uriCloseProd = response.getLink("close-context").getUri();
uriSendNext = response.getLink("send-next-message").getUri();
response = client.target(uriSendNext)
.request()
.accept(MediaType.TEXT_PLAIN)
.post(Entity.entity(msg, MediaType.TEXT_PLAIN));
if (response.getStatus() != Response.Status.OK.getStatusCode())
throw new Exception("send-next-message = " + response.getStatus());
uriSendNext = response.getLink("send-next-message").getUri();
response = client.target(uriCloseProd).request().accept(MediaType.TEXT_PLAIN).delete();
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2020 - 2021 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent D.T.
* Contributor(s):
*/
package joram.bridgerest;
import java.net.URI;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.*;
import org.glassfish.jersey.client.ClientConfig;
import org.objectweb.joram.client.jms.Queue;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.client.jms.admin.RestAcquisitionQueue;
import org.objectweb.joram.client.jms.admin.User;
import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
import framework.TestCase;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation.Builder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.Base64;
import joram.rest.Helper;
/**
* Test: Test the behavior of BridgeAcquisitionQueue using the mechanism restarting
* the daemon if there is no activity.
*/
public class BridgeRestTest11 extends TestCase implements MessageListener {
public static void main(String[] args) {
new BridgeRestTest11().run();
}
public void startAgentServer0() throws Exception {
System.setProperty("felix.config.properties", "file:config0.properties");
startAgentServer((short)0, new String[]{"-DTransaction.UseLockFile=false"});
}
public void startAgentServer1() throws Exception {
System.setProperty("felix.config.properties", "file:config1.properties");
startAgentServer((short)1, new String[]{"-DTransaction.UseLockFile=false"});
}
public void run() {
try {
System.out.println("servers start");
startAgentServer0();
startAgentServer1();
Thread.sleep(1000);
admin();
test();
} catch (Throwable exc) {
exc.printStackTrace();
error(exc);
} finally {
System.out.println("Server stop ");
killAgentServer((short)0);
killAgentServer((short)1);
endTest();
}
}
private final static String foreignQueueName = "foreignQueue";
private final static String acqQueueName = "acqQueue";
private void admin() throws Exception {
javax.jms.ConnectionFactory bridgeCF = TcpConnectionFactory.create("localhost", 16010);
AdminModule.connect(bridgeCF, "root", "root");
javax.naming.Context jndiCtx = new javax.naming.InitialContext();
User.create("anonymous", "anonymous", 0);
User.create("anonymous", "anonymous", 1);
// create The foreign destination and connectionFactory on server 1
Queue foreignQueue = Queue.create(1, foreignQueueName);
foreignQueue.setFreeReading();
foreignQueue.setFreeWriting();
System.out.println("foreign queue = " + foreignQueue);
javax.jms.ConnectionFactory foreignCF = TcpConnectionFactory.create("localhost", 16011);
// bind foreign destination and connectionFactory
jndiCtx.rebind(foreignQueueName, foreignQueue);
jndiCtx.rebind("foreignCF", foreignCF);
// Create a REST acquisition queue on server setting dameon restart if idle during more than 10s.
Queue acqQueue = new RestAcquisitionQueue()
.setMediaTypeJson(true)
.setTimeout(5000)
.setIdleTimeout(10)
.setCheckPeriod(2)
.create(0, acqQueueName, foreignQueueName);
acqQueue.setFreeReading();
System.out.println("joram acquisition queue = " + acqQueue);
jndiCtx.bind(acqQueueName, acqQueue);
jndiCtx.rebind("bridgeCF", bridgeCF);
jndiCtx.close();
AdminModule.disconnect();
System.out.println("admin config ok");
Thread.sleep(1000);
}
int nbmsg = 20000;
Object lock = new Object();
public void test() throws Exception {
Context jndiCtx = new InitialContext();
ConnectionFactory bridgeCF = (ConnectionFactory) jndiCtx.lookup("bridgeCF");
Destination acqQueue = (Destination) jndiCtx.lookup(acqQueueName);
jndiCtx.close();
Connection bridgeCnx = bridgeCF.createConnection();
Session bridgeSess = bridgeCnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer bridgeCons = bridgeSess.createConsumer(acqQueue);
bridgeCons.setMessageListener(this);
bridgeCnx.start();
createProducer("producer", "clientid");
for (int i = 0; i < nbmsg; i++) {
if ((i %1000) == 0)
System.out.println("Send msg #" + i);
try {
send("Message number #" + i, i);
} catch (Exception exc) {
System.err.println("Cannot send Message number #" + i);
}
// Pause implying the daemon restart
if ((i % 1000) == 0) {
System.out.println("Pause begin");
Thread.sleep(5000);
System.out.println("Pause end");
}
synchronized (lock) {
if ((i - counter) > 500)
lock.wait((1L*(i - counter)));
}
}
synchronized (lock) {
if (counter != nbmsg)
lock.wait((1000L*nbmsg) + 10000L);
}
assertTrue("Assertion#1", (counter == nbmsg));
if (counter != nbmsg)
throw new Exception("Bad message count");
}
int counter = 0;
public void onMessage(Message msg) {
try {
String txt1 = "Message number #" + counter;
String txt2 = ((TextMessage) msg).getText();
assertEquals("Message " + msg.getJMSMessageID(), txt1, txt2);
if (! txt1.equals(txt2)) {
System.out.println("Message " + msg.getJMSMessageID() + ": Expected <" + txt1 + "> but was <" + txt2 + "> ");
counter = Integer.parseInt(txt2.substring(16));
}
if ((counter %1000) == 0)
System.out.println("Receives " + msg.getJMSMessageID() + " -> " + txt2);
counter += 1;
if (counter == nbmsg) {
synchronized (lock) {
System.out.println("notify");
lock.notify();
}
}
} catch (JMSException exc) {
error(exc);
exc.printStackTrace();
}
}
ClientConfig config = new ClientConfig();
Client client = ClientBuilder.newClient(config);
URI uriCreateProd = null;
URI uriCloseProd = null;
URI uriSendNext = null;
private void createProducer(String name, String clientId) throws Exception {
WebTarget target = client.target(Helper.getBaseJmsURI());
Builder builder = target.path("jndi").path(foreignQueueName).request();
Response response = builder.accept(MediaType.TEXT_PLAIN).head();
if (response.getStatus() != Response.Status.CREATED.getStatusCode())
throw new Exception("lookup \"" + foreignQueueName + "\" = " + response.getStatus());
// URI to create producer
uriCreateProd = response.getLink("create-producer").getUri();
// TODO (AF): to remove (use FormParam alternate URI)
uriCreateProd = new URI(uriCreateProd.toString() + "-fp");
}
private void send(String msg, int i) throws Exception {
String name = "prod-" +i;
String clientId = "clientid-" +i;
// create producer
WebTarget target = client.target(uriCreateProd);
if (clientId != null)
target = target.queryParam("client-id", clientId);
if (name != null)
target = target.queryParam("name", name);
Response response = target.request()
.accept(MediaType.TEXT_PLAIN)
.post(Entity.entity(null, MediaType.APPLICATION_FORM_URLENCODED));
if (response.getStatus() != Response.Status.CREATED.getStatusCode())
throw new Exception("createProducer = " + response.getStatus() + ", " + target);
// URI to close producer and send next message
uriCloseProd = response.getLink("close-context").getUri();
uriSendNext = response.getLink("send-next-message").getUri();
response = client.target(uriSendNext)
.request()
.accept(MediaType.TEXT_PLAIN)
.post(Entity.entity(msg, MediaType.TEXT_PLAIN));
if (response.getStatus() != Response.Status.OK.getStatusCode())
throw new Exception("send-next-message = " + response.getStatus());
uriSendNext = response.getLink("send-next-message").getUri();
response = client.target(uriCloseProd).request().accept(MediaType.TEXT_PLAIN).delete();
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2020 - 2021 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent D.T.
* Contributor(s):
*/
package joram.bridgerest;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.*;
import org.objectweb.joram.client.jms.Queue;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.client.jms.admin.RestAcquisitionQueue;
import org.objectweb.joram.client.jms.admin.User;
import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
import framework.TestCase;
/**
* Test: Test the behavior of BridgeAcquisitionQueue during a long run
*/
public class BridgeRestTest7 extends TestCase implements MessageListener {
public static void main(String[] args) {
new BridgeRestTest7().run();
}
public void startAgentServer0() throws Exception {
System.setProperty("felix.config.properties", "file:config0.properties");
startAgentServer((short)0, new String[]{"-DTransaction.UseLockFile=false"});
}
public void startAgentServer1() throws Exception {
System.setProperty("felix.config.properties", "file:config1.properties");
startAgentServer((short)1, new String[]{"-DTransaction.UseLockFile=false"});
}
public void run() {
try {
System.out.println("servers start");
startAgentServer0();
startAgentServer1();
Thread.sleep(1000);
admin();
test();
} catch (Throwable exc) {
exc.printStackTrace();
error(exc);
} finally {
System.out.println("Server stop ");
killAgentServer((short)0);
killAgentServer((short)1);