Commit d1cd6467 authored by Ahmed El Rheddane's avatar Ahmed El Rheddane
Browse files

- Added ElasticTopic and related notifications.

parent 2e8c1cf6
......@@ -58,6 +58,7 @@ import org.objectweb.joram.shared.admin.GetRightsReply;
import org.objectweb.joram.shared.admin.GetRightsRequest;
import org.objectweb.joram.shared.admin.GetStatsReply;
import org.objectweb.joram.shared.admin.GetStatsRequest;
import org.objectweb.joram.shared.admin.ScaleRequest;
import org.objectweb.joram.shared.admin.SetDMQRequest;
import org.objectweb.joram.shared.admin.SetReader;
import org.objectweb.joram.shared.admin.SetWriter;
......@@ -1103,4 +1104,16 @@ public abstract class Destination extends AdministeredObject implements javax.jm
public AdminReply setProperties(Properties prop) throws ConnectException, AdminException {
return getWrapper().processAdmin(getName(), AdminCommandConstant.CMD_SET_PROPERTIES, prop);
}
/**
* Administration method for scaling operations.
*
* @param op scaling operation: scale up, scale down or balance.
* @param param parameter of the scaling operation.
* @throws ConnectException
* @throws AdminException
*/
public void scale(int op, String param) throws ConnectException, AdminException {
doRequest(new ScaleRequest(agentId, op, param));
}
}
package org.objectweb.joram.mom.dest;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.mom.notifications.ClientSubscriptionNot;
import org.objectweb.joram.mom.notifications.FwdAdminRequestNot;
import org.objectweb.joram.mom.notifications.GetClientSubscriptions;
import org.objectweb.joram.mom.notifications.ReconnectSubscribersNot;
import org.objectweb.joram.mom.notifications.TopicForwardNot;
import org.objectweb.joram.mom.notifications.TopicMsgsReply;
import org.objectweb.joram.shared.admin.AdminReply;
import org.objectweb.joram.shared.admin.AdminRequest;
import org.objectweb.joram.shared.admin.GetNumberReply;
import org.objectweb.joram.shared.admin.GetSubscriptionsRequest;
import org.objectweb.joram.shared.admin.ScaleRequest;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
import fr.dyade.aaa.agent.AgentId;
import fr.dyade.aaa.agent.Channel;
import fr.dyade.aaa.agent.Notification;
/**
* Class describing a node of a scalable topic tree.
* The difference with Topic is that it allows more than one father.
*
* @author Ahmed El Rheddane
*/
public class ElasticTopic extends Topic {
class TopicDesc implements Serializable {
private static final long serialVersionUID = 5983962604141303712L;
AgentId id;
String server;
int port;
}
private static final long serialVersionUID = 3074584772834111626L;
/**
* Pool of topics to forward msgs to.
*/
private List<TopicDesc> pool = new ArrayList<TopicDesc>();
/**
* True if topic is head of the elastic topic tree.
*/
private boolean isRoot = false;
/**
* Index of topic to forward next subscription to.
*/
private int subId = 0;
public void setProperties(Properties properties, boolean firstTime) throws Exception {
super.setProperties(properties, firstTime);
if (properties != null && properties.containsKey("root")) {
isRoot = true;
}
}
/**
* This method handles the scaling operations.
*/
public void handleAdminRequestNot(AgentId from, FwdAdminRequestNot not) {
AdminRequest adminRequest = not.getRequest();
if (adminRequest instanceof GetSubscriptionsRequest) {
handleGetSubscriptionsRequest(not);
} else if (adminRequest instanceof ScaleRequest) {
handleScaleRequest(not);
} else {
super.handleAdminRequestNot(from, not);
}
}
/**
*
*/
public void react(AgentId from, Notification not) throws Exception {
if (not instanceof ClientSubscriptionNot) {
handleClientSubscriptionNot(from, (ClientSubscriptionNot) not);
} else if (not instanceof ReconnectSubscribersNot) {
// Forward to local default user agent.
Channel.sendTo((AgentId) subscribers.get(0), not);
} else {
super.react(from, not);
}
}
/**
* Forward incoming publications to all fathers.
*
* @param messages
* @param fromCluster
*/
protected void doClientMessages(AgentId from, ClientMessages not, boolean throwsExceptionOnFullDest) {
ClientMessages clientMsgs = preProcess(from, not);
if (clientMsgs != null) {
for (TopicDesc td : pool) {
forward(td.id, new TopicForwardNot(clientMsgs, false));
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Messages forwarded to topic " + td.id.toString());
}
/*processMessages(clientMsgs);
postProcess(clientMsgs);*/
}
}
private void handleGetSubscriptionsRequest(FwdAdminRequestNot not) {
if (subscribers.isEmpty()) {
replyToTopic(new GetNumberReply(getNumberOfSubscribers()),
not.getReplyTo(), not.getRequestMsgId(), not.getReplyMsgId());
} else {
Channel.sendTo((AgentId) subscribers.get(0),
new GetClientSubscriptions(not));
}
}
private void handleScaleRequest(FwdAdminRequestNot not) {
ScaleRequest sr = (ScaleRequest) not.getRequest();
setSave(); // state change, so save.
String[] param;
int op = sr.getOperation();
switch(op) {
case ScaleRequest.SCALE_OUT:
/* Add a new topic to the pool.
param should be: "agent_id;server;port" */
param = sr.getParameter().split(";");
TopicDesc td = new TopicDesc();
td.id = AgentId.fromString(param[0]);
td.server = param[1];
td.port = Integer.parseInt(param[2]);
pool.add(td);
break;
case ScaleRequest.SCALE_IN:
/* remove last added topic.
param is not used. */
pool.remove(pool.size() - 1);
subId = subId % pool.size();
break;
case ScaleRequest.BALANCE:
/* Reconnect a given number of subscribers.
* param should be: "init_topic:topic_index1;number_of_subscribers1;topic_index1;..." */
param = sr.getParameter().split(":");
AgentId topic = pool.get(Integer.parseInt(param[0])).id;
String[] param1 = param[1].split(";");
ArrayList<Integer> subs = new ArrayList<Integer>();
ArrayList<Message> msgs = new ArrayList<Message>();
for (int i = 0; i < param1.length; i += 2) {
msgs.add(createReconnectionMessage(Integer.parseInt(param1[i])));
subs.add(Integer.parseInt(param1[i + 1]));
}
ReconnectSubscribersNot rsn =
new ReconnectSubscribersNot(subs, msgs);
Channel.sendTo(topic,rsn);
break;
default:
// Should never happen.
}
replyToTopic(new AdminReply(true, null),
not.getReplyTo(), not.getRequestMsgId(), not.getReplyMsgId());
}
/**
* If root, redirects subscriptions to proper topic.
*
* @param from shoud be the local default user agent.
* @param not Notification of a new client subscriptions.
*/
private void handleClientSubscriptionNot(AgentId from, ClientSubscriptionNot not) {
if (!isRoot)
return;
Message msg = createReconnectionMessage(subId);
subId = (subId + 1) % pool.size();
ReconnectSubscribersNot rsn =
new ReconnectSubscribersNot(not.getSubName(), msg);
Channel.sendTo(from,rsn);
}
private Message createReconnectionMessage(int tid) {
TopicDesc td = pool.get(tid);
Message msg = new Message();
msg.id = "Reconnection Message";
msg.setProperty("reconnect", td.id.toString());
msg.setProperty("server", td.server);
msg.setProperty("port", td.port);
return msg;
}
}
......@@ -40,6 +40,7 @@ import org.objectweb.joram.mom.notifications.ClusterJoinNot;
import org.objectweb.joram.mom.notifications.ClusterRemoveNot;
import org.objectweb.joram.mom.notifications.ExceptionReply;
import org.objectweb.joram.mom.notifications.FwdAdminRequestNot;
import org.objectweb.joram.mom.notifications.ClientSubscriptionNot;
import org.objectweb.joram.mom.notifications.SubscribeReply;
import org.objectweb.joram.mom.notifications.SubscribeRequest;
import org.objectweb.joram.mom.notifications.TopicForwardNot;
......@@ -151,6 +152,8 @@ public class Topic extends Destination implements TopicMBean {
unsubscribeRequest(from);
else if (not instanceof TopicForwardNot)
topicForwardNot(from, (TopicForwardNot) not);
else if (not instanceof ClientSubscriptionNot)
{}
else
super.react(from, not);
} catch (MomException exc) {
......
package org.objectweb.joram.mom.notifications;
import fr.dyade.aaa.agent.Notification;
/**
* Used by ElasticTopic.
*
* @author Ahmed El Rheddane
*
*/
public class ClientSubscriptionNot extends Notification {
private static final long serialVersionUID = 1L;
private String subName;
public ClientSubscriptionNot(String subName) {
this.subName = subName;
}
public String getSubName() {
return subName;
}
}
package org.objectweb.joram.mom.notifications;
import fr.dyade.aaa.agent.Notification;
/**
* Used by ElasticTopic.
*
* @author Ahmed El Rheddane
*
*/
public class GetClientSubscriptions extends Notification {
private static final long serialVersionUID = 1L;
private FwdAdminRequestNot not;
public GetClientSubscriptions(FwdAdminRequestNot not) {
this.not = not;
}
public FwdAdminRequestNot getAdminNot() {
return not;
}
}
package org.objectweb.joram.mom.notifications;
import java.util.ArrayList;
import org.objectweb.joram.shared.messages.Message;
import fr.dyade.aaa.agent.Notification;
/**
* Used by ElasticTopic
*
* @author Ahmed El Rheddane
*
*/
public class ReconnectSubscribersNot extends Notification {
private static final long serialVersionUID = 1L;
private String subName;
ArrayList<Integer> subs;
ArrayList<Message> msgs;
public ReconnectSubscribersNot(String subName, Message msg) {
this.subName = subName;
msgs = new ArrayList<Message>();
msgs.add(msg);
}
public ReconnectSubscribersNot(ArrayList<Integer> subs, ArrayList<Message> msgs) {
this.subs = subs;
this.msgs = msgs;
}
public String getSubName() {
return subName;
}
public ArrayList<Integer> getSubs() {
return subs;
}
public ArrayList<Message> getMsgs() {
return msgs;
}
}
......@@ -100,4 +100,9 @@ class TopicSubscription {
Iterator getNames() {
return subs.keySet().iterator();
}
/** Returns the number of subscriptions. */
int size() {
return subs.keySet().size();
}
}
......@@ -66,8 +66,11 @@ import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.mom.notifications.DenyRequest;
import org.objectweb.joram.mom.notifications.ExceptionReply;
import org.objectweb.joram.mom.notifications.FwdAdminRequestNot;
import org.objectweb.joram.mom.notifications.GetClientSubscriptions;
import org.objectweb.joram.mom.notifications.ClientSubscriptionNot;
import org.objectweb.joram.mom.notifications.QueueMsgReply;
import org.objectweb.joram.mom.notifications.ReceiveRequest;
import org.objectweb.joram.mom.notifications.ReconnectSubscribersNot;
import org.objectweb.joram.mom.notifications.SubscribeReply;
import org.objectweb.joram.mom.notifications.SubscribeRequest;
import org.objectweb.joram.mom.notifications.TopicDeliveryTimeNot;
......@@ -526,6 +529,10 @@ public final class UserAgent extends Agent implements UserAgentMBean, ProxyAgent
doReact((FwdAdminRequestNot) not);
} else if (not instanceof TopicDeliveryTimeNot) {
doReact((TopicDeliveryTimeNot) not);
} else if (not instanceof GetClientSubscriptions) {
doReact(from, (GetClientSubscriptions) not);
} else if (not instanceof ReconnectSubscribersNot) {
doReact(from, (ReconnectSubscribersNot) not);
} else {
super.react(from, not);
}
......@@ -537,6 +544,81 @@ public final class UserAgent extends Agent implements UserAgentMBean, ProxyAgent
saveModifiedClientContexts();
saveModifiedClientSubscriptions();
}
/**
* Used to get number of local subscribers to 'from'.
* This number is sent as an Admin reply.
*
* @param from should be a Topic agent ID.
* @param not contains the original Admin not sent to 'from'.
*/
private void doReact(AgentId from, GetClientSubscriptions not) {
FwdAdminRequestNot aNot = (FwdAdminRequestNot) not.getAdminNot();
int ls = ((TopicSubscription) topicsTable.get(from)).size();
replyToTopic(new GetNumberReply(ls),
aNot.getReplyTo(), aNot.getRequestMsgId(), aNot.getReplyMsgId());
}
/**
* Sends reconnection messages to one or more subscribers.
*
* @param from
* @param not
*/
private void doReact(AgentId from, ReconnectSubscribersNot not) {
ClientSubscription sub;
ConsumerMessages consM;
String subName = not.getSubName();
ArrayList msgs = not.getMsgs();
List message = new ArrayList();
message.add(
new Message((org.objectweb.joram.shared.messages.Message) msgs.get(0)));
if (subName != null) {
// Redirect a specific subscriber.
sub = subsTable.get(subName);
sub.browseNewMessages(message);
consM = sub.deliver();
try {
setCtx(sub.getContextId());
if (activeCtx.getActivated()) {
doReply(consM);
}
} catch (StateException e) {}
} else {
// Redirect many subscribers..
ArrayList<Integer> subs = not.getSubs();
TopicSubscription tSub = (TopicSubscription) topicsTable.get(from);
int i = 0;
int c = subs.get(i);
for (Iterator names = tSub.getNames(); names.hasNext();) {
subName = (String) names.next();
sub = (ClientSubscription) subsTable.get(subName);
if (sub != null && sub.getActive() > 0) {
sub.browseNewMessages(message);
consM = sub.deliver();
try {
setCtx(sub.getContextId());
if (activeCtx.getActivated()) {
doReply(consM);
}
} catch (StateException e) {}
}
c--;
if (c == 0)
i++;
if (i < subs.size()) {
c = subs.get(i);
message.set(0,new Message((org.objectweb.joram.shared.messages.Message) msgs.get(i)));
} else {
break;
}
}
}
}
private void doSetPeriod(long period) {
if (logger.isLoggable(BasicLevel.DEBUG))
......@@ -1778,6 +1860,8 @@ public final class UserAgent extends Agent implements UserAgentMBean, ProxyAgent
// Acknowledging the request, if needed.
if (!sent)
sendNot(getId(), new SyncReply(activeCtxId, new ServerReply(req)));
Channel.sendTo(topicId,new ClientSubscriptionNot(subName));
}
/**
......@@ -3469,7 +3553,7 @@ public final class UserAgent extends Agent implements UserAgentMBean, ProxyAgent
replyTo, requestMsgId, replyMsgId);
}
}
private void replyToTopic(AdminReply reply, AgentId replyTo, String requestMsgId, String replyMsgId) {
if (replyTo == null) // In some cases the request needs no response
return;
......@@ -4169,5 +4253,4 @@ class Xid implements Serializable, Encodable {
fi = decoder.decodeUnsignedInt();
gti = decoder.decodeByteArray();
}
}
......@@ -150,6 +150,7 @@ public abstract class AbstractAdminMessage implements Externalizable, Streamable
protected final static int MONITOR_GET_DELIVERED_MESSAGES = 99;
protected final static int SND_DEST_WEIGHTS = 100;
protected final static int SET_SYNC_EXCEPTION_ON_FULL_DEST = 101;
protected final static int SCALE_REQUEST = 102;
protected int classid;
......@@ -255,7 +256,8 @@ public abstract class AbstractAdminMessage implements Externalizable, Streamable
DelRemoteDestination.class.getName(),
GetDeliveredMessages.class.getName(),
SendDestinationsWeights.class.getName(),
SetSyncExceptionOnFullDestRequest.class.getName()
SetSyncExceptionOnFullDestRequest.class.getName(),
ScaleRequest.class.getName()
};
protected abstract int getClassId();
......
package org.objectweb.joram.shared.admin;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import fr.dyade.aaa.common.stream.StreamUtil;
/**
* Class to gather all scaling requests
*
* @author Ahmed El Rheddane
*
*/
public class ScaleRequest extends DestinationAdminRequest {
/**
* Operation corresponding to the addition of one resource.
*/
public static final int SCALE_IN = -1;
/**
* Operation corresponding to the removal of one resource.
*/
public static final int SCALE_OUT = 1;
/**
* Generic balancing operation.
*/
public static final int BALANCE = 0;
/** define serialVersionUID for interoperability */
private static final long serialVersionUID = 1L;
private int op;
private String param;
/**
* Adds a destination to a cluster.
* <p>
*
* @param clusteredDest Destination part of the cluster.
* @param addedDest Destination joining the cluster.
*/
public ScaleRequest(String destId, int op, String param) {
super(destId);
this.op = op;
this.param = param;
}
public ScaleRequest() { }
protected int getClassId() {
return SCALE_REQUEST;
}
public int getOperation() {
return op;
}
/**
* @return the resource to be managed.
*/
public String getParameter() {
return param;
}
public void readFrom(InputStream is) throws IOException {
super.readFrom(is);
op = StreamUtil.readIntFrom(is);
param = StreamUtil.readStringFrom(is);
}