Commit 598ad3f1 authored by Andre Freyssinet's avatar Andre Freyssinet

JORAM-323: Handling of temporary destinations (usage of DMQ on deletion).

JORAM-326: Allows to use strict values for counters.
parent bcde7202
......@@ -36,6 +36,7 @@ import java.util.Vector;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import org.objectweb.joram.mom.dest.AdminTopic.DestinationDesc;
import org.objectweb.joram.mom.messages.Message;
import org.objectweb.joram.mom.messages.MessageJMXWrapper;
import org.objectweb.joram.mom.messages.MessageView;
......@@ -414,6 +415,8 @@ public class Queue extends Destination implements QueueMBean {
if (dmqManager == null)
dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
// JORAM-326: Saves the queue to avoid bad counter value at restart.
if (strictCounters) setSave();
message.delete();
dmqManager.addDeadMessage(message.getFullMessage(), MessageErrorConstants.EXPIRED);
......@@ -643,6 +646,7 @@ public class Queue extends Destination implements QueueMBean {
* @param lastime true if the destination is deleted
*/
protected void finalize(boolean lastTime) {
setSave();
// averageLoadTask.cancel();
// averageLoadTask = null;
}
......@@ -883,6 +887,8 @@ public class Queue extends Destination implements QueueMBean {
// setSave();
nbMsgsDeniedSinceCreation += 1;
// JORAM-326: Saves the queue to avoid bad counter value at restart.
if (strictCounters) setSave();
iterator.remove();
if (not.isRedelivered())
......@@ -903,6 +909,7 @@ public class Queue extends Destination implements QueueMBean {
if (dmqManager == null)
dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
// JORAM-326: Already done above.
dmqManager.addDeadMessage(message.getFullMessage(), MessageErrorConstants.UNDELIVERABLE);
} else {
try {
......@@ -1085,10 +1092,11 @@ public class Queue extends Destination implements QueueMBean {
not.getRequestMsgId(),
not.getReplyMsgId());
} else if (adminRequest instanceof GetDeliveredMessages) {
replyToTopic(new GetNumberReply((int)nbMsgsDeliverSinceCreation),
not.getReplyTo(),
not.getRequestMsgId(),
not.getReplyMsgId());
// TODO (AF): Should be getNbMsgsDeliverSinceCreation?
replyToTopic(new GetNumberReply((int)nbMsgsDeliverSinceCreation),
not.getReplyTo(),
not.getRequestMsgId(),
not.getReplyMsgId());
} else {
super.handleAdminRequestNot(from, not);
}
......@@ -1135,6 +1143,7 @@ public class Queue extends Destination implements QueueMBean {
AgentId replyTo,
String requestMsgId,
String replyMsgId) {
// Sends remaining messages to DMQ
for (int i = 0; i < messages.size(); i++) {
Message message = (Message) messages.get(i);
if (message.getId().equals(request.getMessageId())) {
......@@ -1142,6 +1151,8 @@ public class Queue extends Destination implements QueueMBean {
message.delete();
DMQManager dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
// JORAM-326: Saves the queue to avoid bad counter value at restart.
if (strictCounters) setSave();
dmqManager.addDeadMessage(message.getFullMessage(), MessageErrorConstants.ADMIN_DELETED);
dmqManager.sendToDMQ();
break;
......@@ -1159,6 +1170,8 @@ public class Queue extends Destination implements QueueMBean {
Message message = (Message) messages.get(i);
message.delete();
nbMsgsSentToDMQSinceCreation++;
// JORAM-326: Saves the queue to avoid bad counter value at restart.
if (strictCounters) setSave();
dmqManager.addDeadMessage(message.getFullMessage(), MessageErrorConstants.ADMIN_DELETED);
}
dmqManager.sendToDMQ();
......@@ -1230,12 +1243,20 @@ public class Queue extends Destination implements QueueMBean {
org.objectweb.joram.shared.messages.Message message = (org.objectweb.joram.shared.messages.Message) msgs.next();
// set the destination name
message.setProperty("JoramDestinationName", getName());
// JORAM-323: If it is a temporary destination there is no need to persistent messages.
if (temporary) {
message.persistent = false;
}
// interceptors process
org.objectweb.joram.shared.messages.Message m = processInterceptors(message);
if (m == null) {
// send message to the DMQ
DMQManager dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
// JORAM-326: Saves the queue to avoid bad counter value at restart.
if (strictCounters) setSave();
dmqManager.addDeadMessage(message, MessageErrorConstants.INTERCEPTORS);
dmqManager.sendToDMQ();
new Message(message).releaseFullMessage();
......@@ -1312,6 +1333,8 @@ public class Queue extends Destination implements QueueMBean {
}
DMQManager dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
// JORAM-326: Saves the queue to avoid bad counter value at restart.
if (strictCounters) setSave();
dmqManager.addDeadMessage(msg.getFullMessage(), MessageErrorConstants.QUEUE_FULL);
dmqManager.sendToDMQ();
return;
......@@ -1413,6 +1436,8 @@ public class Queue extends Destination implements QueueMBean {
if (dmqManager == null)
dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
// JORAM-326: Saves the queue to avoid bad counter value at restart.
if (strictCounters) setSave();
dmqManager.addDeadMessage(message.getFullMessage(), MessageErrorConstants.UNDELIVERABLE);
} else {
try {
......@@ -1457,17 +1482,24 @@ public class Queue extends Destination implements QueueMBean {
"Requester " + rec.requester + " notified of the queue deletion.");
forward(rec.requester, excRep);
}
// Sending the remaining messages to the DMQ, if needed:
if (! messages.isEmpty()) {
Message message;
DMQManager dmqManager = new DMQManager(dmqId, getId());
while (! messages.isEmpty()) {
message = (Message) messages.remove(0);
message.delete();
nbMsgsSentToDMQSinceCreation++;
dmqManager.addDeadMessage(message.getFullMessage(), MessageErrorConstants.DELETED_DEST);
// JORAM-323: If it is a temporary destination do not send message to DMQ.
if (temporary && (! AgentServer.getBoolean(DestinationConstants.TMPQUEUE_DMQ_ON_DELETE))) {
logger.log(BasicLevel.INFO,
"Removes temporary queue (" + getName() + ") and deletes " + messages.size() + " remaining messages.");
} else {
// Sending the remaining messages to the DMQ, if needed:
if (! messages.isEmpty()) {
Message message;
DMQManager dmqManager = new DMQManager(dmqId, getId());
while (! messages.isEmpty()) {
message = (Message) messages.remove(0);
message.delete();
nbMsgsSentToDMQSinceCreation++;
dmqManager.addDeadMessage(message.getFullMessage(), MessageErrorConstants.DELETED_DEST);
}
dmqManager.sendToDMQ();
}
dmqManager.sendToDMQ();
}
// Deleting the messages:
......@@ -1569,6 +1601,8 @@ public class Queue extends Destination implements QueueMBean {
DMQManager dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
// JORAM-326: Saves the queue to avoid bad counter value at restart.
if (strictCounters) setSave();
dmqManager.addDeadMessage(message.getFullMessage(), MessageErrorConstants.QUEUE_FULL);
dmqManager.sendToDMQ();
return false;
......@@ -1705,6 +1739,8 @@ public class Queue extends Destination implements QueueMBean {
checkDelivery(message.getHeaderMessage())) {
message.incDeliveryCount();
nbMsgsDeliverSinceCreation++;
// JORAM-326: Saves the queue to avoid bad counter value at restart.
if (strictCounters) setSave();
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.getMessages() -> " + j + ',' + message.getId());
......@@ -1758,6 +1794,8 @@ public class Queue extends Destination implements QueueMBean {
if (checkDelivery(message.getHeaderMessage())) {
message.incDeliveryCount();
nbMsgsDeliverSinceCreation++;
// JORAM-326: Saves the queue to avoid bad counter value at restart.
if (strictCounters) setSave();
// use in sub class see ClusterQueue
messageDelivered(message.getId());
......@@ -1960,6 +1998,8 @@ public class Queue extends Destination implements QueueMBean {
// send message to the DMQ
DMQManager dmqManager = new DMQManager(dmqId, getId());
nbMsgsSentToDMQSinceCreation++;
// JORAM-326: Saves the queue to avoid bad counter value at restart.
if (strictCounters) setSave();
dmqManager.addDeadMessage(msg.getFullMessage(), MessageErrorConstants.INTERCEPTORS);
dmqManager.sendToDMQ();
msg.releaseFullMessage();
......@@ -2013,6 +2053,7 @@ public class Queue extends Destination implements QueueMBean {
protected fr.dyade.aaa.common.stream.Properties getStats() {
fr.dyade.aaa.common.stream.Properties stats = new fr.dyade.aaa.common.stream.Properties();
// TODO (AF): Should be getNbMsgsDeliverSinceCreation?
stats.put("NbMsgsDeliverSinceCreation", nbMsgsDeliverSinceCreation);
stats.put("PendingMessageCount", getPendingMessageCount());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment