Commit b30d5de7 authored by Andre Freyssinet's avatar Andre Freyssinet

Enhancements in the handling of delayed messages (JORAM-321, JORAM-322).

parent ad04270c
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2001 - 2018 ScalAgent Distributed Technologies
* Copyright (C) 2001 - 2019 ScalAgent Distributed Technologies
* Copyright (C) 1996 - 2000 Dyade
*
* This library is free software; you can redistribute it and/or
......@@ -109,7 +109,6 @@ public class Queue extends Destination implements QueueMBean {
public static Logger logmsg = Debug.getLogger(Queue.class.getName() + ".Message");
public static final String DELIVERY_TABLE_PREFIX = "DT_";
public static final String DELIVERY_TIME_TABLE_PREFIX = "DTT_";
public static final String ARRIVAL_STATE_PREFIX = "AS_";
/** Static value holding the default DMQ identifier for a server. */
......@@ -236,6 +235,7 @@ public class Queue extends Destination implements QueueMBean {
this.pause = pause;
setSave();
// Sends a QueueDeliveryTimeNot in order to trigger message delivery.
if (!pause)
Channel.sendTo(getId(), new QueueDeliveryTimeNot(null, false));
}
......@@ -255,9 +255,6 @@ public class Queue extends Destination implements QueueMBean {
/** List holding the requests before reply or expiry. */
protected List<ReceiveRequest> requests = new Vector();
/** Table keeping the message deliveries */
protected transient QueueDeliveryTable deliveryTimeTable;
public Queue() {
super();
}
......@@ -334,7 +331,6 @@ public class Queue extends Destination implements QueueMBean {
super.agentSave();
arrivalState.save();
deliveryTable.save();
deliveryTimeTable.save();
}
/**
......@@ -503,16 +499,15 @@ public class Queue extends Destination implements QueueMBean {
return 0;
}
transient int delayedMessageCount = 0;
/**
* Returns the number of messages delivery time.
* Returns the number of messages waiting for a delay.
*
* @return The number of messages delivery time.
* @return The number of messages waiting for a delay.
*/
public final int getDeliveryTimeMessageCount() {
if (deliveryTimeTable != null) {
return deliveryTimeTable.size();
}
return 0;
public final int getDelayedMessageCount() {
return delayedMessageCount;
}
protected long nbMsgsDeniedSinceCreation = 0;
......@@ -575,20 +570,20 @@ public class Queue extends Destination implements QueueMBean {
String arrivalStateTxName = ARRIVAL_STATE_PREFIX + getId().toString();
String deliveryTableTxName = DELIVERY_TABLE_PREFIX + getId().toString();
String deliveryTimeTableTxName = DELIVERY_TIME_TABLE_PREFIX + getId().toString();
if (firstTime) {
arrivalState = new QueueArrivalState(arrivalStateTxName);
deliveryTable = new QueueDeliveryTable(deliveryTableTxName);
deliveryTimeTable = new QueueDeliveryTable(deliveryTimeTableTxName);
return;
} else {
arrivalState = QueueArrivalState.load(arrivalStateTxName);
deliveryTable = QueueDeliveryTable.load(deliveryTableTxName);
deliveryTimeTable = new QueueDeliveryTable(deliveryTimeTableTxName);
}
// Retrieving the persisted messages, if any.
List persistedMsgs = Message.loadAll(getMsgTxPrefix().toString());
long currentTime = System.currentTimeMillis();
if (logmsg.isLoggable(BasicLevel.DEBUG))
logmsg.log(BasicLevel.DEBUG, getName() + ", retrieves messages " + currentTime);
if (persistedMsgs != null) {
Message persistedMsg;
......@@ -599,19 +594,18 @@ public class Queue extends Destination implements QueueMBean {
queueDelivery = deliveryTable.get(persistedMsg.getId());
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ": retrieves message " + persistedMsg.getId());
logmsg.log(BasicLevel.INFO, getName() + ": retrieves message " + persistedMsg.getId() + " -> " + persistedMsg.getDeliveryTime());
QueueDelivery queueDeliveryTime = deliveryTimeTable.get(persistedMsg.getId());
if (queueDelivery == null && queueDeliveryTime != null) {
if (persistedMsg.getDeliveryTime() > System.currentTimeMillis()) {
if (queueDelivery == null) {
if (persistedMsg.getDeliveryTime() > currentTime) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, getName() + ": schedule " + persistedMsg.getClientID());
AgentServer.getTimer().schedule(new QueueDeliveryTimeTask(getId(), persistedMsg, false),
new Date(persistedMsg.getDeliveryTime()));
logger.log(BasicLevel.DEBUG, getName() + ": schedule " + persistedMsg.getId());
delayedMessageCount += 1;
AgentServer.getTimer().schedule(new QueueDeliveryTimeTask(getId(), persistedMsg, false), new Date(persistedMsg.getDeliveryTime()));
} else {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, getName() + ": delay expire, Adds a message " +
persistedMsg.getClientID() + " in the list of messages to deliver.");
logger.log(BasicLevel.DEBUG,
getName() + ": delay expire, Adds a message " + persistedMsg.getId() + " in the list of messages to deliver.");
// Adds a message in the list of messages to deliver.
addMessage(persistedMsg, false);
//TODO: delete persistedMsg if the queue is full ?
......@@ -1283,8 +1277,6 @@ public class Queue extends Destination implements QueueMBean {
sharedMsg.deliveryTime = deliveryTime;
if (sharedMsg.deliveryTime > currentTime) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.doClientMessages: deliveryTimeTable.put " + msg + ')');
addDeliveryTimeMessage(msg, not.getClientContext(), throwsExceptionOnFullDest, false);
} else {
storeMessage(msg, throwsExceptionOnFullDest);
......@@ -1308,8 +1300,11 @@ public class Queue extends Destination implements QueueMBean {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.addDeliveryTimeMessage(" + msg + ", " + clientCtx + ')');
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ": gets new delayed message " + msg.getId() + ", " + msg.order);
// queue is full
if (nbMaxMsg > -1 && nbMaxMsg <= (messages.size() + deliveryTable.size() + deliveryTimeTable.size())) {
if (nbMaxMsg > -1 && nbMaxMsg <= (messages.size() + deliveryTable.size() + getDelayedMessageCount())) {
if (throwsExceptionOnFullDest && isSyncExceptionOnFullDest()) {
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "addDeliveryTimeMessage " + msg.getId() + " throws Exception: The queue \"" + getName() + "\" is full (syncExceptionOnFullDest).");
......@@ -1321,8 +1316,7 @@ public class Queue extends Destination implements QueueMBean {
dmqManager.sendToDMQ();
return;
}
// add to the delivery time table
deliveryTimeTable.put(msg.getId(), new QueueDelivery(getId(), clientCtx, msg));
if (isHeader) {
msg.saveHeader();
msg.releaseFullMessage();
......@@ -1334,21 +1328,27 @@ public class Queue extends Destination implements QueueMBean {
msg.releaseFullMessage();
}
}
delayedMessageCount += 1;
//schedule
AgentServer.getTimer().schedule(new QueueDeliveryTimeTask(getId(), msg, throwsExceptionOnFullDest),new Date(msg.getDeliveryTime()));
AgentServer.getTimer().schedule(new QueueDeliveryTimeTask(getId(), msg, throwsExceptionOnFullDest), new Date(msg.getDeliveryTime()));
}
void processDeliveryTime(AgentId from, QueueDeliveryTimeNot not) throws AccessException {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.processDeliveryTime(" + from + ", " + not + ')');
if (not.msg != null) {
if (logmsg.isLoggable(BasicLevel.INFO))
logmsg.log(BasicLevel.INFO, getName() + ": adds new delayed message " + not.msg.getId() + ", " + not.msg.order);
// Adds a message in the list of messages to deliver.
addMessage(not.msg, not.throwsExceptionOnFullDest);
delayedMessageCount -= 1;
//msg.order = arrivalState.getAndIncrementArrivalCount(msg.isPersistent());
// Remove msgId to the deliveryTimeTable
deliveryTimeTable.remove(not.msg.getId());
}
// Launching a delivery sequence:
......@@ -1559,7 +1559,7 @@ public class Queue extends Destination implements QueueMBean {
*/
protected final boolean addMessage(Message message, boolean throwsExceptionOnFullDest) throws AccessException {
if (nbMaxMsg > -1 && nbMaxMsg <= (messages.size() + deliveryTable.size() + deliveryTimeTable.size())) {
if (nbMaxMsg > -1 && nbMaxMsg <= (messages.size() + deliveryTable.size() + getDelayedMessageCount())) {
if (throwsExceptionOnFullDest && isSyncExceptionOnFullDest()) {
if (logger.isLoggable(BasicLevel.INFO))
......@@ -1970,8 +1970,6 @@ public class Queue extends Destination implements QueueMBean {
}
if (msg.getDeliveryTime() > currentTime) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Queue.doClientMessages: deliveryTimeTable.put " + msg + ')');
// TODO: We can not set the client context id, fix to -1. Is it a problem?
addDeliveryTimeMessage(msg, -1, throwsExceptionOnFullDest, false);
} else {
......
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2013 - 2016 ScalAgent Distributed Technologies
* Copyright (C) 2013 - 2019 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
......@@ -27,17 +27,16 @@ import org.objectweb.joram.mom.messages.Message;
import fr.dyade.aaa.agent.Notification;
/**
*
* Notification allowing to trigger delivery of delayed message.
*/
public class QueueDeliveryTimeNot extends Notification {
private static final long serialVersionUID = 1L;
public Message msg;
public boolean throwsExceptionOnFullDest = false;
public transient Message msg;
public transient boolean throwsExceptionOnFullDest = false;
public QueueDeliveryTimeNot(Message msg, boolean throwsExceptionOnFullDest) {
this.msg = msg;
this.throwsExceptionOnFullDest = throwsExceptionOnFullDest;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment