Commit 39e8a57a authored by afreyssin's avatar afreyssin
Browse files

Bug fixes: Avoids potential duplication of messages (JORAM-233, JORAM-234).

parent 9ccf294a
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2011 - 2014 ScalAgent Distributed Technologies
* Copyright (C) 2011 - 2015 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
......@@ -22,9 +22,7 @@
*/
package org.objectweb.joram.mom.dest;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
......@@ -45,16 +43,18 @@ public class DistributionDaemon extends Daemon {
private Queue distributeQueue;
private Queue ackQueue;
private TxDestination txDest;
private Destination dest;
private String acklistTxName;
public DistributionDaemon(DistributionHandler distributionHandler, String destinationName, TxDestination txDest) {
public DistributionDaemon(DistributionHandler distributionHandler, String destinationId, String destinationName, Destination dest) {
super("DistributionDaemon_" + destinationName, logger);
this.distributionHandler = distributionHandler;
this.acklistTxName = destinationId + "AL";
distributeQueue = new Queue();
ackQueue = new Queue();
this.txDest = txDest;
this.dest = dest;
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionDaemon<> distributionHandler = " + distributionHandler + ", txDest = " + txDest);
logger.log(BasicLevel.DEBUG, "DistributionDaemon<> distributionHandler = " + distributionHandler + ", txDest = " + dest);
}
class ComparatorMessage implements Comparator {
......@@ -130,23 +130,10 @@ public class DistributionDaemon extends Daemon {
distributionHandler.distribute(msg);
ackMessage(msg.id);
// transaction delete the message
String txName = txDest.getTxName(msg.id);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionDaemon run: txName(" + msg.id + ")=" + txName);
if (txName != null) {
org.objectweb.joram.mom.messages.Message momMsg = new org.objectweb.joram.mom.messages.Message(msg);
momMsg.setTxName(txName);
momMsg.delete();
AgentServer.getTransaction().begin();
AgentServer.getTransaction().commit(true);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "DistributionDaemon run: " + msg.id + " deleted.");
} else {
// The destination is a DistributionTopic.
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "DistributionDaemon run: txName == null for msg " + msg.id + " can't be delete.");
}
// transaction save the ack list
AgentServer.getTransaction().save(ackQueue.list(), acklistTxName);
AgentServer.getTransaction().begin();
AgentServer.getTransaction().commit(true);
} catch (Exception e) {
if (logger.isLoggable(BasicLevel.WARN))
logger.log(BasicLevel.WARN, "DistributionDaemon run()", e);
......@@ -198,14 +185,18 @@ public class DistributionDaemon extends Daemon {
distributeQueue.push(msg);
}
public List getAckList() {
List ackList = new ArrayList();
while (!ackQueue.isEmpty()) {
try {
ackList.add(ackQueue.getAndPop());
} catch (InterruptedException e) { }
}
return ackList;
public synchronized String getNextAck() {
if (!ackQueue.isEmpty()) {
try {
return (String) ackQueue.getAndPop();
} catch (InterruptedException exc) {
}
}
return null;
}
public synchronized void cleanAckList() {
ackQueue.clear();
}
public boolean isEmpty() {
......@@ -217,8 +208,8 @@ public class DistributionDaemon extends Daemon {
return true;
int threshold = 0;
if (txDest instanceof DistributionQueue) {
threshold =((DistributionQueue)txDest).getThreshold();
if (dest instanceof DistributionQueue) {
threshold =((DistributionQueue)dest).getThreshold();
}
if (logger.isLoggable(BasicLevel.DEBUG))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment