Commit e568adea authored by afreyssin's avatar afreyssin
Browse files

Bug fix: JORAM-198, JORAM-199 and JORAM-200.

parent 46ab6fdd
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 - ScalAgent Distributed Technologies
* Copyright (C) 2011 - 2014 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
......@@ -23,6 +23,7 @@
package org.objectweb.joram.mom.dest;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.objectweb.joram.shared.messages.Message;
......@@ -36,10 +37,6 @@ import fr.dyade.aaa.common.EmptyQueueException;
import fr.dyade.aaa.common.Queue;
public class DistributionDaemon extends Daemon {
/** define serialVersionUID for interoperability */
private static final long serialVersionUID = 1L;
public static Logger logger = Debug.getLogger(DistributionDaemon.class.getName());
/** Holds the distribution logic. */
......@@ -60,6 +57,41 @@ public class DistributionDaemon extends Daemon {
logger.log(BasicLevel.DEBUG, "DistributionDaemon<> distributionHandler = " + distributionHandler + ", txDest = " + txDest);
}
class ComparatorMessage implements Comparator {
@Override
public int compare(Object o1, Object o2) {
if (((Message) o1).id.equals(o2)) return 0;
return (o1.hashCode() - o2.hashCode());
}
}
class ComparatorString implements Comparator {
@Override
public int compare(Object o1, Object o2) {
if (o1.equals(o2)) return 0;
return (o1.hashCode() - o2.hashCode());
}
}
synchronized boolean isHandling(String id) {
if (distributeQueue.search(new ComparatorMessage(), id)) return true;
if (ackQueue.search(new ComparatorString(), id)) return true;
return false;
}
synchronized void ackMessage(String id) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionDaemon run: distributeQueue.pop = " + id);
// delete the message from the distributeQueue
distributeQueue.pop();
// add message id to the ackQueue
ackQueue.push(id);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionDaemon run: ackQueue.push : " + id);
}
public void run() {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionDaemon run()");
......@@ -96,16 +128,7 @@ public class DistributionDaemon extends Daemon {
try {
// distribute the message
distributionHandler.distribute(msg);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionDaemon run: distributeQueue.pop = " + msg.id);
// delete the message from the distributeQueue
distributeQueue.pop();
// add message id to the ackQueue
ackQueue.push(msg.id);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionDaemon run: ackQueue.push : " + msg.id);
ackMessage(msg.id);
// transaction delete the message
String txName = txDest.getTxName(msg.id);
......@@ -120,10 +143,10 @@ public class DistributionDaemon extends Daemon {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionDaemon run: " + msg.id + " deleted.");
} else {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "DistributionDaemon run: txName == null for msg " + msg.id + " can't be delete.");
// The destination is a DistributionTopic.
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "DistributionDaemon run: txName == null for msg " + msg.id + " can't be delete.");
}
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "DistributionDaemon run()", e);
......@@ -171,7 +194,7 @@ public class DistributionDaemon extends Daemon {
public void push(Message msg) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionDaemon distributeQueue.push(" + msg.id + ')');
logger.log(BasicLevel.DEBUG, "DistributionDaemon.push(" + msg.id + ')');
distributeQueue.push(msg);
}
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2010 - 2011 ScalAgent Distributed Technologies
* Copyright (C) 2010 - 2014 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
......@@ -220,15 +220,19 @@ public class DistributionQueue extends Queue {
logger.log(BasicLevel.WARN, "DistributionQueue.preProcess: distribution error.", exc);
}
} else {
// a processMessage exception is normal with async mode.
if (distributionDaemon != null) {
// use msg.id ?
distributionDaemon.push(msg);
} else {
if (logger.isLoggable(BasicLevel.WARN)) {
logger.log(BasicLevel.WARN, "DistributionQueue.preProcess: distribution distributionDaemon = null but we are in async distribution mode.", exc);
}
}
// Bug fix (JORAM-198): Avoid parallelism access during message serialization (save method).
// Do nothing, the handling of message is now in post-process.
// // a processMessage exception is normal with async mode.
// if (distributionDaemon != null) {
// // use msg.id ?
// distributionDaemon.push(msg);
// } else {
// if (logger.isLoggable(BasicLevel.WARN)) {
// logger.log(BasicLevel.WARN,
// "DistributionQueue.preProcess: distribution distributionDaemon = null in async mode.", exc);
// }
// }
}
// if we don't do batch distribution, stop on first error
if (!batchDistribution) {
......@@ -243,34 +247,76 @@ public class DistributionQueue extends Queue {
return null;
}
private void removeAndDeleteMessages(List ackList) {
logger.log(BasicLevel.DEBUG,
"DistributionQueue.wakeUpNot() - Handles AckList: " + ackList);
String id = null;
Iterator itMessages = ackList.iterator();
while (itMessages.hasNext()) {
id = (String) itMessages.next();
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "DistributionQueue.removeAndDeleteMessages() - Acked: " + id);
int i = 0;
org.objectweb.joram.mom.messages.Message message = null;
while (i < messages.size()) {
message = (org.objectweb.joram.mom.messages.Message) messages.get(i);
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "DistributionQueue.removeAndDeleteMessages() - handles: " + message.getId());
if (id.equals(message.getId())) {
messages.remove(i);
message.delete();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.removeAndDeleteMessages() - removes " + id);
break;
}
// Bug fix (JORAM-199): avoid infinite loop!!
i++;
}
}
}
@Override
protected void postProcess(ClientMessages msgs) {
super.postProcess(msgs);
protected void postProcess(ClientMessages cm) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.postProcess(...)");
super.postProcess(cm);
// Bug fix (JORAM-198): Avoid parallelism access during message serialization (save method).
// The handling of messages is no longer in pre-process.
if (isAsyncDistribution) {
List msgs = cm.getMessages();
for (Iterator ite = msgs.iterator(); ite.hasNext();) {
Message msg = (Message) ite.next();
// a processMessage exception is normal with async mode.
if (distributionDaemon != null) {
// use msg.id ?
distributionDaemon.push(msg);
} else {
if (logger.isLoggable(BasicLevel.WARN)) {
logger.log(BasicLevel.WARN,
"DistributionQueue.postProcess: distribution distributionDaemon = null in async mode.");
}
}
}
}
if (distributionDaemon != null) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.postProcess(...)");
// Cleans message list using ackList from daemon.
List ackList = distributionDaemon.getAckList();
if (ackList != null) {
// Bug fix (JORAM-74): delete anew the forwarded messages
// Replaces the call to removeMessages(ackList) by a similar code deleting the
// related messages.
String id = null;
Iterator itMessages = ackList.iterator();
while (itMessages.hasNext()) {
id = (String) itMessages.next();
int i = 0;
org.objectweb.joram.mom.messages.Message message = null;
while (i < messages.size()) {
message = (org.objectweb.joram.mom.messages.Message) messages.get(i);
if (id.equals(message.getId())) {
messages.remove(i);
message.delete();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.postProcess removes " + id);
break;
}
}
}
// Bug fix (JORAM-74): delete anew the forwarded messages, replacing the call to removeMessages(ackList)
// by a similar code deleting the related messages. Since the fix of JORAM-198 it should not be longer
// useful.
removeAndDeleteMessages(ackList);
}
}
}
......@@ -283,39 +329,23 @@ public class DistributionQueue extends Queue {
* wake up, and cleans the queue.
*/
public void wakeUpNot(WakeUpNot not) {
if (logger.isLoggable(BasicLevel.DEBUG) && !isAsyncDistribution) {
if (logger.isLoggable(BasicLevel.DEBUG) && !isAsyncDistribution) {
logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot(" + not + ')');
}
// Cleans outdated waiting messages
}
// Cleans outdated waiting messages
super.wakeUpNot(not);
// delete the ackQueue
if (distributionDaemon != null) {
List ackList = distributionDaemon.getAckList();
if (ackList != null) {
// Bug fix (JORAM-74): delete anew the forwarded messages
// Replaces the call to removeMessages(ackList) by a similar code deleting the
// related messages.
String id = null;
Iterator itMessages = ackList.iterator();
while (itMessages.hasNext()) {
id = (String) itMessages.next();
int i = 0;
org.objectweb.joram.mom.messages.Message message = null;
while (i < messages.size()) {
message = (org.objectweb.joram.mom.messages.Message) messages.get(i);
if (id.equals(message.getId())) {
messages.remove(i);
message.delete();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot removes " + id);
break;
}
}
}
}
List ackList = distributionDaemon.getAckList();
if (ackList != null) {
// Bug fix (JORAM-74): delete anew the forwarded messages, replacing the call to removeMessages(ackList)
// by a similar code deleting the related messages. Since the fix of JORAM-198 it should not be longer
// useful.
removeAndDeleteMessages(ackList);
}
}
for (Iterator ite = messages.iterator(); ite.hasNext();) {
org.objectweb.joram.mom.messages.Message msg = (org.objectweb.joram.mom.messages.Message) ite.next();
try {
......@@ -325,16 +355,17 @@ public class DistributionQueue extends Queue {
msg.delete();
} catch (Exception exc) {
if (logger.isLoggable(BasicLevel.DEBUG) && !isAsyncDistribution) {
logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot redelivery number " + msg.getDeliveryCount()
+ " failed.", exc);
logger.log(BasicLevel.DEBUG,
"DistributionQueue.wakeUpNot redelivery number " + msg.getDeliveryCount() + " failed.", exc);
} else if (logger.isLoggable(BasicLevel.DEBUG)) {
logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot redelivery " + msg.getId() + " number " + msg.getDeliveryCount());
logger.log(BasicLevel.DEBUG,
"DistributionQueue.wakeUpNot redelivery " + msg.getId() + " number " + msg.getDeliveryCount());
}
// increase the delivery count
if (distributionDaemon == null)
msg.incDeliveryCount();
msg.incDeliveryCount();
// If message considered as undeliverable, add it to the list of dead messages:
if (isUndeliverable(msg)) {
if (logger.isLoggable(BasicLevel.DEBUG)) {
......@@ -348,30 +379,36 @@ public class DistributionQueue extends Queue {
dmqManager.sendToDMQ();
continue;
}
if (distributionDaemon != null) {
synchronized (distributionDaemon) {
// Wakeup the daemon, because the distributionDaemon can wait
// after a distribution exception
distributionDaemon.notify();
}
synchronized (distributionDaemon) {
// Wakeup the daemon, because the distributionDaemon can wait
// after a distribution exception
distributionDaemon.notify();
}
}
if (logger.isLoggable(BasicLevel.DEBUG) && distributionDaemon != null) {
logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot distributionDaemon = " + distributionDaemon +
", distributionDaemon.isEmpty() = " + distributionDaemon.isEmpty());
", distributionDaemon.isEmpty() = " + distributionDaemon.isEmpty());
}
if (distributionDaemon != null) {
if (!distributionDaemon.isEmpty()) {
// needless to push an other message
// because the distributionDaemon can't distribute message now.
break;
} else {
distributionDaemon.push(msg.getFullMessage());
}
if (!distributionDaemon.isEmpty()) {
// needless to push an other message
// because the distributionDaemon can't distribute message now.
break;
} else {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionQueue.wakeUpNot " + msg.getId());
// Bug fix (JORAM-200): Avoid to duplicate a message already known by the daemon (either in
// the distributeQueue or the ackedQueue).
if (! distributionDaemon.isHandling(msg.getId()))
distributionDaemon.push(msg.getFullMessage());
}
}
if (!batchDistribution) {
break;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment