Commit 4af5ae3a authored by David Feliot's avatar David Feliot
Browse files

JORAM-242: interface added to send messages to a client.

parent 539bd5ae
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2013 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
*/
package org.objectweb.joram.mom.proxies;
/**
* A <code>ProxyMessageSender</code> sends messages (<code>ProxyMessage</code>)
* to a client. Messages can be sent synchronously by directly calling
* <code>send</code> or asynchronously by executing a task (if allowed by the
* <code>ProxyMessageSender</code>).
*/
public interface ProxyMessageSender {
/**
* Sends a message to the client.
* @param msg the message to be sent
* @throws Exception if an error occurs
*/
void send(ProxyMessage msg) throws Exception;
/**
* States whether executing a task is allowed.
* @return <code>true</code> if executing a task is allowed;
* <code>false</code> otherwise.
*/
boolean isExecutor();
/**
* Executes a task.
* @param task task to be executed
*/
void execute(Runnable task);
/**
* Closes the connection used by this
* <code>ProxyMessageSender</code> to
* send messages to the client.
*/
void close();
}
......@@ -39,8 +39,8 @@ public class QueueWorker implements Runnable {
public ConcurrentLinkedQueue<ProxyMessage> queue = new ConcurrentLinkedQueue<ProxyMessage>();
public boolean running;
public TcpConnection tcpConnection;
public IOControl ioctrl;
public ProxyMessageSender sender;
//public IOControl ioctrl;
private void handleMessage(ProxyMessage msg) throws Exception {
if ((msg.getObject() instanceof MomExceptionReply) &&
......@@ -50,11 +50,11 @@ public class QueueWorker implements Runnable {
// (see UserAgent)
new Thread(new Runnable() {
public void run() {
tcpConnection.close();
sender.close();
}
}).start();
} else {
ioctrl.send(msg);
sender.send(msg);
}
}
......@@ -78,4 +78,23 @@ public class QueueWorker implements Runnable {
logger.log(BasicLevel.DEBUG, e);
}
}
public void send(ProxyMessage msg) {
synchronized (queue) {
queue.offer(msg);
if (! running) {
running = true;
try {
if (! sender.isExecutor()) {
sender.send(msg);
running = false;
} else {
sender.execute(this);
}
} catch (Exception e) {
logger.log(BasicLevel.ERROR, "", e);
}
}
}
}
}
......@@ -24,7 +24,6 @@ package org.objectweb.joram.mom.proxies;
import java.io.Serializable;
import org.objectweb.joram.mom.proxies.tcp.TcpProxyService;
import org.objectweb.joram.shared.client.AbstractJmsReply;
import org.objectweb.joram.shared.client.AbstractJmsRequest;
import org.objectweb.joram.shared.client.CnxCloseRequest;
......@@ -99,6 +98,8 @@ public class ReliableConnectionContext implements ConnectionContext, Serializabl
}
private void add(ProxyMessage msg) {
queueWorker.send(msg);
/*
synchronized (queueWorker.queue) {
queueWorker.queue.offer(msg);
if (! queueWorker.running) {
......@@ -114,7 +115,7 @@ public class ReliableConnectionContext implements ConnectionContext, Serializabl
logger.log(BasicLevel.ERROR, e);
}
}
}
}*/
}
public QueueWorker getQueueWorker() {
......
......@@ -22,6 +22,8 @@ package org.objectweb.joram.mom.proxies.tcp;
import java.util.Date;
import org.objectweb.joram.mom.proxies.ProxyMessage;
import org.objectweb.joram.mom.proxies.ProxyMessageSender;
import org.objectweb.joram.mom.proxies.ReliableConnectionContext;
import org.objectweb.joram.shared.security.Identity;
import org.objectweb.util.monolog.api.BasicLevel;
......@@ -41,7 +43,7 @@ import fr.dyade.aaa.util.management.MXWrapper;
* @see TcpProxyService
* @see TcpConnectionListener
*/
public class TcpConnection implements TcpConnectionMBean {
public class TcpConnection implements TcpConnectionMBean, ProxyMessageSender {
/** logger */
public static Logger logger = Debug.getLogger(TcpConnection.class.getName());
......@@ -120,8 +122,9 @@ public class TcpConnection implements TcpConnectionMBean {
try {
if (ctx.isNoAckedQueue()) {
ioctrl.setNoAckedQueue(ctx.isNoAckedQueue());
ctx.getQueueWorker().ioctrl = ioctrl;
ctx.getQueueWorker().tcpConnection= this;
//ctx.getQueueWorker().ioctrl = ioctrl;
// ctx.getQueueWorker().tcpConnection= this;
ctx.getQueueWorker().sender = this;
} else {
tcpWriter = new TcpWriter(ioctrl, ctx.getQueue(), this);
tcpWriter.start();
......@@ -187,4 +190,16 @@ public class TcpConnection implements TcpConnectionMBean {
public int getReaderQueueSize() {
return ioctrl.getreceiveQueueSize();
}
public void send(ProxyMessage msg) throws Exception {
ioctrl.send(msg);
}
public boolean isExecutor() {
return TcpProxyService.executorService != null;
}
public void execute(Runnable runnable) {
TcpProxyService.executorService.execute(runnable);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment