Commit 8e974976 authored by David Feliot's avatar David Feliot
Browse files

JORAM-184, JORAM-194, JORAM-195: topic efficiency; parent topic filtered; Encodable notifications.

parent b7c73e0f
......@@ -114,7 +114,7 @@ public class AcquisitionTopic extends Topic implements AcquisitionTopicMBean {
public void react(AgentId from, Notification not) throws Exception {
if (not instanceof AcquisitionNot) {
acquisitionNot((AcquisitionNot) not);
acquisitionNot(from, (AcquisitionNot) not);
} else {
super.react(from, not);
}
......@@ -182,7 +182,7 @@ public class AcquisitionTopic extends Topic implements AcquisitionTopicMBean {
*
* @param not
*/
private void acquisitionNot(AcquisitionNot not) {
private void acquisitionNot(AgentId from, AcquisitionNot not) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "acquisitionNot(" + not + ")");
}
......@@ -197,7 +197,7 @@ public class AcquisitionTopic extends Topic implements AcquisitionTopicMBean {
ClientMessages clientMessages = acquisitionModule.acquisitionNot(not, msgCount);
if (clientMessages != null) {
msgCount += clientMessages.getMessageCount();
forwardMessages(clientMessages);
forwardMessages(from, clientMessages);
processMessages(clientMessages);
postProcess(clientMessages);
}
......
......@@ -500,7 +500,7 @@ public class Topic extends Destination implements TopicMBean {
ClientMessages clientMsgs = preProcess(from, not);
if (clientMsgs != null) {
// Forwarding the messages to the father or the cluster fellows, if any:
forwardMessages(clientMsgs, fromCluster);
forwardMessages(from, clientMsgs, fromCluster);
// Processing the messages:
processMessages(clientMsgs);
......@@ -567,17 +567,60 @@ public class Topic extends Destination implements TopicMBean {
* Actually forwards a list of messages to the father or the cluster
* fellows, if any.
*/
protected void forwardMessages(ClientMessages messages) {
forwardMessages(messages, false);
protected void forwardMessages(AgentId from, ClientMessages messages) {
forwardMessages(from, messages, false);
}
private TopicForwardNot createTopicForward(AgentId destId, ClientMessages messages, boolean fromCluster) {
TopicForwardNot topicForwardNot = new TopicForwardNot(messages, fromCluster);
if (destId.getTo() == AgentServer.getServerId()) {
// Local destination
// The initial notification may be persistent (e.g. if remote).
// In that case the forward needs to be persistent too.
topicForwardNot.setPersistent(messages.isPersistent());
// Pass the callback in any cases (transient or persistent)
// in order to provide flow control.
messages.passCallback(topicForwardNot);
} else {
// Remote destination
// The initial notification may be persistent.
// In that case the forward needs to be persistent too.
boolean persistent = messages.isPersistent();
if (!persistent) {
// Check if there is a persistent message to transmit.
// In that case, the forward needs to be persistent.
List<Message> msgList = messages.getMessages();
for (Message msg : msgList) {
if (msg.persistent) {
persistent = true;
break;
}
}
}
topicForwardNot.setPersistent(persistent);
// Pass the callback in any cases (transient or persistent)
// in order to provide flow control with the network.
messages.passCallback(topicForwardNot);
}
return topicForwardNot;
}
private void forwardMessages(ClientMessages messages, boolean fromCluster) {
private void forwardMessages(AgentId from, ClientMessages messages, boolean fromCluster) {
if (!fromCluster) {
sendToCluster(new TopicForwardNot(messages, true));
if (friends != null && friends.size() > 1) {
for (Iterator e = friends.iterator(); e.hasNext();) {
AgentId id = (AgentId) e.next();
if (!id.equals(getId()))
forward(id, createTopicForward(id, messages, true));
}
}
}
if (fatherId != null) {
forward(fatherId, new TopicForwardNot(messages, false));
if (fatherId != null && !fatherId.equals(from)) {
forward(fatherId, createTopicForward(fatherId, messages, false));
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Messages forwarded to father " + fatherId.toString());
}
......@@ -684,10 +727,11 @@ public class Topic extends Destination implements TopicMBean {
TopicMsgsReply topicMsgsReply = new TopicMsgsReply(deliverables);
topicMsgsReply.setPersistent(persistent);
// If local, set callback
if (subscriber.getTo() == getId().getTo()) {
not.passCallback(topicMsgsReply);
}
// Set the callback in any cases: local and remote
// Remotely, the callback provides flow control.
// Locally, the callback provides flow control and
// reliability for persistent messages.
not.passCallback(topicMsgsReply);
setDmq(topicMsgsReply);
forward(subscriber, topicMsgsReply);
......
......@@ -27,9 +27,14 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.objectweb.joram.mom.util.JoramHelper;
import org.objectweb.joram.shared.messages.Message;
import fr.dyade.aaa.agent.AgentId;
import fr.dyade.aaa.common.encoding.Decoder;
import fr.dyade.aaa.common.encoding.Encodable;
import fr.dyade.aaa.common.encoding.EncodableFactory;
import fr.dyade.aaa.common.encoding.Encoder;
/**
* A <code>ClientMessages</code> instance is used by a client agent for
......@@ -42,11 +47,100 @@ public class ClientMessages extends AbstractRequestNot {
/** Message sent by the client. */
private Message message = null;
/** Messages sent by the client. */
private List messages = null;
private List<Message> messages = null;
private boolean asyncSend;
private AgentId proxyId;
@Override
public int getEncodableClassId() {
return JoramHelper.CLIENT_MESSAGES_CLASS_ID;
}
public int getEncodedSize() throws Exception {
int res = super.getEncodedSize() ;
res += BOOLEAN_ENCODED_SIZE;
if (message != null) {
res += message.getEncodedSize();
}
res += BOOLEAN_ENCODED_SIZE;
if (messages != null) {
res += INT_ENCODED_SIZE;
for (Message msg : messages) {
res += msg.getEncodedSize();
}
}
res += BOOLEAN_ENCODED_SIZE;
res += BOOLEAN_ENCODED_SIZE;
if (proxyId != null) {
res += proxyId.getEncodedSize();
}
return res;
}
public void encode(Encoder encoder) throws Exception {
super.encode(encoder);
if (message != null) {
encoder.encodeBoolean(true);
message.encode(encoder);
} else {
encoder.encodeBoolean(false);
}
if (messages != null) {
encoder.encodeBoolean(true);
encoder.encodeUnsignedInt(messages.size());
for (Message msg : messages) {
msg.encode(encoder);
}
} else {
encoder.encodeBoolean(false);
}
encoder.encodeBoolean(asyncSend);
if (proxyId == null) {
encoder.encodeBoolean(false);
} else {
encoder.encodeBoolean(true);
proxyId.encode(encoder);
}
}
public void decode(Decoder decoder) throws Exception {
super.decode(decoder);
boolean flag = decoder.decodeBoolean();
if (flag) {
message = new Message();
message.decode(decoder);
}
flag = decoder.decodeBoolean();
if (flag) {
int size = decoder.decodeUnsignedInt();
messages = new ArrayList<Message>(size);
for (int i = 0; i < size; i++) {
Message msg = new Message();
msg.decode(decoder);
messages.add(msg);
}
}
asyncSend = decoder.decodeBoolean();
flag = decoder.decodeBoolean();
if (flag) {
proxyId = new AgentId((short) 0, (short) 0, 0);
proxyId.decode(decoder);
}
}
/**
* Constructs a <code>ClientMessages</code> instance.
......@@ -196,8 +290,18 @@ public class ClientMessages extends AbstractRequestNot {
output.append(",message=").append(message);
output.append(",messages=").append(messages);
output.append(",asyncSend=").append(asyncSend);
output.append(",proxyId=").append(proxyId);
output.append(')');
return output;
}
public static class Factory implements EncodableFactory {
public Encodable createEncodable() {
return new ClientMessages();
}
}
}
......@@ -23,13 +23,22 @@
*/
package org.objectweb.joram.mom.notifications;
import org.objectweb.joram.mom.util.JoramHelper;
import fr.dyade.aaa.agent.CallbackNotification;
import fr.dyade.aaa.common.encoding.Decoder;
import fr.dyade.aaa.common.encoding.Encodable;
import fr.dyade.aaa.common.encoding.EncodableFactory;
import fr.dyade.aaa.common.encoding.EncodableFactoryRepository;
import fr.dyade.aaa.common.encoding.Encoder;
/**
* A <code>TopicForwardNot</code> is a notification sent by a topic to
* another topic part of the same cluster, or to its hierarchical father,
* and holding a forwarded <code>ClientMessages</code> notification.
*/
public class TopicForwardNot extends fr.dyade.aaa.agent.Notification {
public class TopicForwardNot extends CallbackNotification {
/**
*
*/
......@@ -42,6 +51,8 @@ public class TopicForwardNot extends fr.dyade.aaa.agent.Notification {
/** The forwarded messages. */
public ClientMessages messages;
public TopicForwardNot() {}
/**
* Constructs a <code>TopicForwardNot</code> instance.
......@@ -54,4 +65,64 @@ public class TopicForwardNot extends fr.dyade.aaa.agent.Notification {
this.messages = messages;
this.fromCluster = fromCluster;
}
public void setPersistent(boolean persistent) {
this.persistent = persistent;
}
@Override
public int getEncodableClassId() {
return JoramHelper.TOPIC_FWD_NOT_CLASS_ID;
}
public int getEncodedSize() throws Exception {
int res = super.getEncodedSize() ;
res += BOOLEAN_ENCODED_SIZE + INT_ENCODED_SIZE;
res += messages.getEncodedSize();
return res;
}
public void encode(Encoder encoder) throws Exception {
super.encode(encoder);
encoder.encodeBoolean(fromCluster);
// Polymorphism may be used
encoder.encode32(messages.getEncodableClassId());
messages.encode(encoder);
}
public void decode(Decoder decoder) throws Exception {
super.decode(decoder);
fromCluster = decoder.decodeBoolean();
int factoryId = decoder.decode32();
if (factoryId == JoramHelper.CLIENT_MESSAGES_CLASS_ID) {
messages = new ClientMessages();
} else {
// Polymorphism
// TODO: a cache could be used
EncodableFactory factory = EncodableFactoryRepository.getFactory(factoryId);
messages = (ClientMessages) factory.createEncodable();
}
messages.decode(decoder);
}
public StringBuffer toString(StringBuffer output) {
output.append('(');
super.toString(output);
output.append(",fromCluster=").append(fromCluster);
output.append(",messages=").append(messages);
output.append(')');
return output;
}
public static class Factory implements EncodableFactory {
public Encodable createEncodable() {
return new TopicForwardNot();
}
}
}
......@@ -35,6 +35,7 @@ import org.objectweb.joram.mom.dest.QueueDeliveryTable;
import org.objectweb.joram.mom.messages.Message;
import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.mom.notifications.GetProxyIdNot;
import org.objectweb.joram.mom.notifications.TopicForwardNot;
import org.objectweb.joram.mom.util.JoramHelper;
import org.objectweb.joram.mom.util.MessageIdListFactory;
import org.objectweb.joram.mom.util.MessageIdListImpl;
......@@ -161,6 +162,8 @@ public class ConnectionManager implements ConnectionManagerMBean {
EncodableFactoryRepository.putFactory(JoramHelper.USER_AGENT_ARRIVAL_STATE_CLASS_ID, new UserAgentArrivalState.UserAgentArrivalStateFactory());
EncodableFactoryRepository.putFactory(JoramHelper.QUEUE_DELIVERY_TABLE_CLASS_ID, new QueueDeliveryTable.Factory());
EncodableFactoryRepository.putFactory(JoramHelper.QUEUE_ARRIVAL_STATE_CLASS_ID, new QueueArrivalState.Factory());
EncodableFactoryRepository.putFactory(JoramHelper.TOPIC_FWD_NOT_CLASS_ID, new TopicForwardNot.Factory());
EncodableFactoryRepository.putFactory(JoramHelper.CLIENT_MESSAGES_CLASS_ID, new ClientMessages.Factory());
}
private static CountDownCallback createCallback(final AbstractJmsRequest req, final ConnectionContext ctx) {
......@@ -252,6 +255,9 @@ public class ConnectionManager implements ConnectionManagerMBean {
Channel.sendTo(destId, not);
} else {
// Remote destination
// Set a callback too in order to provide flow control
// with the network.
rn.setCountDownCallback(createCallback(req, ctx));
Channel.sendTo(proxyId, rn);
}
}
......
......@@ -73,6 +73,8 @@ public class JoramHelper {
public static final int USER_AGENT_ARRIVAL_STATE_CLASS_ID = ENCODABLE_CLASS_ID_AREA + 6;
public static final int QUEUE_DELIVERY_TABLE_CLASS_ID = ENCODABLE_CLASS_ID_AREA + 7;
public static final int QUEUE_ARRIVAL_STATE_CLASS_ID = ENCODABLE_CLASS_ID_AREA + 8;
public static final int TOPIC_FWD_NOT_CLASS_ID = ENCODABLE_CLASS_ID_AREA + 9;
public static final int CLIENT_MESSAGES_CLASS_ID = ENCODABLE_CLASS_ID_AREA + 10;
/**
* Create user.
......
......@@ -266,7 +266,7 @@ public class JMSBridgeTopic extends Topic {
if (getId().equals(from)) return not;
// Forwarding the messages to the father or the cluster fellows, if any:
forwardMessages(not);
forwardMessages(from, not);
// Sending the received messages to the foreign JMS destination:
Message message;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment