Commit 4c4daac2 authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

Initial revision of AMQP acquisition / distribution destinations.

parent 834e2d83
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.objectweb.joram</groupId>
<artifactId>joram-mom-extensions-amqp</artifactId>
<packaging>bundle</packaging>
<name>JORAM :: joram :: mom :: extensions :: amqp</name>
<description>Builds the Joram amqp extension project.</description>
<parent>
<groupId>org.objectweb.joram</groupId>
<artifactId>joram-mom-extensions</artifactId>
<version>5.5.0-SNAPSHOT</version>
</parent>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName>${pom.artifactId}</Bundle-SymbolicName>
<Export-Package>org.objectweb.joram.mom.dest.amqp</Export-Package>
<Embed-Dependency>amqp-client</Embed-Dependency>
</instructions>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.objectweb.joram</groupId>
<artifactId>joram-mom-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.mom.dest.amqp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Vector;
import org.objectweb.joram.mom.dest.AcquisitionDaemon;
import org.objectweb.joram.mom.dest.ReliableTransmitter;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.impl.LongString;
import fr.dyade.aaa.common.Daemon;
import fr.dyade.aaa.common.Debug;
public class AmqpAcquisition implements AcquisitionDaemon {
private static final Logger logger = Debug.getLogger(AmqpAcquisition.class.getName());
private static final String QUEUE_NAME_PROP = "amqp.QueueName";
private static final String UPDATE_PERIOD_PROP = "amqp.ConnectionUpdatePeriod";
private static ConnectionUpdater connectionUpdater;
private ReliableTransmitter transmitter;
// Use a vector as it is used by 2 different threads
private List<Channel> channels = new Vector<Channel>();
private String amqpQueue = null;
private volatile boolean closing = false;
public void start(Properties properties, ReliableTransmitter transmitter) {
this.transmitter = transmitter;
amqpQueue = properties.getProperty(QUEUE_NAME_PROP);
if (amqpQueue == null) {
logger.log(BasicLevel.ERROR, "The amqp queue name property " + QUEUE_NAME_PROP + " must be specified.");
}
long updatePeriod = 5000L;
try {
if (properties.containsKey(UPDATE_PERIOD_PROP)) {
updatePeriod = Long.parseLong(properties.getProperty(UPDATE_PERIOD_PROP));
}
} catch (NumberFormatException nfe) {
logger.log(BasicLevel.ERROR, "Property " + UPDATE_PERIOD_PROP
+ "could not be parsed properly, use default value.", nfe);
}
this.transmitter = transmitter;
if (connectionUpdater == null) {
connectionUpdater = new ConnectionUpdater(updatePeriod);
}
connectionUpdater.addUpdateListener(this);
}
public void stop() {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Stop AmqpAcquisition.");
}
connectionUpdater.removeUpdateListener(this);
closing = true;
synchronized (channels) {
for (Channel channel : channels) {
try {
channel.close();
} catch (IOException exc) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Error while stopping AmqpAcquisition.", exc);
}
}
}
channels.clear();
}
}
/**
* Create a new AMQP consumer for each connection available.
*/
public void onNewConnections(List<Connection> connections) {
for (Connection connection : connections) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Creating a new consumer on queue " + amqpQueue + " for connection "
+ connection);
}
try {
Channel chan = connection.createChannel();
chan.queueDeclarePassive(amqpQueue);
AmqpConsumer consumer = new AmqpConsumer(chan);
chan.basicConsume(amqpQueue, false, consumer);
channels.add(chan);
} catch (Exception e) {
logger.log(BasicLevel.ERROR, "Error while starting consumer on connection: " + connection, e);
}
}
}
private class AmqpConsumer extends DefaultConsumer {
public AmqpConsumer(Channel channel) {
super(channel);
}
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
Message message = new Message();
message.body = body;
try {
message.type = Byte.parseByte(properties.getType());
} catch (NumberFormatException nfe) {
if (logger.isLoggable(BasicLevel.WARN)) {
logger.log(BasicLevel.WARN, "Message Type field could not be parsed.", nfe);
}
message.type = Message.BYTES;
}
message.correlationId = properties.getCorrelationId();
if (properties.getDeliveryMode().intValue() == 1) {
message.persistent = false;
} else if (properties.getDeliveryMode().intValue() == 2) {
message.persistent = true;
}
message.priority = properties.getPriority().intValue();
message.timestamp = properties.getTimestamp().getTime();
try {
message.expiration = Long.parseLong(properties.getExpiration());
} catch (NumberFormatException nfe) {
if (logger.isLoggable(BasicLevel.WARN)) {
logger.log(BasicLevel.WARN, "Expiration field could not be parsed.", nfe);
}
}
if (properties.getHeaders() != null) {
Map<String, Object> props = properties.getHeaders();
for (Map.Entry<String, Object> prop : props.entrySet()) {
try {
if (prop.getValue() instanceof LongString) {
message.setProperty(prop.getKey(), prop.getValue().toString());
} else {
message.setProperty(prop.getKey(), prop.getValue());
}
} catch (ClassCastException exc) {
if (logger.isLoggable(BasicLevel.ERROR)) {
logger.log(BasicLevel.ERROR, "Property can't be mapped to JMS message property.", exc);
}
}
}
}
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "New incoming message : " + message);
}
transmitter.transmit(message, properties.getMessageId());
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Consumer error for connection " + getChannel().getConnection());
}
if (!closing) {
channels.remove(getChannel());
}
}
}
/**
* Daemon used to periodically update the pool of connections known by the
* acquisition destinations.
*/
private static class ConnectionUpdater extends Daemon {
private List<Connection> connections = new ArrayList<Connection>();
private List<AmqpAcquisition> listeners = new ArrayList<AmqpAcquisition>();
private long period;
/** Constructs a <code>ReconnectionDaemon</code> thread. */
protected ConnectionUpdater(long updatePeriod) {
super("ConnectionUpdater", logger);
setDaemon(false);
period = updatePeriod;
if (logmon.isLoggable(BasicLevel.DEBUG)) {
logmon.log(BasicLevel.DEBUG, "ReconnectionDaemon<init>");
}
}
/** The daemon's loop. */
public void run() {
if (logmon.isLoggable(BasicLevel.DEBUG)) {
logmon.log(BasicLevel.DEBUG, "run()");
}
try {
while (running) {
canStop = true;
try {
Thread.sleep(period);
} catch (InterruptedException exc) {
if (logmon.isLoggable(BasicLevel.DEBUG)) {
logmon.log(BasicLevel.DEBUG, "Thread interrupted.");
}
continue;
}
canStop = false;
List<Connection> currentConnections = AmqpConnectionHandler.getConnections();
List<Connection> newConnections = new ArrayList<Connection>(currentConnections);
newConnections.removeAll(connections);
synchronized (listeners) {
if (listeners.size() == 0) {
stop();
}
for (AmqpAcquisition listener : listeners) {
listener.onNewConnections(newConnections);
}
}
connections = currentConnections;
}
} finally {
finish();
}
}
/** Shuts the daemon down. */
public void shutdown() {
interrupt();
}
/** Releases the daemon's resources. */
public void close() {
connections.clear();
}
protected void addUpdateListener(AmqpAcquisition listener) {
synchronized (listeners) {
listeners.add(listener);
if (listeners.size() == 1) {
start();
}
}
List<Connection> existingConnections;
existingConnections = new ArrayList<Connection>(connections);
listener.onNewConnections(existingConnections);
}
protected void removeUpdateListener(AmqpAcquisition listener) {
synchronized (listeners) {
listeners.remove(listener);
}
}
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.mom.dest.amqp;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import fr.dyade.aaa.agent.AgentServer;
import fr.dyade.aaa.common.Debug;
import fr.dyade.aaa.util.management.MXWrapper;
/**
* The {@link AmqpConnectionHandler} service handles the list of known AMQP
* servers, in order to keep live connections with them.
*/
public class AmqpConnectionHandler implements AmqpConnectionHandlerMBean {
private static final Logger logger = Debug.getLogger(AmqpConnectionHandler.class.getName());
private static final String MBEAN_NAME = "type=Connections";
private static AmqpConnectionHandler singleton;
private static Set<LiveServerConnection> servers = new HashSet<LiveServerConnection>();
private AmqpConnectionHandler() {
}
public synchronized static AmqpConnectionHandler getInstance() {
if (singleton == null) {
singleton = new AmqpConnectionHandler();
try {
MXWrapper.registerMBean(singleton, "AMQP#" + AgentServer.getServerId(), MBEAN_NAME);
} catch (Exception e) {
logger.log(BasicLevel.DEBUG, "registerMBean", e);
}
}
return singleton;
}
/**
* Initializes the service. Starts a connection with one server.
*/
public static void init(String args, boolean firstTime) throws Exception {
String host = ConnectionFactory.DEFAULT_HOST;
int port = ConnectionFactory.DEFAULT_AMQP_PORT;
if (args != null) {
StringTokenizer st = new StringTokenizer(args);
if (st.hasMoreTokens()) {
host = st.nextToken();
}
if (st.hasMoreTokens()) {
port = Integer.parseInt(st.nextToken());
}
}
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
getInstance().addServer(factory);
}
/**
* Adds an AMQP server and starts a live connection with it, accessible via
* the {@link ConnectionFactory} provided. A server is uniquely identified
* with its host and port. Adding an existing server won't do anything.
*
* @param factory the factory used to access the server, configured properly
* (host, port, login, password...)
*/
public void addServer(ConnectionFactory factory) {
synchronized (servers) {
servers.add(new LiveServerConnection(factory));
}
}
/**
* Adds an AMQP server and starts a live connection with it, accessible via
* the {@link ConnectionFactory} provided. A server is uniquely identified
* with its host and port. Adding an existing server won't do anything.
*
* @param host host of the added server
* @param port port of the added server
*/
public void addServer(String host, int port) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
addServer(factory);
}
/**
* Adds an AMQP server and starts a live connection with it, accessible via
* the {@link ConnectionFactory} provided. A server is uniquely identified
* with its host and port. Adding an existing server won't do anything.
*
* @param host host of the added server
* @param port port of the added server
* @param user user name
* @param pass user password
*/
public void addServer(String host, int port, String user, String pass) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(user);
factory.setPassword(pass);
addServer(factory);
}
/**
* Removes the live connection to the specified AMQP server.
*
* @param host host of the removed server
* @param port port of the removed server
*/
public void deleteServer(String host, int port) {
synchronized (servers) {
Iterator<LiveServerConnection> iterator = servers.iterator();
while (iterator.hasNext()) {
LiveServerConnection cnx = iterator.next();
if (cnx.getConnectionFactory().getHost().equals(host) && cnx.getConnectionFactory().getPort() == port) {
cnx.stopLiveConnection();
iterator.remove();
break;
}
}
}
}
/**
* Stops all connections to AMQP servers.
*/
public static void stopService() {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Stopping AmqpConnectionHandler service.");
}
synchronized (servers) {
for (LiveServerConnection server : servers) {
server.stopLiveConnection();
}
}
}
/**
* Gets the list of currently opened connections.
*
* @return the list of usable connections.
*/
public static List<Connection> getConnections() {
List<Connection> connections = new ArrayList<Connection>();
synchronized (servers) {
for (LiveServerConnection server : servers) {
if (server.isConnectionOpen()) {
connections.add(server.getConnection());
}
}
}
return connections;
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*