Commit eabeda52 authored by Andre Freyssinet's avatar Andre Freyssinet

Allows to store byte[] in message properties.

parent 7bc833e1
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>joram</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.maven.ide.eclipse.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
<nature>org.maven.ide.eclipse.maven2Nature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>joram</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.m2e.core.maven2Builder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.m2e.core.maven2Nature</nature>
</natures>
</projectDescription>
......@@ -82,7 +82,7 @@ public class SerializableWrapper implements Encodable {
public void decode(Decoder decoder) throws Exception {
byte[] bytes = decoder.decodeByteArray();
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bais);
ObjectInputStream ois = new MigrationControlInputStream(bais);
value = (Serializable) ois.readObject();
}
......
......@@ -242,7 +242,7 @@ public class Properties implements Serializable, Cloneable, Encodable {
* @return the previous value of the specified key in this property object, or null if it did not have one.
*/
public Object setProperty(String key, Object value) throws ClassCastException {
if ((value instanceof Number) || (value instanceof String)) {
if ((value instanceof Boolean) || (value instanceof Number) || (value instanceof String)) {
return put(key, value);
}
throw new ClassCastException("Bad property value: " + value.getClass());
......
......@@ -1430,15 +1430,21 @@ public final class AgentServer {
try {
// then restores all messages.
String[] list = transaction.getList("@");
// BatchEngine and BatchNetwork need a global validate operation after insertion.
Vector<MessageConsumer> toValidate = new Vector<MessageConsumer>();
for (int i=0; i<list.length; i++) {
Message msg = Message.load(list[i]);
if (msg.getSource() == serverId) {
// The update has been locally generated, the message is ready to
// deliver to its consumer (Engine or Network component). So we have
// to insert it in the queue of this consumer.
// The update has been locally generated, the message is ready to deliver to its
// consumer (Engine or Network component). So we have to insert it in the queue
// of this consumer.
try {
getServerDesc(msg.getDest()).getDomain().insert(msg);
MessageConsumer cons = getServerDesc(msg.getDest()).getDomain();
cons.insert(msg);
// BatchEngine and BatchNetwork need a global validate operation after insertion.
if (! toValidate.contains(cons))
toValidate.add(cons);
} catch (UnknownServerException exc) {
logmon.log(BasicLevel.ERROR,
getName() + ", discard message to unknown server id#" +
......@@ -1469,6 +1475,9 @@ public final class AgentServer {
continue;
}
}
// BatchEngine and BatchNetwork need a global validate operation after insertion.
for (MessageConsumer cons : toValidate)
cons.validate();
} catch (ClassNotFoundException exc) {
logmon.log(BasicLevel.FATAL,
getName() + ", can't restore messages", exc);
......
......@@ -64,6 +64,7 @@ public class Channel {
logmon.log(BasicLevel.DEBUG, toString() + " created.");
}
// Temporary list of consumers involved in the current transaction and requiring a validate.
static Vector<MessageConsumer> consumers = null;
/**
......
......@@ -457,6 +457,7 @@ public abstract class AbstractTransaction extends BaseTransaction {
*/
@Override
public boolean useLoadAll() {
// TODO (AF): Changes when loadAll implementation will allows good filtering of names.
return false;
}
......
......@@ -276,8 +276,7 @@ public abstract class DBTransaction extends AbstractTransaction implements DBTra
*/
@Override
public boolean useLoadAll() {
// TODO (AF): Changes when loadAll implementation will allows good filtering of names.
return false;
return true;
}
/**
......@@ -294,7 +293,7 @@ public abstract class DBTransaction extends AbstractTransaction implements DBTra
try {
// Creating a statement lets us issue commands against the connection.
Statement s = conn.createStatement();
ResultSet rs = s.executeQuery("SELECT name, content FROM " + dbtable + " WHERE name LIKE '" + prefix + "%'");
ResultSet rs = s.executeQuery("SELECT name, content FROM " + dbtable + " WHERE ((name LIKE '" + prefix + "%') AND (name NOT LIKE '%B'))");
while (rs.next()) {
String name = rs.getString(1);
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2001 - 2019 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2020 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 Dyade
*
* This library is free software; you can redistribute it and/or
......@@ -549,8 +549,7 @@ public class Connection implements javax.jms.Connection, ConnectionMBean {
/**
* returns the list of IN message interceptors.
* <br>Each IN message interceptor is {@link MessageInterceptor#handle(javax.jms.Message) called}
* when {@link Session#receive() receiving} a message.
* <br>Each IN message interceptor is called when receiving a message.
* <br>The execution follows the order of the elements within the list.
* @return the list of the IN message interceptors.
*/
......@@ -559,8 +558,7 @@ public class Connection implements javax.jms.Connection, ConnectionMBean {
}
/**
* returns the list of OUT message interceptors.
* <br>Each OUT message interceptor is {@link MessageInterceptor#handle(javax.jms.Message) called}
* when {@link Session#send() sending} a message.
* <br>Each OUT message interceptor is called when sending a message.
* <br>The execution follows the order of the elements within the list.
* @return the list of the OUT message interceptors.
*/
......
......@@ -240,7 +240,11 @@ public class Queue extends Destination implements QueueMBean {
Channel.sendTo(getId(), new QueueDeliveryTimeNot(null, false));
}
/** <code>true</code> if all the stored messages have the same priority. */
/**
* <code>true</code> if all the stored messages have the same priority.
* Note: <code>messages</code> list is ordered by priorities, so we could test if
* first and last message have the same priority.
*/
private boolean samePriorities;
/** Number of stored messages with an expiration date. */
......@@ -594,7 +598,7 @@ public class Queue extends Destination implements QueueMBean {
messages = Message.loadAll(getMsgTxPrefix().toString(), Integer.MAX_VALUE);
long currentTime = System.currentTimeMillis();
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ", start retrieves messages " + currentTime);
logmsg.log(BasicLevel.INFO, getName() + ", start retrieves messages");
for (int index=0; index < messages.size(); ) {
Message persistedMsg = messages.get(index);
......@@ -603,6 +607,9 @@ public class Queue extends Destination implements QueueMBean {
QueueDelivery queueDelivery = deliveryTable.get(persistedMsg.getId());
if (queueDelivery == null) {
if (persistedMsg.hasExpiration())
nbExpirations += 1;
if (persistedMsg.getDeliveryTime() > currentTime) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, getName() + ": schedule delayed message " + persistedMsg.getId());
......@@ -620,12 +627,15 @@ public class Queue extends Destination implements QueueMBean {
getName() + ": Adds message " + persistedMsg.getId() + " in the list of messages to deliver.");
}
} else {
// The message has been delivered before stop.
queueDelivery.setMessage(persistedMsg);
if (isLocal(queueDelivery.getConsumerId())) {
// The delivery is aborted.
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, " -> deny " + persistedMsg.getId());
deliveryTable.remove(persistedMsg.getId());
if (persistedMsg.hasExpiration())
nbExpirations += 1;
} else {
// The delivery is always active, remove message from the list of messages to deliver.
messages.remove(index);
......@@ -637,9 +647,15 @@ public class Queue extends Destination implements QueueMBean {
index += 1;
}
currentTime = System.currentTimeMillis();
if (! messages.isEmpty()) {
if (messages.get(0).getPriority() == messages.get(messages.size() -1).getPriority()) {
samePriorities = true;
priority = messages.get(0).getPriority();
}
}
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ", end retrieves messages " + currentTime);
logmsg.log(BasicLevel.INFO, getName() + ", end retrieves messages -> " + (System.currentTimeMillis() - currentTime));
}
// protected void initialize(boolean firstTime) throws Exception {
......
......@@ -112,6 +112,18 @@ public final class Message implements Comparable<Message>, Serializable, Encodab
return msg;
}
/**
* Defines if the queue restoration at startup must use the loadAll transaction
* mechanism when it is implemented.
* <p>
* Default value is false.
* <p>
* This property can be fixed either from <code>java</code> launching
* command, or in <code>a3servers.xml</code> configuration file.
*/
private static final boolean useLoadALL =
AgentServer.getBoolean("org.objectweb.joram.mom.messages.USELOADALL");
/** SoftReference to the body of the MOM message. */
private transient SoftReference bodySoftRef = null;
......@@ -127,7 +139,7 @@ public final class Message implements Comparable<Message>, Serializable, Encodab
* Note: the message swapping can be finely configured using the
* <code>JMS_JORAM_SWAPALLOWED</code> property of the JMS message.
* <p>
* Theses properties can be fixed either from <code>java</code> launching
* This property can be fixed either from <code>java</code> launching
* command, or in <code>a3servers.xml</code> configuration file.
*/
private static final boolean globalUseSoftRef =
......@@ -543,7 +555,40 @@ public final class Message implements Comparable<Message>, Serializable, Encodab
// TODO (AF): Use DBTransaction optimized loadAll method to get all messages from a unique request.
// Currently this optimization results in an issue if messages swap is allowed, it needs to filter
// body objects that have the same prefix).
// if (! tx.useLoadAll()) {
if (tx.useLoadAll() && useLoadALL) {
// Retrieving the saved messages from persistency.
long start = System.currentTimeMillis();
Map<String, Message> msgsMap = new HashMap<String, Message>(32000);
tx.loadAll(msgTxname + "", msgsMap);
long end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: all messages loaded (" + msgsMap.size() + ") -> " + (end - start));
start = System.currentTimeMillis();
for (Map.Entry<String, Message> entry : msgsMap.entrySet()) {
String name = entry.getKey();
if (name.charAt(name.length() - 1) == 'B')
// Never happen, the corresponding objects are bytes arrays and throw exceptions in loadAll
continue;
Message msg = entry.getValue();
msg.txname = name;
if (msg.msg == null) {
// TODO (AF): retrieve body in Map && msg.soft ?
}
messages.add(msg);
}
end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: insert (" + messages.size() + ") -> " + (end - start));
start = System.currentTimeMillis();
// Sort messages list in reverse order to optimize queue restoration (JORAM-358)
Collections.sort(messages);
end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: sort -> " + (end - start));
} else {
// Retrieving the names of the persistence message previously saved.
long start = System.currentTimeMillis();
String[] names = tx.getList(msgTxname);
......@@ -633,40 +678,7 @@ public final class Message implements Comparable<Message>, Serializable, Encodab
end = System.currentTimeMillis();
logger.log(BasicLevel.INFO,
"Message.loadAll: sort -> " + (end - start));
// TODO (AF): Use of Transaction of loadAll
// } else {
// // Retrieving the saved messages from persistency.
// long start = System.currentTimeMillis();
// Map<String, Message> msgsMap = new HashMap<String, Message>(32000);
// tx.loadAll(msgTxname, msgsMap);
// long end = System.currentTimeMillis();
//
// logger.log(BasicLevel.INFO,
// "Message.loadAll: all messages loaded (" + msgsMap.size() + ") -> " + (end - start));
//
// start = System.currentTimeMillis();
// for (Map.Entry<String, Message> entry : msgsMap.entrySet()) {
// String name = entry.getKey();
// if (name.charAt(name.length() - 1) == 'B')
// continue;
// Message msg = entry.getValue();
// msg.txname = name;
// if (msg.msg == null) {
// // TODO (AF): retrieve body in Map && msg.soft ?
// }
// messages.add(msg);
// }
// end = System.currentTimeMillis();
// logger.log(BasicLevel.INFO,
// "Message.loadAll: insert (" + messages.size() + ") -> " + (end - start));
//
// start = System.currentTimeMillis();
// // Sort messages list in reverse order to optimize queue restoration (JORAM-358)
// Collections.sort(messages);
// end = System.currentTimeMillis();
// logger.log(BasicLevel.INFO,
// "Message.loadAll: sort -> " + (end - start));
// }
}
return messages;
}
......
......@@ -107,7 +107,7 @@ public final class Message implements Cloneable, Serializable, Streamable, Encod
/**
* Sets a property value.
* If the value is not a Java primitive object its string representation is used.
* If the value is not a Java primitive object (Boolean, Number, String or byte[]) its string representation is used.
*
* @param name The property name.
* @param value The property value.
......@@ -121,7 +121,7 @@ public final class Message implements Cloneable, Serializable, Streamable, Encod
if (properties == null)
properties = new Properties();
if (value instanceof Boolean || value instanceof Number || value instanceof String) {
if (value instanceof Boolean || value instanceof Number || value instanceof String || value instanceof byte[]) {
properties.put(name, value);
} else {
properties.put(name, value.toString());
......
# JORAM: Java(TM) Open Reliable Asynchronous Messaging
# Copyright (C) 2001 - 2012 ScalAgent Distributed Technologies
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA.
monolog.classname org.objectweb.util.monolog.wrapper.javaLog.LoggerFactory
# -----------------------------------------------------------------------
# tty : console handler
# -----------------------------------------------------------------------
handler.tty.type Console
handler.tty.output System.out
handler.tty.pattern %d : %O{1}.%M : %m%n
# -----------------------------------------------------------------------
# logf : rolling file handler
# -----------------------------------------------------------------------
handler.logf.type RollingFile
handler.logf.output server.log
handler.logf.pattern %h %l %d, %m%n
handler.logf.fileNumber 10
handler.logf.maxSize 10000000
# -----------------------------------------------------------------------
# logger definitions
# -----------------------------------------------------------------------
#logger.root.handler.0 tty
logger.root.handler.1 logf
logger.root.level ERROR
------
# JORAM
#-------
logger.fr.dyade.aaa.level WARN
#logger.fr.dyade.aaa.agent.Agent.level DEBUG
#logger.fr.dyade.aaa.agent.Engine.level DEBUG
#logger.fr.dyade.aaa.util.Transaction.level DEBUG
#logger.fr.dyade.aaa.agent.Network.level DEBUG
#logger.fr.dyade.aaa.agent.Service.level DEBUG
#logger.fr.dyade.aaa.jndi2.client.level DEBUG
#logger.fr.dyade.aaa.jndi2.server.level DEBUG
logger.org.objectweb.joram.level WARN
#logger.org.objectweb.joram.mom.level DEBUG
#logger.org.objectweb.joram.client.jms.level DEBUG
#logger.org.objectweb.joram.client.connector.level DEBUG
#logger.org.objectweb.joram.shared.level DEBUG
#logger.org.objectweb.joram.client.jms.Session.Message.level INFO
#logger.org.objectweb.joram.client.jms.Connection.tracker.level DEBUG
#logger.org.objectweb.joram.client.jms.Session.tracker.level DEBUG
#logger.org.ow2.joram.jmxconnector.level DEBUG
#logger.org.objectweb.joram.tools.rest.level INFO
# JORAM: Java(TM) Open Reliable Asynchronous Messaging
# Copyright (C) 2001 - 2012 ScalAgent Distributed Technologies
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA.
monolog.classname org.objectweb.util.monolog.wrapper.javaLog.LoggerFactory
# -----------------------------------------------------------------------
# tty : console handler
# -----------------------------------------------------------------------
handler.tty.type Console
handler.tty.output System.out
handler.tty.pattern %d : %O{1}.%M : %m%n
# -----------------------------------------------------------------------
# logf : rolling file handler
# -----------------------------------------------------------------------
handler.logf.type RollingFile
handler.logf.output server.log
handler.logf.pattern %h %l %d, %m%n
handler.logf.fileNumber 10
handler.logf.maxSize 10000000
# -----------------------------------------------------------------------
# logger definitions
# -----------------------------------------------------------------------
#logger.root.handler.0 tty
logger.root.handler.1 logf
logger.root.level ERROR
------
# JORAM
#-------
logger.fr.dyade.aaa.level INFO
#logger.fr.dyade.aaa.agent.Agent.level DEBUG
#logger.fr.dyade.aaa.agent.Engine.level DEBUG
logger.fr.dyade.aaa.util.Transaction.level DEBUG
#logger.fr.dyade.aaa.agent.Network.level DEBUG
#logger.fr.dyade.aaa.agent.Service.level DEBUG
#logger.fr.dyade.aaa.jndi2.client.level DEBUG
#logger.fr.dyade.aaa.jndi2.server.level DEBUG
logger.org.objectweb.joram.level WARN
#logger.org.objectweb.joram.mom.level DEBUG
#logger.org.objectweb.joram.client.jms.level DEBUG
#logger.org.objectweb.joram.client.connector.level DEBUG
#logger.org.objectweb.joram.shared.level DEBUG
#logger.org.objectweb.joram.client.jms.Session.Message.level INFO
#logger.org.objectweb.joram.client.jms.Connection.tracker.level DEBUG
#logger.org.objectweb.joram.client.jms.Session.tracker.level DEBUG
#logger.org.ow2.joram.jmxconnector.level DEBUG
#logger.org.objectweb.joram.tools.rest.level INFO
......@@ -144,6 +144,15 @@
<param name="osgi.conf" value="${conf.dir}/config_derby.properties"/>
</antcall>
</target>
<target name="efluid_server" depends="init"
description="--> Starts a single server with NGTransaction component">
<antcall target="server">
<param name="sid" value="0" />
<param name="a3.conf" value="${conf.dir}/efluid_a3servers.xml" />
<param name="osgi.conf" value="${conf.dir}/config.properties" />
</antcall>
</target>
<!-- Start a single server with JDBCTransaction and Derby data store -->
<!-- Be careful: Needs the derby.jar bundle -->
......@@ -646,6 +655,16 @@
<arg line="queue"/>
</java>
</target>
<!-- Runs the efluid consumer client for queue -->
<target name="efluid_consumer" depends="simple_init"
description="--> Starts a Consumer client">
<java classname="classic.efluidConsumer" failonerror="no" fork="yes"
dir="${run.dir}">
<classpath path="${project.class.path}" />
<arg line="queue" />
</java>
</target>
<!-- Runs the classic producer client for queue -->
<target name="producer_queue" depends="simple_init"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment