Commit c65bd1de authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

Rework clustering: harmonize behaviors and notifications exchanges between...

Rework clustering: harmonize behaviors and notifications exchanges between queues and topics, allow transitive closure, fix various problems.
parent a52cbaf4
......@@ -24,38 +24,41 @@
package org.objectweb.joram.mom.dest;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.objectweb.joram.mom.messages.Message;
import org.objectweb.joram.mom.notifications.AckJoinQueueCluster;
import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.mom.notifications.ClusterJoinAck;
import org.objectweb.joram.mom.notifications.ClusterJoinNot;
import org.objectweb.joram.mom.notifications.ClusterRemoveNot;
import org.objectweb.joram.mom.notifications.FwdAdminRequestNot;
import org.objectweb.joram.mom.notifications.JoinQueueCluster;
import org.objectweb.joram.mom.notifications.LBCycleLife;
import org.objectweb.joram.mom.notifications.LBMessageGive;
import org.objectweb.joram.mom.notifications.LBMessageHope;
import org.objectweb.joram.mom.notifications.LeaveQueueCluster;
import org.objectweb.joram.mom.notifications.QueueClusterNot;
import org.objectweb.joram.mom.notifications.ReceiveRequest;
import org.objectweb.joram.mom.notifications.WakeUpNot;
import org.objectweb.joram.mom.util.DMQManager;
import org.objectweb.joram.shared.admin.AddQueueCluster;
import org.objectweb.joram.shared.admin.AdminReply;
import org.objectweb.joram.shared.admin.AdminRequest;
import org.objectweb.joram.shared.admin.ListClusterQueue;
import org.objectweb.joram.shared.admin.RemoveQueueCluster;
import org.objectweb.joram.shared.admin.ClusterAdd;
import org.objectweb.joram.shared.admin.ClusterLeave;
import org.objectweb.joram.shared.admin.ClusterList;
import org.objectweb.joram.shared.admin.ClusterListReply;
import org.objectweb.joram.shared.excepts.AccessException;
import org.objectweb.joram.shared.excepts.RequestException;
import org.objectweb.util.monolog.api.BasicLevel;
import fr.dyade.aaa.agent.AgentId;
import fr.dyade.aaa.agent.DeleteNot;
import fr.dyade.aaa.agent.Notification;
import fr.dyade.aaa.agent.UnknownAgent;
import fr.dyade.aaa.agent.UnknownNotificationException;
/**
......@@ -70,7 +73,7 @@ public class ClusterQueue extends Queue {
* key = agentId of ClusterQueue
* value = rateOfFlow (Float)
*/
protected Hashtable clusters;
protected Map clusters;
/** to evaluate the loading factor, overloading, ... */
protected LoadingFactor loadingFactor;
......@@ -79,12 +82,12 @@ public class ClusterQueue extends Queue {
* key = msgId
* value = date
*/
private LinkedHashMap timeTable = new LinkedHashMap();
private Map timeTable = new LinkedHashMap();
/**
* key = msgId value = List (alreadyVisit)
*/
private Hashtable visitTable = new Hashtable();
private Map visitTable = new Hashtable();
/** Number of message send to cluster */
private long clusterDeliveryCount = 0;
......@@ -167,16 +170,13 @@ public class ClusterQueue extends Queue {
String info = strbuf.append("Request [").append(not.getClass().getName())
.append("], sent to Destination [").append(getId()).append("], successful [true] ").toString();
if (adminRequest instanceof ListClusterQueue) {
List list = doList((ListClusterQueue) adminRequest);
replyToTopic(new AdminReply(true, info, list), not.getReplyTo(), not.getRequestMsgId(),
not.getReplyMsgId());
} else if (adminRequest instanceof AddQueueCluster) {
addQueueCluster(((AddQueueCluster) adminRequest).joiningQueue, loadingFactor.getRateOfFlow());
replyToTopic(new AdminReply(true, info), not.getReplyTo(), not.getRequestMsgId(), not.getReplyMsgId());
} else if (adminRequest instanceof RemoveQueueCluster) {
broadcastLeave(((RemoveQueueCluster) adminRequest).removeQueue);
removeQueueCluster(((RemoveQueueCluster) adminRequest).removeQueue);
if (adminRequest instanceof ClusterList) {
List list = clusterList();
replyToTopic(new ClusterListReply(list), not.getReplyTo(), not.getRequestMsgId(), not.getReplyMsgId());
} else if (adminRequest instanceof ClusterAdd) {
clusterAdd(not, ((ClusterAdd) adminRequest).getAddedDest());
} else if (adminRequest instanceof ClusterLeave) {
clusterLeave();
replyToTopic(new AdminReply(true, info), not.getReplyTo(), not.getRequestMsgId(), not.getReplyMsgId());
} else {
super.handleAdminRequestNot(from, not);
......@@ -194,12 +194,12 @@ public class ClusterQueue extends Queue {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "--- " + this + " react(" + from + "," + not + ")");
if (not instanceof AckJoinQueueCluster)
ackJoinQueueCluster((AckJoinQueueCluster) not);
else if (not instanceof JoinQueueCluster)
joinQueueCluster((JoinQueueCluster) not);
else if (not instanceof LeaveQueueCluster)
removeQueueCluster(((LeaveQueueCluster) not).removeQueue);
if (not instanceof ClusterJoinAck)
clusterJoinAck((ClusterJoinAck) not);
else if (not instanceof ClusterJoinNot)
clusterJoin((ClusterJoinNot) not);
else if (not instanceof ClusterRemoveNot)
clusterRemove(from);
else if (not instanceof LBMessageGive)
lBMessageGive(from, (LBMessageGive) not);
else if (not instanceof LBMessageHope)
......@@ -216,74 +216,96 @@ public class ClusterQueue extends Queue {
}
/**
* return the cluster list.
*
* @param req
* @return the cluster list.
* Reaction to the request of adding a new cluster element.
*/
protected List doList(ListClusterQueue req) {
List vect = new ArrayList();
for (Enumeration e = clusters.keys(); e.hasMoreElements(); )
vect.add(e.nextElement().toString());
return vect;
private void clusterAdd(FwdAdminRequestNot req, String joiningQueue) {
AgentId newFriendId = AgentId.fromString(joiningQueue);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueue.addQueueCluster: joiningQueue="
+ joiningQueue + ", clusters=" + clusters);
forward(newFriendId,
new ClusterJoinNot(new HashSet(clusters.keySet()), req.getReplyTo(), req.getRequestMsgId(), req.getReplyMsgId()));
}
/**
* send to joiningQueue a JoinQueueCluster not.
*
* @param joiningQueue
* @param rateOfFlow
*/
protected void addQueueCluster(String joiningQueue, float rateOfFlow) {
AgentId id = AgentId.fromString(joiningQueue);
if (clusters.containsKey(id)) return;
* Method implementing the reaction to a {@link ClusterJoinNot} notification,
* sent by a fellow queue for notifying this queue to join the cluster, doing
* a transitive closure of clusters, if any.
*/
private void clusterJoin(ClusterJoinNot not) {
for (Iterator e = not.getCluster().iterator(); e.hasNext();) {
AgentId id = (AgentId) e.next();
if (!clusters.containsKey(id))
clusters.put(id, new Float(1));
}
// clusters.put(id,new Float(rateOfFlow));
sendToCluster(new ClusterJoinAck(new HashSet(clusters.keySet())));
replyToTopic(new AdminReply(true, null), not.getReplyTo(), not.getRequestMsgId(), not.getReplyMsgId());
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueue.addQueueCluster in " + getId()
+ "\njoiningQueue=" + joiningQueue + "\nclusters=" + clusters);
logger.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueue.joinQueueCluster(" + not + "), clusters="
+ clusters);
}
/**
* Method implementing the reaction to a {@link ClusterJoinAck} notification,
* doing a transitive closure with the current cluster and the one of the new
* cluster element.
*/
private void clusterJoinAck(ClusterJoinAck not) {
for (Iterator e = not.getCluster().iterator(); e.hasNext();) {
AgentId id = (AgentId) e.next();
if (!clusters.containsKey(id)) {
clusters.put(id, new Float(1));
}
}
forward(id,
new JoinQueueCluster(loadingFactor.getRateOfFlow(),
clusters,
clients,
freeReading,
freeWriting));
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueue.ackJoinQueueCluster(" + not
+ "), clusters=" + clusters);
}
/**
* broadcast to cluster the removeQueue.
* Returns the cluster list.
*
* @param removeQueue
* @return the cluster list.
*/
protected void broadcastLeave(String removeQueue) {
sendToCluster(new LeaveQueueCluster(removeQueue));
private List clusterList() {
List list = new ArrayList();
for (Iterator e = clusters.keySet().iterator(); e.hasNext();)
list.add(e.next().toString());
return list;
}
/**
* removeQueue leave the cluster.
* Ask this queue to leave the cluster.
*/
private void clusterLeave() {
sendToCluster(new ClusterRemoveNot());
clusters.clear();
clusters.put(getId(), new Float(1));
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueue.leaveCluster: " + getId());
}
/**
* Remove the specified queue from current cluster.
*
* @param removeQueue
*/
private void removeQueueCluster(String removeQueue) {
AgentId id = AgentId.fromString(removeQueue);
if (getId().equals(id)) {
clusters.clear();
} else
clusters.remove(id);
for (Enumeration e = visitTable.elements(); e.hasMoreElements(); ) {
List visit = (List) e.nextElement();
if (visit.contains(id))
visit.remove(id);
* @param queue The queue which left the cluster
*/
private void clusterRemove(AgentId queue) {
clusters.remove(queue);
for (Iterator e = visitTable.values().iterator(); e.hasNext();) {
((List) e.next()).remove(queue);
}
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueue.removeQueueCluster in " + getId()
+ "\nremoveQueue=" + removeQueue + "\nclusters=" + clusters);
logger.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueue.removeQueueFromCluster: removedQueue="
+ queue + ", clusters=" + clusters);
}
/**
* overload preProcess(AgentId, ClientMessages)
* store all msgId in timeTable and visitTable.
......@@ -293,8 +315,7 @@ public class ClusterQueue extends Queue {
*/
public ClientMessages preProcess(AgentId from, ClientMessages not) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"--- " + this + " " + not);
logger.log(BasicLevel.DEBUG, "--- " + this + " " + not);
receiving = true;
long date = System.currentTimeMillis();
......@@ -357,13 +378,13 @@ public class ClusterQueue extends Queue {
if (toGive.isEmpty()) return;
Hashtable table = new Hashtable();
Map table = new Hashtable();
for (int i = 0; i < toGive.size(); i++) {
String msgId = (String) toGive.get(i);
List visit = (List) visitTable.get(msgId);
boolean transmitted = false;
for (Enumeration e = clusters.keys(); e.hasMoreElements(); ) {
AgentId id = (AgentId) e.nextElement();
for (Iterator e = clusters.keySet().iterator(); e.hasNext();) {
AgentId id = (AgentId) e.next();
if (! visit.contains(id)) {
Message message = getQueueMessage(msgId, true);
if (message != null) {
......@@ -388,8 +409,8 @@ public class ClusterQueue extends Queue {
}
}
for (Enumeration e = table.keys(); e.hasMoreElements(); ) {
AgentId id = (AgentId) e.nextElement();
for (Iterator e = table.keySet().iterator(); e.hasNext();) {
AgentId id = (AgentId) e.next();
forward(id,(LBCycleLife) table.get(id));
}
}
......@@ -406,97 +427,20 @@ public class ClusterQueue extends Queue {
clusters.put(from,new Float(not.getRateOfFlow()));
Hashtable vT = not.getVisitTable();
for (Enumeration e = vT.keys(); e.hasMoreElements(); ) {
String msgId = (String) e.nextElement();
Map vT = not.getVisitTable();
for (Iterator e = vT.keySet().iterator(); e.hasNext();) {
String msgId = (String) e.next();
visitTable.put(msgId,vT.get(msgId));
}
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueue.lBCycleLife(" + not + ")" + "\nvisitTable="
logger.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueue.lBCycleLife(" + not + "), visitTable="
+ clusters);
ClientMessages cm = not.getClientMessages();
if (cm != null)
doClientMessages(from, cm);
}
/**
* new queue come in cluster, update clusters.
* and spread to clusters the AckjoiningQueue.
*
* @param not JoinQueueCluster
*/
private void joinQueueCluster(JoinQueueCluster not) {
for (Enumeration e = not.clusters.keys(); e.hasMoreElements(); ) {
AgentId id = (AgentId) e.nextElement();
if (! clusters.containsKey(id))
clusters.put(id,not.clusters.get(id));
}
for (Enumeration e = not.clients.keys(); e.hasMoreElements(); ) {
AgentId user = (AgentId) e.nextElement();
if (clients.containsKey(user)) {
Integer right = (Integer) not.clients.get(user);
if (right.compareTo((Integer) clients.get(user)) > 0)
clients.put(user,right);
} else
clients.put(user,not.clients.get(user));
}
freeReading = freeReading | not.freeReading;
freeWriting = freeWriting | not.freeWriting;
sendToCluster(
new AckJoinQueueCluster(loadingFactor.getRateOfFlow(),
clusters,
clients,
freeReading,
freeWriting));
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueue.joinQueueCluster(" + not + ")"
+ "\nclusters=" + clusters + "\nclients=" + clients);
}
/**
*
* @param not AckJoinQueueCluster
*/
private void ackJoinQueueCluster(AckJoinQueueCluster not) {
boolean update = false;
for (Enumeration e = not.clusters.keys(); e.hasMoreElements(); ) {
AgentId id = (AgentId) e.nextElement();
if (! clusters.containsKey(id)) {
clusters.put(id,not.clusters.get(id));
update = true;
}
}
for (Enumeration e = not.clients.keys(); e.hasMoreElements(); ) {
AgentId user = (AgentId) e.nextElement();
if (clients.containsKey(user)) {
Integer right = (Integer) not.clients.get(user);
if (right.compareTo((Integer) clients.get(user)) > 0)
clients.put(user,right);
} else
clients.put(user,not.clients.get(user));
}
freeReading = freeReading | not.freeReading;
freeWriting = freeWriting | not.freeWriting;
if (update) {
sendToCluster(
new AckJoinQueueCluster(loadingFactor.getRateOfFlow(),
clusters,
clients,
freeReading,
freeWriting));
}
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueue.ackJoinQueueCluster(" + not + ")"
+ "\nclusters=" + clusters + "\nclients=" + clients);
}
/**
*
* @param not ReceiveRequest
......@@ -619,25 +563,48 @@ public class ClusterQueue extends Queue {
}
return msg;
}
/**
* send to all queue in cluster.
* Sends a notification to all queue in cluster.
*
* @param not
* @param not The notification to send.
*/
protected void sendToCluster(QueueClusterNot not) {
protected void sendToCluster(Notification not) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "--- " + this + " ClusterQueue.sendToCluster(" + not + ")");
if (clusters.size() < 2) return;
for (Enumeration e = clusters.keys(); e.hasMoreElements(); ) {
AgentId id = (AgentId) e.nextElement();
for (Iterator e = clusters.keySet().iterator(); e.hasNext();) {
AgentId id = (AgentId) e.next();
if (! id.equals(getId()))
forward(id,not);
}
}
protected void doDeleteNot(DeleteNot not) {
clusterLeave();
super.doDeleteNot(not);
}
protected void doUnknownAgent(UnknownAgent uA) {
super.doUnknownAgent(uA);
AgentId agId = uA.agent;
Notification not = uA.not;
if (not instanceof ClusterJoinNot) {
ClusterJoinNot cT = (ClusterJoinNot) not;
logger.log(BasicLevel.ERROR, "Cluster join failed: " + uA.agent + " unknown.");
String info = "Cluster join failed: Unknown destination.";
replyToTopic(new AdminReply(AdminReply.BAD_CLUSTER_REQUEST, info), cT.getReplyTo(),
cT.getRequestMsgId(), cT.getReplyMsgId());
} else if (not instanceof ClusterJoinAck || not instanceof ClusterRemoveNot) {
logger.log(BasicLevel.ERROR, "Cluster error: " + uA.agent + " unknown. "
+ "The topic has probably been removed in the meantime.");
clusterRemove(agId);
}
}
/**
* return the number of Message send to cluster.
*/
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment