Commit 4653f2d2 authored by Ahmed El Rheddane's avatar Ahmed El Rheddane
Browse files

-Added Elastic Topics files.

parent d1cd6467
<project default="compile" basedir="." xmlns:m2="urn:maven-artifact-ant">
<project name ="elastic-joram" default="compile" basedir="." xmlns:m2="urn:maven-artifact-ant">
<!-- Initializes the environment paths and properties -->
<target name="init">
......
......@@ -3,7 +3,6 @@ package elasticity.loop;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import java.util.Scanner;
import elasticity.services.ElasticityService;
......
......@@ -6,10 +6,12 @@ import java.util.List;
import java.util.Properties;
import java.util.logging.Level;
import javax.jms.ConnectionFactory;
import javax.naming.InitialContext;
import org.objectweb.joram.client.jms.Queue;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
import elasticity.interfaces.Service;
......@@ -61,7 +63,8 @@ public class ElasticityService extends Service {
public void initService(Properties props) throws Exception {
logger.log(Level.FINE, "Started Initialization..");
//Setting the admin connection once and for all.
AdminModule.connect("localhost",16101,"root","root", 60);
ConnectionFactory cfa = TcpConnectionFactory.create("localhost",16101);
AdminModule.connect(cfa,"root","root");
//Initializes the service beneath.
js = new JoramService();
......
......@@ -12,7 +12,6 @@ import javax.jms.ConnectionFactory;
import javax.naming.InitialContext;
import org.objectweb.joram.client.jms.Queue;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.client.jms.admin.AdminWrapper;
import org.objectweb.joram.client.jms.admin.Server;
import org.objectweb.joram.client.jms.admin.User;
......@@ -68,7 +67,7 @@ public class JoramService extends Service {
ConnectionFactory cf = (ConnectionFactory) jndiCtx.lookup("cfp2");
jndiCtx.close();
Connection cn = cf.createConnection("root", "root");
Connection cn = cf.createConnection("root","root");
cn.start();
aw = new AdminWrapper(cn);
......
package elasticity.topics.client;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.objectweb.joram.client.jms.Topic;
import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
public class ListenerWrapper implements MessageListener {
/** Related Subscriber Wrapper. */
private SubscriberWrapper sw;
public ListenerWrapper(SubscriberWrapper sw) {
this.sw = sw;
}
@Override
public void onMessage(Message m) {
try {
if (m.propertyExists("reconnect")) {
System.out.println("HEY HEY HEY!!");
String tid = m.getStringProperty("reconnect");
String server = m.getStringProperty("server");
int port = m.getIntProperty("port");
Topic topic = Topic.createTopic(tid,null);
ConnectionFactory cf = TcpConnectionFactory.create(server, port);
sw.reconnect(topic,cf);
} else if (sw.reconnecting) {
// Should make sure that each message is
// processed only once.
} else {
sw.listener.onMessage(m);
}
} catch(Exception e) {
e.printStackTrace();
}
}
}
package elasticity.topics.client;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
public class SubscriberThread extends Thread {
Topic topic;
ConnectionFactory cf;
SubscriberWrapper sw;
boolean end;
public SubscriberThread(Topic topic, ConnectionFactory cf, SubscriberWrapper sw) {
this.topic = topic;
this.cf = cf;
this.sw = sw;
end = false;
}
@Override
public void run() {
try {
Connection cnx = cf.createConnection();
Session sess = cnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer c = sess.createConsumer(topic);
c.setMessageListener(new ListenerWrapper(sw));
cnx.start();
System.out.println("Matrix Reloaded.");
sw.reconnected();
while (!end) {
Thread.sleep(1000);
}
cnx.close();
System.out.println("This is the end, my only friend.");
} catch (Exception e) {
e.printStackTrace();
}
}
protected void terminate() {
end = true;
}
}
package elasticity.topics.client;
import javax.jms.ConnectionFactory;
import javax.jms.MessageListener;
import javax.jms.Topic;
/**
* Creates subscribers on specified topic.
*
* @author Ahmed El Rheddane
*
*/
public class SubscriberWrapper {
Topic topic;
ConnectionFactory cf;
/**
* Listener specified by the user.
*/
MessageListener listener;
boolean reconnecting;
SubscriberThread t0;
SubscriberThread t1;
public SubscriberWrapper(Topic topic, ConnectionFactory cf, MessageListener listener) {
this.topic = topic;
this.cf = cf;
this.listener = listener;
this.reconnecting = false;
}
public void start() {
this.t0 = new SubscriberThread(topic,cf,this);
t0.start();
}
public void stop() {
if (t0 != null) {
t0.terminate();
}
if (t1 != null) {
t1.terminate();
}
reconnecting = false;
}
void reconnect(Topic topic, ConnectionFactory cf) {
reconnecting = true;
t1 = t0;
t0 = new SubscriberThread(topic,cf,this);
t0.start();
}
void reconnected() {
reconnecting = false;
if (t1 != null) {
t1.terminate();
}
}
}
\ No newline at end of file
package elasticity.topics.eval;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
/**
* On reception, prints message latency.
*
* @author Ahmed El Rheddane
*
*/
public class Listener implements MessageListener {
@Override
public void onMessage(Message msg) {
try {
System.out.println(System.currentTimeMillis() - msg.getJMSTimestamp());
} catch (JMSException e) {}
}
}
package elasticity.topics.eval;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.InitialContext;
/**
* Publishes messages on specified topic.
*
* @author Ahmed El Rheddane
*
*/
public class Pub {
public static final int MSG_SIZE = 1000;
public static void main(String[] args) throws Exception {
System.out.println("[Pub] Started...");
int tid = args.length > 0 ? Integer.parseInt(args[0]) : 0;
int nbr = args.length > 1 ? Integer.parseInt(args[1]) : 100;
int prd = args.length > 2 ? Integer.parseInt(args[2]) : 1000;
Context ictx = new InitialContext();
Topic topic = (Topic) ictx.lookup("t" + tid);
ConnectionFactory cf = (ConnectionFactory) ictx.lookup("cf" + tid);
ictx.close();
Connection cnx = cf.createConnection();
Session sess = cnx.createSession(true, 0);
MessageProducer p = sess.createProducer(topic);
byte[] content = new byte[MSG_SIZE];
for (int i = 0; i < MSG_SIZE; i++) {
content[i] = (byte) i;
}
BytesMessage msg = sess.createBytesMessage();
msg.writeBytes(content);
cnx.start();
for (int i = 0; i < nbr; i++) {
long start = System.currentTimeMillis();
p.send(msg);
sess.commit();
System.out.println("Sent message #" + i);
Thread.sleep(prd - (System.currentTimeMillis() - start));
}
cnx.close();
System.out.println("[Pub] Done.");
}
}
package elasticity.topics.eval;
import java.util.Properties;
import javax.jms.ConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.objectweb.joram.client.jms.Topic;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.client.jms.admin.Server;
import org.objectweb.joram.client.jms.admin.User;
import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
/**
* Class to setup experiments on tree topics.
*
* @author Ahmed El Rheddane
*
*/
public class Setup {
private static final int SERV_NBR = 4;
private static Server[] servers;
private static String getServerAddress(int id) {
for (Server s : servers) {
if (s.getId() == id)
return s.getHostName();
}
return null;
}
public static void main(String args[]) throws Exception {
System.out.println("[Setup] Started...");
// Connecting the administrator (using TcpProxyService port)
ConnectionFactory cfa = TcpConnectionFactory.create("localhost",16000);
AdminModule.connect(cfa,"root","root");
servers = AdminModule.getServers();
Topic[] t = new Topic[SERV_NBR];
Context jndiCtx = new InitialContext();
for (int i = 0; i < SERV_NBR; i++) {
User.create("anonymous", "anonymous", i);
if (i == 0) {
Properties props = new Properties();
props.setProperty("root","");
t[i] = Topic.create(i,"t" + i,"org.objectweb.joram.mom.dest.ElasticTopic",props);
} else {
t[i] = Topic.create(i,"t" + i,"org.objectweb.joram.mom.dest.ElasticTopic",null);
t[0].scale(1,t[i].getName() + ";" + getServerAddress(i) + ";" + (16000 + i));
}
t[i].setFreeWriting();
t[i].setFreeReading();
ConnectionFactory cf = TcpConnectionFactory.create(getServerAddress(i), 16000 + i);
jndiCtx.bind("t" + i, t[i]);
jndiCtx.bind("cf" + i, cf);
System.out.println("[Setup] Topic created..");
}
jndiCtx.close();
AdminModule.disconnect();
System.out.println("[Setup] Done.");
}
}
\ No newline at end of file
package elasticity.topics.eval;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.InitialContext;
/**
* Creates subscribers on specified topic.
*
* @author Ahmed El Rheddane
*
*/
public class Sub {
private static ConnectionFactory cf;
private static Topic topic;
private static void createConsumer() throws Exception {
Connection cnx = cf.createConnection();
Session sess = cnx.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer c = sess.createConsumer(topic);
c.setMessageListener(new Listener());
cnx.start();
}
public static void main(String[] args) throws Exception {
System.out.println("[Sub] Started...");
int tid = Integer.parseInt(args[0]);
int nbr = Integer.parseInt(args[1]);
Context ictx = new InitialContext();
topic = (Topic) ictx.lookup("t" + tid);
cf = (ConnectionFactory) ictx.lookup("cf" + tid);
ictx.close();
for (int i = 0; i < nbr; i++) {
createConsumer();
}
//System.out.println("[Sub] 'Enter' to exit..");
while(true) {
Thread.sleep(300000);
}
//System.out.println("[Sub] Done.");
}
}
\ No newline at end of file
package elasticity.topics.eval;
import javax.jms.ConnectionFactory;
import javax.jms.Topic;
import javax.naming.InitialContext;
import elasticity.topics.client.SubscriberWrapper;
public class SubBis {
public static void main(String[] args) throws Exception {
int tid = Integer.parseInt(args[0]);
InitialContext jndiCtx = new InitialContext();
Topic topic = (Topic) jndiCtx.lookup("t" + tid);
ConnectionFactory cf = (ConnectionFactory) jndiCtx.lookup("cf" + tid);
jndiCtx.close();
SubscriberWrapper sub = new SubscriberWrapper(topic,cf,new Listener());
sub.start();
while(true) {
Thread.sleep(1000);
}
}
}
package elasticity.topics.loop;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import elasticity.topics.services.ElasticityService;
public class ControlLoop {
public static void main(String[] args) throws Exception {
ElasticityService es = new ElasticityService();
es.init(null);
BufferedReader br =
new BufferedReader(new InputStreamReader(System.in));
while(true){
String cmd = br.readLine();
System.out.println("#" + cmd + "#");
if (cmd.equals("monitor")) {
es.monitorTopics();
} else if (cmd.equals("balance")) {
es.balanceSubscribers();
} else if (cmd.equals("exit")) {
break;
}
}
}
}
\ No newline at end of file
package elasticity.topics.services;
import java.util.ArrayList;
import java.util.Properties;
import javax.jms.ConnectionFactory;
import javax.naming.InitialContext;
import org.objectweb.joram.client.jms.Topic;
import org.objectweb.joram.client.jms.admin.AdminModule;
import org.objectweb.joram.client.jms.tcp.TcpConnectionFactory;
import elasticity.interfaces.Service;
public class ElasticityService extends Service {
private Topic root;
private ArrayList<Topic> topics = new ArrayList<Topic>();
private ArrayList<Integer> subs = new ArrayList<Integer>();
@Override
protected void initService(Properties props) throws Exception {
//Setting the admin connection once and for all.
ConnectionFactory cfa = TcpConnectionFactory.create("localhost",16000);
AdminModule.connect(cfa,"root","root");
// TEST
InitialContext jndiCtx = new InitialContext();
root = (Topic) jndiCtx.lookup("t0");
for (int i = 1; i <= 3; i++) {
topics.add((Topic) jndiCtx.lookup("t" + i));
subs.add(0);
}
jndiCtx.close();
}
public void monitorTopics() throws Exception {
for (int i = 0; i < topics.size(); i++) {
int s = topics.get(i).getSubscriptions();
subs.set(i,s);
System.out.println("Topic #" + i + " has " + s + " subscribers.");
}
}
public void balanceSubscribers() throws Exception {
int sum = 0;
for (int i = 0; i < topics.size(); i++) {
sum += subs.get(i);
}
int avg = sum / topics.size();
ArrayList<Integer> more = new ArrayList<Integer>();
ArrayList<Integer> even = new ArrayList<Integer>();
ArrayList<Integer> less = new ArrayList<Integer>();
for (int i = 0; i < topics.size(); i++) {
if (subs.get(i) > avg) {
more.add(i);
subs.set(i, subs.get(i) - avg);
} else if (subs.get(i) == avg) {
even.add(i);
subs.set(i,0);
} else {
less.add(i);
subs.set(i, avg - subs.get(i));
}
}
System.out.println("more: " + more);
System.out.println("even: " + even);
System.out.println("less: " + less);
less.addAll(even);
int mod = sum % topics.size();
if (mod > more.size()) {
for (int i = 0; i < mod - more.size(); i++) {
subs.set(less.get(i), subs.get(less.get(i)) + 1);
}
mod = more.size();
}
for (int i = 0, j = 0; i < more.size(); i++) {
int a = more.get(i);
int x = subs.get(a);
if (mod > 0) {
x--;
mod--;
}
String param = a + ":";
for (; x > 0; j++) {
int b = less.get(j);
int y = subs.get(b);
if (y > x) {
param = param + b + ";" + x + ";";
subs.set(b, y - x);
x = 0;
} else {
param = param + b + ";" + y + ";";
x -= y;
}