Commit 4af47fb7 authored by Ahmed El Rheddane's avatar Ahmed El Rheddane
Browse files

- Qq changement dans la partie topic elastique.

- Traitement nul dans Destination pour ScaleRequest.
parent 4940abc8
......@@ -62,6 +62,7 @@ import org.objectweb.joram.shared.admin.AdminRequest;
import org.objectweb.joram.shared.admin.DeleteDestination;
import org.objectweb.joram.shared.admin.GetStatsReply;
import org.objectweb.joram.shared.admin.GetStatsRequest;
import org.objectweb.joram.shared.admin.ScaleRequest;
import org.objectweb.joram.shared.admin.SetDMQRequest;
import org.objectweb.joram.shared.admin.SetReader;
import org.objectweb.joram.shared.admin.SetRight;
......@@ -1065,6 +1066,8 @@ public abstract class Destination extends Agent implements DestinationMBean, TxD
not.getReplyTo(), not.getRequestMsgId(), not.getReplyMsgId());
} else if (adminRequest instanceof AdminCommandRequest) {
processAdminCommand((AdminCommandRequest) adminRequest, not.getReplyTo(), not.getRequestMsgId());
} else if (adminRequest instanceof ScaleRequest) {
// Ignored, should be handled either by Queues or Topics.
} else {
logger.log(BasicLevel.ERROR, "Unknown administration request for destination " + getId());
replyToTopic(new AdminReply(AdminReply.UNKNOWN_REQUEST, null),
......
......@@ -21,6 +21,7 @@
* Initial developer(s): Université Joseph Fourier
* Contributor(s): ScalAgent Distributed Technologies
*/
package org.objectweb.joram.mom.dest;
import java.io.Serializable;
......@@ -48,7 +49,6 @@ import fr.dyade.aaa.agent.Notification;
/**
* Class describing a node of a scalable topic tree.
* The difference with Topic is that it allows more than one father.
*
* @author Ahmed El Rheddane
*/
......@@ -81,7 +81,6 @@ public class ElasticTopic extends Topic {
public void setProperties(Properties properties, boolean firstTime) throws Exception {
super.setProperties(properties, firstTime);
if (properties != null && properties.containsKey("root")) {
isRoot = true;
}
......@@ -109,7 +108,8 @@ public class ElasticTopic extends Topic {
if (not instanceof ClientSubscriptionNot) {
handleClientSubscriptionNot(from, (ClientSubscriptionNot) not);
} else if (not instanceof ReconnectSubscribersNot) {
// Forward to local default user agent.
/* Forward to local default user agent.
Should reply to initial Admin request. */
Channel.sendTo((AgentId) subscribers.get(0), not);
} else {
super.react(from, not);
......@@ -137,8 +137,9 @@ public class ElasticTopic extends Topic {
}
private void handleGetSubscriptionsRequest(FwdAdminRequestNot not) {
logger.log(BasicLevel.ERROR, "Number of subsribers: " + subscribers.size());
if (subscribers.isEmpty()) {
replyToTopic(new GetNumberReply(getNumberOfSubscribers()),
replyToTopic(new GetNumberReply(0),
not.getReplyTo(), not.getRequestMsgId(), not.getReplyMsgId());
} else {
Channel.sendTo((AgentId) subscribers.get(0),
......@@ -170,8 +171,8 @@ public class ElasticTopic extends Topic {
subId = subId % pool.size();
break;
case ScaleRequest.BALANCE:
/* Reconnect a given number of subscribers.
* param should be: "init_topic:topic_index1;number_of_subscribers1;topic_index1;..." */
/* Reconnect a given number of subscribers ON THE SAME TOPIC.
param should be: "init_topic:topic_index1;number_of_subscribers1;topic_index1;..." */
param = sr.getParameter().split(":");
AgentId topic = pool.get(Integer.parseInt(param[0])).id;
String[] param1 = param[1].split(";");
......@@ -182,10 +183,10 @@ public class ElasticTopic extends Topic {
subs.add(Integer.parseInt(param1[i + 1]));
}
ReconnectSubscribersNot rsn =
new ReconnectSubscribersNot(subs, msgs);
new ReconnectSubscribersNot(subs, msgs,not);
//Destination should do the Admin reply.
Channel.sendTo(topic,rsn);
break;
return;
default:
// Should never happen.
}
......@@ -197,14 +198,18 @@ public class ElasticTopic extends Topic {
/**
* If root, redirects subscriptions to proper topic.
*
* @param from shoud be the local default user agent.
* @param from should be the local default user agent.
* @param not Notification of a new client subscriptions.
*/
private void handleClientSubscriptionNot(AgentId from, ClientSubscriptionNot not) {
if (!isRoot)
if (!isRoot) {
logger.log(BasicLevel.ERROR,"Received subscription!");
return;
}
Message msg = createReconnectionMessage(subId);
logger.log(BasicLevel.ERROR,"Redirecting sub to: " + subId + ";" + pool.get(subId).server);
subId = (subId + 1) % pool.size();
ReconnectSubscribersNot rsn =
......
......@@ -40,8 +40,10 @@ public class ReconnectSubscribersNot extends Notification {
private String subName;
ArrayList<Integer> subs;
ArrayList<Message> msgs;
private ArrayList<Integer> subs;
private ArrayList<Message> msgs;
private FwdAdminRequestNot not;
public ReconnectSubscribersNot(String subName, Message msg) {
this.subName = subName;
......@@ -49,9 +51,10 @@ public class ReconnectSubscribersNot extends Notification {
msgs.add(msg);
}
public ReconnectSubscribersNot(ArrayList<Integer> subs, ArrayList<Message> msgs) {
public ReconnectSubscribersNot(ArrayList<Integer> subs, ArrayList<Message> msgs,FwdAdminRequestNot not) {
this.subs = subs;
this.msgs = msgs;
this.not = not;
}
public String getSubName() {
......@@ -65,4 +68,8 @@ public class ReconnectSubscribersNot extends Notification {
public ArrayList<Message> getMsgs() {
return msgs;
}
public FwdAdminRequestNot getNot() {
return not;
}
}
......@@ -618,6 +618,13 @@ public final class UserAgent extends Agent implements UserAgentMBean, ProxyAgent
}
}
}
// If there is an Admin request to reply to..
FwdAdminRequestNot adr = not.getNot();
if (adr != null) {
replyToTopic(new AdminReply(true, null),
adr.getReplyTo(), adr.getRequestMsgId(), adr.getReplyMsgId());
}
}
private void doSetPeriod(long period) {
......@@ -967,7 +974,7 @@ public final class UserAgent extends Agent implements UserAgentMBean, ProxyAgent
// reactToClientRequest(key.intValue(), new CnxCloseRequest());
//
// if (ctx != null) {
// MomException exc = new MomException(MomExceptionReply.HBCloseConnection, "Connection " + getId()
// MomException exc = new MomException(MomExceptionReply.HBCloseConnection, "Connection " + getId()
// + ':' + key + " closed");
// ctx.pushError(exc);
// }
......@@ -1861,6 +1868,7 @@ public final class UserAgent extends Agent implements UserAgentMBean, ProxyAgent
if (!sent)
sendNot(getId(), new SyncReply(activeCtxId, new ServerReply(req)));
// Forward client subscription (should be ignored if topicId isn't an ElasticTopic)
Channel.sendTo(topicId,new ClientSubscriptionNot(subName));
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment