Commit bfcd62f0 authored by Kyriakos Kritikos's avatar Kyriakos Kritikos
Browse files

Modifications for metric-collector to be synchronised with UULM code

parent 7aed9510
......@@ -24,6 +24,7 @@ import eu.paasage.camel.metric.MetricCondition;
import eu.paasage.camel.metric.MetricFormula;
import eu.paasage.camel.metric.MetricFormulaParameter;
import eu.paasage.camel.metric.MetricInstance;
import eu.paasage.camel.requirement.OptimisationRequirement;
import eu.paasage.camel.requirement.Requirement;
import eu.paasage.camel.requirement.RequirementGroup;
import eu.paasage.camel.requirement.ServiceLevelObjective;
......@@ -74,19 +75,18 @@ public class CDOUtils {
//Checking if SLO contains condition on metric of instance
List<MetricCondition> mc = view.createQuery("hql", "select mc from MetricCondition mc, ServiceLevelObjective slo, ExecutionContext ec where ec.name='" + ecId + "' and slo member of ec.requirementGroup.requirements and slo.customServiceLevel=mc and mc.metricContext.metric.name='" + metricName + "'").getResult(MetricCondition.class);
if (mc != null && !mc.isEmpty()) return true;
else{
//Checking if exists non-functional event in scalability rule with condition on metric of instance
List<NonFunctionalEvent> events = view.createQuery("hql","select ev from NonFunctionalEvent ev, ExecutionContext ec, User u, ScalabilityRule sr, MetricCondition mc where ec.name='" + ecId +"' and ev.metricCondition=mc and mc.metricContext.metric.name='" + metricName + "' and u member of sr.entity and ec.deploymentModel member of u.deploymentModels and sr.event=ev").getResult(NonFunctionalEvent.class);
if (events != null && !events.isEmpty()) return true;
else{
//Checking if exists event pattern in scalability rule containing a non-functional event with condition on metric of instance
List<EventPattern> eps = view.createQuery("hql","select ep from EventPattern ep, ExecutionContext ec, User u, ScalabilityRule sr where ec.name='" + ecId +"' and u member of sr.entity and ec.deploymentModel member of u.deploymentModels and sr.event=ep").getResult(EventPattern.class);
if (eps != null && !eps.isEmpty()){
for (EventPattern ep: eps) {
boolean contained = patternIncludesEvent(ep,m);
if (contained) return true;
}
}
//Checking if exists optimisation requirement on metric of instance
List<OptimisationRequirement> or = view.createQuery("hql", "select or from OptimisationRequirement or, ExecutionContext ec where ec.name='" + ecId + "' and or member of ec.requirementGroup.requirements and or.metric.name='" + metricName + "'").getResult(OptimisationRequirement.class);
if (or != null && !or.isEmpty()) return true;
//Checking if exists non-functional event in scalability rule with condition on metric of instance
List<NonFunctionalEvent> events = view.createQuery("hql","select ev from NonFunctionalEvent ev, ExecutionContext ec, User u, ScalabilityRule sr, MetricCondition mc where ec.name='" + ecId +"' and ev.metricCondition=mc and mc.metricContext.metric.name='" + metricName + "' and u member of sr.entity and ec.deploymentModel member of u.deploymentModels and sr.event=ev").getResult(NonFunctionalEvent.class);
if (events != null && !events.isEmpty()) return true;
//Checking if exists event pattern in scalability rule containing a non-functional event with condition on metric of instance
List<EventPattern> eps = view.createQuery("hql","select ep from EventPattern ep, ExecutionContext ec, User u, ScalabilityRule sr where ec.name='" + ecId +"' and u member of sr.entity and ec.deploymentModel member of u.deploymentModels and sr.event=ep").getResult(EventPattern.class);
if (eps != null && !eps.isEmpty()){
for (EventPattern ep: eps) {
boolean contained = patternIncludesEvent(ep,m);
if (contained) return true;
}
}
return false;
......
......@@ -36,7 +36,7 @@ import eu.paasage.camel.type.FloatsValue;
import eu.paasage.camel.type.IntegerValue;
import eu.paasage.camel.type.SingleValue;
import eu.paasage.executionware.metric_collector.MetricCollector.Mode;
import eu.paasage.executionware.metric_collector.MetricStorage.MeasurementType;
import eu.paasage.executionware.metric_collector.SynchronisedMetricStorage.MeasurementType;
import eu.paasage.executionware.metric_collector.influxdb.InfluxAggregationNode;
import eu.paasage.executionware.metric_collector.influxdb.InfluxDBClient;
import eu.paasage.executionware.metric_collector.kairosdb.KairosAggregationNode;
......@@ -114,7 +114,7 @@ public class MetricHandler implements Runnable{
Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
}
//MetricStorageWithBlockingQueue.storeMeasurement(value,ID,ecID,measurementType,object,object2);
MetricStorage.storeMeasurement(value,ID,ecID,measurementType,object,object2);
SynchronisedMetricStorage.storeMeasurement(value,ID,ecID,measurementType,object,object2);
pw.println("" + value);
pw.flush();
}
......
......@@ -28,12 +28,11 @@ import eu.paasage.camel.execution.InternalComponentMeasurement;
import eu.paasage.camel.execution.Measurement;
import eu.paasage.camel.execution.VMMeasurement;
import eu.paasage.camel.metric.MetricInstance;
import eu.paasage.executionware.metric_collector.MetricStorage.MeasurementType;
import eu.paasage.executionware.metric_collector.SynchronisedMetricStorage.MeasurementType;
import eu.paasage.mddb.cdo.client.CDOClient;
public class MetricStorageWithBlockingQueue {
public class MetricStorageWithBlockingQueue extends MetricStorage{
private static CDOTransaction trans = null;
private static CDOClient cl = null;
private static int transNum = 0;
private static ArrayBlockingQueue<StorageRequest> queue = new ArrayBlockingQueue<StorageRequest>(10000);
......@@ -41,7 +40,7 @@ public class MetricStorageWithBlockingQueue {
private static final Integer maxTrans = 5;
private static final int maxItems = 100;
private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(MetricStorageWithBlockingQueue.class);
//private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(MetricStorageWithBlockingQueue.class);
private static class StorageRequest{
public double value;
......@@ -123,43 +122,7 @@ public class MetricStorageWithBlockingQueue {
private static void enforceMeasurementRequest(StorageRequest sr){
checkTransaction();
ExecutionContext ec = (ExecutionContext)trans.getObject(sr.ecID);
ExecutionModel em = (ExecutionModel)ec.eContainer();
MetricInstance mi = (MetricInstance)trans.getObject(sr.ID);
Measurement m = null;
//Create MeasurementObject to be stored
switch(sr.measurementType){
case APPLICATION_MEASUREMENT:
ApplicationMeasurement am = ExecutionFactory.eINSTANCE.createApplicationMeasurement();
am.setApplication((Application)trans.getObject(sr.object));
m = am;
break;
case VM_MEASUREMENT:
VMMeasurement rm = ExecutionFactory.eINSTANCE.createVMMeasurement();
rm.setVmInstance((VMInstance)trans.getObject(sr.object));
m = rm;
break;
case COMPONENT_MEASUREMENT:
InternalComponentMeasurement cm = ExecutionFactory.eINSTANCE.createInternalComponentMeasurement();
cm.setInternalComponentInstance((InternalComponentInstance)trans.getObject(sr.object));
m = cm;
break;
case COMMUNICATION_MEASUREMENT:
CommunicationMeasurement rcm = ExecutionFactory.eINSTANCE.createCommunicationMeasurement();
rcm.setSourceVMInstance((VMInstance)trans.getObject(sr.object));
rcm.setDestinationVMInstance((VMInstance)trans.getObject(sr.object2));
m = rcm;
break;
}
m.setMetricInstance(mi);
m.setExecutionContext(ec);
m.setValue(sr.value);
m.setMeasurementTime(new Date());
m.setName(CDOIDUtil.createUUID().toString());
//Add measurement
em.getMeasurements().add(m);
MetricStorage.storeMeasurement(sr.value, sr.ID, sr.ecID, sr.measurementType, sr.object, sr.object2);
}
public static void commitTransaction(){
......
......@@ -44,10 +44,11 @@ public class StorageRunnable implements Runnable{
logger.info("Starting my work: " + v);
for (int i = 0; i < times; i++){
//logger.info("Running the " + i + " measurement for thread: " + v);
/* Change to MetricStorage.storeMeasurement to run the other test for measurement
/* Change to SynchronisedMetricStorage.storeMeasurement to run the other test for measurement
* storage according to the other implementation
*/
MetricStorageWithBlockingQueue.storeMeasurement(v, id, ecID, MetricStorage.MeasurementType.APPLICATION_MEASUREMENT, appID, null);
MetricStorageWithBlockingQueue.storeMeasurement(v, id, ecID, SynchronisedMetricStorage.MeasurementType.APPLICATION_MEASUREMENT, appID, null);
//SynchronisedMetricStorage.storeMeasurement(v, id, ecID, SynchronisedMetricStorage.MeasurementType.APPLICATION_MEASUREMENT, appID, null);
}
logger.info("Ended my work: " + v);
}
......
/* Copyright (C) 2015 KYRIAKOS KRITIKOS <kritikos@ics.forth.gr> */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/
*/
package eu.paasage.executionware.metric_collector;
import java.util.Date;
import org.eclipse.emf.cdo.common.id.CDOID;
import org.eclipse.emf.cdo.common.id.CDOIDUtil;
import org.eclipse.emf.cdo.transaction.CDOTransaction;
import eu.paasage.camel.Application;
import eu.paasage.camel.deployment.InternalComponentInstance;
import eu.paasage.camel.deployment.VMInstance;
import eu.paasage.camel.execution.ApplicationMeasurement;
import eu.paasage.camel.execution.CommunicationMeasurement;
import eu.paasage.camel.execution.ExecutionContext;
import eu.paasage.camel.execution.ExecutionFactory;
import eu.paasage.camel.execution.ExecutionModel;
import eu.paasage.camel.execution.InternalComponentMeasurement;
import eu.paasage.camel.execution.Measurement;
import eu.paasage.camel.execution.VMMeasurement;
import eu.paasage.camel.metric.MetricInstance;
import eu.paasage.mddb.cdo.client.CDOClient;
public class SynchronisedMetricStorage extends MetricStorage{
private static CDOClient cl = null;
private static int transNum = 0;
private static int itemNum = 0;
private static int deadline = 600;
private static TimeThread tr = null;
private static final Integer maxTrans = 5;
private static final int maxItems = 100;
//private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(SynchronisedMetricStorage.class);
public enum MeasurementType {
VM_MEASUREMENT,
APPLICATION_MEASUREMENT,
COMMUNICATION_MEASUREMENT,
COMPONENT_MEASUREMENT
}
private static synchronized void checkTransaction(){
if (cl == null){
cl = new CDOClient();
trans = cl.openTransaction();
transNum = 0;
itemNum = 0;
}
else if (trans == null){
trans = cl.openTransaction();
}
if (tr == null){
tr = new TimeThread(deadline);
tr.start();
}
}
public static void storeMeasurement(double value, CDOID ID, CDOID ecID, MeasurementType measurementType, CDOID object, CDOID object2){
checkTransaction();
synchronized(maxTrans){
MetricStorage.storeMeasurement(value, ID, ecID, measurementType, object, object2);
if ((++itemNum) == maxItems){
commitTransaction(false);
}
}
}
public static void commitTransaction(boolean timerCall){
synchronized(maxTrans){
if (!timerCall) tr.interrupt();
if (itemNum > 0){
logger.info("Trying to commit with param: " + timerCall + " " + itemNum + " " + transNum);
try{
trans.commit();
}
catch(Exception e){
logger.error("Something went wrong while attempting to commit the transaction",e);
//e.printStackTrace();
}
trans.close();
transNum ++;
if (transNum == maxTrans){
cl.closeSession();
cl = new CDOClient();
transNum = 0;
}
trans = cl.openTransaction();
itemNum = 0;
}
tr = new TimeThread(deadline);
tr.start();
}
}
}
......@@ -11,6 +11,7 @@ import java.io.FileOutputStream;
import java.io.ObjectOutputStream;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -676,6 +677,7 @@ public class Test {
* (a) comment next line code and (b) uncomment the last
* commented lines of code
*/
//MetricStorage.initServer(1, 5563);
//runExcessiveTest(ids.iterator().next());
//ExecutorService lis = Executors.newFixedThreadPool(3);
//startCDOListeners("test",lis);
......@@ -710,6 +712,9 @@ public class Test {
thr.shutdownNow();
subscribers.shutdown();
if (runClient) fap.terminate();
deleteAllMetrics(ecID);
//lis.shutdownNow();
//MetricStorage.terminateServer();
mc1.terminate();
mc2.terminate();
}
......
......@@ -21,7 +21,7 @@ public class TimeThread extends Thread{
try{
Thread.sleep(deadline);
logger.info("Will now commit transaction");
MetricStorage.commitTransaction(true);
SynchronisedMetricStorage.commitTransaction(true);
}
catch(Exception e){
//e.printStackTrace();
......
......@@ -34,6 +34,13 @@ public class PublicationServer{
socket.bind("tcp://*:5563");
}
public PublicationServer(int threadNum,int portNum){
this.threadNum = threadNum;
context = ZMQ.context(threadNum);
socket = context.socket(ZMQ.PUB);
socket.bind("tcp://*:" + portNum);
}
public synchronized void submitValue(String metricId, double value){
if (run){
logger.info("PublicationServer submitting value: " + value + " for metricInstance: " + metricId);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment