Commit 12d1043f authored by Nicolas Tachker's avatar Nicolas Tachker
Browse files

JORAM-245: Add a REST API for JMS.

parent a86ce5ef
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>joram-tools-rest-jms</artifactId>
<packaging>bundle</packaging>
<name>JORAM :: joram :: tools :: mqtt :: jms</name>
<description>Builds the Joram REST JMS.</description>
<parent>
<groupId>org.ow2.joram</groupId>
<artifactId>joram-tools-rest</artifactId>
<version>5.12.0-SNAPSHOT</version>
</parent>
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>${maven.bundle.plugin.version}</version>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
<Bundle-Activator>org.objectweb.joram.tools.rest.jms.Activator</Bundle-Activator>
<Import-Package>
fr.dyade.aaa.common,
javax.jms,
javax.naming,
org.objectweb.util.monolog,
org.objectweb.util.monolog.api,
org.eclipse.jetty.server</Import-Package>
<!-- TODO remove DynamicImport-Package -->
<DynamicImport-Package>*</DynamicImport-Package>
</instructions>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.ow2.spec.ee</groupId>
<artifactId>ow2-jms-2.0-spec</artifactId>
</dependency>
<dependency>
<groupId>org.ow2.joram</groupId>
<artifactId>a3-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-server</artifactId>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet-core</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2016 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.tools.rest.jms;
import java.util.Dictionary;
import java.util.Hashtable;
import org.glassfish.jersey.servlet.ServletContainer;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
import org.osgi.service.http.HttpContext;
import org.osgi.service.http.HttpService;
import fr.dyade.aaa.common.Debug;
/**
*
*/
public class Activator implements BundleActivator {
public static Logger logger = Debug.getLogger(Activator.class.getName());
public static final String SERVICE_NAME = "rest.service.name";
private BundleContext context = null;
private HttpService httpService;
public String servletAlias = "/joram";
private CleanerTask cleanerTask;
public void start(BundleContext bundleContext) throws Exception {
this.context = bundleContext;
ServiceReference<HttpService> reference = bundleContext.getServiceReference(HttpService.class);
httpService = bundleContext.getService(reference);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "rest.jms.activator httpService = " + httpService);
ClassLoader myClassLoader = getClass().getClassLoader();
ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(myClassLoader);
String serviceName = bundleContext.getProperty(SERVICE_NAME);
if (serviceName != null && !serviceName.isEmpty()) {
servletAlias = serviceName.startsWith("/") ? serviceName : "/" + serviceName;
}
// set properties from felix configuration
Helper.getInstance().setGlobalProperties(bundleContext);
Dictionary<String, String> jerseyServletParams = new Hashtable<>();
jerseyServletParams.put("javax.ws.rs.Application", JmsJerseyApplication.class.getName());
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "start: REGISTERING SERVLETS " + servletAlias);
HttpContext httpContext = null;
// register the servlet
httpService.registerServlet(servletAlias, new ServletContainer(), jerseyServletParams, httpContext);
String period = bundleContext.getProperty(Helper.BUNDLE_CLEANER_PERIOD_PROP);
if (period != null && !period.isEmpty()) {
cleanerTask = new CleanerTask();
cleanerTask.setPeriod(Integer.parseInt(period));
cleanerTask.start();
}
} finally {
Thread.currentThread().setContextClassLoader(originalContextClassLoader);
}
}
public void stop(BundleContext bundleContext) throws Exception {
if (cleanerTask != null)
cleanerTask.stop();
Helper.getInstance().closeAll();
if (httpService != null) {
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "stop: UNREGISTERING SERVLETS " + servletAlias);
httpService.unregister(servletAlias);
}
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2016 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.tools.rest.jms;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import fr.dyade.aaa.common.Debug;
public class CleanerTask implements Callable<Boolean> {
public static Logger logger = Debug.getLogger(CleanerTask.class.getName());
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
private int period = 60;//in s
private int timeOut = 3600;//in s
private ScheduledFuture<?> callableHandle;
private HashMap<String, RestClientContext> restClientCtxs = Helper.getInstance().getRestClientCtxs();
private Helper helper = Helper.getInstance();
/**
* @return the period
*/
public int getPeriod() {
return period;
}
/**
* @param period the period to set
*/
public void setPeriod(int period) {
this.period = period;
}
/**
* @return the timeOut
*/
public int getTimeOut() {
return timeOut;
}
/**
* @param timeOut the timeOut to set
*/
public void setTimeOut(int timeOut) {
this.timeOut = timeOut;
}
public void start() {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "cleanerTask.start period = " + period);
if (scheduler != null && scheduler.isTerminated())
scheduler = Executors.newScheduledThreadPool(2);
if (callableHandle != null && !callableHandle.isCancelled()) {
callableHandle.cancel(true);
}
callableHandle = scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
submitCallTask();
}
}, 0, period, TimeUnit.SECONDS);
}
public void stop() {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "cleanerTask.stop");
if (callableHandle != null)
callableHandle.cancel(true);
scheduler.shutdown();
}
public void submitCallTask() {
Boolean res = false;
Future<Boolean> future = scheduler.submit(this);
try {
res = future.get(timeOut, TimeUnit.SECONDS);
} catch (InterruptedException e) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "==== cleanerTask.submitCallTask InterruptedException");
} catch (ExecutionException e) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "==== cleanerTask.submitCallTask ExecutionException");
} catch (TimeoutException e) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "==== cleanerTask.submitCallTask TimeoutException");
future.cancel(true);
}
}
@Override
public Boolean call() throws Exception {
ArrayList<String> toClose = new ArrayList<String>();
for (RestClientContext restClientCtx : restClientCtxs.values()) {
if (restClientCtx.getIdleTimeout() < 1) {
// never close
continue;
}
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "cleanerTask.call : " + restClientCtx.getClientId() + ", idleTimeout = " + restClientCtx.getIdleTimeout());
if (restClientCtx.getLastActivity() + restClientCtx.getIdleTimeout() < System.currentTimeMillis()) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "cleanerTask.call close : " + restClientCtx.getClientId());
if (!toClose.contains(restClientCtx.getClientId()))
toClose.add(restClientCtx.getClientId());
}
}
for (String clientId : toClose) {
helper.close(clientId);
}
return true;
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2016 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.tools.rest.jms;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSConsumer;
import javax.jms.JMSException;
import javax.jms.Message;
public class ConsumerContext extends SessionContext {
private JMSConsumer consumer;
private ConcurrentHashMap<Long, Message> messages;
public ConsumerContext(RestClientContext clientCtx) {
super(clientCtx);
messages = new ConcurrentHashMap<Long, Message>();
}
/**
* @return the consumer
*/
public JMSConsumer getConsumer() {
return consumer;
}
/**
* @param consumer
* the consumer to set
*/
public void setConsumer(JMSConsumer consumer) {
this.consumer = consumer;
}
public long getId(Message message) throws JMSException {
if (message == null || !messages.containsValue(message))
return -1;
for (Entry<Long, Message> entry : messages.entrySet()) {
if (message.getJMSMessageID().equals(entry.getValue().getJMSMessageID())) {
return entry.getKey();
}
}
return -1;
}
public void put(long id, Message msg) {
if (msg == null)
return;
if (id > getLastId())
setLastId(id);
messages.put(id, msg);
}
public Message getMessage(long id) {
return messages.get(id);
}
public Message removeMessage(long id) {
return messages.remove(id);
}
public void clear() {
messages.clear();
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2016 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.tools.rest.jms;
import java.util.HashSet;
import java.util.Set;
import javax.ws.rs.core.Application;
public class JmsJerseyApplication extends Application {
static private Set<Class<?>> result = new HashSet<Class<?>>();
@Override
public Set<Class<?>> getClasses() {
result.add(JmsService.class);
result.add(JndiService.class);
result.add(JmsContextService.class);
result.add(RootService.class);
return result;
}
}
\ No newline at end of file
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2016 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.tools.rest.jms;
import javax.jms.JMSContext;
import javax.jms.JMSSecurityRuntimeException;
import javax.jms.Message;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HEAD;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import fr.dyade.aaa.common.Debug;
@Path("/"+ JmsService.JMS)
public class JmsService {
public static Logger logger = Debug.getLogger(JmsService.class.getName());
private final Helper helper = Helper.getInstance();
public static final String JMS = "jms";
public static final String JMS_CREATE_PROD = "create-producer";
public static final String JMS_CREATE_CONS = "create-consumer";
@GET
@Produces(MediaType.TEXT_HTML)
public String info(@Context UriInfo uriInfo) {
StringBuilder buff = new StringBuilder();
buff.append("<html>");
buff.append("<body>");
buff.append("<h3>create a producer (HEAD)</h3>");
buff.append("<pre>");
buff.append(uriInfo.getAbsolutePathBuilder() + "/[queue|topic]/{<b>destination-name</b>}/"+JMS_CREATE_PROD);
buff.append("\n<b>options:</b>");
buff.append("\n <b>client-id:</b> The client identifier for the JMSContext's connection");
buff.append("\n <b>name:</b> The producer name for the producer JMSContext");
buff.append("\n <b>session-mode:</b> AUTO_ACKNOWLEDGE, CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE or SESSION_TRANSACTED");
buff.append("\n <b>delivery-mode:</b> Specifies the delivery mode of messages that are sent using this JMSProducer");
buff.append("\n <b>delivery-delay:</b> Sets the minimum length of time in milliseconds that must elapse after a message is sent before the JMS provider may deliver the message to a consumer");
buff.append("\n <b>correlation-id:</b> Specifies that messages sent using this JMSProducer will have their JMSCorrelationID header value set to the specified correlation ID");
buff.append("\n <b>priority:</b> Specifies the priority of messages that are sent using this JMSProducer");
buff.append("\n <b>timeTo-live:</b> Specifies the time to live of messages that are sent using this JMSProducer");
buff.append("\n <b>idle-timeout:</b> Allows to set the idle time in milliseconds in which the producer context will be closed if idle");
buff.append("\n <b>user:</b> Specifies the userName for the JMS connection");
buff.append("\n <b>password:</b> Specifies the password for the JMS connection");
buff.append("</pre>");
buff.append("<h3>create a consumer (HEAD)</h3>");
buff.append("<pre>");
buff.append(uriInfo.getAbsolutePathBuilder() + "/[queue|topic]/{<b>destination-name</b>}/"+JMS_CREATE_CONS);
buff.append("\n<b>options:</b>");
buff.append("\n <b>client-id:</b> The client identifier for the JMSContext's connection");
buff.append("\n <b>name:</b> The producer name for the producer JMSContext");
buff.append("\n <b>session-mode:</b> AUTO_ACKNOWLEDGE, CLIENT_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE or SESSION_TRANSACTED");
buff.append("\n <b>selector:</b> Only messages with properties matching the message selector expression are delivered");
//TODO: buff.append("\n no-local:</b> if true then any messages published to the topic using this session's connection");
buff.append("\n <b>durable:</b> true to creates an durable subscription on the specified topic");
buff.append("\n <b>shared:</b> true for shared");
buff.append("\n <b>sub-name:</b> the name used to identify this subscription");
buff.append("\n <b>idle-timeout:</b> Allows to set the idle time in milliseconds in which the consumer context will be closed if idle");
buff.append("\n <b>user:</b> Specifies the userName for the JMS connection");
buff.append("\n <b>password:</b> Specifies the password for the JMS connection");
buff.append("</pre>");
buff.append("<h3>close a producer or a consumer (DELETE)</h3>");
buff.append("<pre>");
buff.append(uriInfo.getAbsolutePathBuilder() + "/{<b>name</b>}");
buff.append("</pre>");
buff.append("</body>");
buff.append("</html>");
return buff.toString();
}
@HEAD
@Path("/topic/{destName}/"+JMS_CREATE_PROD)