Commit 7aed9510 authored by Kyriakos Kritikos's avatar Kyriakos Kritikos
Browse files

Provided support for starting metric collection via ZeroMQ + slight change to...

Provided support for starting metric collection via ZeroMQ + slight change to MC interface + logging
parent b97d13fe
Compilation
As this is a maven project, it is just essential to execute the following command: "mvn clean install" at the command line or via any software development toolkit in order to compile the code.
Execution
The code can be executed by typing "mvn exec:java -Deu.paasage.configdir=<path>" where <path> denotes the path to the directory in which two configuration files are situated: (a) the properties files required by the encompassing CDOClient (of some components) in order to be able to correctly establish sessions and communicate with the CDOServer which is named as "eu.paasage.mddb.cdo.client.properties" and (b) the properties file required for the proper configuration of the MetricCollector named as "eu.paasage.executionware.metric-collector.properties", including information about which TSDB is used at which port, what is the mode of aggregation, whether a publication server and a subscription client (for starting metric collections) must be run in case of a global MetricCollector and how to interact with the MetricCollector (see next sub-section).
Pre-Requisites
The MetricCollector exploits a particular TSDB in order to store and aggregate over measurements. Thus, this TSDB should be already running and the MetricCollector should be configured to properly connect to it. Two TSDB implementations are currently supported, InfluxDB and KairosDB. The version of InfluxDB supported is 0.8.8. For KairosDB, the latest version is supported 0.9.4.
As surely for a global MetricCollector but in most of the cases for a local MetricCollector, a CDOServer should also exist and be running as the measurements of top-level metrics or metrics needed for computing global metrics have to be stored also in it.
Configuration
The main configuration file of a Metric Collector, named as "eu.paasage.executionware.metric-collector.properties", contains the following properties that can be configured:
- db: it can take the values of influx or kairos mapping to the two TSDBs that are currently supported, i.e., InfluxDB and KairosDB, respectively.
- host: the name of the host where the TSDB resides
- port: the port number on which the TSDB listens
- mode: the operation mode of the MetricCollector which can take the values of either local or global
- runServer: a boolean value is expected here to denote whether a publication server should also be executed in order to publish measurements for (top-level) metrics. By default the value is false. This parameter should only be set on in the case of a global MetricCollector.
- runClient: a boolean value is expected here to denote whether a subscription client should also be executed in order to receive requests for measuring collections of metrics on behalf of a certain application instance. By default the value is false. This parameter should only be set on in the case of a global MetricCollector.
- directCall: a boolean value is expected to denote whether the public methods of the MetricCollector are to be called in a direct or indirect manner (e.g., via command file). By default this property takes the value of false.
- dynamic: a boolean value is expected to denote whether the public methods are dynamically called by changing the content of the command file. Obviously, this property should only be set on if the value of the directCall configuration property is false. By default, this property takes the value of false.
Interaction ways
The MetricCollector can be executed either directly or indirectly. The direct execution is the obvious case where a specific instance of the MetricCollector class is generated in a specific encompassing code and the respective methods of this instance are directly executed by this code. There are two ways you can indirectly execute a MetricCollector:
(a) via the command file which is named as " eu.paasage.executionware.metric-collector.command". This file is situated in the same directory as the one used to store the two required properties file for the configuration of the MetricCollector;
(b) by issuing/publishing startCollection messages which include the CDOID of the respective execution context at the 5550 local port - the MetricCollector should have the runClient configuration property set on in order to run its subscription client and listen to these messages. The indirect interaction in this way is limited to only the starting of a metric collection measurement for the moment.
To indicate one of the two main interaction ways (direct/indirect) to select, there are two properties in the "eu.paasage.executionware.metric-collector.properties" that can be set: (a) directCall which indicates whether the MetricCollector can be directly called or not and (b) in case of indirect interaction, the dynamic property used to indicate whether the interaction is dynamic or not (first indirect way) or the runClient configuration property (as indicated above - second indirect way). Dynamic means that the command file will be modified during the execution of the MetricCollector to indicate the different calls that have to be made while non-dynamic means that the command file is read just once in the beginning. It is obvious that the non-dynamic indirect interaction requires stopping the MetricCollector in the hard way as also embeds the dynamic configuration of the measurement system, so it is not quite preferable. The second indirect way is obviously dynamic in nature.
Testing
In order to test the MetricCollector, a specific test case has been developed. This test case involves the generation of a CAMEL model which includes all the elements that are needed in order to start a measurement on behalf of a certain application instance, including concrete deployment models, metric models and execution models. The test code executes two MetricCollectors, one local and one global. The local MetricCollector is indirectly executed via the command file. The execution of the global MetricCollector can be performed in two ways depending on a boolean input parameter that should be set and given when calling the main method. This parameter indicates whether the global MetricCollector will be informed about the particular actions to perform via the MeasurementInitiationSubscriptionClient or immediately after it is instantiated via directly calling the respective method. In order not to interfere in the execution of the two types of MetricCollectors, their configuration files are put in separate directories. The directory of the local MetricCollector is the root directory of the code while the directory of the global MetricCollector is the input sub-directory of this root directory. The test case involves measuring resource and aggregate metrics. The scheduling of these metrics is different and so is the responsibility to measure them. Particular files are created inside the root directory code when executing this test, each one mapping to a different metric. These are developed in order to indicate the measurements that are produced for each metric. The log file is another source of checking as there you can examine that not only the measurements are produced but also those of the top-level aggregate metrics are published to be received by potential subscribers. A specific SubscriptionClient runs in the background which listens to the measurement events and reports them in the log file.
To run this test case, you just have to execute as already indicated above the following command: 'mvn exec:java -Deu.paasage.configdir=<path> -Dexec.args="<true/false>"' where you can see that we have included in the end the input parameter to the main method.
A pre-requisite for properly running this test case is that a TSDB should be already running. By default, we assume that an InfluxDB is used to realise the functionality of the TSDB but the user can of course modify this in the configuration files of both the local and global MetricCollector. A CDOServer should be also running and the respective connection information should have been set at the "eu.paasage.mddb.cdo.client.properties" file.
\ No newline at end of file
......@@ -8,6 +8,8 @@ port=8080
mode=local
#publication server to be run or not - local MetricCollectors should not run any such server
runServer=false
#subscription client to be run or not - local MetricCollectors should not run any such client
runClient=false
#direct calls to MetricCollector's public methods
directCall=false
#in case directCall is false, do we dynamically watch for new commands from file named
......
#TSDB exploited determination (currently kairos & influx are supported)
db=influx
#hostname where TSDB server resides
host=localhost
#port on which TSDB Server listens
port=8080
#the operation mode of MetricCollector - can be local or global
mode=global
#publication server to be run or not
runServer=true
#subscription client to be run or not - local MetricCollectors should not run any such client
runClient=true
#direct calls to MetricCollector's public methods
directCall=true
#in case directCall is false, do we dynamically watch for new commands from file named
#as command to be stored in the same dir as properties file?
dynamic=false
#hostname where CDO Server resides
host=localhost
#port on which CDO Server listens
port=2036
#the name of the CDO repository of the server
repository=repo1
#logging to be set off or on - default is off
logging=off
......@@ -102,7 +102,12 @@
<artifactId>jeromq</artifactId>
<version>0.3.2</version>
</dependency>
<!-- Logging -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<distributionManagement>
<repository>
......
......@@ -34,6 +34,8 @@ public class CDOListener implements Runnable, IListener{
private boolean publish = false;
private HashSet<CDOID> ids;
private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(CDOListener.class);
public CDOListener(String metricInstanceID, double threshold){
this.metricInstanceID = metricInstanceID;
this.threshold = threshold;
......@@ -44,7 +46,8 @@ public class CDOListener implements Runnable, IListener{
pw = new PrintWriter(metricInstanceID);
}
catch(Exception e){
e.printStackTrace();
logger.error("Something went wrong wile attempting to open file named as: " + metricInstanceID, e);
//e.printStackTrace();
}
}
......@@ -55,14 +58,14 @@ public class CDOListener implements Runnable, IListener{
cl = new CDOClient();
view = cl.openView();
cl.addListener(this);
System.out.println("CDOListener was called with ids: " + ids);
logger.info("CDOListener was called with ids: " + ids);
}
private void processMeasurement(Measurement am){
if (!publish){
if (am.getMetricInstance().getName().equals(metricInstanceID)){
double value = am.getValue();
System.out.println("MetricInstance: " + metricInstanceID + " with value: " + value);
logger.info("MetricInstance: " + metricInstanceID + " with value: " + value);
pw.println("MetricInstance: " + metricInstanceID + " with value: " + value);
if (value < threshold)
pw.println("Violation of SLO with threshold: " + threshold);
......@@ -76,12 +79,12 @@ public class CDOListener implements Runnable, IListener{
}
public void notifyEvent(IEvent event){
//System.out.println("EVENT: " + event);
//logger.info("EVENT: " + event);
if (event instanceof CDOSessionInvalidationEvent){
CDOSessionInvalidationEvent e = (CDOSessionInvalidationEvent)event;
List<CDOIDAndVersion> newObjs = e.getNewObjects();
for (CDOIDAndVersion id: newObjs){
System.out.println("Got new object with id: " + id.getID());
logger.info("Got new object with id: " + id.getID());
Object o = view.getObject(id.getID());
if (o instanceof Measurement){
Measurement am = (Measurement)o;
......@@ -97,7 +100,7 @@ public class CDOListener implements Runnable, IListener{
Thread.sleep(20000);
}
catch(Exception e){
e.printStackTrace();
//e.printStackTrace();
if (e instanceof InterruptedException){
if (pw != null){
pw.flush();
......@@ -107,6 +110,7 @@ public class CDOListener implements Runnable, IListener{
cl.closeSession();
return;
}
logger.error("Something went wrong while CDOListener was sleeping",e);
}
}
}
......
......@@ -9,6 +9,7 @@ package eu.paasage.executionware.metric_collector;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
......@@ -39,14 +40,16 @@ public class ExecutionContextHandler implements Runnable{
private PublicationServer server;
private CDOListener listener;
public ExecutionContextHandler(CDOID[] metricIDs, CDOID ID, String host, String port, MetricCollector.Mode mode, MetricCollector.DBType dbType, PublicationServer server){
private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(Test.class);
public ExecutionContextHandler(Set<CDOID> metricIDs, CDOID ID, String host, String port, MetricCollector.Mode mode, MetricCollector.DBType dbType, PublicationServer server){
this.ID = ID;
this.host = host;
this.port = port;
this.mode = mode;
this.dbType = dbType;
this.server = server;
if (metricIDs != null && metricIDs.length != 0){
if (metricIDs != null && metricIDs.size() != 0){
tpe = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,ALIVE_TIME,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(CORE_POOL_SIZE));
metricToHandler = new Hashtable<CDOID,MetricHandler>();
expandMetricIDs(metricIDs);
......@@ -64,7 +67,7 @@ public class ExecutionContextHandler implements Runnable{
CompositeMetricInstance cmi = (CompositeMetricInstance)instance;
CDOID child = instance.cdoID();
if (mode == MetricCollector.Mode.LOCAL || cmi.getObjectBinding() instanceof MetricApplicationBinding){
System.out.println("Got in with mode: " + mode + " metric: " + id + " instance: " + child);
logger.info("Got in with mode: " + mode + " metric: " + id + " instance: " + child);
HashSet<CDOID> parents = childToParent.get(child);
if (parents == null) parents = new HashSet<CDOID>();
parents.add(id);
......@@ -74,10 +77,10 @@ public class ExecutionContextHandler implements Runnable{
}
else if (publish){
if (CDOUtils.canPush(child,view)){
System.out.println("Can push: " + child);
logger.info("Can push: " + child);
localMetricIDs.add(child);
}
//System.out.println("Just publishing metric instance: " + child);
//logger.info("Just publishing metric instance: " + child);
//expandMetricID(view,child,false,metricIDs,publish);
}
}
......@@ -87,7 +90,7 @@ public class ExecutionContextHandler implements Runnable{
}
//Expand IDs with child aggregated metrics and spawn all threads
private void expandMetricIDs(CDOID[] metricIDs){
private void expandMetricIDs(Set<CDOID> metricIDs){
this.metricIDs = new HashSet<CDOID>();
boolean publish = (mode == MetricCollector.Mode.GLOBAL && server != null);
if (publish) localMetricIDs = new HashSet<CDOID>();
......@@ -105,7 +108,7 @@ public class ExecutionContextHandler implements Runnable{
view.close();
client.closeSession();
if (publish && !localMetricIDs.isEmpty()){
System.out.println("Creating listeners for metric ids: " + localMetricIDs);
logger.info("Creating listeners for metric ids: " + localMetricIDs);
listener = new CDOListener(server,localMetricIDs);
tpe.execute(listener);
}
......@@ -116,7 +119,7 @@ public class ExecutionContextHandler implements Runnable{
if (mode == Mode.GLOBAL) push = CDOUtils.canPush(id,view);
MetricHandler mh = null;
if (push){
System.out.println("Can push: " + id);
logger.info("Can push: " + id);
mh = new MetricHandler(id,ID,!rootMetric,host,port,mode,dbType,server);
}
else mh = new MetricHandler(id,ID,!rootMetric,host,port,mode,dbType,null);
......@@ -131,13 +134,14 @@ public class ExecutionContextHandler implements Runnable{
Thread.sleep(10000);
}
catch(Exception e){
e.printStackTrace();
logger.error("ExecutionContextHandler was interrupted while sleeping",e);
//e.printStackTrace();
}
}
}
public void terminate(){
System.out.println("Terminating all metric handlers for ExecutionHandler: " + ID);
logger.info("Terminating all metric handlers for ExecutionHandler: " + ID);
//Just terminate all MetricHandlers
for (MetricHandler mh: metricToHandler.values()){
mh.terminate();
......@@ -167,7 +171,7 @@ public class ExecutionContextHandler implements Runnable{
}
}
public void update(CDOID[] metricIDs){
public void update(Set<CDOID> metricIDs){
CDOClient client = new CDOClient();
CDOView view = client.openView();
for (CDOID s: metricIDs){
......
package eu.paasage.executionware.metric_collector;
/* Copyright (C) 2015 KYRIAKOS KRITIKOS <kritikos@ics.forth.gr> */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/
*/
import org.eclipse.emf.cdo.common.id.CDOID;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
/**
* Fake Adapter which is used for testing purposes to send requests for solving CP problems
*/
public class FakeAdapterPublisher implements Runnable{
private Context context;
private Socket socket;
private boolean run = true;
private CDOID ecID = null;
private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(FakeAdapterPublisher.class);
//Creates receiving and sending sockets on different ports
public FakeAdapterPublisher(CDOID ecID){
context = ZMQ.context(1);
socket = context.socket(ZMQ.PUB);
socket.bind("tcp://*:5550");
logger.info("Init call finished for FakeAdapterPublisher");
this.ecID = ecID;
}
public synchronized void terminate(){
run = false;
}
public void run() {
int times = 0;
while (run) {
socket.sendMore("startCollection");
socket.send(ecID.toURIFragment());
logger.info("Have sent the ecID for starting the collection of metrics");
try{
Thread.sleep(10000);
times++;
if (times == 2) break;
}
catch(Exception e){
logger.error("Thread interrupted",e);
//e.printStackTrace();
break;
}
}
socket.close();
context.term();
}
}
\ No newline at end of file
......@@ -10,6 +10,8 @@ package eu.paasage.executionware.metric_collector;
import java.io.File;
import java.io.FileInputStream;
import java.io.ObjectInputStream;
import java.util.HashSet;
import java.util.Set;
import org.eclipse.emf.cdo.common.id.CDOID;
......@@ -19,6 +21,8 @@ public class FileWatcher implements Runnable{
private long timestamp = 0;
private MetricCollection mc;
private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(FileWatcher.class);
public FileWatcher(File file, long delay, MetricCollection mc){
this.file = file;
this.delay = delay;
......@@ -52,32 +56,32 @@ public class FileWatcher implements Runnable{
for (int i = 0; i < num; i++){
ids[i]=(CDOID)br.readObject();
}*/
CDOID[] ids = (CDOID[])br.readObject();
Set<CDOID> ids = (Set<CDOID>)br.readObject();
CDOID ecID = (CDOID)br.readObject();
System.out.println("FileWatcher: calling readMetrics method");
logger.info("FileWatcher: calling readMetrics method");
mc.readMetrics(ids, ecID);
br.close();
}
else if (command.equals("deleteMetrics")){
CDOID ecID = (CDOID)br.readObject();
System.out.println("FileWatcher: calling deleteMetrics method");
logger.info("FileWatcher: calling deleteMetrics method");
mc.deleteMetrics(ecID);
br.close();
}
else if (command.equals("updateMetrics")){
int num = br.readInt();
CDOID[] ids = new CDOID[num];
Set<CDOID> ids = new HashSet<CDOID>();
for (int i = 0; i < num; i++){
ids[i]=(CDOID)br.readObject();
ids.add((CDOID)br.readObject());
}
CDOID ecID = (CDOID)br.readObject();
System.out.println("FileWatcher: calling updateMetrics method");
logger.info("FileWatcher: calling updateMetrics method");
mc.updateMetrics(ids, ecID);
br.close();
}
else if (command.equals("terminate")){
br.close();
System.out.println("FileWatcher: calling terminate method");
logger.info("FileWatcher: calling terminate method");
mc.terminate();
}
}
......
......@@ -7,14 +7,16 @@
package eu.paasage.executionware.metric_collector;
import java.util.Set;
import org.eclipse.emf.cdo.common.id.CDOID;
public interface MetricCollection {
//Read metric definitions based on their ID & start measuring
public void readMetrics(CDOID[] metricIDs, CDOID execContextId);
public void readMetrics(Set<CDOID> metricIDs, CDOID execContextId);
//Read updated metric definitions based on their ID & change measurement process
public void updateMetrics(CDOID[] metricIDs, CDOID execContextId);
public void updateMetrics(Set<CDOID> metricIDs, CDOID execContextId);
//Stop measuring metrics mapping to the specific execution context id
public void deleteMetrics(CDOID execContextId);
......
......@@ -13,12 +13,14 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Hashtable;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.emf.cdo.common.id.CDOID;
import eu.paasage.executionware.metric_collector.pubsub.MeasurementInitiationSubscriptionClient;
import eu.paasage.executionware.metric_collector.pubsub.PublicationServer;
import eu.paasage.mddb.cdo.client.CDOClient;
......@@ -48,7 +50,9 @@ public class MetricCollector implements MetricCollection{
private Mode mode = Mode.LOCAL;
private DBType dbType = DBType.KAIROS;
private PublicationServer publicationServer = null;
private MeasurementInitiationSubscriptionClient subscriptionClient = null;
private boolean runServer = false;
private boolean runClient = false;
private boolean directCall = false;
private boolean dynamic = false;
private FileWatcher fw = null;
......@@ -84,7 +88,7 @@ public class MetricCollector implements MetricCollection{
private void init(){
myId = ID++;
getConnectionInformation();
System.out.println("Metric Collector is initiated: " + myId + " mode: " + mode);
logger.info("Metric Collector is initiated: " + myId + " mode: " + mode);
cl = new CDOClient();
idToECH = new Hashtable<CDOID,ExecutionContextHandler>();
tpe = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,ALIVE_TIME,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(CORE_POOL_SIZE));
......@@ -161,10 +165,13 @@ public class MetricCollector implements MetricCollection{
String modeStr = props.getProperty("mode");
if (modeStr == null || modeStr.equals("local")) mode = Mode.LOCAL;
else if (modeStr.equals("global")) mode = Mode.GLOBAL;
System.out.println("Mode is: " + mode);
logger.info("Mode is: " + mode);
String pubStr = props.getProperty("runServer");
if (pubStr == null || pubStr.equals("false")) runServer = false;
else if (pubStr.equals("true")) runServer = true;
String subStr = props.getProperty("runClient");
if (subStr == null || subStr.equals("false")) runClient = false;
else if (subStr.equals("true")) runClient = true;
String dirCallStr = props.getProperty("directCall");
if (dirCallStr == null || dirCallStr.equals("false")) directCall = false;
else if (dirCallStr.equals("true")) directCall = true;
......@@ -173,30 +180,34 @@ public class MetricCollector implements MetricCollection{
else if (dynamicStr.equals("true")) dynamic = true;
logger.info("Got host: " + host + " port: " + port + " mode:" + modeStr + " runServer: " + runServer + " directCall: " + directCall + " dynamic: " + dynamic);
if (runServer) publicationServer = new PublicationServer();
if (runClient){
subscriptionClient = new MeasurementInitiationSubscriptionClient(this);
new Thread(subscriptionClient).start();
}
}
public void readMetrics(CDOID[] metricIDs, CDOID execContextId) {
public synchronized void readMetrics(Set<CDOID> metricIDs, CDOID execContextId) {
//logger.debug("MC: " + myId + " is reading metrics: " + metricIDs);
System.out.println("MC: " + myId + " is reading metrics: " + metricIDs);
logger.info("MC: " + myId + " is reading metrics: " + metricIDs);
ExecutionContextHandler ech = new ExecutionContextHandler(metricIDs,execContextId,host,port,mode,dbType,publicationServer);
idToECH.put(execContextId, ech);
tpe.execute(ech);
}
public void updateMetrics(CDOID[] metricIDs, CDOID execContextId) {
public synchronized void updateMetrics(Set<CDOID> metricIDs, CDOID execContextId) {
}
public void deleteMetrics(CDOID execContextId) {
public synchronized void deleteMetrics(CDOID execContextId) {
//logger.debug("MC: " + myId + " is deleting metrics for EC: " + execContextId);
System.out.println("MC: " + myId + " is deleting metrics for EC: " + execContextId);
logger.info("MC: " + myId + " is deleting metrics for EC: " + execContextId);
ExecutionContextHandler ech = idToECH.get(execContextId);
ech.terminate();
tpe.remove(ech);
}
public void terminate(){
public synchronized void terminate(){
for (ExecutionContextHandler ech: idToECH.values()){
ech.terminate();
tpe.remove(ech);
......@@ -206,5 +217,6 @@ public class MetricCollector implements MetricCollection{
idToECH.clear();
idToECH = null;
if (publicationServer != null) publicationServer.terminate();
if (subscriptionClient != null) subscriptionClient.terminate();
}
}
\ No newline at end of file
......@@ -64,15 +64,17 @@ public class MetricHandler implements Runnable{
private MetricCollector.DBType dbType;
private PublicationServer server;
private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(MetricHandler.class);
public MetricHandler(CDOID ID, CDOID ecID, boolean inTSDB, String host, String port, MetricCollector.Mode mode, MetricCollector.DBType dbType, PublicationServer server){
this.ID = ID;
this.ecID = ecID;
this.mode = mode;
this.dbType = dbType;
System.out.println("dbType is: " + dbType);
logger.info("dbType is: " + dbType);
this.server = server;
cl = new CDOClient();
System.out.println("DBClient should be available at: " + "http://" + host + ":" + port);
logger.info("DBClient should be available at: " + "http://" + host + ":" + port);
if (dbType == MetricCollector.DBType.KAIROS) kairosClient = new KairosDbClient("http://" + host + ":" + port);
else if (dbType == MetricCollector.DBType.INFLUX) influxClient = new InfluxDBClient(host,"test");
this.inTSDB = inTSDB;
......@@ -90,7 +92,8 @@ public class MetricHandler implements Runnable{
pw = new PrintWriter(f);
}
catch(Exception e){
e.printStackTrace();
logger.error("Something went wrong while attempting to create a PrintWrite on file with name: " + f.getName(),e);
//e.printStackTrace();
}
setupAggregation((CompositeMetricInstance)mi);
while (run){
......@@ -116,13 +119,14 @@ public class MetricHandler implements Runnable{
pw.flush();
}
catch(Exception e){
e.printStackTrace();
//e.printStackTrace();
if (e instanceof InterruptedException){
view.close();
cl.closeSession();
pw.close();
return;
}
logger.error("Something went wrong while running the main method of the MetricHandler",e);
}
}
view.close();
......@@ -163,20 +167,20 @@ public class MetricHandler implements Runnable{
Metric metric = mi.getMetric();
EList<MetricInstance> componentMetrics = mi.getComposingMetricInstances();
ArrayList<Metric> metrics = getMetricsFromInstances(componentMetrics);
System.out.println("MetricInstances of instance: " + mi.getName() + " are: " + metrics);
logger.info("MetricInstances of instance: " + mi.getName() + " are: " + metrics);
if (dbType == MetricCollector.DBType.KAIROS){
node = KairosAggregationNode.getKairosAggregationNode(kairosClient, cl, mode, aggregationPeriod, mi.getName(),((CompositeMetric)metric).getFormula(), metrics, componentMetrics, period, unit);
System.out.println("Node is: " + ((KairosAggregationNode)node).toString());
logger.info("Node is: " + ((KairosAggregationNode)node).toString());
}
else if (dbType == MetricCollector.DBType.INFLUX){
node = InfluxAggregationNode.getInfluxAggregationNode(influxClient, cl, mode, aggregationPeriod, mi.getName(),((CompositeMetric)metric).getFormula(), metrics, componentMetrics, period, unit);
System.out.println("Node is: " + ((InfluxAggregationNode)node).toString());
logger.info("Node is: " + ((InfluxAggregationNode)node).toString());
}
}
private void printAggregationNode(AggregationNode node, MetricInstance mi){
System.out.println("Printing AggregationInfo for MetricInstance: " + mi.getName() + " of metric: " + mi.getMetric().getName());
System.out.println(node.toString());
logger.info("Printing AggregationInfo for MetricInstance: " + mi.getName() + " of metric: " + mi.getMetric().getName());
logger.info(node.toString());
}
private int findPeriod(TimeUnit unit){
......@@ -253,7 +257,7 @@ public class MetricHandler implements Runnable{
}
public synchronized void terminate(){
System.out.println("MetricHandler: " + ID + " is about to terminate");