Commit 539bd5ae authored by David Feliot's avatar David Feliot
Browse files

JORAM-241: allows to set the offset and length of a message body.

parent 3d142248
......@@ -526,7 +526,11 @@ public final class Message implements Serializable, MessageView, Encodable {
out.writeBoolean(soft);
msg.writeHeaderTo(out);
StreamUtil.writeTo(msg.body, out);
if (msg.bodyLength < 0) {
StreamUtil.writeTo(msg.body, out);
} else {
StreamUtil.writeTo(msg.body, msg.bodyOffset, msg.bodyLength, out);
}
}
/**
......@@ -542,6 +546,10 @@ public final class Message implements Serializable, MessageView, Encodable {
msg = new org.objectweb.joram.shared.messages.Message();
msg.readHeaderFrom(in);
msg.body = StreamUtil.readByteArrayFrom(in);
msg.bodyOffset = 0;
if (msg.body != null) {
msg.bodyLength = msg.body.length;
}
}
public String getText() {
......
......@@ -193,7 +193,7 @@ public class AmqpDistribution implements DistributionHandler {
if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "Sending message on " + cnxName);
}
chan.basicPublish("", amqpQueue, props, message.body);
chan.basicPublish("", amqpQueue, props, message.getBody());
channels.get(cnxName); // Access the used connection to update the LRU map
return;
} catch (IOException exc) {
......
......@@ -76,6 +76,21 @@ public final class Message implements Cloneable, Serializable, Streamable, Encod
* on client side, used getBody and setBody instead of direct access to the body.
*/
public transient byte[] body = null;
/**
* The offset of the subarray in <code>body</code> to be used;
* must be non-negative and no larger than <code>array.length</code>.
*/
public transient int bodyOffset;
/**
* The length of the subarray in <code>body</code> to be used;
* must be non-negative and no larger than <code>array.length - offset</code>.
* Value <code>-1</code> means that there is no subarray in <code>body</code>
* and <code>bodyOffset</code> is ignored.
* Default value is <code>bodyOffset</code>.
*/
public transient int bodyLength = -1;
/** The message properties table. */
public transient Properties properties = null;
......@@ -434,6 +449,22 @@ public final class Message implements Cloneable, Serializable, Streamable, Encod
compressed = false;
this.body = body;
}
bodyOffset = 0;
if (this.body != null) {
bodyLength = this.body.length;
}
}
public void trimBody() {
if (body != null && bodyLength >= 0
&& (bodyOffset > 0 || bodyLength != body.length)) {
// Create a new byte array that fits the body
byte[] newBody = new byte[bodyLength];
System.arraycopy(body, bodyOffset, newBody, 0, bodyLength);
// Replace the body with the new body
body = newBody;
bodyOffset = 0;
}
}
/**
......@@ -444,8 +475,11 @@ public final class Message implements Cloneable, Serializable, Streamable, Encod
* @throws IOException if an I/O error has occurred
*/
public byte[] getBody() throws IOException {
trimBody();
if (compressed && (body != null)) {
body = uncompress(body);
bodyLength = body.length;
compressed = false;
}
return body;
......@@ -456,6 +490,7 @@ public final class Message implements Cloneable, Serializable, Streamable, Encod
*/
public void clearBody() {
body = null;
bodyLength = -1;
}
......@@ -471,7 +506,11 @@ public final class Message implements Cloneable, Serializable, Streamable, Encod
*/
public int getBodyLength() {
if (body == null) return 0;
return body.length;
if (bodyLength < 0) {
return body.length;
} else {
return bodyLength;
}
}
/**
......@@ -569,8 +608,15 @@ public final class Message implements Cloneable, Serializable, Streamable, Encod
Message clone = (Message) super.clone();
if (body != null) {
// AF: May be we can share the body as it should be RO.
clone.body = new byte[body.length];
System.arraycopy(body, 0, clone.body, 0, body.length);
if (bodyLength < 0) {
clone.body = new byte[body.length];
System.arraycopy(body, 0, clone.body, 0, body.length);
} else {
clone.body = new byte[bodyLength];
clone.bodyOffset = 0;
clone.bodyLength = bodyLength;
System.arraycopy(body, bodyOffset, clone.body, 0, bodyLength);
}
}
if (properties != null) {
clone.properties = (Properties) properties.clone();
......@@ -606,7 +652,11 @@ public final class Message implements Cloneable, Serializable, Streamable, Encod
*/
public void writeTo(OutputStream os) throws IOException {
writeHeaderTo(os);
StreamUtil.writeTo(body, os);
if (bodyLength < 0) {
StreamUtil.writeTo(body, os);
} else {
StreamUtil.writeTo(body, bodyOffset, bodyLength, os);
}
}
public void writeHeaderTo(OutputStream os) throws IOException {
......@@ -661,6 +711,9 @@ public final class Message implements Cloneable, Serializable, Streamable, Encod
public void readFrom(InputStream is) throws IOException {
readHeaderFrom(is);
body = StreamUtil.readByteArrayFrom(is);
if (body != null) {
bodyLength = body.length;
}
}
public void readHeaderFrom(InputStream is) throws IOException {
......@@ -802,7 +855,12 @@ public final class Message implements Cloneable, Serializable, Streamable, Encod
if (deliveryCount != 0) { encodedSize += INT_ENCODED_SIZE; }
if (jmsType != null) { encodedSize += EncodableHelper.getStringEncodedSize(jmsType); }
encodedSize += EncodableHelper.getNullableByteArrayEncodedSize(body);
if (bodyLength < 0) {
encodedSize += EncodableHelper.getNullableByteArrayEncodedSize(body);
} else {
encodedSize += EncodableHelper.getNullableByteArrayEncodedSize(body,
bodyLength);
}
encodedSize += EncodableHelper.getNullableStringEncodedSize(clientID);
return encodedSize;
......@@ -846,7 +904,11 @@ public final class Message implements Cloneable, Serializable, Streamable, Encod
if (deliveryCount != 0) { encoder.encodeUnsignedInt(deliveryCount); }
if (jmsType != null) { encoder.encodeString(jmsType); }
encoder.encodeNullableByteArray(body);
if (bodyLength < 0) {
encoder.encodeNullableByteArray(body);
} else {
encoder.encodeNullableByteArray(body, bodyOffset, bodyLength);
}
encoder.encodeNullableString(clientID);
}
......@@ -882,6 +944,10 @@ public final class Message implements Cloneable, Serializable, Streamable, Encod
persistent = (s & persistentFlag) != 0;
body = decoder.decodeNullableByteArray();
bodyOffset = 0;
if (body != null) {
bodyLength = body.length;
}
clientID = decoder.decodeNullableString();
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment