Commit 38a83c5c authored by Guillaume Surrel's avatar Guillaume Surrel
Browse files

New collector destination using the new destination extension mechanism.

parent 96acd333
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2008 - 2009 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package com.scalagent.joram.mom.dest.collector;
import java.io.IOException;
import java.util.Properties;
/**
*
*/
public interface Collector {
/**
* Check the resource depend of implementation.
* Store file in Queue or send to topic (collector destination.
*
* @throws IOException
*/
public void check() throws IOException;
/**
* set the collector destination.
*
* @param collectorDest the destination Queue or Topic.
*/
public void setCollectorDestination(CollectorDestination collectorDest);
/**
* set collector properties.
*
* @param properties The initial set of properties.
*/
public void setProperties(Properties properties);
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2008 - 2010 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package com.scalagent.joram.mom.dest.collector;
import fr.dyade.aaa.common.stream.Properties;
/**
*
*/
public interface CollectorDestination {
public static final String DEFAULT_COLLECTOR = "com.scalagent.joram.mom.dest.collector.URLCollector";
public static final String CLASS_NAME = "collector.className";
public static final String PERSISTENT_MSG = "persistent";
public static final String EXPIRATION_MSG = "expiration";
public static final String PRIORITY_MSG = "priority";
/**
* send message.
*
* @param type message type.
* @param body message body.
* @param properties message properties.
*/
public void sendMessage(int type, byte[] body, Properties properties);
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2008 - 2010 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package com.scalagent.joram.mom.dest.collector;
import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.Logger;
import fr.dyade.aaa.agent.Debug;
import fr.dyade.aaa.common.stream.Properties;
/**
*
*/
public class CollectorHelper {
public static Logger logger = Debug.getLogger(CollectorHelper.class.getName());
/**
* create shared message.
*
* @param type message type.
* @param body message body.
* @param prop message properties
* @param expiration message expiration.
* @param persistent is message persistent.
* @param identifier message identifier.
* @return shared Message.
*/
public static Message createMessage(
int type,
byte[] body,
Properties prop,
long expiration,
boolean persistent,
String identifier) {
Message msg = new Message();
// set message type
if (type > 0)
msg.type = type;
else
msg.type = Message.BYTES;
msg.body = body;
msg.properties = prop;
msg.id = identifier;
return msg;
}
/**
* create client message.
*
* @param msg the shared message
* @return ClientMessages.
*/
public static ClientMessages createClientMessages(Message msg) {
return new ClientMessages(-1, -1, msg);
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2008 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package com.scalagent.joram.mom.dest.collector;
import java.util.Properties;
import org.objectweb.joram.mom.dest.DestinationImpl;
import org.objectweb.joram.mom.dest.Queue;
import org.objectweb.util.monolog.api.Logger;
import fr.dyade.aaa.agent.AgentId;
import fr.dyade.aaa.agent.Debug;
/**
* A <code>CollectorQueue</code> agent is an agent hosting a MOM queue.
*/
public class CollectorQueue extends Queue {
/** define serialVersionUID for interoperability */
private static final long serialVersionUID = 1L;
public static Logger logger = Debug.getLogger(CollectorQueue.class.getName());
public static final String COLLECTOR_QUEUE_TYPE = "queue.collector";
public static String getDestinationType() {
return COLLECTOR_QUEUE_TYPE;
}
/**
* Empty constructor for newInstance().
*/
public CollectorQueue() {
fixed = true;
}
/**
* Creates the <tt>CollectorQueueImpl</tt>.
*
* @param adminId Identifier of the queue administrator.
* @param prop The initial set of properties.
*/
public DestinationImpl createsImpl(AgentId adminId, Properties prop) {
return new CollectorQueueImpl(adminId, prop);
}
/**
* Gives this agent an opportunity to initialize after having been deployed,
* and each time it is loaded into memory.
*
* @param firstTime true when first called by the factory
*
* @exception Exception
* unspecialized exception
*/
protected void agentInitialize(boolean firstTime) throws Exception {
super.agentInitialize(firstTime);
}
// /**
// * Distributes the received notifications to the appropriate reactions.
// * @throws Exception
// */
// public void react(AgentId from, Notification not) throws Exception {
// if (logger.isLoggable(BasicLevel.DEBUG))
// logger.log(BasicLevel.DEBUG, "CollectorQueue.react(" + from + ',' + not + ')');
//
// super.react(from, not);
// }
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2008 - 2010 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package com.scalagent.joram.mom.dest.collector;
import java.io.IOException;
import java.util.Enumeration;
import java.util.Properties;
import java.util.Vector;
import org.objectweb.joram.mom.dest.QueueImpl;
import org.objectweb.joram.mom.notifications.ClientMessages;
import org.objectweb.joram.mom.notifications.WakeUpNot;
import org.objectweb.joram.shared.messages.ConversionHelper;
import org.objectweb.joram.shared.messages.Message;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;
import fr.dyade.aaa.agent.AgentId;
import fr.dyade.aaa.agent.Debug;
/**
* The <code>CollectorQueueImpl</code> class implements the MOM collector queue behavior,
* basically storing messages and delivering them upon clients requests.
*/
public class CollectorQueueImpl extends QueueImpl implements CollectorDestination, CollectorQueueImplMBean {
/** define serialVersionUID for interoperability */
private static final long serialVersionUID = 1L;
public static Logger logger = Debug.getLogger(CollectorQueueImpl.class.getName());
private Collector collector;
private long count = 0;
/** Tells if the messages produced are persistent. */
private boolean isPersistent = false;
/**
* Returns true if the messages produced are persistent.
*
* @return true if the messages produced are persistent.
*/
public boolean isMessagePersistent() {
return isPersistent;
}
/**
* Sets the DeliveryMode value for the produced messages.
* if the parameter is true the messages produced are persistent.
*
* @param isPersistent if true the messages produced are persistent.
*/
public void setMessagePersistent(boolean isPersistent) {
this.isPersistent = isPersistent;
}
/** The priority of produced messages. */
private int priority = 4;
/**
* Returns the priority of produced messages.
*
* @return the priority of produced messages.
*/
public int getPriority() {
return priority;
}
/**
* Sets the priority of produced messages.
*
* @param priority the priority to set.
*/
public void setPriority(int priority) {
this.priority = priority;
}
/** The duration of produced messages. */
private long expiration = -1;
/**
* Returns the expiration value for produced messages.
*
* @return the expiration value for produced messages.
*/
public long getExpiration() {
return expiration;
}
/**
* Sets the expiration value for produced messages.
*
* @param expiration the expiration to set.
*/
public void setExpiration(long expiration) {
this.expiration = expiration;
}
/**
* Constructs a <code>CollectorQueueImpl</code> instance.
*
* @param adminId Identifier of the administrator of the queue.
* @param prop The initial set of properties.
*/
public CollectorQueueImpl(AgentId adminId, Properties properties) {
super(adminId, properties);
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "CollectorQueueImpl.<init> prop = " + properties );
if (properties != null) {
Enumeration e = properties.keys();
while (e.hasMoreElements()) {
String name = (String) e.nextElement();
try {
if (name.equals(WAKEUP_PERIOD)) {
//nothing to do, see DestinationImpl
} else if (name.equals(PERSISTENT_MSG))
isPersistent = ConversionHelper.toBoolean(properties.get(PERSISTENT_MSG));
else if (name.equals(PRIORITY_MSG))
priority = ConversionHelper.toInt(properties.get(PRIORITY_MSG));
else if (name.equals(EXPIRATION_MSG))
expiration = ConversionHelper.toLong(properties.get(EXPIRATION_MSG));
else if (name.equals(CLASS_NAME)) {
String className = ConversionHelper.toString(properties.get(CLASS_NAME));
if (className == null)
className = DEFAULT_COLLECTOR;
createCollector(className, properties);
}
} catch (Exception exc) {
logger.log(BasicLevel.ERROR, "CollectorQueueImpl.<init>: bad initialization.", exc);
}
}
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "CollectorQueueImpl.<init> period = " + getPeriod() + ", collector = " + collector);
}
}
private void createCollector(String className, Properties properties)
throws ClassNotFoundException, InstantiationException, IllegalAccessException {
Class clazz;
clazz = Class.forName(className);
collector = (Collector) clazz.newInstance();
collector.setCollectorDestination(this);
collector.setProperties(properties);
}
private Properties transform(fr.dyade.aaa.common.stream.Properties properties) {
if (properties == null)
return null;
Properties prop = new Properties();
Enumeration e = properties.keys();
while (e.hasMoreElements()) {
String key = (String) e.nextElement();
prop.put(key, properties.get(key));
}
return prop;
}
/**
* create wake up task and wake up this collector.
*
* @see org.objectweb.joram.mom.dest.QueueImpl#initialize(boolean)
*/
public void initialize(boolean firstTime) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "CollectorQueueImpl.initialize(" + firstTime + ')');
super.initialize(firstTime);
if (firstTime) {
try {
if (collector != null)
collector.check();
} catch (IOException e) {
// TODO Auto-generated catch block
}
}
}
/**
* wake up the collector (do check)
* and schedule task.
*/
public void wakeUpNot(WakeUpNot not) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "CollectorQueueImpl.collectorWakeUp()");
super.wakeUpNot(not);
try {
collector.check();
} catch (IOException e) {
// TODO Auto-generated catch block
}
}
/**
* update collector properties.
*
* @see org.objectweb.joram.mom.dest.DestinationImpl#preProcess(fr.dyade.aaa.agent.AgentId, org.objectweb.joram.mom.notifications.ClientMessages)
*/
public ClientMessages preProcess(AgentId from, ClientMessages cm) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "Change collector properties. preProcess(" + from + ", " + cm + ')');
long period = getPeriod();
Vector msgs = cm.getMessages();
for (int i=0; i<msgs.size(); i++) {
Message msg = (Message) msgs.elementAt(i);
if (msg.properties != null) {
Enumeration enumProperties = msg.properties.keys();
while (enumProperties.hasMoreElements()) {
String key = (String) enumProperties.nextElement();
try {
if (key.equals(WAKEUP_PERIOD))
period = ConversionHelper.toLong(msg.properties.get(WAKEUP_PERIOD));
else if (key.equals(PERSISTENT_MSG))
isPersistent = ConversionHelper.toBoolean(msg.properties.get(PERSISTENT_MSG));
else if (key.equals(PRIORITY_MSG))
priority = ConversionHelper.toInt(msg.properties.get(PRIORITY_MSG));
else if (key.equals(EXPIRATION_MSG))
expiration = ConversionHelper.toLong(msg.properties.get(EXPIRATION_MSG));
else if (key.equals(CLASS_NAME)) {
String className = ConversionHelper.toString(msg.properties.get(CLASS_NAME));
if (className == null)
className = DEFAULT_COLLECTOR;
createCollector(className, transform(msg.properties));
}
} catch (Exception exc) {
logger.log(BasicLevel.ERROR, "CollectorQueueImpl.<init>: bad configuration.", exc);
}
}
}
msg.properties = null;
}
setPeriod(period);
return null;
}
/**
* send message and properties.
* Here just construct the client messages and call storeClientMessage.
*
* @param type message type.
* @param body the message body.
* @param properties the message properties.
*
* @see com.scalagent.joram.mom.dest.collector.CollectorDestination#sendMessage(int, byte[], java.util.Properties)
*/
public void sendMessage(int type, byte[] body, fr.dyade.aaa.common.stream.Properties properties) {
if (logger.isLoggable(BasicLevel.DEBUG))
logger.log(BasicLevel.DEBUG, "CollectorQueueImpl.sendMessage(" + type + ", " + body + ", " + properties + ')');
// create shared message
Message msg = CollectorHelper.createMessage(
type,
body,
properties,
expiration,
isPersistent,
"collectorQueue_" + count);
// increment message counter
count++;
// create client message
ClientMessages clientMsgs = CollectorHelper.createClientMessages(msg);
// store message in this queue (CollectorQueue).
addClientMessages(clientMsgs);
}
public String toString() {
return "CollectorQueueImpl:" + getId().toString();
}
}
/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2008 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*