Commit 2eeb5662 authored by Ahmed El Rheddane's avatar Ahmed El Rheddane

- Added 2 Admin requests (GetDeliveredMessages & SendDestinationsWeights)...

- Added 2 Admin requests (GetDeliveredMessages & SendDestinationsWeights) along with appropriate client methods and handlers,
- Added the 'elastic' directory, with elasticity manager's code and test scripts.
parent 4bd28ac4
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2003 - 2004 Bull SA
* Copyright (C) 2001 - 2004 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 Dyade
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): Frederic Maistre (INRIA)
* Contributor(s): Nicolas Tachker (ScalAgent), Ahmed El Rheddane (INRIA)
*/
package alias;
import org.objectweb.joram.client.jms.Queue;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.client.jms.admin.User;
import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
import java.util.Properties;
/**
* Manages a distributed architecture.
* Creates a producer and 3 consumers on 3 JORAM servers.
*/
public class Admin {
public static void main(String args[]) throws Exception {
System.out.println("[Admin]\tStarted...");
// Connecting the administrator:
AdminModule.connect("root", "root", 60);
// Creating access for user anonymous on servers 0 and 2:
User.create("anonymous", "anonymous", 0);
User.create("anonymous", "anonymous", 1);
User.create("anonymous", "anonymous", 2);
User.create("anonymous", "anonymous", 3);
User.create("anonymous", "anonymous", 4);
// Creating the clustered remote queues, and alias queue:
/*
Properties propCQ = new Properties();
propCQ.setProperty("period","1000");
propCQ.setProperty("producThreshold","2000");
propCQ.setProperty("consumThreshold","1000");
propCQ.setProperty("autoEvalThreshold","false");
propCQ.setProperty("waitAfterClusterReq","5000");
Queue rq1 = Queue.create(1, null, Queue.CLUSTER_QUEUE, propCQ);
Queue rq2 = Queue.create(2, null, Queue.CLUSTER_QUEUE, propCQ);
rq1.addClusteredQueue(rq2);
*/
Queue rq1 = Queue.create(1);
Queue rq2 = Queue.create(2);
Queue rq3 = Queue.create(3);
Queue rq4 = Queue.create(4);
Properties propAQ = new Properties();
propAQ.setProperty("remoteAgentID",rq1.getName());// + ";" + rq2.getName()); //+ ";" + rq3.getName());
//propAQ.setProperty("period",String.valueOf(Constants.QUEUE_PERIOD));
Queue aq = Queue.create(0,"org.objectweb.joram.mom.dest.AliasInQueue",propAQ);
// Setting free access to the destinations:
rq1.setFreeReading();
rq1.setFreeWriting();
rq2.setFreeReading();
rq2.setFreeWriting();
rq3.setFreeReading();
rq3.setFreeWriting();
rq4.setFreeReading();
rq4.setFreeWriting();
aq.setFreeWriting();
// Creating the connection factories for connecting to the servers:
javax.jms.ConnectionFactory cf0 =
TcpConnectionFactory.create("10.0.0.2", 16010);
javax.jms.ConnectionFactory cf1 =
TcpConnectionFactory.create("10.0.0.3", 16011);
javax.jms.ConnectionFactory cf2 =
TcpConnectionFactory.create("10.0.0.3", 16012);
javax.jms.ConnectionFactory cf3 =
TcpConnectionFactory.create("10.0.0.4", 16013);
javax.jms.ConnectionFactory cf4 =
TcpConnectionFactory.create("10.0.0.4", 16014);
// Binding the objects in JNDI:
javax.naming.Context jndiCtx = new javax.naming.InitialContext();
jndiCtx.bind("alias", aq);
jndiCtx.bind("remote1", rq1);
jndiCtx.bind("remote2", rq2);
jndiCtx.bind("remote3", rq3);
jndiCtx.bind("remote4", rq4);
jndiCtx.bind("cf0", cf0);
jndiCtx.bind("cf1", cf1);
jndiCtx.bind("cf2", cf2);
jndiCtx.bind("cf3", cf3);
jndiCtx.bind("cf4", cf4);
jndiCtx.close();
AdminModule.disconnect();
System.out.println("[Admin]\tDone.");
}
}
package alias;
import org.objectweb.joram.client.jms.Queue;
import org.objectweb.joram.client.jms.admin.AdminModule;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
/**
* Changes the remote destination of an AliasInQueue.
*/
public class Command {
public static void main(String args[]) throws Exception {
String cmd = args[0];
System.out.println("[Command] Executing " + args[0] + "...");
AdminModule.connect("root", "root", 60);
Context ictx = new InitialContext();
Queue aq = (Queue) ictx.lookup("alias");
ConnectionFactory cnxF = (ConnectionFactory) ictx.lookup("cf0");
if (cmd.equals("add")) {
aq.addRemoteDestination(((Queue) ictx.lookup("remote" + args[1])).getName());
} else if (cmd.equals("del")) {
aq.delRemoteDestination(((Queue) ictx.lookup("remote" + args[1])).getName());
} else if (cmd.equals("send")) {
Connection cnx = cnxF.createConnection();
Session session = cnx.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageProducer sender = session.createProducer(aq);
TextMessage message = session.createTextMessage();
cnx.start();
for(int i = 0; i < Integer.parseInt(args[1]); i++) {
message.setText("Message number " + i);
sender.send(message);
}
cnx.close();
} else if (cmd.equals("stop")) {
try {
AdminModule.stopServer(Integer.parseInt(args[1]));
} catch (Exception e) {
System.out.println("Puroburemu -_-'");
}
}
ictx.close();
AdminModule.disconnect();
System.out.println("[Command] Done.");
}
}
\ No newline at end of file
package alias;
public class Constants {
static final int QUEUE_PERIOD = 10000;
static final int MSG_SIZE = 1000;
static final int NB_OF_ROUNDS = 1000;
static final int MSG_PER_ROUND = 1000;
static final int RG_ROUNDS = 300;
static final int RG_SND_LOAD = 100;
static final int RG_REC_LOAD = 100;
static final int TIMEOUT = 1000000;
static final int TIME_UNIT = 1000;
}
\ No newline at end of file
package alias;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.FileHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import javax.naming.*;
import org.objectweb.joram.client.jms.Queue;
import org.objectweb.joram.client.jms.admin.AdminModule;
/**
* A class to add elasticity to the Alias Queue behavior.
*
* @author Ahmed El Rheddane
*
*/
public class ElasticityLoop {
/** Logger object. */
private final static Logger logger = Logger.getLogger(ElasticityLoop.class.getName());
/** Log file. */
private final static String loggerFile = ElasticityLoop.class.getName() + ".log";
/** Limit over which we add new workers. */
public static final int maxCapThreshold = 500;
/** Limit under which we remove unnecessary workers. */
public static final int minCapThreshold = 10;
/** Period of our elasticity loop in milliseconds. */
public static final Integer loopPeriod = 2000;
/** Rate at which reception rates are decreased (a percentage) */
private static int downRate = 20;
/** The list of producers */
private static List<Queue> producers = null;
/** The list of workers connected to producers, cannot be empty. */
private static List<Queue> activeWorkers = null;
/** The list of available workers not connected to producers. */
private static List<Queue> idleWorkers = null;
/** Number of delivered messages per active worker, since their creation. */
private static List<Integer> delivered;
/** Number of delivered messages per active worker, during the last period. */
private static List<Integer> rates;
/** Number of pending messages per worker. */
private static List<Integer> loads;
/** Number of monitored underloaded workers. */
private static int underloaded;
/** Number of monitored overloaded workers. */
private static int overloaded;
/** Index of the candidate to remove in an on-going scaling down process. */
private static int toRemove;
/** Reception rate of the candidate to remove when elected. */
private static int toRemoveRate;
/** Number of iterations since toRemove has been elected. */
private static int toRemoveAge;
/** Minimum value of current rates. */
private static int minRate;
/** Maximum value of current rates. */
private static int maxRate;
/** Loop continues while stop is false. */
private static boolean stopLoop = false;
/** initializes the logger. */
public static void initLogger() {
try {
FileHandler fh = new FileHandler(loggerFile, false);
logger.addHandler(fh);
logger.setLevel(Level.ALL);
SimpleFormatter formatter = new SimpleFormatter();
fh.setFormatter(formatter);
} catch (Exception e) {
e.printStackTrace();
System.out.println("Logger couldn't be initialized.");
}
}
/** Sets up the producers and workers. */
public static void initLoop() throws Exception {
producers = new ArrayList<Queue>();
activeWorkers = new ArrayList<Queue>();
idleWorkers = new ArrayList<Queue>();
delivered = new ArrayList<Integer>();
rates = new ArrayList<Integer>();
loads = new ArrayList<Integer>();
toRemove = -1;
InitialContext ictx = new InitialContext();
producers.add((Queue) ictx.lookup("alias"));
activeWorkers.add((Queue) ictx.lookup("remote1"));
delivered.add(0);
rates.add(0);
loads.add(0);
for(int i = 2; i <= 4; i++)
idleWorkers.add((Queue) ictx.lookup("remote" + i));
ictx.close();
}
/**
* Updates the values of monitoring variables.
*/
private static void monitorWorkers() {
underloaded = 0;
overloaded = 0;
minRate = Integer.MAX_VALUE;
maxRate = Integer.MIN_VALUE;
try {
for(int i = 0; i < activeWorkers.size(); i++) {
Queue worker = activeWorkers.get(i);
loads.set(i, worker.getPendingMessages());
if (loads.get(i) < minCapThreshold)
underloaded++;
else if (loads.get(i) > maxCapThreshold)
overloaded++;
int newDelivered = worker.getDeliveredMessages();
rates.set(i, newDelivered - delivered.get(i));
delivered.set(i,newDelivered);
if (rates.get(i) < minRate)
minRate = rates.get(i);
if (rates.get(i) > maxRate)
maxRate = rates.get(i);
}
} catch (Exception e) {
e.printStackTrace();
logger.log(Level.SEVERE, "Couldn't monitor the workers.");
}
}
/**
* @return the index of a worker from idleWorkers to be added to activeWorkers.
*/
private static int electWorkerToAdd() {
return 0;
}
/**
* @return the index of a worker from activeWorkers to be potentially removed.
*/
private static int electWorkerToRemove() {
return activeWorkers.size()-1;
}
/**
* Starts, continues or aborts scaling down process based
* on the monitored values.
* @return true if scaling down is on-going.
*/
private static boolean testScaleDown() {
//Plan should be cancelled.
if (overloaded > 0) {
if (toRemove >= 0) {
rates.set(activeWorkers.indexOf(toRemove), toRemoveRate);
toRemove = -1;
}
return false;
}
//Should initiate plan.
if (activeWorkers.size() > 1 && toRemove >= 0 && underloaded == activeWorkers.size()) {
toRemove = electWorkerToRemove();
logger.log(Level.INFO, "Scaling down elected worker: " + toRemove);
toRemoveAge = 0;
toRemoveRate = rates.get(toRemove);
}
//Plan can continue.
if (toRemove >= 0) {
if (++toRemoveAge > (100/downRate)) {
rates.set(toRemove, toRemoveRate*(100-downRate*toRemoveAge)/100);
logger.log(Level.INFO,"Trying to remove extra worker, " + toRemoveAge);
} else {
Queue worker = activeWorkers.remove(toRemove);
idleWorkers.add(worker);
rates.remove(toRemove);
loads.remove(toRemove);
delivered.remove(toRemove);
//Notify producers.
for(int i = 0; i < producers.size(); i++)
try {
producers.get(i).delRemoteDestination(worker.getName());
} catch (Exception e) {
e.printStackTrace();
logger.log(Level.SEVERE, "Couldn't delete a remote destination from producers.");
}
logger.log(Level.INFO, "Removed extra worker successfully.");
toRemove = -1;
}
return true;
}
return false;
}
/**
* Adds a new worker to the active workers list, if needed.
* @return true if scaling up is achieved.
*/
private static boolean testScaleUp() {
if (overloaded != activeWorkers.size())
return false;
if (idleWorkers.size() == 0)
return false;
int toAdd = electWorkerToAdd();
Queue worker = idleWorkers.remove(toAdd);
activeWorkers.add(worker);
rates.add(maxRate);
loads.add(0);
delivered.add(0);
//Notify producers.
for(int i = 0; i < producers.size(); i++)
try {
producers.get(i).addRemoteDestination(worker.getName());
} catch (Exception e) {
e.printStackTrace();
logger.log(Level.SEVERE, "Couldn't add a new remote destination to producer.");
}
logger.log(Level.INFO,"Added new worker successfully.");
return true;
}
/**
* Decreases the monitored rates of overloaded workers.
*/
private static void regulateRates() {
if (overloaded == 0)
return;
for(int i = 0; i < activeWorkers.size(); i++) {
if (loads.get(i) > maxCapThreshold)
rates.set(i,rates.get(i)*(100-downRate)/100);
}
}
/**
* Computes weights and send them to producers.
*/
private static void sendWeights() {
int[] weights = new int[activeWorkers.size()];
if (minRate <= 0)
minRate = 1;
int base = (int)Math.pow(10.0,Math.floor(Math.log10(minRate)));
logger.log(Level.INFO,"Weights computation, base: " + base);
for (int i = 0; i < activeWorkers.size(); i++) {
int weight = (int)Math.round((double)rates.get(i)/(double)base);
if (weight <= 0)
weight = 1;
weights[i] = weight;
logger.log(Level.INFO,"Weights computation, computed: " + weights[i] + " for: " + i);
}
//Notify producers.
for(int i = 0; i < producers.size(); i++)
try {
producers.get(i).sendDestinationsWeights(weights);
} catch (Exception e) {
e.printStackTrace();
logger.log(Level.SEVERE, "Couldn't add a new remote destination to producer.");
}
}
public static void main(String args[]) throws Exception {
initLogger();
initLoop();
//Elasticity loop
long correction = 0l;
do {
Thread.sleep(loopPeriod - correction);
if (stopLoop)
break;
long startTime = System.currentTimeMillis();
AdminModule.connect("root", "root", 60);
monitorWorkers();
if(!testScaleDown())
if(!testScaleUp())
regulateRates();
sendWeights();
AdminModule.disconnect();
correction = System.currentTimeMillis() - startTime;
} while (!stopLoop);
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2001 - ScalAgent Distributed Technologies
* Copyright (C) 1996 - Dyade
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): Frederic Maistre (INRIA)
* Contributor(s): Ahmed El Rheddane (INRIA)
*/
package alias;
import javax.jms.*;
import javax.naming.*;
/**
* A receiver client.
* Periodically reads messages from the corresponding queue.
*/
public class Receiver
{
static Context ictx = null;
public static boolean loop = true;
public static void main(String argv[]) throws Exception
{
int number = Integer.parseInt(argv[0]);
System.out.println("\n[Receiver " + number + "] Started...");
ictx = new InitialContext();
ConnectionFactory cnxF = (ConnectionFactory) ictx.lookup("cf" + number);
Queue dest = (Queue) ictx.lookup("remote" + number);
ictx.close();
Connection cnx = cnxF.createConnection();
Session session = cnx.createSession(false,Session.AUTO_ACKNOWLEDGE);
MessageConsumer rec = session.createConsumer(dest);
TextMessage msg;
cnx.start();
while(loop) {
msg = (TextMessage) rec.receive();
System.out.println("[Receiver " + number + "] Received: " + msg.getText());
}
cnx.close();
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2001 - ScalAgent Distributed Technologies
* Copyright (C) 1996 - Dyade
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): Frederic Maistre (INRIA)
* Contributor(s): Ahmed El Rheddane (INRIA)
*/
package alias;
import javax.jms.*;
import javax.naming.*;
/**
* A receiver client.
* Receives rounds of messages from a given queue.
*/
public class RegulatedReceiver {
static class ReceiveRound extends Thread {
public void run() {
done = false;
for(int j = 0; j < Constants.RG_REC_LOAD; j++) {
try {
receiver.receive();
} catch (Exception e) {}
}
done = true;
}
}
static ReceiveRound rr;
static boolean done = false;
static int number;
static MessageConsumer receiver = null;
static Context ictx = null;