Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
joram
joram
Commits
b367b363
Commit
b367b363
authored
Feb 04, 2011
by
Guillaume Surrel
Browse files
Fix array marshalling/unmarshalling.
parent
d6d4f306
Changes
2
Hide whitespace changes
Inline
Side-by-side
mom-amqp/org.ow2.joram.mom.amqp/src/main/java/org/ow2/joram/mom/amqp/marshalling/AMQPInputStream.java
View file @
b367b363
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2008 - 20
09
ScalAgent Distributed Technologies
* Copyright (C) 2008 - 20
11
ScalAgent Distributed Technologies
* Copyright (C) 2008 - 2009 CNES
*
* This library is free software; you can redistribute it and/or
...
...
@@ -27,10 +27,13 @@ import java.io.IOException;
import
java.io.InputStream
;
import
java.math.BigDecimal
;
import
java.math.BigInteger
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
org.objectweb.util.monolog.api.BasicLevel
;
import
org.objectweb.util.monolog.api.Logger
;
import
org.ow2.joram.mom.amqp.exceptions.SyntaxErrorException
;
...
...
@@ -112,206 +115,119 @@ public class AMQPInputStream {
return
result
;
}
public
final
Map
readTable
()
throws
IOException
,
SyntaxErrorException
{
clearBits
();
if
(
in
.
available
()
<
1
)
return
null
;
long
tableLength
=
StreamUtil
.
readUnsignedIntFrom
(
in
);
if
(
tableLength
<
1
)
return
null
;
Map
table
=
new
HashMap
();
InputStream
oldIn
=
in
;
in
=
new
TruncatedInputStream
(
this
.
in
,
tableLength
);
private
final
Object
readFieldValue
()
throws
IOException
,
SyntaxErrorException
{
Object
value
=
null
;
while
(
in
.
available
()
>
0
)
{
String
name
=
readShortstr
();
int
type
=
StreamUtil
.
readUnsignedByteFrom
(
in
);
switch
(
type
)
{
case
'S'
:
value
=
readLongstr
();
break
;
case
'I'
:
value
=
new
Integer
(
StreamUtil
.
readIntFrom
(
in
));
break
;
case
'D'
:
int
scale
=
StreamUtil
.
readUnsignedByteFrom
(
in
);
byte
[]
unscaled
=
StreamUtil
.
readByteArrayFrom
(
in
,
4
);
value
=
new
BigDecimal
(
new
BigInteger
(
unscaled
),
scale
);
break
;
case
'T'
:
value
=
readTimestamp
();
break
;
case
'F'
:
value
=
readTable
();
break
;
// Taken from AMQP 0.9.1 spec
case
'B'
:
value
=
new
Byte
(
StreamUtil
.
readByteFrom
(
in
));
break
;
case
'U'
:
value
=
new
Short
(
StreamUtil
.
readShortFrom
(
in
));
break
;
case
'L'
:
value
=
new
Long
(
StreamUtil
.
readLongFrom
(
in
));
break
;
case
'A'
:
value
=
readArray
();
break
;
case
'f'
:
value
=
new
Float
(
StreamUtil
.
readFloatFrom
(
in
));
break
;
case
'd'
:
value
=
new
Double
(
StreamUtil
.
readDoubleFrom
(
in
));
break
;
case
'b'
:
value
=
new
Byte
(
StreamUtil
.
readByteFrom
(
in
));
break
;
case
'l'
:
value
=
new
Long
(
StreamUtil
.
readLongFrom
(
in
));
break
;
case
't'
:
value
=
new
Boolean
(
readBoolean
());
break
;
case
'V'
:
value
=
null
;
break
;
case
's'
:
value
=
new
Boolean
(
StreamUtil
.
readShortStringFrom
(
in
));
break
;
default
:
throw
new
SyntaxErrorException
(
"Unrecognised object type in table: "
+
(
char
)
type
);
}
if
(!
table
.
containsKey
(
name
))
table
.
put
(
name
,
value
);
}
in
=
oldIn
;
return
table
;
}
private
final
Object
[]
readArray
()
throws
IOException
,
SyntaxErrorException
{
long
arrayLength
=
StreamUtil
.
readUnsignedIntFrom
(
in
);
if
(
arrayLength
<
1
)
return
null
;
// DataInputStream in = new DataInputStream(new TruncatedInputStream(in, arrayLength));
InputStream
oldIn
=
in
;
in
=
new
TruncatedInputStream
(
this
.
in
,
arrayLength
);
int
typeCode
=
StreamUtil
.
readUnsignedByteFrom
(
in
);
int
count
=
StreamUtil
.
readIntFrom
(
in
);
Object
[]
value
=
null
;
switch
(
typeCode
)
{
int
type
=
StreamUtil
.
readUnsignedByteFrom
(
in
);
switch
(
type
)
{
case
'S'
:
value
=
new
LongString
[
count
];
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
value
[
i
]
=
readLongstr
();
}
value
=
readLongstr
();
break
;
case
'I'
:
value
=
new
Integer
[
count
];
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
value
[
i
]
=
new
Integer
(
StreamUtil
.
readIntFrom
(
in
));
}
break
;
case
'U'
:
value
=
new
Short
[
count
];
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
value
[
i
]
=
new
Short
(
StreamUtil
.
readShortFrom
(
in
));
}
break
;
case
'B'
:
value
=
new
Byte
[
count
];
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
value
[
i
]
=
new
Byte
(
StreamUtil
.
readByteFrom
(
in
));
}
value
=
new
Integer
(
StreamUtil
.
readIntFrom
(
in
));
break
;
case
'D'
:
value
=
new
BigDecimal
[
count
];
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
int
scale
=
StreamUtil
.
readUnsignedByteFrom
(
in
);
byte
[]
unscaled
=
StreamUtil
.
readByteArrayFrom
(
in
,
4
);
value
[
i
]
=
new
BigDecimal
(
new
BigInteger
(
unscaled
),
scale
);
}
int
scale
=
StreamUtil
.
readUnsignedByteFrom
(
in
);
byte
[]
unscaled
=
StreamUtil
.
readByteArrayFrom
(
in
,
4
);
value
=
new
BigDecimal
(
new
BigInteger
(
unscaled
),
scale
);
break
;
case
'T'
:
value
=
new
Date
[
count
];
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
value
[
i
]
=
readTimestamp
();
}
value
=
readTimestamp
();
break
;
case
'F'
:
value
=
new
HashMap
[
count
];
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
value
[
i
]
=
readTable
();
}
value
=
readTable
();
break
;
case
'B'
:
value
=
new
Byte
(
StreamUtil
.
readByteFrom
(
in
));
break
;
case
'U'
:
value
=
new
Short
(
StreamUtil
.
readShortFrom
(
in
));
break
;
case
'L'
:
value
=
new
Long
[
count
];
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
value
[
i
]
=
new
Long
(
StreamUtil
.
readLongFrom
(
in
));
}
value
=
new
Long
(
StreamUtil
.
readLongFrom
(
in
));
break
;
case
'A'
:
value
=
new
Object
[
count
][];
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
value
[
i
]
=
readArray
();
}
value
=
readArray
();
break
;
case
'f'
:
value
=
new
Float
[
count
];
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
value
[
i
]
=
new
Float
(
StreamUtil
.
readFloatFrom
(
in
));
}
value
=
new
Float
(
StreamUtil
.
readFloatFrom
(
in
));
break
;
case
'd'
:
value
=
new
Double
[
count
];
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
value
[
i
]
=
new
Double
(
StreamUtil
.
readDoubleFrom
(
in
));
}
value
=
new
Double
(
StreamUtil
.
readDoubleFrom
(
in
));
break
;
case
'b'
:
value
=
new
Byte
[
count
];
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
value
[
i
]
=
new
Byte
(
StreamUtil
.
readByteFrom
(
in
));
}
value
=
new
Byte
(
StreamUtil
.
readByteFrom
(
in
));
break
;
case
'l'
:
value
=
new
Long
[
count
];
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
value
[
i
]
=
new
Long
(
StreamUtil
.
readLongFrom
(
in
));
}
value
=
new
Long
(
StreamUtil
.
readLongFrom
(
in
));
break
;
case
't'
:
value
=
new
Boolean
[
count
];
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
value
[
i
]
=
new
Boolean
(
readBoolean
());
}
value
=
new
Boolean
(
readBoolean
());
break
;
case
'V'
:
value
=
n
ew
Object
[
count
]
;
value
=
n
ull
;
break
;
case
's'
:
value
=
new
String
[
count
];
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
value
[
i
]
=
StreamUtil
.
readShortStringFrom
(
in
);
}
value
=
new
Boolean
(
StreamUtil
.
readShortStringFrom
(
in
));
break
;
default
:
throw
new
SyntaxErrorException
(
"Unrecognised type in table: "
+
(
char
)
type
Code
);
throw
new
SyntaxErrorException
(
"Unrecognised
object
type in table: "
+
(
char
)
type
);
}
return
value
;
}
if
(
in
.
available
()
>
0
)
{
throw
new
IOException
(
"Incorrect array size."
);
public
final
Map
readTable
()
throws
IOException
,
SyntaxErrorException
{
if
(
logger
.
isLoggable
(
BasicLevel
.
DEBUG
))
{
logger
.
log
(
BasicLevel
.
DEBUG
,
"readTable()"
);
}
clearBits
();
if
(
in
.
available
()
<
1
)
return
null
;
long
tableLength
=
StreamUtil
.
readUnsignedIntFrom
(
in
);
if
(
tableLength
<
0
)
throw
new
SyntaxErrorException
(
"Negative table size."
);
Map
table
=
new
HashMap
();
InputStream
oldIn
=
in
;
in
=
new
TruncatedInputStream
(
this
.
in
,
tableLength
);
Object
value
=
null
;
while
(
in
.
available
()
>
0
)
{
String
name
=
readShortstr
();
value
=
readFieldValue
();
if
(!
table
.
containsKey
(
name
))
table
.
put
(
name
,
value
);
}
in
=
oldIn
;
return
value
;
if
(
logger
.
isLoggable
(
BasicLevel
.
DEBUG
))
{
logger
.
log
(
BasicLevel
.
DEBUG
,
"readTable -> "
+
table
);
}
return
table
;
}
private
final
Object
[]
readArray
()
throws
IOException
,
SyntaxErrorException
{
if
(
logger
.
isLoggable
(
BasicLevel
.
DEBUG
))
{
logger
.
log
(
BasicLevel
.
DEBUG
,
"readArray()"
);
}
long
arrayLength
=
StreamUtil
.
readIntFrom
(
in
);
if
(
arrayLength
<
0
)
throw
new
SyntaxErrorException
(
"Negative array size."
);
InputStream
oldIn
=
in
;
in
=
new
TruncatedInputStream
(
this
.
in
,
arrayLength
);
List
<
Object
>
list
=
new
ArrayList
<
Object
>();
while
(
in
.
available
()
>
0
)
{
list
.
add
(
readFieldValue
());
}
in
=
oldIn
;
if
(
logger
.
isLoggable
(
BasicLevel
.
DEBUG
))
{
logger
.
log
(
BasicLevel
.
DEBUG
,
"readArray -> "
+
list
);
}
return
list
.
toArray
();
}
public
final
int
readOctet
()
throws
IOException
{
...
...
mom-amqp/org.ow2.joram.mom.amqp/src/main/java/org/ow2/joram/mom/amqp/marshalling/AMQPOutputStream.java
View file @
b367b363
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2008 - 20
09
ScalAgent Distributed Technologies
* Copyright (C) 2008 - 20
11
ScalAgent Distributed Technologies
* Copyright (C) 2008 - 2009 CNES
*
* This library is free software; you can redistribute it and/or
...
...
@@ -30,10 +30,12 @@ import java.io.UnsupportedEncodingException;
import
java.math.BigDecimal
;
import
java.math.BigInteger
;
import
java.sql.Timestamp
;
import
java.util.Arrays
;
import
java.util.Date
;
import
java.util.Iterator
;
import
java.util.Map
;
import
org.objectweb.util.monolog.api.BasicLevel
;
import
org.objectweb.util.monolog.api.Logger
;
import
fr.dyade.aaa.common.Debug
;
...
...
@@ -138,6 +140,9 @@ public class AMQPOutputStream {
}
public
final
void
writeTable
(
Map
table
)
throws
IOException
{
if
(
logger
.
isLoggable
(
BasicLevel
.
DEBUG
))
{
logger
.
log
(
BasicLevel
.
DEBUG
,
"writeTable -> "
+
table
);
}
bitflush
();
if
(
table
==
null
)
{
StreamUtil
.
writeTo
(
0
,
out
);
...
...
@@ -146,151 +151,75 @@ public class AMQPOutputStream {
for
(
Iterator
entries
=
table
.
entrySet
().
iterator
();
entries
.
hasNext
();)
{
Map
.
Entry
entry
=
(
Map
.
Entry
)
entries
.
next
();
writeShortstr
((
String
)
entry
.
getKey
());
Object
value
=
entry
.
getValue
();
if
(
value
==
null
)
{
writeOctet
(
'V'
);
}
else
if
(
value
instanceof
String
)
{
writeOctet
(
's'
);
writeShortstr
((
String
)
value
);
}
else
if
(
value
instanceof
LongString
)
{
writeOctet
(
'S'
);
writeLongstr
((
LongString
)
value
);
}
else
if
(
value
instanceof
Integer
)
{
writeOctet
(
'I'
);
writeLong
(((
Integer
)
value
).
intValue
());
}
else
if
(
value
instanceof
Short
)
{
writeOctet
(
'U'
);
writeShort
(((
Short
)
value
).
shortValue
());
}
else
if
(
value
instanceof
Byte
)
{
writeOctet
(
'b'
);
writeByte
(((
Byte
)
value
).
byteValue
());
}
else
if
(
value
instanceof
BigDecimal
)
{
writeOctet
(
'D'
);
BigDecimal
decimal
=
(
BigDecimal
)
value
;
writeOctet
(
decimal
.
scale
());
BigInteger
unscaled
=
decimal
.
unscaledValue
();
if
(
unscaled
.
bitLength
()
>
32
)
/* Integer.SIZE in Java 1.5 */
throw
new
IllegalArgumentException
(
"BigDecimal too large to be encoded"
);
writeLong
(
decimal
.
unscaledValue
().
intValue
());
}
else
if
(
value
instanceof
Date
)
{
writeOctet
(
'T'
);
writeTimestamp
((
Date
)
value
);
}
else
if
(
value
instanceof
Boolean
)
{
writeOctet
(
't'
);
writeBoolean
(((
Boolean
)
value
).
booleanValue
());
}
else
if
(
value
instanceof
Map
)
{
writeOctet
(
'F'
);
writeTable
((
Map
)
value
);
}
else
if
(
value
instanceof
Long
)
{
writeOctet
(
'l'
);
writeLonglong
(((
Long
)
value
).
longValue
());
}
else
if
(
value
instanceof
Double
)
{
writeOctet
(
'd'
);
writeDouble
(((
Double
)
value
).
doubleValue
());
}
else
if
(
value
instanceof
Float
)
{
writeOctet
(
'f'
);
writeFloat
(((
Float
)
value
).
floatValue
());
}
else
if
(
value
instanceof
Object
[])
{
writeOctet
(
'A'
);
writeArray
((
Object
[])
value
);
}
else
{
throw
new
IllegalArgumentException
(
"Invalid value type: "
+
value
.
getClass
().
getName
()
+
" for key "
+
entry
.
getKey
());
}
writeFieldValue
(
entry
.
getValue
());
}
}
}
private
void
writeArray
(
Object
[]
value
)
throws
IOException
{
int
len
=
value
.
length
;
StreamUtil
.
writeTo
(
arraySize
(
value
),
out
);
if
(
value
instanceof
String
[])
{
private
void
writeFieldValue
(
Object
value
)
throws
IOException
{
if
(
value
==
null
)
{
writeOctet
(
'V'
);
}
else
if
(
value
instanceof
String
)
{
writeOctet
(
's'
);
StreamUtil
.
writeTo
(
len
,
out
);
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
writeShortstr
((
String
)
value
[
i
]);
}
}
else
if
(
value
instanceof
LongString
[])
{
writeShortstr
((
String
)
value
);
}
else
if
(
value
instanceof
LongString
)
{
writeOctet
(
'S'
);
StreamUtil
.
writeTo
(
len
,
out
);
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
writeLongstr
((
LongString
)
value
[
i
]);
}
}
else
if
(
value
instanceof
Integer
[])
{
writeLongstr
((
LongString
)
value
);
}
else
if
(
value
instanceof
Integer
)
{
writeOctet
(
'I'
);
StreamUtil
.
writeTo
(
len
,
out
);
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
writeLong
(((
Integer
)
value
[
i
]).
intValue
());
}
}
else
if
(
value
instanceof
Short
[])
{
writeLong
(((
Integer
)
value
).
intValue
());
}
else
if
(
value
instanceof
Short
)
{
writeOctet
(
'U'
);
StreamUtil
.
writeTo
(
len
,
out
);
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
writeShort
(((
Short
)
value
[
i
]).
shortValue
());
}
}
else
if
(
value
instanceof
Byte
[])
{
writeShort
(((
Short
)
value
).
shortValue
());
}
else
if
(
value
instanceof
Byte
)
{
writeOctet
(
'b'
);
StreamUtil
.
writeTo
(
len
,
out
);
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
writeByte
(((
Byte
)
value
[
i
]).
byteValue
());
}
}
else
if
(
value
instanceof
BigDecimal
[])
{
writeByte
(((
Byte
)
value
).
byteValue
());
}
else
if
(
value
instanceof
BigDecimal
)
{
writeOctet
(
'D'
);
StreamUtil
.
writeTo
(
len
,
out
);
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
BigDecimal
decimal
=
(
BigDecimal
)
value
[
i
];
StreamUtil
.
writeTo
((
byte
)
decimal
.
scale
(),
out
);
BigInteger
unscaled
=
decimal
.
unscaledValue
();
if
(
unscaled
.
bitLength
()
>
32
)
/* Integer.SIZE in Java 1.5 */
throw
new
IllegalArgumentException
(
"BigDecimal too large to be encoded"
);
StreamUtil
.
writeTo
(
decimal
.
unscaledValue
().
intValue
(),
out
);
}
}
else
if
(
value
instanceof
Date
[])
{
BigDecimal
decimal
=
(
BigDecimal
)
value
;
writeOctet
(
decimal
.
scale
());
BigInteger
unscaled
=
decimal
.
unscaledValue
();
if
(
unscaled
.
bitLength
()
>
32
)
/* Integer.SIZE in Java 1.5 */
throw
new
IllegalArgumentException
(
"BigDecimal too large to be encoded"
);
writeLong
(
decimal
.
unscaledValue
().
intValue
());
}
else
if
(
value
instanceof
Date
)
{
writeOctet
(
'T'
);
StreamUtil
.
writeTo
(
len
,
out
);
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
writeTimestamp
((
Date
)
value
[
i
]);
}
}
else
if
(
value
instanceof
Boolean
[])
{
writeTimestamp
((
Date
)
value
);
}
else
if
(
value
instanceof
Boolean
)
{
writeOctet
(
't'
);
StreamUtil
.
writeTo
(
len
,
out
);
for
(
int
i
=
0
;
i
<
len
;
i
++
)
{
write
Boolean
(((
Boolean
)
value
[
i
]).
booleanValue
()
);
}
}
else
if
(
value
instanceof
Long
[]
)
{
writeBoolean
(((
Boolean
)
value
).
booleanValue
()
);
}
else
if
(
value
instanceof
Map
)
{
write
Octet
(
'F'
);
writeTable
((
Map
)
value
);
}
else
if
(
value
instanceof
Long
)
{
writeOctet
(
'l'
);
StreamUtil
.
writeTo
(
len
,
out
);
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
writeLonglong
(((
Long
)
value
[
i
]).
longValue
());
}
}
else
if
(
value
instanceof
Double
[])
{
writeLonglong
(((
Long
)
value
).
longValue
());
}
else
if
(
value
instanceof
Double
)
{
writeOctet
(
'd'
);
StreamUtil
.
writeTo
(
len
,
out
);
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
writeDouble
(((
Double
)
value
[
i
]).
doubleValue
());
}
}
else
if
(
value
instanceof
Float
[])
{
writeDouble
(((
Double
)
value
).
doubleValue
());
}
else
if
(
value
instanceof
Float
)
{
writeOctet
(
'f'
);
StreamUtil
.
writeTo
(
len
,
out
);
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
writeFloat
(((
Float
)
value
[
i
]).
floatValue
());
}
}
else
if
(
value
instanceof
Map
[])
{
writeOctet
(
'F'
);
StreamUtil
.
writeTo
(
len
,
out
);
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
writeTable
((
Map
)
value
[
i
]);
}
}
else
if
(
value
instanceof
Object
[][])
{
writeFloat
(((
Float
)
value
).
floatValue
());
}
else
if
(
value
instanceof
Object
[])
{
writeOctet
(
'A'
);
StreamUtil
.
writeTo
(
len
,
out
);
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
writeArray
((
Object
[])
value
[
i
]);
}
writeArray
((
Object
[])
value
);
}
else
{
throw
new
IllegalArgumentException
(
"invalid value in table: "
+
value
.
getClass
().
getName
());
throw
new
IllegalArgumentException
(
"Invalid value type: "
+
value
.
getClass
().
getName
());
}
}
private
void
writeArray
(
Object
[]
array
)
throws
IOException
{
if
(
logger
.
isLoggable
(
BasicLevel
.
DEBUG
))
{
logger
.
log
(
BasicLevel
.
DEBUG
,
"writeArray -> "
+
Arrays
.
toString
(
array
));
}
StreamUtil
.
writeTo
(
arraySize
(
array
),
out
);
if
(
logger
.
isLoggable
(
BasicLevel
.
DEBUG
))
{
logger
.
log
(
BasicLevel
.
DEBUG
,
"arrayLength -> "
+
arraySize
(
array
));
}
for
(
int
i
=
0
;
i
<
array
.
length
;
i
++)
{
writeFieldValue
(
array
[
i
]);
}
}
...
...
@@ -328,75 +257,48 @@ public class AMQPOutputStream {
long
acc
=
0
;
for
(
Iterator
entries
=
table
.
entrySet
().
iterator
();
entries
.
hasNext
();)
{
Map
.
Entry
entry
=
(
Map
.
Entry
)
entries
.
next
();
acc
+=
shortStrSize
((
String
)
entry
.
getKey
());
acc
+=
1
;