Commit 712e3bba authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

Add MBeans for cluster destinations.

parent 8471c0d9
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2010 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.mom.dest;
public interface ClusterDestinationMBean {
/** @return an array containing the ids of the cluster elements. */
String[] getClusterElements();
}
......@@ -64,7 +64,7 @@ import fr.dyade.aaa.agent.UnknownNotificationException;
/**
* The <code>ClusterQueue</code> class implements the cluster queue behavior.
*/
public class ClusterQueue extends Queue {
public class ClusterQueue extends Queue implements ClusterQueueMBean {
/** define serialVersionUID for interoperability */
private static final long serialVersionUID = 1L;
......@@ -92,9 +92,6 @@ public class ClusterQueue extends Queue {
/** Number of message send to cluster */
private long clusterDeliveryCount = 0;
/** Waiting after a cluster request */
private long waitAfterClusterReq = -1;
/**
* Maximum period of time before forwarding a waiting message or request to
* other queues of the cluster. By default it is set to
......@@ -117,6 +114,7 @@ public class ClusterQueue extends Queue {
/** automatic eval threshold */
boolean autoEvalThreshold = false;
long waitAfterClusterReq = -1;
if (prop != null) {
try {
waitAfterClusterReq = Long.valueOf(prop.getProperty("waitAfterClusterReq")).longValue();
......@@ -279,6 +277,11 @@ public class ClusterQueue extends Queue {
return list;
}
public String[] getClusterElements() {
List list = clusterList();
return (String[]) list.toArray(new String[list.size()]);
}
/**
* Ask this queue to leave the cluster.
*/
......@@ -504,7 +507,7 @@ public class ClusterQueue extends Queue {
if (loadingFactor.getRateOfFlow() < 1) {
int possibleGive = getPendingMessageCount() - getWaitingRequestCount();
LBMessageGive msgGive =
new LBMessageGive(waitAfterClusterReq,loadingFactor.getRateOfFlow());
new LBMessageGive(loadingFactor.validityPeriod, loadingFactor.getRateOfFlow());
// get client messages, hope or possible give.
ClientMessages cm = null;
......@@ -659,7 +662,6 @@ public class ClusterQueue extends Queue {
* @param waitAfterClusterReq
*/
public void setWaitAfterClusterReq(long waitAfterClusterReq) {
this.waitAfterClusterReq = waitAfterClusterReq;
loadingFactor.validityPeriod = waitAfterClusterReq;
}
......@@ -686,4 +688,41 @@ public class ClusterQueue extends Queue {
public void setAutoEvalThreshold(boolean autoEvalThreshold) {
loadingFactor.autoEvalThreshold = autoEvalThreshold;
}
public int getProducThreshold() {
return loadingFactor.producThreshold;
}
public int getConsumThreshold() {
return loadingFactor.consumThreshold;
}
public boolean isAutoEvalThreshold() {
return loadingFactor.autoEvalThreshold;
}
public long getWaitAfterClusterReq() {
return loadingFactor.validityPeriod;
}
public float getRateOfFlow() {
return loadingFactor.getRateOfFlow();
}
public boolean isOverloaded() {
return loadingFactor.isOverloaded();
}
public String getStatus() {
return loadingFactor.getStatus();
}
public String getConsumerStatus() {
return loadingFactor.getConsumerStatus();
}
public String getProducerStatus() {
return loadingFactor.getProducerStatus();
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2010 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.mom.dest;
import org.objectweb.joram.mom.dest.LoadingFactor.ConsumerStatus;
import org.objectweb.joram.mom.dest.LoadingFactor.ProducerStatus;
import org.objectweb.joram.mom.dest.LoadingFactor.Status;
public interface ClusterQueueMBean extends QueueMBean, ClusterDestinationMBean {
/**
* Gets the number of messages above which a queue is considered loaded.
*
* @return the produce threshold
*/
public int getProducThreshold();
/**
* Sets the number of messages above which a queue is considered loaded.
*
* @param producThreshold the new threshold
*/
public void setProducThreshold(int producThreshold);
/**
* Gets the number of pending "receive" requests above which a queue will
* request messages from the other queues of the cluster.
*
* @return the consume threshold
*/
public int getConsumThreshold();
/**
* Sets the number of pending "receive" requests above which a queue will
* request messages from the other queues of the cluster.
*
* @param consumThreshold the new threshold
*/
public void setConsumThreshold(int consumThreshold);
/**
* True if an automatic reevaluation of the queues' thresholds values is
* allowed according to their activity.
*
* @return true if auto evaluation of thresholds is allowed.
*/
public boolean isAutoEvalThreshold();
/**
* Automatic reevaluation of the queues' thresholds can be done according to
* their activity.
*
* @param autoEvalThreshold true to enable auto evaluation of thresholds
*/
public void setAutoEvalThreshold(boolean autoEvalThreshold);
/**
* Gets the time (in ms) during which a queue which requested something from
* the cluster is not authorized to do it again.
*
* @return the minimum time to wait before another cluster request.
*/
public long getWaitAfterClusterReq();
/**
* Sets the time (in ms) during which a queue which requested something from
* the cluster is not authorized to do it again.
*
* @param waitAfterClusterReq the minimum time to wait before another cluster
* request.
*/
public void setWaitAfterClusterReq(long waitAfterClusterReq);
/**
* Gets an evaluation of the flow of messages handled by the queue.
*
* @return the rate of flow
*/
public float getRateOfFlow();
/**
* Tells if the queue is overloaded.
*
* @return true if the queue is overloaded
*/
public boolean isOverloaded();
/**
* Gets the status of the queue (RUN, INIT or WAIT).
*
* @return the status of the queue
* @see Status
*/
public String getStatus();
/**
* Gets consumer status (NO, NORMAL, HIGH).
*
* @return consumer status
* @see ConsumerStatus
*/
public String getConsumerStatus();
/**
* Gets producer status (NO, NORMAL, HIGH).
*
* @return producer status
* @see ProducerStatus
*/
public String getProducerStatus();
}
......@@ -131,6 +131,18 @@ public class LoadingFactor implements Serializable {
return rateOfFlow;
}
public String getStatus() {
return Status.names[status];
}
public String getProducerStatus() {
return ProducerStatus.names[producerStatus];
}
public String getConsumerStatus() {
return ConsumerStatus.names[consumerStatus];
}
public void setWait() {
status = Status.WAIT;
statusTime = System.currentTimeMillis() + validityPeriod;
......
......@@ -242,6 +242,11 @@ public class Topic extends Destination implements TopicMBean {
return cluster;
}
public String[] getClusterElements() {
List list = clusterList();
return (String[]) list.toArray(new String[list.size()]);
}
/**
* Ask this topic to leave the cluster.
*/
......
......@@ -22,7 +22,7 @@
*/
package org.objectweb.joram.mom.dest;
public interface TopicMBean extends DestinationMBean {
public interface TopicMBean extends DestinationMBean, ClusterDestinationMBean {
/**
* Returns the number of subscribers.
* Each user appears once even if there is multiples subscriptions, the different
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment