Commit 43c7cb5f authored by Andre Freyssinet's avatar Andre Freyssinet

JORAM-377: Adds a task restarting the daemon if there is no message during

a period.
parent e453c161
/* /*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging * JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2017 - 2020 ScalAgent Distributed Technologies * Copyright (C) 2017 - 2021 ScalAgent Distributed Technologies
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public * modify it under the terms of the GNU Lesser General Public
...@@ -50,6 +50,8 @@ import org.objectweb.joram.shared.DestinationConstants; ...@@ -50,6 +50,8 @@ import org.objectweb.joram.shared.DestinationConstants;
* <li>acquisition.period: delay between to attempts to receive a message if the acquisition * <li>acquisition.period: delay between to attempts to receive a message if the acquisition
* queue doesn't use the asyncrhonous mode, by default 100 milliseconds.</li> * queue doesn't use the asyncrhonous mode, by default 100 milliseconds.</li>
* <li>jms.destination: the name of remote JMS destination.</li> * <li>jms.destination: the name of remote JMS destination.</li>
* <li>rest.checkPeriod: verification period in seconds of acquisition handler. If no message has been received
* during this period, the handler is restarted.</li>
* </ul> * </ul>
*/ */
public class RestAcquisitionQueue { public class RestAcquisitionQueue {
...@@ -71,7 +73,9 @@ public class RestAcquisitionQueue { ...@@ -71,7 +73,9 @@ public class RestAcquisitionQueue {
// Normally each consumer resource need to be explicitly closed, this parameter allows to set the idle time // Normally each consumer resource need to be explicitly closed, this parameter allows to set the idle time
// in seconds in which the consumer context will be closed if idle. // in seconds in which the consumer context will be closed if idle.
private long idleTimeout = 60; private long idleTimeout = 60;
// Verification period in seconds of acquisition handler. If no message has been received
// during this period, the handler is restarted.
private long checkPeriod = 30 * 60;
/** /**
* @return the hostName * @return the hostName
*/ */
...@@ -198,6 +202,23 @@ public class RestAcquisitionQueue { ...@@ -198,6 +202,23 @@ public class RestAcquisitionQueue {
return this; return this;
} }
/**
* @return the checkPeriod.
*/
public long getCheckPeriod() {
return checkPeriod;
}
/**
*
* @param checkPeriod the ckeckPeriod to set.
* @return
*/
public RestAcquisitionQueue setCheckPeriod(long checkPeriod) {
this.checkPeriod = checkPeriod;
return this;
}
/** /**
* Administration method creating and deploying a REST acquisition queue on the local server. * Administration method creating and deploying a REST acquisition queue on the local server.
* <p> * <p>
...@@ -323,6 +344,8 @@ public class RestAcquisitionQueue { ...@@ -323,6 +344,8 @@ public class RestAcquisitionQueue {
props.setProperty(DestinationConstants.TIMEOUT_PROP, "" + timeout); props.setProperty(DestinationConstants.TIMEOUT_PROP, "" + timeout);
if (!props.containsKey(DestinationConstants.IDLETIMEOUT_PROP)) if (!props.containsKey(DestinationConstants.IDLETIMEOUT_PROP))
props.setProperty(DestinationConstants.IDLETIMEOUT_PROP, "" + idleTimeout); props.setProperty(DestinationConstants.IDLETIMEOUT_PROP, "" + idleTimeout);
if (!props.containsKey(DestinationConstants.CHECKPERIOD_PROP))
props.setProperty(DestinationConstants.CHECKPERIOD_PROP, "" + checkPeriod);
if (!props.containsKey(DestinationConstants.ACQUISITION_PERIOD)) if (!props.containsKey(DestinationConstants.ACQUISITION_PERIOD))
props.setProperty(DestinationConstants.ACQUISITION_PERIOD, "" + acquisitionPeriod); props.setProperty(DestinationConstants.ACQUISITION_PERIOD, "" + acquisitionPeriod);
......
/* /*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging * JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2017 - 2020 ScalAgent Distributed Technologies * Copyright (C) 2017 - 2021 ScalAgent Distributed Technologies
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public * modify it under the terms of the GNU Lesser General Public
...@@ -33,6 +33,7 @@ import java.util.HashMap; ...@@ -33,6 +33,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.TimerTask;
import javax.jms.JMSContext; import javax.jms.JMSContext;
import javax.ws.rs.client.Client; import javax.ws.rs.client.Client;
...@@ -57,8 +58,10 @@ import org.objectweb.util.monolog.api.Logger; ...@@ -57,8 +58,10 @@ import org.objectweb.util.monolog.api.Logger;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
import fr.dyade.aaa.agent.AgentServer;
import fr.dyade.aaa.common.Daemon; import fr.dyade.aaa.common.Daemon;
import fr.dyade.aaa.common.Debug; import fr.dyade.aaa.common.Debug;
import fr.dyade.aaa.util.management.MXWrapper;
/** /**
* Asynchronous acquisition handler implementing the AcquisitionDaemon interface. * Asynchronous acquisition handler implementing the AcquisitionDaemon interface.
...@@ -94,10 +97,15 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { ...@@ -94,10 +97,15 @@ public class RestAcquisitionAsync implements AcquisitionDaemon {
private boolean mediaTypeJson = true; //default true, use "application/json" private boolean mediaTypeJson = true; //default true, use "application/json"
private boolean persistent = true; private boolean persistent = true;
// Verification period in seconds of acquisition handler. If no message has been received during this period,
// the handler is restarted.
private long checkPeriod = 30 * 60;
private TimerTask checkTask = null;
private XDaemon daemon = null; private XDaemon daemon = null;
@Override @Override
public void start(Properties properties, ReliableTransmitter transmitter) { public synchronized void start(Properties properties, ReliableTransmitter transmitter) {
this.properties = properties; this.properties = properties;
this.transmitter = transmitter; this.transmitter = transmitter;
...@@ -108,7 +116,12 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { ...@@ -108,7 +116,12 @@ public class RestAcquisitionAsync implements AcquisitionDaemon {
"Missing Destination JNDI name, should fixed property " + DestinationConstants.DESTINATION_NAME_PROP); "Missing Destination JNDI name, should fixed property " + DestinationConstants.DESTINATION_NAME_PROP);
return; return;
} }
if (checkPeriod > 0) {
checkTask = new CheckTask();
AgentServer.getTimer().schedule(checkTask, checkPeriod * 1000, checkPeriod * 1000);
}
daemon = new XDaemon("RestAcquisitionAsync.Daemon-" + destName, logger); daemon = new XDaemon("RestAcquisitionAsync.Daemon-" + destName, logger);
daemon.start(); daemon.start();
} }
...@@ -180,15 +193,29 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { ...@@ -180,15 +193,29 @@ public class RestAcquisitionAsync implements AcquisitionDaemon {
} }
if (properties.containsKey(DestinationConstants.TIMEOUT_PROP)) { if (properties.containsKey(DestinationConstants.TIMEOUT_PROP)) {
try {
timeout = properties.getProperty(DestinationConstants.TIMEOUT_PROP); timeout = properties.getProperty(DestinationConstants.TIMEOUT_PROP);
} catch (NumberFormatException exc) { } } else {
logger.log(BasicLevel.WARN,
"Missing property " + DestinationConstants.TIMEOUT_PROP + ", use default value: " + timeout);
} }
if (properties.containsKey(DestinationConstants.IDLETIMEOUT_PROP)) { if (properties.containsKey(DestinationConstants.IDLETIMEOUT_PROP)) {
try {
idleTimeout = properties.getProperty(DestinationConstants.IDLETIMEOUT_PROP); idleTimeout = properties.getProperty(DestinationConstants.IDLETIMEOUT_PROP);
} catch (NumberFormatException exc) { } } else {
logger.log(BasicLevel.WARN,
"Missing property " + DestinationConstants.IDLETIMEOUT_PROP + ", use default value: " + idleTimeout);
}
if (properties.containsKey(DestinationConstants.CHECKPERIOD_PROP)) {
try {
checkPeriod = Long.parseLong(properties.getProperty(DestinationConstants.CHECKPERIOD_PROP));
} catch (NumberFormatException exc) {
logger.log(BasicLevel.ERROR,
"Property " + DestinationConstants.CHECKPERIOD_PROP + " could not be parsed properly, use default value: " + checkPeriod, exc);
}
} else {
logger.log(BasicLevel.WARN,
"Missing property " + DestinationConstants.CHECKPERIOD_PROP + ", use default value: " + checkPeriod);
} }
} }
...@@ -205,6 +232,8 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { ...@@ -205,6 +232,8 @@ public class RestAcquisitionAsync implements AcquisitionDaemon {
if (uriCloseConsumer != null) if (uriCloseConsumer != null)
return; return;
nbCnx += 1;
URI base = UriBuilder.fromUri("http://" + hostName + ":" + port + "/joram/").build(); URI base = UriBuilder.fromUri("http://" + hostName + ":" + port + "/joram/").build();
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, logger.log(BasicLevel.DEBUG,
...@@ -290,14 +319,14 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { ...@@ -290,14 +319,14 @@ public class RestAcquisitionAsync implements AcquisitionDaemon {
if (userName != null) target = target.queryParam("user", userName); if (userName != null) target = target.queryParam("user", userName);
if (password != null) target = target.queryParam("password", password); if (password != null) target = target.queryParam("password", password);
} else { } else {
if (userName != null) auth.param("user", userName); if (userName != null) auth.param("user", userName);
if (password != null) auth.param("password", password); if (password != null) auth.param("password", password);
} }
if (useOldAPI) { if (useOldAPI) {
response = target.request().accept(MediaType.TEXT_PLAIN).post(null); response = target.request().accept(MediaType.TEXT_PLAIN).post(null);
} else { } else {
response = target.request().accept(MediaType.TEXT_PLAIN).post(Entity.entity(auth, MediaType.APPLICATION_FORM_URLENCODED)); response = target.request().accept(MediaType.TEXT_PLAIN).post(Entity.entity(auth, MediaType.APPLICATION_FORM_URLENCODED));
} }
} catch (Exception exc) { } catch (Exception exc) {
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
...@@ -827,7 +856,7 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { ...@@ -827,7 +856,7 @@ public class RestAcquisitionAsync implements AcquisitionDaemon {
logger.log(BasicLevel.DEBUG, "RestAcquisitionAsync.closeConsumer(): -> " + response.getStatus()); logger.log(BasicLevel.DEBUG, "RestAcquisitionAsync.closeConsumer(): -> " + response.getStatus());
} catch (Exception exc) { } catch (Exception exc) {
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.WARN, "RestAcquisitionAsync.closeConsumer()", exc); logger.log(BasicLevel.WARN, "RestAcquisitionAsync.closeConsumer()", exc);
else else
logger.log(BasicLevel.WARN, "RestAcquisitionAsync.closeConsumer() - " + exc.getMessage()); logger.log(BasicLevel.WARN, "RestAcquisitionAsync.closeConsumer() - " + exc.getMessage());
return; return;
...@@ -838,29 +867,83 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { ...@@ -838,29 +867,83 @@ public class RestAcquisitionAsync implements AcquisitionDaemon {
} }
@Override @Override
public void stop() { public synchronized void stop() {
if (checkTask != null) {
checkTask.cancel();
checkTask = null;
}
if ((daemon != null) && daemon.isRunning()) if ((daemon != null) && daemon.isRunning())
daemon.stop(); daemon.stop();
daemon = null; daemon = null;
} }
private class XDaemon extends Daemon { public synchronized void restart() {
if ((daemon != null) && daemon.isRunning())
daemon.stop();
daemon = null;
daemon = new XDaemon("RestAcquisitionAsync.Daemon-" + destName, logger);
daemon.start();
}
private long round = 0;
private long lastActivity = 0;
private long lastMessage = 0;
private int nbCnx = 0;
public boolean isConnected() {
return uriCloseConsumer != null;
}
/**
* Task checking the acquisition mechanism. If no message has been received during this
* period, the daemon is restarted.
*/
private class CheckTask extends TimerTask {
private long lastCheckMessage = -1;
@Override
public void run() {
if (checkTask == null) return;
if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO,
"RestAcquisitionAsync.CheckTask.run(): cnx=" + nbCnx + ", msg=" + lastMessage + " (" + round + '/' + lastActivity + ')');
// TODO (AF): The check is currently done on the number of messages acquired. This involves
// restarting the daemon when there is no message for a period. There are other indicators,
// but the number of messages acquired is the most reliable.
if (lastMessage != lastCheckMessage) {
lastCheckMessage = lastMessage;
return;
}
logger.log(BasicLevel.WARN, "RestAcquisitionAsync.CheckTask.run(): restart daemon.");
restart();
}
}
/**
* Daemon for acquisition mechanism.
*/
private class XDaemon extends Daemon {
protected XDaemon(String name, Logger logmon) { protected XDaemon(String name, Logger logmon) {
super(name, logmon); super(name, logmon);
setDaemon(true); setDaemon(true);
} }
private int errors = 0;
@Override @Override
public void run() { public void run() {
if (logger.isLoggable(BasicLevel.INFO)) if (logger.isLoggable(BasicLevel.INFO))
logger.log(BasicLevel.INFO, "RestAcquisitionAsync.Daemon.run(): starting"); logger.log(BasicLevel.INFO, "RestAcquisitionAsync.Daemon.run(): starting");
try { try {
int errors = 0;
while (running) { while (running) {
canStop = false; canStop = false;
round += 1;
try { try {
createConsumer(); createConsumer();
errors = 0; errors = 0;
...@@ -879,6 +962,8 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { ...@@ -879,6 +962,8 @@ public class RestAcquisitionAsync implements AcquisitionDaemon {
canStop = false; canStop = false;
continue; continue;
} }
lastActivity += 1;
Message msg = null; Message msg = null;
try { try {
...@@ -897,7 +982,8 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { ...@@ -897,7 +982,8 @@ public class RestAcquisitionAsync implements AcquisitionDaemon {
} }
if (msg == null) continue; if (msg == null) continue;
lastMessage += 1;
if (logger.isLoggable(BasicLevel.DEBUG)) if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "RestAcquisitionAsync.Daemon.run(): receives " + msg.id); logger.log(BasicLevel.DEBUG, "RestAcquisitionAsync.Daemon.run(): receives " + msg.id);
......
/* /*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging * JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2009 - 2020 ScalAgent Distributed Technologies * Copyright (C) 2009 - 2021 ScalAgent Distributed Technologies
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public * modify it under the terms of the GNU Lesser General Public
...@@ -72,6 +72,7 @@ public final class DestinationConstants { ...@@ -72,6 +72,7 @@ public final class DestinationConstants {
public static final String MEDIA_TYPE_JSON_PROP = "rest.mediaTypeJson"; public static final String MEDIA_TYPE_JSON_PROP = "rest.mediaTypeJson";
public static final String TIMEOUT_PROP = "rest.timeout"; public static final String TIMEOUT_PROP = "rest.timeout";
public static final String IDLETIMEOUT_PROP = "rest.idletimeout"; public static final String IDLETIMEOUT_PROP = "rest.idletimeout";
public static final String CHECKPERIOD_PROP = "rest.checkPeriod";
public static final String NB_MAX_MSG_PROP = "rest.maxMsgPerPeriod"; // TODO (AF): To be remove public static final String NB_MAX_MSG_PROP = "rest.maxMsgPerPeriod"; // TODO (AF): To be remove
public static final String TMPQUEUE_DMQ_ON_DELETE = "org.objectweb.joram.tempQ.dmqOnDel"; public static final String TMPQUEUE_DMQ_ON_DELETE = "org.objectweb.joram.tempQ.dmqOnDel";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment