diff --git a/joram/joram/client/jms/src/main/java/org/objectweb/joram/client/jms/admin/RestAcquisitionQueue.java b/joram/joram/client/jms/src/main/java/org/objectweb/joram/client/jms/admin/RestAcquisitionQueue.java index 3806ea5f3b95c4734f1daf3db219af36d61ef4a6..a358e15ee75d5bfeac6da767d015d0aa99da0ff7 100644 --- a/joram/joram/client/jms/src/main/java/org/objectweb/joram/client/jms/admin/RestAcquisitionQueue.java +++ b/joram/joram/client/jms/src/main/java/org/objectweb/joram/client/jms/admin/RestAcquisitionQueue.java @@ -1,6 +1,6 @@ /* * JORAM: Java(TM) Open Reliable Asynchronous Messaging - * Copyright (C) 2017 - 2020 ScalAgent Distributed Technologies + * Copyright (C) 2017 - 2021 ScalAgent Distributed Technologies * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -50,6 +50,8 @@ import org.objectweb.joram.shared.DestinationConstants; *
@@ -323,6 +344,8 @@ public class RestAcquisitionQueue { props.setProperty(DestinationConstants.TIMEOUT_PROP, "" + timeout); if (!props.containsKey(DestinationConstants.IDLETIMEOUT_PROP)) props.setProperty(DestinationConstants.IDLETIMEOUT_PROP, "" + idleTimeout); + if (!props.containsKey(DestinationConstants.CHECKPERIOD_PROP)) + props.setProperty(DestinationConstants.CHECKPERIOD_PROP, "" + checkPeriod); if (!props.containsKey(DestinationConstants.ACQUISITION_PERIOD)) props.setProperty(DestinationConstants.ACQUISITION_PERIOD, "" + acquisitionPeriod); diff --git a/joram/joram/mom/extensions/restbridge/src/main/java/com/scalagent/joram/mom/dest/rest/RestAcquisitionAsync.java b/joram/joram/mom/extensions/restbridge/src/main/java/com/scalagent/joram/mom/dest/rest/RestAcquisitionAsync.java index 150123fafdd7ddeafde128d762b0232c5dec6829..710ce819e87277238648f57010cf512571590898 100644 --- a/joram/joram/mom/extensions/restbridge/src/main/java/com/scalagent/joram/mom/dest/rest/RestAcquisitionAsync.java +++ b/joram/joram/mom/extensions/restbridge/src/main/java/com/scalagent/joram/mom/dest/rest/RestAcquisitionAsync.java @@ -1,6 +1,6 @@ /* * JORAM: Java(TM) Open Reliable Asynchronous Messaging - * Copyright (C) 2017 - 2020 ScalAgent Distributed Technologies + * Copyright (C) 2017 - 2021 ScalAgent Distributed Technologies * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -33,6 +33,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.TimerTask; import javax.jms.JMSContext; import javax.ws.rs.client.Client; @@ -57,8 +58,10 @@ import org.objectweb.util.monolog.api.Logger; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import fr.dyade.aaa.agent.AgentServer; import fr.dyade.aaa.common.Daemon; import fr.dyade.aaa.common.Debug; +import fr.dyade.aaa.util.management.MXWrapper; /** * Asynchronous acquisition handler implementing the AcquisitionDaemon interface. @@ -94,10 +97,15 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { private boolean mediaTypeJson = true; //default true, use "application/json" private boolean persistent = true; + // Verification period in seconds of acquisition handler. If no message has been received during this period, + // the handler is restarted. + private long checkPeriod = 30 * 60; + private TimerTask checkTask = null; + private XDaemon daemon = null; @Override - public void start(Properties properties, ReliableTransmitter transmitter) { + public synchronized void start(Properties properties, ReliableTransmitter transmitter) { this.properties = properties; this.transmitter = transmitter; @@ -108,7 +116,12 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { "Missing Destination JNDI name, should fixed property " + DestinationConstants.DESTINATION_NAME_PROP); return; } - + + if (checkPeriod > 0) { + checkTask = new CheckTask(); + AgentServer.getTimer().schedule(checkTask, checkPeriod * 1000, checkPeriod * 1000); + } + daemon = new XDaemon("RestAcquisitionAsync.Daemon-" + destName, logger); daemon.start(); } @@ -180,15 +193,29 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { } if (properties.containsKey(DestinationConstants.TIMEOUT_PROP)) { - try { timeout = properties.getProperty(DestinationConstants.TIMEOUT_PROP); - } catch (NumberFormatException exc) { } + } else { + logger.log(BasicLevel.WARN, + "Missing property " + DestinationConstants.TIMEOUT_PROP + ", use default value: " + timeout); } if (properties.containsKey(DestinationConstants.IDLETIMEOUT_PROP)) { - try { idleTimeout = properties.getProperty(DestinationConstants.IDLETIMEOUT_PROP); - } catch (NumberFormatException exc) { } + } else { + logger.log(BasicLevel.WARN, + "Missing property " + DestinationConstants.IDLETIMEOUT_PROP + ", use default value: " + idleTimeout); + } + + if (properties.containsKey(DestinationConstants.CHECKPERIOD_PROP)) { + try { + checkPeriod = Long.parseLong(properties.getProperty(DestinationConstants.CHECKPERIOD_PROP)); + } catch (NumberFormatException exc) { + logger.log(BasicLevel.ERROR, + "Property " + DestinationConstants.CHECKPERIOD_PROP + " could not be parsed properly, use default value: " + checkPeriod, exc); + } + } else { + logger.log(BasicLevel.WARN, + "Missing property " + DestinationConstants.CHECKPERIOD_PROP + ", use default value: " + checkPeriod); } } @@ -205,6 +232,8 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { if (uriCloseConsumer != null) return; + nbCnx += 1; + URI base = UriBuilder.fromUri("http://" + hostName + ":" + port + "/joram/").build(); if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, @@ -290,14 +319,14 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { if (userName != null) target = target.queryParam("user", userName); if (password != null) target = target.queryParam("password", password); } else { - if (userName != null) auth.param("user", userName); - if (password != null) auth.param("password", password); + if (userName != null) auth.param("user", userName); + if (password != null) auth.param("password", password); } if (useOldAPI) { response = target.request().accept(MediaType.TEXT_PLAIN).post(null); } else { - response = target.request().accept(MediaType.TEXT_PLAIN).post(Entity.entity(auth, MediaType.APPLICATION_FORM_URLENCODED)); + response = target.request().accept(MediaType.TEXT_PLAIN).post(Entity.entity(auth, MediaType.APPLICATION_FORM_URLENCODED)); } } catch (Exception exc) { if (logger.isLoggable(BasicLevel.DEBUG)) @@ -827,7 +856,7 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { logger.log(BasicLevel.DEBUG, "RestAcquisitionAsync.closeConsumer(): -> " + response.getStatus()); } catch (Exception exc) { if (logger.isLoggable(BasicLevel.DEBUG)) - logger.log(BasicLevel.WARN, "RestAcquisitionAsync.closeConsumer()", exc); + logger.log(BasicLevel.WARN, "RestAcquisitionAsync.closeConsumer()", exc); else logger.log(BasicLevel.WARN, "RestAcquisitionAsync.closeConsumer() - " + exc.getMessage()); return; @@ -838,29 +867,83 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { } @Override - public void stop() { + public synchronized void stop() { + if (checkTask != null) { + checkTask.cancel(); + checkTask = null; + } + if ((daemon != null) && daemon.isRunning()) daemon.stop(); daemon = null; } - private class XDaemon extends Daemon { + public synchronized void restart() { + if ((daemon != null) && daemon.isRunning()) + daemon.stop(); + daemon = null; + daemon = new XDaemon("RestAcquisitionAsync.Daemon-" + destName, logger); + daemon.start(); + } + + private long round = 0; + private long lastActivity = 0; + private long lastMessage = 0; + private int nbCnx = 0; + + public boolean isConnected() { + return uriCloseConsumer != null; + } + + /** + * Task checking the acquisition mechanism. If no message has been received during this + * period, the daemon is restarted. + */ + private class CheckTask extends TimerTask { + private long lastCheckMessage = -1; + + @Override + public void run() { + if (checkTask == null) return; + + if (logger.isLoggable(BasicLevel.INFO)) + logger.log(BasicLevel.INFO, + "RestAcquisitionAsync.CheckTask.run(): cnx=" + nbCnx + ", msg=" + lastMessage + " (" + round + '/' + lastActivity + ')'); + + // TODO (AF): The check is currently done on the number of messages acquired. This involves + // restarting the daemon when there is no message for a period. There are other indicators, + // but the number of messages acquired is the most reliable. + if (lastMessage != lastCheckMessage) { + lastCheckMessage = lastMessage; + return; + } + + logger.log(BasicLevel.WARN, "RestAcquisitionAsync.CheckTask.run(): restart daemon."); + restart(); + } + } + + /** + * Daemon for acquisition mechanism. + */ + private class XDaemon extends Daemon { protected XDaemon(String name, Logger logmon) { super(name, logmon); setDaemon(true); } - private int errors = 0; - @Override public void run() { if (logger.isLoggable(BasicLevel.INFO)) logger.log(BasicLevel.INFO, "RestAcquisitionAsync.Daemon.run(): starting"); try { + int errors = 0; + while (running) { canStop = false; + round += 1; try { createConsumer(); errors = 0; @@ -879,6 +962,8 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { canStop = false; continue; } + + lastActivity += 1; Message msg = null; try { @@ -897,7 +982,8 @@ public class RestAcquisitionAsync implements AcquisitionDaemon { } if (msg == null) continue; - + lastMessage += 1; + if (logger.isLoggable(BasicLevel.DEBUG)) logger.log(BasicLevel.DEBUG, "RestAcquisitionAsync.Daemon.run(): receives " + msg.id); diff --git a/joram/joram/shared/src/main/java/org/objectweb/joram/shared/DestinationConstants.java b/joram/joram/shared/src/main/java/org/objectweb/joram/shared/DestinationConstants.java index 5ff6fe3e5f8ec6cedd0be357e89f639b5a032b53..8f3ed170986461c6212684ba5943b31e27c86d5f 100644 --- a/joram/joram/shared/src/main/java/org/objectweb/joram/shared/DestinationConstants.java +++ b/joram/joram/shared/src/main/java/org/objectweb/joram/shared/DestinationConstants.java @@ -1,6 +1,6 @@ /* * JORAM: Java(TM) Open Reliable Asynchronous Messaging - * Copyright (C) 2009 - 2020 ScalAgent Distributed Technologies + * Copyright (C) 2009 - 2021 ScalAgent Distributed Technologies * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public @@ -72,6 +72,7 @@ public final class DestinationConstants { public static final String MEDIA_TYPE_JSON_PROP = "rest.mediaTypeJson"; public static final String TIMEOUT_PROP = "rest.timeout"; public static final String IDLETIMEOUT_PROP = "rest.idletimeout"; + public static final String CHECKPERIOD_PROP = "rest.checkPeriod"; public static final String NB_MAX_MSG_PROP = "rest.maxMsgPerPeriod"; // TODO (AF): To be remove public static final String TMPQUEUE_DMQ_ON_DELETE = "org.objectweb.joram.tempQ.dmqOnDel";