From 606835a5aa14f0cd9ae2f5d2d13bdaefb0e4c5ea Mon Sep 17 00:00:00 2001 From: freyssin Date: Thu, 10 Dec 2020 17:48:48 +0100 Subject: [PATCH] Fix JORAM-372 and adds test for JORAM-372. --- .../org/objectweb/joram/mom/dest/Queue.java | 26 ++- tests/src/joram/noreg/Test64.java | 178 ++++++++++++++++++ 2 files changed, 195 insertions(+), 9 deletions(-) create mode 100644 tests/src/joram/noreg/Test64.java diff --git a/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/dest/Queue.java b/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/dest/Queue.java index ff78ae834..0788f2b64 100644 --- a/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/dest/Queue.java +++ b/joram/joram/mom/core/src/main/java/org/objectweb/joram/mom/dest/Queue.java @@ -339,9 +339,13 @@ public class Queue extends Destination implements QueueMBean { /** * Removes all request that the expiration time is expired. + * + * Be careful,this method is part of the Queue MBean interface, it can be called outside the + * reactions of the engine thread. In order to avoid synchronization issues we should avoid + * direct manipulation of requests list (see JORAM-372, JORAM-373). */ - public void cleanWaitingRequest() { - cleanWaitingRequest(System.currentTimeMillis()); + public final void cleanWaitingRequest() { +// cleanWaitingRequest(System.currentTimeMillis()); } /** @@ -372,7 +376,7 @@ public class Queue extends Destination implements QueueMBean { */ public final int getWaitingRequestCount() { if (requests != null) { - cleanWaitingRequest(System.currentTimeMillis()); +// cleanWaitingRequest(); return requests.size(); } return 0; @@ -384,15 +388,19 @@ public class Queue extends Destination implements QueueMBean { /** List holding the messages before delivery. */ protected transient List messages; + public byte getType() { + return DestinationConstants.QUEUE_TYPE; + } + /** * Removes all messages that the time-to-live is expired. + * + * Be careful,this method is part of the Queue MBean interface, it can be called outside the + * reactions of the engine thread. In order to avoid synchronization issues we should avoid + * direct manipulation of messages list (see JORAM-372, JORAM-373). */ - public void cleanPendingMessage() { - cleanPendingMessage(System.currentTimeMillis()); - } - - public byte getType() { - return DestinationConstants.QUEUE_TYPE; + public final void cleanPendingMessage() { +// cleanPendingMessage(System.currentTimeMillis()); } /** diff --git a/tests/src/joram/noreg/Test64.java b/tests/src/joram/noreg/Test64.java new file mode 100644 index 000000000..601438d22 --- /dev/null +++ b/tests/src/joram/noreg/Test64.java @@ -0,0 +1,178 @@ +/* + * JORAM: Java(TM) Open Reliable Asynchronous Messaging + * Copyright (C) 2020 ScalAgent Distributed Technologies + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Initial developer(s): ScalAgent Distributed Technologies + * Contributor(s): + */ +package joram.noreg; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; + +import org.objectweb.joram.client.jms.Queue; +import org.objectweb.joram.client.jms.admin.AdminModule; +import org.objectweb.joram.client.jms.admin.User; +import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory; + +import framework.TestCase; + +/** + * Test bad side effect between Queue reaction and JMX requests (JORAM-372). + * If the test fails, the effective error is in the server's log. This error stops the server, the test + * fails after the server stops: + +SEVERE Engine#0 2020-12-09 16:54:49,064, Engine#0: Uncaught exception during react, Queue:#0.0.1026.react(#0.0.1027, (((org.objectweb.joram.mom.notifications.ReceiveRequest@337b1e3e,messageId=null,persistent=false,detachable=false,detached=false,context=null,expiration=0,priority=4,deadNotificationAgentId=null), clientContext=0),requestId=47)) +java.lang.ArrayIndexOutOfBoundsException: Array index out of range: 0 + at java.util.Vector.remove(Vector.java:831) + at org.objectweb.joram.mom.dest.Queue.cleanWaitingRequest(Queue.java:363) + at org.objectweb.joram.mom.dest.Queue.receiveRequest(Queue.java:843) + at org.objectweb.joram.mom.dest.Queue.react(Queue.java:310) + at fr.dyade.aaa.agent.Engine.run(Engine.java:1166) + at java.lang.Thread.run(Thread.java:745) +INFO Thread-14 2020-12-09 16:54:49,065, AgentServer#0, stop() + + */ +public class Test64 extends TestCase implements ExceptionListener { + + public static void main(String[] args) { + new Test64().run(); + } + + JMSException exc = null; + + final static int NbSessions = 100; + + volatile boolean ok = true; + + synchronized void nok() { + ok = false; + } + + synchronized boolean isOk() { + return ok; + } + + public void run() { + try { + int jmxport = 18090; + startAgentServer((short) 0, new String[] { + "-Dcom.sun.management.jmxremote.port=" + jmxport, + "-Dcom.sun.management.jmxremote.authenticate=false", + "-Dcom.sun.management.jmxremote.ssl=false" + }); + Thread.sleep(1000); + + // Create an administration connection on server #0 + ConnectionFactory cf = TcpConnectionFactory.create("localhost", 16010); + ((TcpConnectionFactory) cf).getParameters().cnxPendingTimer = 5000; + ((TcpConnectionFactory) cf).getParameters().connectingTimer = 30; + + AdminModule.connect(cf, "root", "root"); + Queue queue = Queue.create(0, "queue"); + queue.setFreeReading(); + queue.setFreeWriting(); + + // Create the anonymous user needed for test + User.create("anonymous", "anonymous"); + AdminModule.disconnect(); + + Thread.sleep(1000); + + JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://:" + jmxport + "/jmxrmi"); + JMXConnector jmxc = JMXConnectorFactory.connect(url, null); + final MBeanServerConnection mxserver = jmxc.getMBeanServerConnection(); + + Connection cnx = cf.createConnection("anonymous", "anonymous"); + cnx.setClientID("ITSME"); + cnx.setExceptionListener(this); + + // Use several sessions in order to avoid javax.jms.IllegalStateException (Illegal control thread). + // In effect the next receive starts before the previous is completely done. + Session[] sessions = new Session[NbSessions]; + MessageConsumer[] cons = new MessageConsumer[NbSessions]; + for (int i=0; i " + exc); + error(exc); + nok(); + } + } + }.start(); + + for (int i=0; i<100000; i++) { + final int idx = i; + new Thread() { + public void run() { + try { + cons[idx%NbSessions].receive(1L); + } catch (JMSException exc) { + System.out.println("Consumer#" + idx + " -> " + exc); + error(exc); + } + } + }.start(); + try { + Thread.sleep(1); + } catch (InterruptedException exc) {} + if (! isOk()) break; + } + if (isOk()) + System.out.println("JMS OK"); + } catch (Throwable exc) { + exc.printStackTrace(); + error(exc); + } finally { + killAgentServer((short) 0); + endTest(); + } + } + + @Override + public void onException(JMSException e) { + System.out.println("JMS NOK -> " + e.getMessage()); + this.exc = e; +// e.printStackTrace(); + } +} -- GitLab