diff --git a/joram/joram/tools/rest/jms/src/main/java/org/objectweb/joram/tools/rest/jms/Helper.java b/joram/joram/tools/rest/jms/src/main/java/org/objectweb/joram/tools/rest/jms/Helper.java index 39d3a9548a5fb3eba04a05f113f7e517c6249635..eab9bf51ab8577f758acfeb5c5726d3bd792de82 100644 --- a/joram/joram/tools/rest/jms/src/main/java/org/objectweb/joram/tools/rest/jms/Helper.java +++ b/joram/joram/tools/rest/jms/src/main/java/org/objectweb/joram/tools/rest/jms/Helper.java @@ -1,6 +1,6 @@ /* * JORAM: Java(TM) Open Reliable Asynchronous Messaging - * Copyright (C) 2016 ScalAgent Distributed Technologies + * Copyright (C) 2016 - 2017 ScalAgent Distributed Technologies * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -64,6 +64,7 @@ public class Helper { public static final int DFLT_CLEANER_PERIOD = 15; public static Logger logger = Debug.getLogger(Helper.class.getName()); + private static final AtomicLong counter = new AtomicLong(1); private static Helper helper = null; private InitialContext ictx; @@ -79,7 +80,7 @@ public class Helper { sessionCtxs = new HashMap(); } - static public Helper getInstance() { + static public synchronized Helper getInstance() { if (helper == null) helper = new Helper(); return helper; @@ -104,9 +105,11 @@ public class Helper { jndiProps.setProperty("java.naming.factory.host", "localhost"); jndiProps.setProperty("java.naming.factory.port", "16400"); } + // TODO: use the osgi service jndi? // ServiceReference ref = context.getServiceReference(javax.naming.spi.ObjectFactory.class); // ObjectFactory jndiFactory = bundleContext.getService(ref); + if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "jndiProperties = " + jndiProps); @@ -210,7 +213,7 @@ public class Helper { return null; } - public Object lookup(String name) throws NamingException { + public synchronized Object lookup(String name) throws NamingException { if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "Helper.lookup " + name); if (ictx == null) @@ -238,7 +241,7 @@ public class Helper { * @return the producer name * @throws Exception */ - public String createProducer( + public synchronized String createProducer( String userName, String password, String clientId, @@ -289,18 +292,19 @@ public class Helper { logger.log(BasicLevel.DEBUG, "Helper.createProducer jmsContext = " + restClientCtx.getJmsContext()); } - SessionContext prodContext = sessionCtxs.get(prodId); - if ( prodContext == null) { + ProducerContext prodContext = (ProducerContext) sessionCtxs.get(prodId); + if (prodContext == null) { // create a new producer context prodContext = new ProducerContext(restClientCtx); prodContext.setJmsContext(restClientCtx.getJmsContext().createContext(sessionMode)); - JMSProducer producer = prodContext.getJmsContext().createProducer(); - producer.setDeliveryMode(deliveryMode); - if (correlationID != null) - producer.setJMSCorrelationID(correlationID); - producer.setPriority(priority); - producer.setTimeToLive(timeToLive); - producer.setDeliveryDelay(deliveryDelay); + + prodContext.setDefaultDeliveryMode(deliveryMode); + prodContext.setDefaultJMSCorrelationID(correlationID); + prodContext.setDefaultPriority(priority); + prodContext.setDefaultTimeToLive(timeToLive); + prodContext.setDefaultDeliveryDelay(deliveryDelay); + + JMSProducer producer = prodContext.getJmsContext().createProducer(); ((ProducerContext) prodContext).setProducer(producer); sessionCtxs.put(prodId, prodContext); restClientCtx.addSessionCtxNames(prodId); @@ -316,10 +320,11 @@ public class Helper { } prodContext.setDest(destination); } + return prodId; } - public String createConsumer( + public synchronized String createConsumer( String userName, String password, String clientId, @@ -426,7 +431,7 @@ public class Helper { return consId; } - private void setMapMessage(Map jsonMap, MapMessage msg) throws Exception { + static final void setMapMessage(Map jsonMap, MapMessage msg) throws Exception { if (jsonMap == null) return; @@ -500,7 +505,7 @@ public class Helper { } } - private Object getValue(Map map, String key) throws Exception { + static final Object getValue(Map map, String key) throws Exception { Object value = map.get(key); if (value instanceof ArrayList) { ArrayList array =(ArrayList) value; @@ -526,7 +531,7 @@ public class Helper { * @param jmsProps * @param jmsBody * @param deliveryMode - * @param deliveryTime + * @param deliveryDelay * @param priority * @param timeToLive * @param correlationID @@ -540,7 +545,7 @@ public class Helper { Map jmsProps, Object jmsBody, int deliveryMode, - long deliveryTime, + long deliveryDelay, int priority, long timeToLive, String correlationID) throws Exception { @@ -550,152 +555,10 @@ public class Helper { if (producerCtx == null) throw new Exception(prodName + " not found."); - Message msg = null; - - if (type.equals(TextMessage.class.getSimpleName())) { - if (logger.isLoggable(BasicLevel.DEBUG)) - logger.log(BasicLevel.DEBUG, "send text message = " + jmsBody); - - // create the text message - msg = producerCtx.getJmsContext().createTextMessage((String) jmsBody); - } else if(type.equals(BytesMessage.class.getSimpleName())) { - // create the byte message - if (jmsBody instanceof ArrayList) { - if (logger.isLoggable(BasicLevel.DEBUG)) - logger.log(BasicLevel.DEBUG, "send bytes message"); - - msg = producerCtx.getJmsContext().createBytesMessage(); - byte[] bytes = new byte[((ArrayList) jmsBody).size()]; - for (int i = 0; i < ((ArrayList) jmsBody).size(); i++) { - Object value = ((ArrayList) jmsBody).get(i); - bytes[i] = ((Number) value).byteValue(); - } - ((BytesMessage) msg).writeBytes(bytes); - ((BytesMessage) msg).reset(); - } else { - throw new Exception("BytesMessage: invalid jmsBody = " + jmsBody.getClass().getName()); - } - } else if(type.equals(MapMessage.class.getSimpleName())) { - if (logger.isLoggable(BasicLevel.DEBUG)) - logger.log(BasicLevel.DEBUG, "send map message"); - - // create the map message - if (jmsBody instanceof Map) { - msg = producerCtx.getJmsContext().createMapMessage(); - setMapMessage((Map) jmsBody, (MapMessage) msg); - } else { - throw new Exception("MapMessage: invalid jmsBody = " + jmsBody.getClass().getName()); - } - } else if(type.equals(ObjectMessage.class.getSimpleName())) { - throw new Exception("type: " + type + ", not yet implemented"); - } else if(type.equals(StreamMessage.class.getSimpleName())) { - throw new Exception("type: " + type + ", not yet implemented"); - } else { - throw new Exception("Unknown message type: " + type); - } - - if (jmsHeaders != null) { - // Header - if (deliveryMode == -1) { - Integer value = (Integer) getValue(jmsHeaders, "DeliveryMode"); - if (value != null) - msg.setJMSDeliveryMode(value); - } - if (deliveryTime == -1) { - Long value = (Long) getValue(jmsHeaders, "DeliveryTime"); - if (value != null) - msg.setJMSDeliveryTime(value); - } - if (priority == -1) { - Integer value = (Integer) getValue(jmsHeaders, "Priority"); - if (value != null) - msg.setJMSPriority(value); - } - if (timeToLive == -1) { - Long value = (Long) getValue(jmsHeaders, "Expiration"); - if (value != null) - msg.setJMSExpiration(value); - } - if (correlationID == null) { - String value = (String) getValue(jmsHeaders, "CorrelationID"); - if (value != null) - msg.setJMSCorrelationID(value); - } - } - - if (deliveryMode > -1) - msg.setJMSDeliveryMode(deliveryMode); - if (deliveryTime > -1) - msg.setJMSDeliveryTime(deliveryTime); - if (priority > -1) - msg.setJMSPriority(priority); - if (timeToLive > -1) - msg.setJMSExpiration(timeToLive); - if (correlationID != null) - msg.setJMSCorrelationID(correlationID); - - if (jmsProps != null) { - // Properties - for (String key : jmsProps.keySet()) { - Object value = null; - try { - value = getValue(jmsProps, key); - } catch (Exception e) { - if (logger.isLoggable(BasicLevel.ERROR)) - logger.log(BasicLevel.ERROR, "ignore set jms properties(" + key + ", " + value + ") : " + e.getMessage()); - continue; - } - if (value == null) - continue; - if (logger.isLoggable(BasicLevel.DEBUG)) - logger.log(BasicLevel.DEBUG, "set jms properties: " + key + ", value = " + value + ", " + value.getClass().getSimpleName()); - - switch (value.getClass().getSimpleName()) { - case "String": - msg.setStringProperty(key, (String) value); - break; - case "Boolean": - msg.setBooleanProperty(key, (Boolean)value); - break; - case "Integer": - msg.setIntProperty(key, (Integer)value); - break; - case "Long": - msg.setLongProperty(key, (Long)value); - break; - case "Double": - msg.setDoubleProperty(key, (Double)value); - break; - case "Float": - msg.setFloatProperty(key, (Float)value); - break; - case "Short": - msg.setShortProperty(key, (Short)value); - break; - case "Byte": - msg.setByteProperty(key, (Byte)value); - break; - - default: - try { - msg.setObjectProperty(key, value); - } catch (Exception e) { - if (logger.isLoggable(BasicLevel.ERROR)) - logger.log(BasicLevel.ERROR, "ignore jms setObjectProperties(" + key + ", " + value + ") : " + e.getMessage()); - } - break; - } - } - } - - // send the message - producerCtx.getProducer().send(producerCtx.getDest(), msg); - // Increment the last id - producerCtx.incLastId(); - //update activity - producerCtx.getClientCtx().setLastActivity(System.currentTimeMillis()); - - return producerCtx.getLastId(); + return producerCtx.send(type, + jmsHeaders, jmsProps, + jmsBody, deliveryMode, deliveryDelay, priority, timeToLive, + correlationID); } catch (Exception e) { if (logger.isLoggable(BasicLevel.WARN)) logger.log(BasicLevel.WARN, e); @@ -715,39 +578,12 @@ public class Helper { ConsumerContext consumerCtx = (ConsumerContext) sessionCtxs.get(consName); if (consumerCtx == null) throw new Exception(consName + " not found."); - - //update activity - consumerCtx.getClientCtx().setLastActivity(System.currentTimeMillis()); - + Message message = consumerCtx.getMessage(msgId); if (message != null) return message; - if (timeout > 0) - message = consumerCtx.getConsumer().receive(timeout); - else if (timeout == 0) - message = consumerCtx.getConsumer().receiveNoWait(); - else { - message = consumerCtx.getConsumer().receive(); - if (message == null) { - throw new JMSException("The consumer expire (timeout)"); - } - } - - //update activity - consumerCtx.getClientCtx().setLastActivity(System.currentTimeMillis()); - - if (message != null) { - if (consumerCtx.getJmsContext().getSessionMode() == JMSContext.CLIENT_ACKNOWLEDGE) { - long id = msgId; - if (id == -1) - id = consumerCtx.incLastId(); - consumerCtx.put(id, message); - } else { - consumerCtx.incLastId(); - } - } - + message = consumerCtx.receive(timeout, msgId); return message; }