Commit 8c6b986a authored by Andre Freyssinet's avatar Andre Freyssinet
Browse files

Factorizes JMS_JORAM property names.

Fix an issue with null values in setProperty.
parent d9a2b3cf
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2006 - 2018 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.shared.messages;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Vector;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
import org.objectweb.joram.shared.admin.AbstractAdminMessage;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import fr.dyade.aaa.common.Debug;
import fr.dyade.aaa.common.encoding.Decoder;
import fr.dyade.aaa.common.encoding.Encodable;
import fr.dyade.aaa.common.encoding.EncodableHelper;
import fr.dyade.aaa.common.encoding.Encoder;
import fr.dyade.aaa.common.stream.Properties;
import fr.dyade.aaa.common.stream.StreamUtil;
import fr.dyade.aaa.common.stream.Streamable;
/**
* Implements the <code>Message</code> data structure.
*/
public final class Message implements Cloneable, Serializable, Streamable, Encodable {
/** define serialVersionUID for interoperability */
private static final long serialVersionUID = 3L;
// default value from javax.jms.Message (jms 1.1)
public static final int NON_PERSISTENT = 1;
public static final int PERSISTENT = 2;
public static final int DEFAULT_DELIVERY_MODE = PERSISTENT;
public static final int DEFAULT_PRIORITY = 4;
public static final long DEFAULT_TIME_TO_LIVE = 0;
/** logger */
public static Logger logger = Debug.getLogger(Message.class.getName());
/**
* Constructs a bright new <code>Message</code>.
*/
public Message() {}
/**
* Body of the message.
* on client side, used getBody and setBody instead of direct access to the body.
*/
public transient byte[] body = null;
/**
* The offset of the subarray in <code>body</code> to be used;
* must be non-negative and no larger than <code>array.length</code>.
*/
public transient int bodyOffset;
/**
* The length of the subarray in <code>body</code> to be used;
* must be non-negative and no larger than <code>array.length - offset</code>.
* Value <code>-1</code> means that there is no subarray in <code>body</code>
* and <code>bodyOffset</code> is ignored.
* Default value is <code>bodyOffset</code>.
*/
public transient int bodyLength = -1;
/** The message properties table. */
public transient Properties properties = null;
/**
* Returns a property as an object.
*
* @param name The property name.
*/
public Object getProperty(String name) {
if (properties == null) return null;
return properties.get(name);
}
/**
* Sets a property value.
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2006 - 2021 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package org.objectweb.joram.shared.messages;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Vector;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
import org.objectweb.joram.shared.admin.AbstractAdminMessage;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import fr.dyade.aaa.common.Debug;
import fr.dyade.aaa.common.encoding.Decoder;
import fr.dyade.aaa.common.encoding.Encodable;
import fr.dyade.aaa.common.encoding.EncodableHelper;
import fr.dyade.aaa.common.encoding.Encoder;
import fr.dyade.aaa.common.stream.Properties;
import fr.dyade.aaa.common.stream.StreamUtil;
import fr.dyade.aaa.common.stream.Streamable;
/**
* Implements the <code>Message</code> data structure.
*/
public final class Message implements Cloneable, Serializable, Streamable, Encodable {
/** define serialVersionUID for interoperability */
private static final long serialVersionUID = 3L;
// Default values from javax.jms.Message (JMS 1.1)
public static final int NON_PERSISTENT = 1;
public static final int PERSISTENT = 2;
public static final int DEFAULT_DELIVERY_MODE = PERSISTENT;
public static final int DEFAULT_PRIORITY = 4;
public static final long DEFAULT_TIME_TO_LIVE = 0;
// Reserved property names for Joram
public static final String SWAPALLOWED = "JMS_JORAM_SWAPALLOWED";
public static final String ERRORCOUNT = "JMS_JORAM_ERRORCOUNT";
public static final String ERRORCAUSE_PREFIX = "JMS_JORAM_ERRORCAUSE_";
public static final String ERRORCODE_PREFIX= "JMS_JORAM_ERRORCODE_";
public static final String CORRELATION_ID = "JMS_JORAM_CORRELATIONID";
/** logger */
public static Logger logger = Debug.getLogger(Message.class.getName());
/**
* Constructs a bright new <code>Message</code>.
*/
public Message() {}
/**
* Body of the message.
* on client side, used getBody and setBody instead of direct access to the body.
*/
public transient byte[] body = null;
/**
* The offset of the subarray in <code>body</code> to be used;
* must be non-negative and no larger than <code>array.length</code>.
*/
public transient int bodyOffset;
/**
* The length of the subarray in <code>body</code> to be used;
* must be non-negative and no larger than <code>array.length - offset</code>.
* Value <code>-1</code> means that there is no subarray in <code>body</code>
* and <code>bodyOffset</code> is ignored.
* Default value is <code>bodyOffset</code>.
*/
public transient int bodyLength = -1;
/** The message properties table. */
public transient Properties properties = null;
/**
* Returns a property as an object.
*
* @param name The property name.
*/
public Object getProperty(String name) {
if (properties == null) return null;
return properties.get(name);
}
/**
* Sets a property value.
* If the value is not a Java primitive object (Boolean, Number, String or byte[]) its string representation is used.
*
* @param name The property name.
* @param value The property value.
*
* @exception IllegalArgumentException If the key name is illegal (null or empty string).
*/
public void setProperty(String name, Object value) {
if (name == null || name.equals(""))
throw new IllegalArgumentException("Invalid property name: " + name);
if (properties == null)
properties = new Properties();
if (value instanceof Boolean || value instanceof Number || value instanceof String || value instanceof byte[]) {
properties.put(name, value);
} else {
properties.put(name, value.toString());
}
}
/** The message identifier. */
public transient String id = null;
/** <code>true</code> if the message must be persisted. */
public transient boolean persistent = true;
/** A simple message carries an empty body. */
public static final int SIMPLE = 0;
/** A text message carries a String body. */
public static final int TEXT = 1;
/** An object message carries a serializable object. */
public static final int OBJECT = 2;
/** A map message carries an hashtable. */
public static final int MAP = 3;
/** A stream message carries a bytes stream. */
public static final int STREAM = 4;
/** A bytes message carries an array of bytes. */
public static final int BYTES = 5;
/** A admin message carries a streamable object. */
public static final int ADMIN = 6;
/**
* The client message type: SIMPLE, TEXT, OBJECT, MAP, STREAM, BYTES, ADMIN.
* By default, the message type is SIMPLE.
* Be careful, this type is coded on 4 bits (see writeTo and readFrom methods).
*/
public transient int type = SIMPLE;
/**
* The JMSType header field contains a message type identifier supplied by a
* client when a message is sent.
*/
public transient String jmsType = null;
/**
* The message priority from 0 to 9, 9 being the highest.
* By default, the priority is 4.
* Be careful, this type is coded on 4 bits (see writeTo and readFrom methods).
*/
public transient int priority = DEFAULT_PRIORITY;
/** The message expiration time, by default 0 for infinite time-to-live. */
public transient long expiration = 0;
/** The message time stamp. */
public transient long timestamp;
/**
* <code>true</code> if the message has been denied at least once by a
* consumer.
*/
public transient boolean redelivered = false;
/** The message destination identifier. */
public transient String toId = null;
/** The message destination name. */
public transient String toName = null;
/** The message destination type. */
public transient byte toType;
/** <code>true</code> if compressed body. */
public transient boolean compressed;
/**
* If the message body is upper than the <code>compressedMinSize</code>,
* this message body is compressed.
*/
public transient int compressedMinSize;
public transient int compressionLevel = Deflater.BEST_SPEED;
/** the message delivery time value. */
public transient long deliveryTime;
/** The client connection identification */
public transient String clientID;
/**
* Sets the message destination.
*
* @param id The destination identifier.
* @param name The destination name.
* @param type The type of the destination.
*/
public final void setDestination(String id, String name, byte type) {
toId = id;
toName = name;
toType = type;
}
// /** Returns the message destination identifier. */
// public final String getDestinationId() {
// return toId;
// }
//
// /** Returns <code>true</code> if the destination is a queue. */
// public final String getDestinationType() {
// return toType;
// }
/** The reply to destination identifier. */
public transient String replyToId = null;
/** The reply to destination name. */
public transient String replyToName = null;
/** <code>true</code> if the "reply to" destination is a queue. */
public transient byte replyToType;
// /** Returns the destination id the reply should be sent to. */
// public final String getReplyToId() {
// return replyToId;
// }
//
// /** Returns <code>true</code> if the reply to destination is a queue. */
// public final String replyToType() {
// return replyToType;
// }
/**
* Sets the destination to which a reply should be sent.
*
* @param id The destination identifier.
* @param type The destination type.
*/
public final void setReplyTo(String id, String name, byte type) {
replyToId = id;
replyToName = name;
replyToType = type;
}
/** The correlation identifier field. */
public transient String correlationId = null;
/**
* Gets the correlation ID as an array of bytes for the message.
*
* @return the correlation ID for the message as an array of bytes.
*/
public final byte[] getJMSCorrelationIDAsBytes() {
if (correlationId != null)
return correlationId.getBytes();
return null;
}
/**
* Sets the correlation ID as an array of bytes for the message.
*
* @param correlationID the message ID value as an array of bytes.
*/
public final void setJMSCorrelationIDAsBytes(byte[] correlationID) {
if (correlationID != null)
this.correlationId = new String(correlationID);
else
this.correlationId = null;
}
/** The number of delivery attempts for this message. */
public transient int deliveryCount = 0;
/**
* convert serializable object to byte[]
*
* @param object the serializable object
* @return the byte array
* @throws IOException In case of error
*/
private byte[] toBytes(Serializable object) throws IOException {
if (object == null)
return null;
ByteArrayOutputStream baos = null;
ObjectOutputStream oos = null;
try {
baos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(baos);
oos.writeObject(object);
oos.flush();
return baos.toByteArray();
} finally {
if (oos != null)
oos.close();
if (baos != null)
baos.close();
}
}
/**
* convert byte[] to serializable object
*
* @param body the byte array
* @return the serializable object
* @throws Exception In case of error
*/
private Serializable fromBytes(byte[] body) throws Exception {
if (body == null)
return null;
ByteArrayInputStream bais = null;
ObjectInputStream ois = null;
Object obj = null;
try {
try {
bais = new ByteArrayInputStream(body);
ois = new ObjectInputStream(bais);
obj = ois.readObject();
} catch (ClassNotFoundException cnfexc) {
// Could not build serialized object: reason could be linked to
// class loaders hierarchy in an application server.
class Specialized_OIS extends ObjectInputStream {
Specialized_OIS(InputStream is) throws IOException {
super(is);
}
protected Class resolveClass(ObjectStreamClass osc) throws IOException, ClassNotFoundException {
String n = osc.getName();
return Class.forName(n, false, Thread.currentThread().getContextClassLoader());
}
}
bais = new ByteArrayInputStream(body);
ois = new Specialized_OIS(bais);
obj = ois.readObject();
}
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "ERROR: fromBytes(body) -> " + id, exc);
else
logger.log(BasicLevel.WARN, "ERROR: fromBytes(body) -> " + id);
throw exc;
} finally {
try {
ois.close();
} catch (Exception e) {}
try {
bais.close();
} catch (Exception e) {}
}
return (Serializable) obj;
}
/**
* Sets a String as the body of the message.
* @throws IOException In case of an error while setting the text
*/
public void setText(String text) throws IOException {
setBody(toBytes(text));
}
/**
* Returns the text body of the message.
* @throws Exception In case of an error while getting the text
*/
public String getText() throws Exception {
if (body == null) {
return null;
}
return (String) fromBytes(getBody());
}
/**
* Sets an object as the body of the message.
*
* @exception IOException In case of an error while setting the object.
*/
public void setObject(Serializable object) throws IOException {
type = Message.OBJECT;
setBody(toBytes(object));
}
/**
* Returns the object body of the message.
*
* @exception Exception In case of an error while getting the object.
*/
public Serializable getObject() throws Exception {
// TODO (AF): May be, we should verify that it is an Object message!!
return fromBytes(getBody());
}
/**
* Sets an AbstractAdminMessage as the body of the message.
*
* @exception IOException In case of an error while setting the object.
*/
public void setAdminMessage(AbstractAdminMessage adminMsg) throws IOException {
type = Message.ADMIN;
if (adminMsg == null) {
body = null;
} else {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
AbstractAdminMessage.write(adminMsg, baos);
baos.flush();
setBody(baos.toByteArray());
baos.close();
}
}
/**
* Returns the AbstractAdminMessage body of the message.
*
* @exception IOException In case of an error while getting the object.
* @exception ClassNotFoundException If the object class is unknown.
*/
public AbstractAdminMessage getAdminMessage() {
if (body == null) return null;
ByteArrayInputStream bais = null;
AbstractAdminMessage adminMsg = null;
try {
bais = new ByteArrayInputStream(getBody());
adminMsg = AbstractAdminMessage.read(bais);
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "ERROR: getAdminMessage()", e);
}
return adminMsg;
}
/**
* set the body.
* compress if the body length > compressedMinSize
*
* @param body a byte array
* @throws IOException if an I/O error has occurred
*/
public void setBody(byte[] body) throws IOException {
if (compressedMinSize > 0 && body != null && body.length > compressedMinSize) {
long length = body.length;
this.body = compress(body, compressionLevel);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, type + " : setBody: compressedMinSize = " + compressedMinSize +