Commit 1d11ab94 authored by afreyssin's avatar afreyssin

Fix potential synchronization issues.

Moves send and receive code in Producer and Consumer contexts.
parent 4ced5536
/* /*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging * JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2016 ScalAgent Distributed Technologies * Copyright (C) 2016 - 2017 ScalAgent Distributed Technologies
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public * modify it under the terms of the GNU Lesser General Public
...@@ -64,6 +64,7 @@ public class Helper { ...@@ -64,6 +64,7 @@ public class Helper {
public static final int DFLT_CLEANER_PERIOD = 15; public static final int DFLT_CLEANER_PERIOD = 15;
public static Logger logger = Debug.getLogger(Helper.class.getName()); public static Logger logger = Debug.getLogger(Helper.class.getName());
private static final AtomicLong counter = new AtomicLong(1); private static final AtomicLong counter = new AtomicLong(1);
private static Helper helper = null; private static Helper helper = null;
private InitialContext ictx; private InitialContext ictx;
...@@ -79,7 +80,7 @@ public class Helper { ...@@ -79,7 +80,7 @@ public class Helper {
sessionCtxs = new HashMap<String, SessionContext>(); sessionCtxs = new HashMap<String, SessionContext>();
} }
static public Helper getInstance() { static public synchronized Helper getInstance() {
if (helper == null) if (helper == null)
helper = new Helper(); helper = new Helper();
return helper; return helper;
...@@ -104,9 +105,11 @@ public class Helper { ...@@ -104,9 +105,11 @@ public class Helper {
jndiProps.setProperty("java.naming.factory.host", "localhost"); jndiProps.setProperty("java.naming.factory.host", "localhost");
jndiProps.setProperty("java.naming.factory.port", "16400"); jndiProps.setProperty("java.naming.factory.port", "16400");
} }
// TODO: use the osgi service jndi? // TODO: use the osgi service jndi?
// ServiceReference<ObjectFactory> ref = context.getServiceReference(javax.naming.spi.ObjectFactory.class); // ServiceReference<ObjectFactory> ref = context.getServiceReference(javax.naming.spi.ObjectFactory.class);
// ObjectFactory jndiFactory = bundleContext.getService(ref); // ObjectFactory jndiFactory = bundleContext.getService(ref);
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "jndiProperties = " + jndiProps); logger.log(BasicLevel.DEBUG, "jndiProperties = " + jndiProps);
...@@ -210,7 +213,7 @@ public class Helper { ...@@ -210,7 +213,7 @@ public class Helper {
return null; return null;
} }
public Object lookup(String name) throws NamingException { public synchronized Object lookup(String name) throws NamingException {
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Helper.lookup " + name); logger.log(BasicLevel.DEBUG, "Helper.lookup " + name);
if (ictx == null) if (ictx == null)
...@@ -238,7 +241,7 @@ public class Helper { ...@@ -238,7 +241,7 @@ public class Helper {
* @return the producer name * @return the producer name
* @throws Exception * @throws Exception
*/ */
public String createProducer( public synchronized String createProducer(
String userName, String userName,
String password, String password,
String clientId, String clientId,
...@@ -289,18 +292,19 @@ public class Helper { ...@@ -289,18 +292,19 @@ public class Helper {
logger.log(BasicLevel.DEBUG, "Helper.createProducer jmsContext = " + restClientCtx.getJmsContext()); logger.log(BasicLevel.DEBUG, "Helper.createProducer jmsContext = " + restClientCtx.getJmsContext());
} }
SessionContext prodContext = sessionCtxs.get(prodId); ProducerContext prodContext = (ProducerContext) sessionCtxs.get(prodId);
if ( prodContext == null) { if (prodContext == null) {
// create a new producer context // create a new producer context
prodContext = new ProducerContext(restClientCtx); prodContext = new ProducerContext(restClientCtx);
prodContext.setJmsContext(restClientCtx.getJmsContext().createContext(sessionMode)); prodContext.setJmsContext(restClientCtx.getJmsContext().createContext(sessionMode));
JMSProducer producer = prodContext.getJmsContext().createProducer();
producer.setDeliveryMode(deliveryMode); prodContext.setDefaultDeliveryMode(deliveryMode);
if (correlationID != null) prodContext.setDefaultJMSCorrelationID(correlationID);
producer.setJMSCorrelationID(correlationID); prodContext.setDefaultPriority(priority);
producer.setPriority(priority); prodContext.setDefaultTimeToLive(timeToLive);
producer.setTimeToLive(timeToLive); prodContext.setDefaultDeliveryDelay(deliveryDelay);
producer.setDeliveryDelay(deliveryDelay);
JMSProducer producer = prodContext.getJmsContext().createProducer();
((ProducerContext) prodContext).setProducer(producer); ((ProducerContext) prodContext).setProducer(producer);
sessionCtxs.put(prodId, prodContext); sessionCtxs.put(prodId, prodContext);
restClientCtx.addSessionCtxNames(prodId); restClientCtx.addSessionCtxNames(prodId);
...@@ -316,10 +320,11 @@ public class Helper { ...@@ -316,10 +320,11 @@ public class Helper {
} }
prodContext.setDest(destination); prodContext.setDest(destination);
} }
return prodId; return prodId;
} }
public String createConsumer( public synchronized String createConsumer(
String userName, String userName,
String password, String password,
String clientId, String clientId,
...@@ -426,7 +431,7 @@ public class Helper { ...@@ -426,7 +431,7 @@ public class Helper {
return consId; return consId;
} }
private void setMapMessage(Map<String, Object> jsonMap, MapMessage msg) throws Exception { static final void setMapMessage(Map<String, Object> jsonMap, MapMessage msg) throws Exception {
if (jsonMap == null) if (jsonMap == null)
return; return;
...@@ -500,7 +505,7 @@ public class Helper { ...@@ -500,7 +505,7 @@ public class Helper {
} }
} }
private Object getValue(Map map, String key) throws Exception { static final Object getValue(Map map, String key) throws Exception {
Object value = map.get(key); Object value = map.get(key);
if (value instanceof ArrayList) { if (value instanceof ArrayList) {
ArrayList<String> array =(ArrayList<String>) value; ArrayList<String> array =(ArrayList<String>) value;
...@@ -526,7 +531,7 @@ public class Helper { ...@@ -526,7 +531,7 @@ public class Helper {
* @param jmsProps * @param jmsProps
* @param jmsBody * @param jmsBody
* @param deliveryMode * @param deliveryMode
* @param deliveryTime * @param deliveryDelay
* @param priority * @param priority
* @param timeToLive * @param timeToLive
* @param correlationID * @param correlationID
...@@ -540,7 +545,7 @@ public class Helper { ...@@ -540,7 +545,7 @@ public class Helper {
Map<String, Object> jmsProps, Map<String, Object> jmsProps,
Object jmsBody, Object jmsBody,
int deliveryMode, int deliveryMode,
long deliveryTime, long deliveryDelay,
int priority, int priority,
long timeToLive, long timeToLive,
String correlationID) throws Exception { String correlationID) throws Exception {
...@@ -550,152 +555,10 @@ public class Helper { ...@@ -550,152 +555,10 @@ public class Helper {
if (producerCtx == null) if (producerCtx == null)
throw new Exception(prodName + " not found."); throw new Exception(prodName + " not found.");
Message msg = null; return producerCtx.send(type,
jmsHeaders, jmsProps,
if (type.equals(TextMessage.class.getSimpleName())) { jmsBody, deliveryMode, deliveryDelay, priority, timeToLive,
if (logger.isLoggable(BasicLevel.DEBUG)) correlationID);
logger.log(BasicLevel.DEBUG, "send text message = " + jmsBody);
// create the text message
msg = producerCtx.getJmsContext().createTextMessage((String) jmsBody);
} else if(type.equals(BytesMessage.class.getSimpleName())) {
// create the byte message
if (jmsBody instanceof ArrayList) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "send bytes message");
msg = producerCtx.getJmsContext().createBytesMessage();
byte[] bytes = new byte[((ArrayList) jmsBody).size()];
for (int i = 0; i < ((ArrayList) jmsBody).size(); i++) {
Object value = ((ArrayList) jmsBody).get(i);
bytes[i] = ((Number) value).byteValue();
}
((BytesMessage) msg).writeBytes(bytes);
((BytesMessage) msg).reset();
} else {
throw new Exception("BytesMessage: invalid jmsBody = " + jmsBody.getClass().getName());
}
} else if(type.equals(MapMessage.class.getSimpleName())) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "send map message");
// create the map message
if (jmsBody instanceof Map) {
msg = producerCtx.getJmsContext().createMapMessage();
setMapMessage((Map) jmsBody, (MapMessage) msg);
} else {
throw new Exception("MapMessage: invalid jmsBody = " + jmsBody.getClass().getName());
}
} else if(type.equals(ObjectMessage.class.getSimpleName())) {
throw new Exception("type: " + type + ", not yet implemented");
} else if(type.equals(StreamMessage.class.getSimpleName())) {
throw new Exception("type: " + type + ", not yet implemented");
} else {
throw new Exception("Unknown message type: " + type);
}
if (jmsHeaders != null) {
// Header
if (deliveryMode == -1) {
Integer value = (Integer) getValue(jmsHeaders, "DeliveryMode");
if (value != null)
msg.setJMSDeliveryMode(value);
}
if (deliveryTime == -1) {
Long value = (Long) getValue(jmsHeaders, "DeliveryTime");
if (value != null)
msg.setJMSDeliveryTime(value);
}
if (priority == -1) {
Integer value = (Integer) getValue(jmsHeaders, "Priority");
if (value != null)
msg.setJMSPriority(value);
}
if (timeToLive == -1) {
Long value = (Long) getValue(jmsHeaders, "Expiration");
if (value != null)
msg.setJMSExpiration(value);
}
if (correlationID == null) {
String value = (String) getValue(jmsHeaders, "CorrelationID");
if (value != null)
msg.setJMSCorrelationID(value);
}
}
if (deliveryMode > -1)
msg.setJMSDeliveryMode(deliveryMode);
if (deliveryTime > -1)
msg.setJMSDeliveryTime(deliveryTime);
if (priority > -1)
msg.setJMSPriority(priority);
if (timeToLive > -1)
msg.setJMSExpiration(timeToLive);
if (correlationID != null)
msg.setJMSCorrelationID(correlationID);
if (jmsProps != null) {
// Properties
for (String key : jmsProps.keySet()) {
Object value = null;
try {
value = getValue(jmsProps, key);
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "ignore set jms properties(" + key + ", " + value + ") : " + e.getMessage());
continue;
}
if (value == null)
continue;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "set jms properties: " + key + ", value = " + value + ", " + value.getClass().getSimpleName());
switch (value.getClass().getSimpleName()) {
case "String":
msg.setStringProperty(key, (String) value);
break;
case "Boolean":
msg.setBooleanProperty(key, (Boolean)value);
break;
case "Integer":
msg.setIntProperty(key, (Integer)value);
break;
case "Long":
msg.setLongProperty(key, (Long)value);
break;
case "Double":
msg.setDoubleProperty(key, (Double)value);
break;
case "Float":
msg.setFloatProperty(key, (Float)value);
break;
case "Short":
msg.setShortProperty(key, (Short)value);
break;
case "Byte":
msg.setByteProperty(key, (Byte)value);
break;
default:
try {
msg.setObjectProperty(key, value);
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "ignore jms setObjectProperties(" + key + ", " + value + ") : " + e.getMessage());
}
break;
}
}
}
// send the message
producerCtx.getProducer().send(producerCtx.getDest(), msg);
// Increment the last id
producerCtx.incLastId();
//update activity
producerCtx.getClientCtx().setLastActivity(System.currentTimeMillis());
return producerCtx.getLastId();
} catch (Exception e) { } catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN)) if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, e); logger.log(BasicLevel.WARN, e);
...@@ -715,39 +578,12 @@ public class Helper { ...@@ -715,39 +578,12 @@ public class Helper {
ConsumerContext consumerCtx = (ConsumerContext) sessionCtxs.get(consName); ConsumerContext consumerCtx = (ConsumerContext) sessionCtxs.get(consName);
if (consumerCtx == null) if (consumerCtx == null)
throw new Exception(consName + " not found."); throw new Exception(consName + " not found.");
//update activity
consumerCtx.getClientCtx().setLastActivity(System.currentTimeMillis());
Message message = consumerCtx.getMessage(msgId); Message message = consumerCtx.getMessage(msgId);
if (message != null) if (message != null)
return message; return message;
if (timeout > 0) message = consumerCtx.receive(timeout, msgId);
message = consumerCtx.getConsumer().receive(timeout);
else if (timeout == 0)
message = consumerCtx.getConsumer().receiveNoWait();
else {
message = consumerCtx.getConsumer().receive();
if (message == null) {
throw new JMSException("The consumer expire (timeout)");
}
}
//update activity
consumerCtx.getClientCtx().setLastActivity(System.currentTimeMillis());
if (message != null) {
if (consumerCtx.getJmsContext().getSessionMode() == JMSContext.CLIENT_ACKNOWLEDGE) {
long id = msgId;
if (id == -1)
id = consumerCtx.incLastId();
consumerCtx.put(id, message);
} else {
consumerCtx.incLastId();
}
}
return message; return message;
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment