Commit 0ba511da authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

Fix: Remove static transaction variable, as it can change when agent server stops and restarts.

Fix: msgCounter was not properly initialized when loading messages from disk.
parent 4e76674d
......@@ -84,7 +84,6 @@ public class Queue implements Serializable {
// Use LinkedHashMap because ordering is necessary for round robin consumers
private transient Map<SubscriptionKey, Subscription> consumers = new LinkedHashMap<SubscriptionKey, Subscription>();
private static Transaction transaction = AgentServer.getTransaction();
public static String PREFIX_QUEUE = "Queue_";
private static final String PREFIX_MSG = "M.";
private static final String PREFIX_BOUND_EXCHANGE = "BE_";
......@@ -435,7 +434,7 @@ public class Queue implements Serializable {
}
if (durable) {
transaction.delete(PREFIX_QUEUE + Naming.getLocalName(queueName));
AgentServer.getTransaction().delete(PREFIX_QUEUE + Naming.getLocalName(queueName));
deleteAllMessage(toDeliver);
deleteAllMessage(toAck);
Iterator<String> iterBoundExchanges = boundExchanges.iterator();
......@@ -510,6 +509,7 @@ public class Queue implements Serializable {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.loadQueue(" + name + ')');
Transaction transaction = AgentServer.getTransaction();
// load Queue
Queue queue = (Queue) transaction.load(name);
try {
......@@ -528,7 +528,7 @@ public class Queue implements Serializable {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.loadQueue: msg.queueMsgId = " + msg.queueMsgId + ", name = " + list[i]);
queue.toDeliver.add(msg);
queue.msgCounter = msg.queueMsgId;
queue.msgCounter = msg.queueMsgId + 1;
}
}
if (logger.isLoggable(BasicLevel.DEBUG))
......@@ -549,7 +549,7 @@ public class Queue implements Serializable {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.saveQueue(" + queue + ')');
try {
transaction.create(queue, PREFIX_QUEUE + Naming.getLocalName(queue.name));
AgentServer.getTransaction().create(queue, PREFIX_QUEUE + Naming.getLocalName(queue.name));
} catch (IOException e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "Queue.saveQueue ERROR::", e);
......@@ -562,7 +562,7 @@ public class Queue implements Serializable {
logger.log(BasicLevel.DEBUG, "Queue.saveBoundExchange(" + exchange + ')');
try {
// replace / to . for transaction need.
transaction.create(exchange, prefixBE + exchange.replace('/', '.'));
AgentServer.getTransaction().create(exchange, prefixBE + exchange.replace('/', '.'));
} catch (IOException e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "Queue.saveBoundExchange ERROR::", e);
......@@ -574,7 +574,7 @@ public class Queue implements Serializable {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.deleteBoundExchange(" + exchangeName + ')');
// replace / to . for transaction need.
transaction.delete(prefixBE + exchangeName.replace('/', '.'));
AgentServer.getTransaction().delete(prefixBE + exchangeName.replace('/', '.'));
}
private void deleteAllMessage(TreeSet<Message> messages) {
......@@ -593,7 +593,7 @@ public class Queue implements Serializable {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.saveMessage(" + msg.queueMsgId + ')');
try {
transaction.create(msg, prefixMsg + msg.queueMsgId);
AgentServer.getTransaction().create(msg, prefixMsg + msg.queueMsgId);
} catch (IOException e) {
if (logger.isLoggable(BasicLevel.ERROR))
logger.log(BasicLevel.ERROR, "Queue.saveMessage ERROR::", e);
......@@ -604,7 +604,7 @@ public class Queue implements Serializable {
private void deleteMessage(long msgId) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.deleteMessage(" + msgId + ')');
transaction.delete(prefixMsg + msgId);
AgentServer.getTransaction().delete(prefixMsg + msgId);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment