Commit 925f095d authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

Improve Joram administration: regroup all requests forwarded to a destination...

Improve Joram administration: regroup all requests forwarded to a destination under DestinationAdminRequest class, remove special admin requests.
parent cbc3342b
......@@ -43,7 +43,6 @@ import org.objectweb.joram.mom.notifications.GetProxyIdNot;
import org.objectweb.joram.mom.notifications.GetRightsReplyNot;
import org.objectweb.joram.mom.notifications.GetRightsRequestNot;
import org.objectweb.joram.mom.notifications.RequestGroupNot;
import org.objectweb.joram.mom.notifications.SpecialAdminRequest;
import org.objectweb.joram.mom.proxies.AdminNotification;
import org.objectweb.joram.mom.proxies.SendReplyNot;
import org.objectweb.joram.mom.proxies.UserAgent;
......@@ -58,7 +57,7 @@ import org.objectweb.joram.shared.admin.CreateUserReply;
import org.objectweb.joram.shared.admin.CreateUserRequest;
import org.objectweb.joram.shared.admin.DeleteDestination;
import org.objectweb.joram.shared.admin.DeleteUser;
import org.objectweb.joram.shared.admin.GetClusterRequest;
import org.objectweb.joram.shared.admin.DestinationAdminRequest;
import org.objectweb.joram.shared.admin.GetConfigRequest;
import org.objectweb.joram.shared.admin.GetDMQSettingsReply;
import org.objectweb.joram.shared.admin.GetDMQSettingsRequest;
......@@ -66,32 +65,20 @@ import org.objectweb.joram.shared.admin.GetDestinationsReply;
import org.objectweb.joram.shared.admin.GetDestinationsRequest;
import org.objectweb.joram.shared.admin.GetDomainNames;
import org.objectweb.joram.shared.admin.GetDomainNamesRep;
import org.objectweb.joram.shared.admin.GetFatherRequest;
import org.objectweb.joram.shared.admin.GetLocalServer;
import org.objectweb.joram.shared.admin.GetLocalServerRep;
import org.objectweb.joram.shared.admin.GetNbMaxMsgRequest;
import org.objectweb.joram.shared.admin.GetPendingMessages;
import org.objectweb.joram.shared.admin.GetPendingRequests;
import org.objectweb.joram.shared.admin.GetRightsReply;
import org.objectweb.joram.shared.admin.GetRightsRequest;
import org.objectweb.joram.shared.admin.GetServersIdsReply;
import org.objectweb.joram.shared.admin.GetServersIdsRequest;
import org.objectweb.joram.shared.admin.GetStatsReply;
import org.objectweb.joram.shared.admin.GetStatsRequest;
import org.objectweb.joram.shared.admin.GetSubscriberIds;
import org.objectweb.joram.shared.admin.GetSubscriptionsRequest;
import org.objectweb.joram.shared.admin.GetUsersReply;
import org.objectweb.joram.shared.admin.GetUsersRequest;
import org.objectweb.joram.shared.admin.QueueAdminRequest;
import org.objectweb.joram.shared.admin.RemoveDomainRequest;
import org.objectweb.joram.shared.admin.RemoveServerRequest;
import org.objectweb.joram.shared.admin.SetCluster;
import org.objectweb.joram.shared.admin.SetDMQRequest;
import org.objectweb.joram.shared.admin.SetFather;
import org.objectweb.joram.shared.admin.SetNbMaxMsgRequest;
import org.objectweb.joram.shared.admin.SetRight;
import org.objectweb.joram.shared.admin.SetThresholdRequest;
import org.objectweb.joram.shared.admin.SpecialAdmin;
import org.objectweb.joram.shared.admin.StopServerRequest;
import org.objectweb.joram.shared.admin.UpdateUser;
import org.objectweb.joram.shared.admin.UserAdminRequest;
......@@ -583,22 +570,14 @@ public final class AdminTopic extends Topic implements AdminTopicMBean {
doProcess((CreateDestinationRequest) request, replyTo, msgId);
else if (request instanceof DeleteDestination)
doProcess((DeleteDestination) request, replyTo, msgId);
else if (request instanceof SetCluster)
doProcess((SetCluster) request, replyTo, msgId);
else if (request instanceof SetFather)
doProcess((SetFather) request, replyTo, msgId);
else if (request instanceof CreateUserRequest)
doProcess((CreateUserRequest) request, replyTo, msgId);
else if (request instanceof UpdateUser)
doProcess((UpdateUser) request, replyTo, msgId);
else if (request instanceof DeleteUser)
doProcess((DeleteUser) request, replyTo, msgId);
else if (request instanceof SetRight)
doProcess((SetRight) request, replyTo, msgId);
else if (request instanceof SetDMQRequest)
doProcess((SetDMQRequest) request, replyTo, msgId);
else if (request instanceof SetNbMaxMsgRequest)
doProcess((SetNbMaxMsgRequest) request, replyTo, msgId);
else if (request instanceof SetThresholdRequest)
doProcess((SetThresholdRequest) request, replyTo, msgId);
else if (request instanceof GetServersIdsRequest)
......@@ -615,22 +594,10 @@ public final class AdminTopic extends Topic implements AdminTopicMBean {
doProcess((GetRightsRequest) request, replyTo, msgId);
else if (request instanceof GetDMQSettingsRequest)
doProcess((GetDMQSettingsRequest) request, replyTo, msgId);
else if (request instanceof GetFatherRequest)
doProcess((GetFatherRequest) request, replyTo, msgId);
else if (request instanceof GetClusterRequest)
doProcess((GetClusterRequest) request, replyTo, msgId);
else if (request instanceof GetPendingMessages)
doProcess((GetPendingMessages) request, replyTo, msgId);
else if (request instanceof GetPendingRequests)
doProcess((GetPendingRequests) request, replyTo, msgId);
else if (request instanceof GetStatsRequest)
doProcess((GetStatsRequest) request, replyTo, msgId);
else if (request instanceof GetNbMaxMsgRequest)
doProcess((GetNbMaxMsgRequest) request, replyTo, msgId);
else if (request instanceof GetSubscriptionsRequest)
doProcess((GetSubscriptionsRequest) request, replyTo, msgId);
else if (request instanceof SpecialAdmin)
doProcess((SpecialAdmin) request, replyTo, msgId);
else if (request instanceof DestinationAdminRequest)
doProcess((DestinationAdminRequest) request, replyTo, msgId);
else if (request instanceof AddServerRequest)
doProcess((AddServerRequest) request, replyTo, msgId, from);
else if (request instanceof AddDomainRequest)
......@@ -643,10 +610,6 @@ public final class AdminTopic extends Topic implements AdminTopicMBean {
doProcess((GetConfigRequest) request, replyTo, msgId);
else if (request instanceof UserAdminRequest)
doProcess((UserAdminRequest) request, replyTo, msgId);
else if (request instanceof GetSubscriberIds)
doProcess((GetSubscriberIds) request, replyTo, msgId);
else if (request instanceof QueueAdminRequest)
doProcess((QueueAdminRequest) request, replyTo, msgId);
} catch (UnknownServerException exc) {
// Caught when a target server is invalid.
info = strbuf.append("Request [").append(request.getClass().getName())
......@@ -850,42 +813,6 @@ public final class AdminTopic extends Topic implements AdminTopicMBean {
}
}
/**
* Processes a <code>Monitor_GetCluster</code> request by forwarding it to
* its target topic, if local.
*/
private void doProcess(GetClusterRequest request, AgentId replyTo, String msgId) {
AgentId destId = AgentId.fromString(request.getTopic());
forward(destId, new FwdAdminRequestNot(request, replyTo, msgId, createMessageId()));
}
/**
* Processes a <code>SetCluster<code> instance requesting to link two topics
* in a cluster relationship.
*/
private void doProcess(SetCluster request, AgentId replyTo, String msgId) {
AgentId destId = AgentId.fromString(request.getInitId());
forward(destId, new FwdAdminRequestNot(request, replyTo, msgId, createMessageId()));
}
/**
* Processes a <code>Monitor_GetFather</code> request by forwarding it to
* its target topic, if local.
*/
private void doProcess(GetFatherRequest request, AgentId replyTo, String msgId) {
AgentId destId = AgentId.fromString(request.getTopic());
forward(destId, new FwdAdminRequestNot(request, replyTo, msgId, createMessageId()));
}
/**
* Processes a <code>SetFather<code> instance requesting to link two topics
* in a hierarchical relationship.
*/
private void doProcess(SetFather request, AgentId replyTo, String msgId) {
AgentId destId = AgentId.fromString(request.getSon());
forward(destId, new FwdAdminRequestNot(request, replyTo, msgId, createMessageId()));
}
/**
* Processes a <code>CreateUserRequest</code> instance requesting the
* creation of a <code>UserAgent</code> for a given user.
......@@ -1055,15 +982,6 @@ public final class AdminTopic extends Topic implements AdminTopicMBean {
}
}
/**
* Processes a <code>SetRight</code> instance requesting to grant a user
* a given right on a given destination.
*/
private void doProcess(SetRight request, AgentId replyTo, String msgId) {
AgentId destId = AgentId.fromString(request.getDestId());
forward(destId, new FwdAdminRequestNot(request, replyTo, msgId, createMessageId()));
}
/**
* Processes a <code>SetDMQ</code> request requesting a given queue to be set as
* the DMQ of a given destination or user.
......@@ -1129,15 +1047,6 @@ public final class AdminTopic extends Topic implements AdminTopicMBean {
}
}
/**
* Processes a <code>SetNbMaxMsg</code> request requesting
* a given nbMaxMsg value to be set in queue or subscription.
*/
private void doProcess(SetNbMaxMsgRequest request, AgentId replyTo, String msgId) {
AgentId destId = AgentId.fromString(request.getId());
forward(destId, new FwdAdminRequestNot(request, replyTo, msgId, createMessageId()));
}
/**
* Processes a <code>Monitor_GetServersIds</code> request by sending
* the list of the platform servers' ids.
......@@ -1341,24 +1250,6 @@ public final class AdminTopic extends Topic implements AdminTopicMBean {
}
}
/**
* Processes a <code>Monitor_GetPendingMessages</code> request by
* forwarding it to its target queue, if local.
*/
private void doProcess(GetPendingMessages request, AgentId replyTo, String msgId) {
AgentId destId = AgentId.fromString(request.getDest());
forward(destId, new FwdAdminRequestNot(request, replyTo, msgId, createMessageId()));
}
/**
* Processes a <code>Monitor_GetPendingRequests</code> request by
* forwarding it to its target queue, if local.
*/
private void doProcess(GetPendingRequests request, AgentId replyTo, String msgId) {
AgentId destId = AgentId.fromString(request.getDest());
forward(destId, new FwdAdminRequestNot(request, replyTo, msgId, createMessageId()));
}
/**
* Processes a <code>Monitor_GetStat</code> request by
* forwarding it to its target destination, if local.
......@@ -1380,47 +1271,12 @@ public final class AdminTopic extends Topic implements AdminTopicMBean {
}
/**
* Processes an <code>Monitor_GetNbMaxMsg</code> request requesting
* to get the maximum number of messages.
*/
private void doProcess(GetNbMaxMsgRequest request, AgentId replyTo, String msgId) {
AgentId destId = AgentId.fromString(request.getId());
forward(destId, new FwdAdminRequestNot(request, replyTo, msgId, createMessageId()));
}
/**
* Processes a <code>Monitor_GetSubscriptions</code> request by
* forwarding it to its target queue, if local.
* Processes a <code>DestinationRequest</code> request by
* forwarding it to its target destination, if local.
*/
private void doProcess(GetSubscriptionsRequest request, AgentId replyTo, String msgId) {
AgentId destId = AgentId.fromString(request.getDest());
forward(destId, new FwdAdminRequestNot(request, replyTo, msgId, createMessageId()));
}
private void doProcess(SpecialAdmin request,
AgentId replyTo,
String msgId) throws UnknownServerException {
private void doProcess(DestinationAdminRequest request, AgentId replyTo, String msgId) {
AgentId destId = AgentId.fromString(request.getDestId());
if (checkServerId(destId.getTo())) {
// The destination is local, process the request.
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "AdminTopic.doProcess " + "SpecialAdminRequest destId=" + destId);
if (getId().equals(destId)) {
// If this destination is the target destination, doing nothing:
distributeReply(replyTo, msgId,
new AdminReply(false, "destId mustn't be TopicAdmin."));
return;
}
forward(destId, new SpecialAdminRequest(msgId,request));
if (replyTo != null) requestsTable.put(msgId, replyTo);
} else {
// Forward the request to the right AdminTopic agent.
forward(getDefault(destId.getTo()),
new FwdAdminRequestNot(request, replyTo, msgId));
}
forward(destId, new FwdAdminRequestNot(request, replyTo, msgId, createMessageId()));
}
/* ***** ***** ***** ***** *****
......@@ -1676,16 +1532,6 @@ public final class AdminTopic extends Topic implements AdminTopicMBean {
}
}
private void doProcess(GetSubscriberIds request, AgentId replyTo, String requestMsgId) {
forward(AgentId.fromString(request.getTopicId()),
new FwdAdminRequestNot(request, replyTo, requestMsgId, createMessageId()));
}
private void doProcess(QueueAdminRequest request, AgentId replyTo, String requestMsgId) {
forward(AgentId.fromString(request.getQueueId()),
new FwdAdminRequestNot(request, replyTo, requestMsgId, createMessageId()));
}
/**
* Returns <code>true</code> if a given server identification corresponds
* to the local server's.
......
......@@ -35,6 +35,7 @@ import java.util.Set;
import org.objectweb.joram.mom.messages.Message;
import org.objectweb.joram.mom.notifications.AckJoinQueueCluster;
import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.mom.notifications.FwdAdminRequestNot;
import org.objectweb.joram.mom.notifications.JoinQueueCluster;
import org.objectweb.joram.mom.notifications.LBCycleLife;
import org.objectweb.joram.mom.notifications.LBMessageGive;
......@@ -42,13 +43,13 @@ import org.objectweb.joram.mom.notifications.LBMessageHope;
import org.objectweb.joram.mom.notifications.LeaveQueueCluster;
import org.objectweb.joram.mom.notifications.QueueClusterNot;
import org.objectweb.joram.mom.notifications.ReceiveRequest;
import org.objectweb.joram.mom.notifications.SpecialAdminRequest;
import org.objectweb.joram.mom.notifications.WakeUpNot;
import org.objectweb.joram.mom.util.DMQManager;
import org.objectweb.joram.shared.admin.AddQueueCluster;
import org.objectweb.joram.shared.admin.AdminReply;
import org.objectweb.joram.shared.admin.AdminRequest;
import org.objectweb.joram.shared.admin.ListClusterQueue;
import org.objectweb.joram.shared.admin.RemoveQueueCluster;
import org.objectweb.joram.shared.admin.SpecialAdmin;
import org.objectweb.joram.shared.excepts.AccessException;
import org.objectweb.joram.shared.excepts.RequestException;
import org.objectweb.util.monolog.api.BasicLevel;
......@@ -158,6 +159,31 @@ public class ClusterQueue extends Queue {
}
}
public void handleAdminRequestNot(AgentId from, FwdAdminRequestNot not) {
setSave(); // state change, so save.
AdminRequest adminRequest = not.getRequest();
String info = strbuf.append("Request [").append(not.getClass().getName())
.append("], sent to Destination [").append(getId()).append("], successful [true] ").toString();
if (adminRequest instanceof ListClusterQueue) {
List list = doList((ListClusterQueue) adminRequest);
replyToTopic(new AdminReply(true, info, list), not.getReplyTo(), not.getRequestMsgId(),
not.getReplyMsgId());
} else if (adminRequest instanceof AddQueueCluster) {
addQueueCluster(((AddQueueCluster) adminRequest).joiningQueue, loadingFactor.getRateOfFlow());
replyToTopic(new AdminReply(true, info), not.getReplyTo(), not.getRequestMsgId(), not.getReplyMsgId());
} else if (adminRequest instanceof RemoveQueueCluster) {
broadcastLeave(((RemoveQueueCluster) adminRequest).removeQueue);
removeQueueCluster(((RemoveQueueCluster) adminRequest).removeQueue);
replyToTopic(new AdminReply(true, info), not.getReplyTo(), not.getRequestMsgId(), not.getReplyMsgId());
} else {
super.handleAdminRequestNot(from, not);
}
strbuf.setLength(0);
}
/**
* Distributes the received notifications to the appropriate reactions.
*
......@@ -188,40 +214,6 @@ public class ClusterQueue extends Queue {
public String toString() {
return "ClusterQueue:" + getId().toString();
}
/**
* use to add or remove ClusterQueue to cluster.
*
* @param not
*/
public Object specialAdminProcess(SpecialAdminRequest not)
throws RequestException {
Object ret = null;
try {
SpecialAdmin req = not.getRequest();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG,
"--- " + this + " specialAdminProcess : " + req);
if (req instanceof AddQueueCluster) {
addQueueCluster(((AddQueueCluster) req).joiningQueue,
loadingFactor.getRateOfFlow());
} else if (req instanceof RemoveQueueCluster) {
broadcastLeave(((RemoveQueueCluster) req).removeQueue);
removeQueueCluster(((RemoveQueueCluster) req).removeQueue);
} else if(req instanceof ListClusterQueue) {
ret = doList((ListClusterQueue) req);
}
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"--- " + this + " specialAdminProcess", exc);
throw new RequestException(exc.getMessage());
}
return ret;
}
/**
* return the cluster list.
......@@ -229,7 +221,7 @@ public class ClusterQueue extends Queue {
* @param req
* @return the cluster list.
*/
protected Object doList(ListClusterQueue req) {
protected List doList(ListClusterQueue req) {
List vect = new ArrayList();
for (Enumeration e = clusters.keys(); e.hasMoreElements(); )
vect.add(e.nextElement().toString());
......
......@@ -36,14 +36,12 @@ import javax.management.MBeanAttributeInfo;
import javax.management.ObjectName;
import org.objectweb.joram.mom.notifications.AbstractRequestNot;
import org.objectweb.joram.mom.notifications.AdminReplyNot;
import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.mom.notifications.ExceptionReply;
import org.objectweb.joram.mom.notifications.FwdAdminRequestNot;
import org.objectweb.joram.mom.notifications.GetRightsReplyNot;
import org.objectweb.joram.mom.notifications.GetRightsRequestNot;
import org.objectweb.joram.mom.notifications.RequestGroupNot;
import org.objectweb.joram.mom.notifications.SpecialAdminRequest;
import org.objectweb.joram.mom.notifications.WakeUpNot;
import org.objectweb.joram.mom.proxies.SendRepliesNot;
import org.objectweb.joram.mom.proxies.SendReplyNot;
......@@ -198,8 +196,6 @@ public abstract class Destination extends Agent implements DestinationMBean {
try {
if (not instanceof GetRightsRequestNot)
getRights(from, (GetRightsRequestNot) not);
else if (not instanceof SpecialAdminRequest)
specialAdminRequest(from, (SpecialAdminRequest) not);
else if (not instanceof ClientMessages)
clientMessages(from, (ClientMessages) not);
else if (not instanceof UnknownAgent)
......@@ -655,37 +651,6 @@ public abstract class Destination extends Agent implements DestinationMBean {
deletable = true;
}
}
/**
* Method implementing the reaction to a <code>SpecialAdminRequest</code>
* notification requesting the special administration of the destination.
* <p>
*/
protected void specialAdminRequest(AgentId from, SpecialAdminRequest not) {
Object obj = null;
setSave(); // state change, so save.
try {
if (!isAdministrator(from)) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN,
"Unauthorized SpecialAdminRequest request from " + from);
throw new RequestException("ADMIN right not granted");
}
obj = specialAdminProcess(not);
strbuf.append("Request [").append(not.getClass().getName()).append("], sent to Destination [").append(getId()).append("], successful [true] ").toString();
forward(from, new AdminReplyNot(not, true, strbuf.toString(), obj));
} catch (RequestException exc) {
strbuf.append("Request [").append(not.getClass().getName()).append("], sent to Destination [").append(getId()).append("], successful [false]: ").append(exc.getMessage());
forward(from, new AdminReplyNot(not, false, strbuf.toString(), obj));
} finally {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, strbuf.toString());
strbuf.setLength(0);
}
}
protected void requestGroupNot(AgentId from, RequestGroupNot not) {
Enumeration en = not.getClientMessages();
......@@ -716,10 +681,6 @@ public abstract class Destination extends Agent implements DestinationMBean {
forward(from, new SendRepliesNot(replies));
}
}
protected Object specialAdminProcess(SpecialAdminRequest not) throws RequestException {
return null;
}
/**
* Checks the reading permission of a given client agent.
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2005 - 2006 ScalAgent Distributed Technologies
* Copyright (C) 2003 - ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.mom.notifications;
import org.objectweb.joram.shared.admin.SpecialAdmin;
/**
* A <code>SpecialAdminRequest</code> instance is used by a destination agent
* to do special administration.
*/
public class SpecialAdminRequest extends AdminRequestNot {
/**
*
*/
private static final long serialVersionUID = 1L;
private SpecialAdmin request;
/**
* Constructs a <code>SpecialAdminRequest</code> instance.
*
* @param id Identifier of the request, may be null.
* @param request SpecialAdmin
*/
public SpecialAdminRequest(String id, SpecialAdmin request) {
super(id);
this.request = request;
}
/** Returns the SpecialAdmin request */
public SpecialAdmin getRequest() {
return request;
}
/**
* Appends a string image for this object to the StringBuffer parameter.
*
* @param output
* buffer to fill in
* @return
<code>output</code> buffer is returned
*/
public StringBuffer toString(StringBuffer output) {
output.append('(');
super.toString(output);
output.append(", request=").append(request);
output.append(')');
return output;
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2007 ScalAgent Distributed Technologies
* Copyright (C) 2007 - 2010 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
......@@ -22,25 +22,23 @@
*/
package org.objectweb.joram.shared.admin;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Hashtable;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import fr.dyade.aaa.common.Debug;
import fr.dyade.aaa.common.stream.StreamUtil;
import fr.dyade.aaa.common.stream.Streamable;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
/**
* An <code>AbstractAdminMessage</code> is a message exchanged between a
* Admin Joram client and its proxy.
......@@ -111,9 +109,9 @@ public abstract class AbstractAdminMessage implements Externalizable, Streamable
protected final static int GET_USERS_REQUEST = 58;
protected final static int GET_USERS_REPLY = 59;