diff --git a/binding-components/one-way-bc/src/main/resources/Mqttconfig_block2_interface_1 b/binding-components/one-way-bc/src/main/resources/Mqttconfig_block2_interface_1 index 8f0d29afa6cfacc0773bab4e7c2d3a71ee3ff9de..52e7a151888302320dee817a9f904033cefc23cd 100644 --- a/binding-components/one-way-bc/src/main/resources/Mqttconfig_block2_interface_1 +++ b/binding-components/one-way-bc/src/main/resources/Mqttconfig_block2_interface_1 @@ -1 +1 @@ -{"invocation_address":"128.93.64.90","service_port":"8891","subcomponent_port":"8891","subcomponent_address":"128.93.64.90","service_name":"","target_namespace":""} +{"invocation_address":"127.0.0.1","service_port":"8891","subcomponent_port":"8891","subcomponent_address":"127.0.0.1","service_name":"","target_namespace":""} diff --git a/gmdl-tools/monitor/src/main/java/eu/chorevolution/vsb/listener/MeasureListener.java b/gmdl-tools/monitor/src/main/java/eu/chorevolution/vsb/listener/MeasureListener.java index bfdc30c993758a8e9dc1a332c1d064a3d91b8259..c5227f9efb9e2afaee24306d4a11671db94bdee0 100644 --- a/gmdl-tools/monitor/src/main/java/eu/chorevolution/vsb/listener/MeasureListener.java +++ b/gmdl-tools/monitor/src/main/java/eu/chorevolution/vsb/listener/MeasureListener.java @@ -59,8 +59,7 @@ public class MeasureListener implements NotificationListener { final Object new_value = acn.getNewValue(); // final Object old_value = acn.getOldValue(); String timestamp_built = (String) new_value; - - + String array[] = timestamp_built.split("-"); Long timestamp = Long.valueOf(array[0]).longValue(); int message_id = Integer.valueOf(array[1]); @@ -68,36 +67,40 @@ public class MeasureListener implements NotificationListener { Long timestamp_array[] = new Long[] { 0l, 0l, 0l, 0l, 0l, 0l }; if (!timestampMap.containsKey(message_id)) { - - synchronized (timestampMap){ - + + synchronized (timestampMap) { + timestampMap.put(message_id, timestamp_array); } } - switch (timestamp_name){ + switch (timestamp_name) { case "timestamp_1": - Long[] time_array = timestampMap.get(message_id); - if (time_array != null) { + synchronized (timestampMap) { - if (time_array[0] == 0l) { + Long[] time_array = timestampMap.get(message_id); + if (time_array != null) { - time_array[0] = timestamp; - timestampMap.put(message_id, time_array); + if (time_array[0] == 0l) { - } + time_array[0] = timestamp; + timestampMap.put(message_id, time_array); - } else { + } + + } else { + + time_array = new Long[6]; + timestamp_array[0] = timestamp; + timestampMap.put(message_id, time_array); + } - time_array = new Long[6]; - timestamp_array[0] = timestamp; - timestampMap.put(message_id, time_array); } break; case "timestamp_2": - + synchronized (timestampMap) { Long[] time_array1 = timestampMap.get(message_id); if (time_array1 != null) { @@ -115,13 +118,14 @@ public class MeasureListener implements NotificationListener { time_array1[1] = timestamp; timestampMap.put(message_id, time_array1); } - + + } break; case "timestamp_3": - + synchronized (timestampMap) { Long[] time_array2 = timestampMap.get(message_id); if (time_array2 != null) { @@ -140,10 +144,13 @@ public class MeasureListener implements NotificationListener { timestampMap.put(message_id, time_array2); } + } + break; case "timestamp_4": + synchronized (timestampMap) { Long[] time_array3 = timestampMap.get(message_id); if (time_array3 != null) { @@ -162,10 +169,12 @@ public class MeasureListener implements NotificationListener { timestampMap.put(message_id, time_array3); } + } + break; case "timestamp_5": - + synchronized (timestampMap) { Long[] time_array4 = timestampMap.get(message_id); if (time_array4 != null) { @@ -183,10 +192,12 @@ public class MeasureListener implements NotificationListener { time_array4[4] = timestamp; timestampMap.put(message_id, time_array4); } + } break; case "timestamp_6": + synchronized (timestampMap) { Long[] time_array5 = timestampMap.get(message_id); if (time_array5 != null) { @@ -204,6 +215,7 @@ public class MeasureListener implements NotificationListener { time_array5[5] = timestamp; timestampMap.put(message_id, time_array5); } + } break; default: diff --git a/gmdl-tools/monitor/src/main/java/eu/chorevolution/vsb/monitor/MonitorMain.java b/gmdl-tools/monitor/src/main/java/eu/chorevolution/vsb/monitor/MonitorMain.java index 6e0fd2abe3b593ac094d93d7b4067870628ba2b8..12a7b807fcf20e7c61852e124c5a3031b0ca2d02 100644 --- a/gmdl-tools/monitor/src/main/java/eu/chorevolution/vsb/monitor/MonitorMain.java +++ b/gmdl-tools/monitor/src/main/java/eu/chorevolution/vsb/monitor/MonitorMain.java @@ -24,17 +24,16 @@ public class MonitorMain { public static void main(String[] args) { // TODO Auto-generated method stub + +// if (args.length != 1) { +// +// System.err.println("Missing arguments"); +// System.err.println("java -jar MonitorMain-jar-with-dependencies.jar steadystateTimer"); +// System.exit(0); +// } - - if (args.length != 1) { - - System.err.println("Missing arguments"); - System.err.println("java -jar MonitorMain-jar-with-dependencies.jar steadystateTimer"); - System.exit(0); - } - - long waitSteadystate = Long.valueOf(args[0]); + long waitSteadystate = 1000; ///Long.valueOf(args[0]); ExperimentListener experimentListener = new ExperimentListener("StartExperiment", MonitorConstant.M2, "9006"); @@ -92,6 +91,7 @@ public class MonitorMain { long experimentMsgSent = 0l; long experimentMsgReceive = 0l; + long experimentMsgReceiveBestFitnesss = 0l; System.out.print("Start Monitoring."); while (experimentListener.experimentRunning){ @@ -128,7 +128,7 @@ public class MonitorMain { long BC1 = 0l; long BC2 = 0l; long EtoE = 0l; - // System.out.print("\t\t t0 \t\t \t\t t1 \t\t \t\t t2 \t\t \t\t t3 \t\t \t\t t4 \t\t \t\t t5 \t\t \t\t t1-t0 \t\t \t\t t2-t1 \t\t \t\t t3-t2 \t\t \t\t t4-t3 \t\t \t\t t5-t4 \t\t \t\t t5-t0 \t\t\n"); +// System.out.print("\t\t t0 \t\t \t\t t1 \t\t \t\t t2 \t\t \t\t t3 \t\t \t\t t4 \t\t \t\t t5 \t\t \t\t t1-t0 \t\t \t\t t2-t1 \t\t \t\t t3-t2 \t\t \t\t t4-t3 \t\t \t\t t5-t4 \t\t \t\t t5-t0 \t\t\n"); @@ -151,12 +151,17 @@ public class MonitorMain { t3 = time_array[3]; t4 = time_array[4]; t5 = time_array[5]; - BC1 += (t2 - t1); - BC2 += (t4 - t3); - EtoE += (t5 - t0); - System.out.print(t0 + "\t" + t1 + "\t" + t2 + "\t" + t3 + "\t" + t4 + "\t" + t5 + "\t" + (t1 - t0) + "\t" - + (t2 - t1) + "\t" + (t3 - t2) + "\t" + (t4 - t3) + "\t" + (t5 - t4) + "\t" + (t5 - t0) + "\n"); - experimentMsgReceive++; + if(t0 != 0 && t1 != 0 && t2 != 0 && t3 != 0 && t4 != 0 && t5 != 0 ){ + + BC1 += (t2 - t1); + BC2 += (t4 - t3); + EtoE += (t5 - t0); + System.out.print(t0 + "\t" + t1 + "\t" + t2 + "\t" + t3 + "\t" + t4 + "\t" + t5 + "\t" + (t1 - t0) + "\t" + + (t2 - t1) + "\t" + (t3 - t2) + "\t" + (t4 - t3) + "\t" + (t5 - t4) + "\t" + (t5 - t0) + "\n"); + experimentMsgReceiveBestFitnesss++; + + } + experimentMsgReceive++; } // DBObject doc = new BasicDBObject().append("timestamp_0", t0).append("timestamp_1", t1) @@ -166,9 +171,9 @@ public class MonitorMain { // collection.insert(doc); - long AVG_BC1 = BC1 / (experimentMsgReceive); - long AVG_BC2 = BC2 / (experimentMsgReceive); - long AVG_EtoE = EtoE / (experimentMsgReceive); + long AVG_BC1 = BC1 / (experimentMsgReceiveBestFitnesss); + long AVG_BC2 = BC2 / (experimentMsgReceiveBestFitnesss); + long AVG_EtoE = EtoE / (experimentMsgReceiveBestFitnesss); System.out.println("\n=========================================================== "); System.out.println("Start time: " +start ); @@ -176,9 +181,11 @@ public class MonitorMain { System.out.println("Messages sent: " + experimentListener.experimentMsgCounter ); System.out.println("Messages receive: " + experimentMsgReceive); System.out.println("Messages lost: " + (experimentListener.experimentMsgCounter - experimentMsgReceive) ); + System.out.println("Messages take for computing : " +experimentMsgReceiveBestFitnesss ); long effectiveDuration = experimentListener.experimentDuration - waitSteadystate; - System.out.println("Throughput : " + experimentMsgReceive /effectiveDuration ); - System.out.println("Losses : " + (experimentListener.experimentMsgCounter / experimentMsgReceive)); + System.out.println("Monitoring duration : " +effectiveDuration ); + System.out.println("Throughput : " + (double)( (double)experimentMsgReceive /(double)effectiveDuration) ); + System.out.println("Losses : " + ( (double) ((double)experimentListener.experimentMsgCounter / (double)experimentMsgReceive))); System.out.println("Message / seconde : " + experimentListener.experimentAvgMessagesSent ); System.out.println("BC1 Avg latency: " + AVG_BC1 ); System.out.println("BC2 Avg latency: " + AVG_BC2 ); diff --git a/gmdl-tools/monitor/src/main/java/eu/chorevolution/vsb/monitor/util/MonitorConstant.java b/gmdl-tools/monitor/src/main/java/eu/chorevolution/vsb/monitor/util/MonitorConstant.java index f771068f3e7c4ef4380148c6b2bde0895d194962..80a2f44a9213aac6ab3771dd544835710170b891 100644 --- a/gmdl-tools/monitor/src/main/java/eu/chorevolution/vsb/monitor/util/MonitorConstant.java +++ b/gmdl-tools/monitor/src/main/java/eu/chorevolution/vsb/monitor/util/MonitorConstant.java @@ -2,21 +2,22 @@ package eu.chorevolution.vsb.monitor.util; public class MonitorConstant { - public static final String TIME_SERVER = "128.93.64.246"; - public static final int TIME_SERVER_PORT = 9999; +// public static final String TIME_SERVER = "128.93.64.246"; // public static final String M1 = "128.93.65.234"; // public static final String M2 = "128.93.65.233"; // public static final String M3 = "128.93.64.1"; // public static final String M4 = "128.93.64.90"; // public static final String MDefault = "127.0.0.1"; + public static final String TIME_SERVER = "127.0.0.1"; public static final String M1 = "127.0.0.1"; public static final String M2 = "127.0.0.1"; public static final String M3 = "127.0.0.1"; public static final String M4 = "127.0.0.1"; public static final String MDefault = "127.0.0.1"; + public static final int TIME_SERVER_PORT = 9999; public static final int timestamp_1_port_listener = 9000; public static final int timestamp_2_port_listener = 9001; public static final int timestamp_3_port_listener = 9002; diff --git a/playgrounds/client-server/rest-playground/src/main/java/eu/chorevolution/vsb/playgrounds/clientserver/rest/RestServer.java b/playgrounds/client-server/rest-playground/src/main/java/eu/chorevolution/vsb/playgrounds/clientserver/rest/RestServer.java index 5bc91fa4a9b291d3404dbbbb44932e17e4bade56..d7ab927044df94607efb630094b5f7b6287ef2d2 100644 --- a/playgrounds/client-server/rest-playground/src/main/java/eu/chorevolution/vsb/playgrounds/clientserver/rest/RestServer.java +++ b/playgrounds/client-server/rest-playground/src/main/java/eu/chorevolution/vsb/playgrounds/clientserver/rest/RestServer.java @@ -3,7 +3,6 @@ package eu.chorevolution.vsb.playgrounds.clientserver.rest; import java.io.IOException; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; - import org.restlet.Component; import org.restlet.Server; import org.restlet.data.Protocol; diff --git a/playgrounds/pub-sub/mqtt-playground/pom.xml b/playgrounds/pub-sub/mqtt-playground/pom.xml index fdb65fecfd31ba06a4c3d18960ee21f61eca5523..3a25a1893b7e797456d8d03598500409daf7054a 100644 --- a/playgrounds/pub-sub/mqtt-playground/pom.xml +++ b/playgrounds/pub-sub/mqtt-playground/pom.xml @@ -73,6 +73,16 @@ <artifactId>gm-websocket</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> + <dependency> + <groupId>eu.chorevolution.vsb</groupId> + <artifactId>monitor</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>eu.chorevolution.vsb</groupId> + <artifactId>experiment</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> </dependencies> <build> <plugins> @@ -124,6 +134,30 @@ </execution> </executions> </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>MqttStartExperiment</id> + <phase>package</phase> + <goals> + <goal>assembly</goal> + </goals> + <configuration> + <archive> + <manifest> + <addClasspath>true</addClasspath> + <mainClass>eu.chorevolution.vsb.playgrounds.mqtt.StartExperiment</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + <finalName>MqttStartExperiment</finalName> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> </project> diff --git a/playgrounds/pub-sub/mqtt-playground/src/main/java/eu/chorevolution/vsb/playgrounds/mqtt/BrokerMqtt.java b/playgrounds/pub-sub/mqtt-playground/src/main/java/eu/chorevolution/vsb/playgrounds/mqtt/BrokerMqtt.java index eee5e0e277c9c0f92a88fe3226d7986728ee69b4..14c77d1b54afe0064b2ad1a260177ca88d94e864 100644 --- a/playgrounds/pub-sub/mqtt-playground/src/main/java/eu/chorevolution/vsb/playgrounds/mqtt/BrokerMqtt.java +++ b/playgrounds/pub-sub/mqtt-playground/src/main/java/eu/chorevolution/vsb/playgrounds/mqtt/BrokerMqtt.java @@ -7,7 +7,6 @@ package eu.chorevolution.vsb.playgrounds.mqtt; import org.apache.activemq.broker.*; import org.fusesource.mqtt.client.BlockingConnection; - import eu.chorevolution.vsb.monitor.util.MonitorConstant; public class BrokerMqtt{ diff --git a/playgrounds/pub-sub/mqtt-playground/src/main/java/eu/chorevolution/vsb/playgrounds/mqtt/PublisherMqtt.java b/playgrounds/pub-sub/mqtt-playground/src/main/java/eu/chorevolution/vsb/playgrounds/mqtt/PublisherMqtt.java index 0d70fdbcacdd01938a6de2a0071d280012d67e31..097edf408c4a210c3747ed9946fed78dd22b0703 100644 --- a/playgrounds/pub-sub/mqtt-playground/src/main/java/eu/chorevolution/vsb/playgrounds/mqtt/PublisherMqtt.java +++ b/playgrounds/pub-sub/mqtt-playground/src/main/java/eu/chorevolution/vsb/playgrounds/mqtt/PublisherMqtt.java @@ -5,26 +5,33 @@ import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.json.simple.JSONObject; +import eu.chorevolution.vsb.agent.MeasureAgent; +import eu.chorevolution.vsb.monitor.util.MonitorConstant; + public class PublisherMqtt { - public static void main(String[] args){ - MqttClient client = null; - + private String url = null; + MeasureAgent agent = null; + public PublisherMqtt(String host, String port){ - JSONObject obj = new JSONObject(); - obj.put("lat", "12584589.78541238"); - obj.put("lon", "84589.78541238"); - obj.put("op_name", "bridgeNextClosure"); - - String content = obj.toJSONString(); - + this.url = "tcp://"+host+":"+port; + agent = new MeasureAgent("timestamp_1", System.currentTimeMillis(), MonitorConstant.M2, + MonitorConstant.timestamp_1_port_listener); + } + + public void publish(String messageContent, String scope){ + MqttClient client = null; + try{ - client = new MqttClient("tcp://localhost:8893", MqttClient.generateClientId()); + client = new MqttClient(url, MqttClient.generateClientId()); client.connect(); MqttMessage message = new MqttMessage(); - message.setPayload(content.getBytes()); - client.publish("bridgeNextClosure", message); + message.setQos(0); + message.setPayload(messageContent.getBytes()); + client.publish(scope, message); + String message_id = agent.getMessageID(messageContent); + agent.fire("" + System.currentTimeMillis() + "-" + message_id); client.disconnect(); }catch (MqttException e){ diff --git a/playgrounds/pub-sub/mqtt-playground/src/main/java/eu/chorevolution/vsb/playgrounds/mqtt/StartExperiment.java b/playgrounds/pub-sub/mqtt-playground/src/main/java/eu/chorevolution/vsb/playgrounds/mqtt/StartExperiment.java new file mode 100644 index 0000000000000000000000000000000000000000..c1748100742c81f508ce060c41295a4a26fd3eb8 --- /dev/null +++ b/playgrounds/pub-sub/mqtt-playground/src/main/java/eu/chorevolution/vsb/playgrounds/mqtt/StartExperiment.java @@ -0,0 +1,79 @@ +package eu.chorevolution.vsb.playgrounds.mqtt; + +import eu.chorevolution.vsb.agent.ExperimentAgent; +import eu.chorevolution.vsb.experiment.utils.Parameters; +import eu.chorevolution.vsb.monitor.util.MonitorConstant; +import eu.chorevolution.vsb.experiment.utils.Exp; +import eu.chorevolution.vsb.experiment.utils.MessageGenerator; + + + +public class StartExperiment { + + public static boolean experimentRunning = true; + public static long experimentStartTime = 0l; + public static Long msgCounter = 0L; + public static ExperimentAgent experimentAgent = null; + public static Exp waitDuration = null; + public static Long averageMsgSize = 0L; + public static int threadNumber = 5; + public static MessageGenerator msgGen = null; + + public static void main(String[] args) { + // TODO Auto-generated method stub + +// if (args.length < 3) { +// +// System.err.println("Missing arguments"); +// System.err.println("java -jar StartExperiment-jar-with-dependencies.jar duration threadNumber rate"); +// System.exit(0); +// } + + Parameters.experimentDuration = 2*60*1000;//Long.valueOf(args[0]); + threadNumber = 2;//Integer.valueOf(args[1]); + Parameters.msgSendParam = 2;//Double.valueOf(args[2]); + + waitDuration = new Exp(Parameters.msgSendParam); + int counter = 0; + msgGen = new MessageGenerator(); + experimentAgent = new ExperimentAgent("StartExperiment", MonitorConstant.M2, 9006); + PublisherMqtt publisher = new PublisherMqtt(MonitorConstant.M4,"8891"); + + + + try { + + Thread.sleep(30000); + + } catch (InterruptedException e) { + e.printStackTrace(); + } + + experimentStartTime = System.currentTimeMillis(); + + while (counter < threadNumber) { + + StartSourceApplication source = new StartSourceApplication(publisher, waitDuration, averageMsgSize, msgGen); + source.start(); + counter++; + } + + new java.util.Timer().schedule(new java.util.TimerTask() { + + @Override + public void run() { + + // TODO Auto-generated method stub + experimentRunning = false; + experimentAgent.fireExperimentRunning(experimentRunning); + experimentAgent.fireExperimentStartTime(experimentStartTime); + experimentAgent.fireExperimentFinishTime(System.currentTimeMillis()); + experimentAgent.fireExperimentMsgCounter(msgCounter); + experimentAgent.fireExperimentAvgMessagesSent(waitDuration.average()); + + } + }, Parameters.experimentDuration); + + } + +} diff --git a/playgrounds/pub-sub/mqtt-playground/src/main/java/eu/chorevolution/vsb/playgrounds/mqtt/StartSourceApplication.java b/playgrounds/pub-sub/mqtt-playground/src/main/java/eu/chorevolution/vsb/playgrounds/mqtt/StartSourceApplication.java new file mode 100644 index 0000000000000000000000000000000000000000..1ecd32e34a68865aedeb473264701d33d2a92a19 --- /dev/null +++ b/playgrounds/pub-sub/mqtt-playground/src/main/java/eu/chorevolution/vsb/playgrounds/mqtt/StartSourceApplication.java @@ -0,0 +1,56 @@ +package eu.chorevolution.vsb.playgrounds.mqtt; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import org.json.simple.JSONObject; +import eu.chorevolution.vsb.experiment.utils.Exp; +import eu.chorevolution.vsb.experiment.utils.MessageGenerator; + +public class StartSourceApplication extends Thread { + + private PublisherMqtt publisher = null; + private String msg = ""; + private Exp waitDuration; + private long averageMsgSize = 0l; + private MessageGenerator msgGen = null; + + public StartSourceApplication(PublisherMqtt publisher, Exp waitDuration, long averageMsgSize, MessageGenerator msgGen) { + + this.publisher = publisher; + this.waitDuration = waitDuration; + this.averageMsgSize = averageMsgSize; + this.msgGen = msgGen; + + } + + public void run() { + + System.out.println("Thread sender started"); + int count = 0; + while (StartExperiment.experimentRunning) { + + synchronized (publisher){ + + publisher.publish(msgGen.getMessage(), "bridgeNextClosure"); + StartExperiment.msgCounter++; + } + + + try { +// this.sleep(1000); + double delay = waitDuration.next() * 1000; + this.sleep((long)delay); + + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + this.interrupt(); + System.out.println("StartSourceApplication-" + this.getId() + " finish the job and dies "); + } + + +} diff --git a/playgrounds/pub-sub/mqtt-playground/target2/localhost/KahaDB/db.data b/playgrounds/pub-sub/mqtt-playground/target2/localhost/KahaDB/db.data index e93927bb18aa0dde68da441b1329123fd9a75a95..89e6d5eb63a46a9d7182f3562d559183f4a9beae 100644 Binary files a/playgrounds/pub-sub/mqtt-playground/target2/localhost/KahaDB/db.data and b/playgrounds/pub-sub/mqtt-playground/target2/localhost/KahaDB/db.data differ diff --git a/playgrounds/pub-sub/mqtt-playground/target2/localhost/KahaDB/db.redo b/playgrounds/pub-sub/mqtt-playground/target2/localhost/KahaDB/db.redo index 75977876e43578e0402918cee28c3e67389c79a6..ec060845f6d0522a78a290529cc4b8f59b312a18 100644 Binary files a/playgrounds/pub-sub/mqtt-playground/target2/localhost/KahaDB/db.redo and b/playgrounds/pub-sub/mqtt-playground/target2/localhost/KahaDB/db.redo differ diff --git a/playgrounds/pub-sub/mqtt-playground/target2/localhost/KahaDB/lock b/playgrounds/pub-sub/mqtt-playground/target2/localhost/KahaDB/lock index c08af52faa45b2d28f617d75cd7e6adadcd38964..180fd381a0875829eb5799e29988429776c5bb37 100644 Binary files a/playgrounds/pub-sub/mqtt-playground/target2/localhost/KahaDB/lock and b/playgrounds/pub-sub/mqtt-playground/target2/localhost/KahaDB/lock differ diff --git a/playgrounds/str/websockets-playground/pom.xml b/playgrounds/str/websockets-playground/pom.xml index ab348d1f3fbdc6e3340952deb3843242d982b419..7fcc49927ee105602e9a5f815009a9ca9d7284ae 100644 --- a/playgrounds/str/websockets-playground/pom.xml +++ b/playgrounds/str/websockets-playground/pom.xml @@ -87,6 +87,30 @@ </execution> </executions> </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>StartWebSocketConsummer</id> + <phase>package</phase> + <goals> + <goal>assembly</goal> + </goals> + <configuration> + <archive> + <manifest> + <addClasspath>true</addClasspath> + <mainClass>eu.chorevolution.vsb.playgrounds.str.websockets.StartWebSocketConsummer</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + <finalName>StartWebSocketConsummer</finalName> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> </project> diff --git a/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playground/websocket/experiment/StartExperiment.java b/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playground/websocket/experiment/StartExperiment.java index b8dd8f18e39fc5b4810e8baab986a3b9d56f18ed..b86053368898f8bd9d86384642c101a2034f5450 100644 --- a/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playground/websocket/experiment/StartExperiment.java +++ b/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playground/websocket/experiment/StartExperiment.java @@ -29,7 +29,7 @@ public class StartExperiment { // System.exit(0); // } - Parameters.experimentDuration = 2*60*1000;//Long.valueOf(args[0]); + Parameters.experimentDuration = 20*60*1000;//Long.valueOf(args[0]); threadNumber = 2;//Integer.valueOf(args[1]); Parameters.msgSendParam =2;// Double.valueOf(args[2]); @@ -37,7 +37,6 @@ public class StartExperiment { int counter = 0; msgGen = new MessageGenerator(); experimentAgent = new ExperimentAgent("StartExperiment", MonitorConstant.M2, 9006); - experimentAgent.fireExperimentDuration(Parameters.experimentDuration); StartServerWebSocket server = new StartServerWebSocket(); server.start(); @@ -64,12 +63,14 @@ public class StartExperiment { public void run() { // TODO Auto-generated method stub + experimentAgent.fireExperimentFinishTime(System.currentTimeMillis()); experimentRunning = false; experimentAgent.fireExperimentRunning(experimentRunning); experimentAgent.fireExperimentStartTime(experimentStartTime); experimentAgent.fireExperimentFinishTime(System.currentTimeMillis()); experimentAgent.fireExperimentMsgCounter(msgCounter); experimentAgent.fireExperimentAvgMessagesSent(waitDuration.average()); + experimentAgent.fireExperimentDuration(Parameters.experimentDuration); } }, Parameters.experimentDuration); diff --git a/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playgrounds/str/websockets/StartExperiment.java b/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playgrounds/str/websockets/StartExperiment.java deleted file mode 100644 index 8cb9bf5b9a5433a8fe3eae1a5396e86ddfecc7c2..0000000000000000000000000000000000000000 --- a/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playgrounds/str/websockets/StartExperiment.java +++ /dev/null @@ -1,82 +0,0 @@ -package eu.chorevolution.vsb.playgrounds.str.websockets; -///* -// * To change this template, choose Tools | Templates -// * and open the template in the editor. -// */ -//package eu.chorevolution.vsb.playgrounds.str.websockets.test; -// -//import java.util.HashMap; -//import java.util.logging.Level; -//import java.util.logging.Logger; -// -//import eu.chorevolution.vsb.playgrounds.str.websockets.test.utils.Parameters; -// -//public class StartExperiment { -// -// public static boolean experimentRunning = true; -// public static HashMap<Long, Long> startTimeMap = new HashMap<Long, Long>(); -// public static HashMap<Long, Long> endTimeMap = new HashMap<Long, Long>(); -// public static Long messagesReceived = 0L; -// public static boolean DEBUG = true; -// public static long experimentStartTime = 0l; -// -// public static void main(String[] args) throws Exception { -// -// // just to ensure initialization so that no time wasted in first msg sent -// StartExperiment.startTimeMap.put(-1L, -1L); -// StartExperiment.endTimeMap.put(-1L, -1L); -// -// StartServer server = new StartServer(); -// server.start(); -// -// StartClient client = new StartClient(); -// client.connect(); -// -// Thread.sleep(1000); -// -// experimentStartTime = System.nanoTime(); -// -// StartSourceApplication source = new StartSourceApplication(server.server); -// new Thread((Runnable)source).start(); -// -// -// new Thread((Runnable)client).start(); -// -// new java.util.Timer().schedule(new java.util.TimerTask() { -// @Override -// public void run() { -// StartExperiment.experimentRunning = false; -// -// try { -// Thread.sleep(2000); -// } catch (InterruptedException ex) { -// Logger.getLogger(StartExperiment.class.getName()).log(Level.SEVERE, null, ex); -// } -// -// System.out.println("Packets Sent: " + StartSourceApplication.counter); -// System.out.println("Packets Received: " + StartExperiment.messagesReceived); -// System.out.println("Packet Loss: " + (StartSourceApplication.counter - StartExperiment.messagesReceived)); -// -// Long dur = 0L; -// -// if(StartExperiment.endTimeMap.containsKey(0L)) -// StartExperiment.endTimeMap.remove(0L); -// -// for(java.util.Map.Entry<Long, Long> e: StartExperiment.endTimeMap.entrySet()) { -// dur += (e.getValue() - StartExperiment.startTimeMap.get(e.getKey())); -// } -// -// System.out.println("Average time: " + dur.doubleValue()/StartExperiment.endTimeMap.size()); -// -// System.out.println("on duration: " + StartClient.onParameter.average()); -// System.out.println("off duration: " + StartClient.offParameter.average()); -// System.out.println("msgs: " + StartSourceApplication.waitDuration.average()); -// -// } -// }, Parameters.experimentDuration); -// -// -// -// } -// -//} \ No newline at end of file diff --git a/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playgrounds/str/websockets/StartServerWebSocket.java b/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playgrounds/str/websockets/StartServerWebSocket.java index ffafe8c7aa267f963b46f8e76266545118574610..a42b65699cbb651794451bdd60f4667b4ab10abd 100644 --- a/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playgrounds/str/websockets/StartServerWebSocket.java +++ b/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playgrounds/str/websockets/StartServerWebSocket.java @@ -1,41 +1,41 @@ package eu.chorevolution.vsb.playgrounds.str.websockets; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.net.InetSocketAddress; - import org.json.simple.JSONObject; - import eu.chorevolution.vsb.agent.MeasureAgent; import eu.chorevolution.vsb.monitor.util.MonitorConstant; import eu.chorevolution.vsb.playgrounds.str.websocketsImp.WsServer; import java.sql.Timestamp; -public class StartServerWebSocket{ - +public class StartServerWebSocket { + public WsServer server = null; private MeasureAgent agent = null; - + public StartServerWebSocket() { + // create a server listening on port 8090 server = new WsServer(new InetSocketAddress(9082)); - agent = new MeasureAgent("timestamp_1",System.currentTimeMillis(),MonitorConstant.M2,MonitorConstant.timestamp_1_port_listener); + agent = new MeasureAgent("timestamp_1", System.currentTimeMillis(), MonitorConstant.M2, + MonitorConstant.timestamp_1_port_listener); + } - - public void start(){ - + + public void start() { + server.start(); - + } - - public void send(String message){ - + + public void send(String message) { + server.send(message); String message_id = agent.getMessageID(message); - agent.fire(""+System.currentTimeMillis()+"-"+message_id); + agent.fire("" + System.currentTimeMillis() + "-" + message_id); + } - - + } diff --git a/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playgrounds/str/websockets/StartSourceApplication.java b/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playgrounds/str/websockets/StartSourceApplication.java deleted file mode 100644 index 9b36093ab9a2d2ba28118b153d822ad73636d58d..0000000000000000000000000000000000000000 --- a/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playgrounds/str/websockets/StartSourceApplication.java +++ /dev/null @@ -1,46 +0,0 @@ -package eu.chorevolution.vsb.playgrounds.str.websockets; -//package eu.chorevolution.vsb.playgrounds.str.websockets.test; -// -//import eu.chorevolution.vsb.playgrounds.str.websocketsImp.WsServer; -//import eu.chorevolution.vsb.playgrounds.str.websockets.test.utils.Exp; -//import eu.chorevolution.vsb.playgrounds.str.websockets.test.utils.Parameters; -// -//public class StartSourceApplication implements Runnable { -// -// private WsServer wsServer = null; -// public static Long counter = 0L; -// public static Exp waitDuration = new Exp(Parameters.msgPostParam); -// -// public StartSourceApplication(WsServer wsServer) { -// this.wsServer = wsServer; -// } -// -// void sendMsg() { -// String msg = "Msg " + counter; -// if(StartExperiment.DEBUG) { -// synchronized (StartExperiment.startTimeMap) { -// StartExperiment.startTimeMap.put(counter, System.nanoTime()); -// } -// } -// else { -// StartExperiment.startTimeMap.put(counter, System.nanoTime()); -// } -// wsServer.send(msg); -// counter++; -// } -// -// public void run() { -// -// while (StartExperiment.experimentRunning) { -// sendMsg(); -// try { -// Thread.sleep((long)waitDuration.next() * 1000); -//// Thread.sleep(1000); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// } -// -// } -// -//} diff --git a/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playgrounds/str/websockets/StartWebSocketConsummer.java b/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playgrounds/str/websockets/StartWebSocketConsummer.java new file mode 100644 index 0000000000000000000000000000000000000000..b44ba7ca336feeab0af68390c211af7df6e47628 --- /dev/null +++ b/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playgrounds/str/websockets/StartWebSocketConsummer.java @@ -0,0 +1,63 @@ +package eu.chorevolution.vsb.playgrounds.str.websockets; + +import java.net.URI; +import java.net.URISyntaxException; + +import eu.chorevolution.vsb.agent.MeasureAgent; +import eu.chorevolution.vsb.monitor.util.MonitorConstant; + +public class StartWebSocketConsummer extends Thread { + + WebSocketConsummer webSocketConsummer = null; + + private MeasureAgent agent = null; + + public StartWebSocketConsummer(WebSocketConsummer webSocketConsummer) { + + this.webSocketConsummer = webSocketConsummer; + this.agent = new MeasureAgent("timestamp_2", System.currentTimeMillis(), MonitorConstant.M3, + MonitorConstant.timestamp_2_port_listener); + } + + public void run() { + + webSocketConsummer.connect(); + while (true) { + + String msg = null; + + try { + msg = webSocketConsummer.msgQueue.take(); + + if (!msg.isEmpty()) { + + String message_id = msg.split("-")[1]; + String message = msg.split("-")[0]; + agent.fire(""+System.currentTimeMillis()+"-"+message_id); + System.out.println(msg); + } + + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + + } + + public static void main(String[] args) { + + URI uri = null; + try { + + uri = new URI("http://"+MonitorConstant.M2+":"+9082); + + } catch (URISyntaxException e){e.printStackTrace();} + WebSocketConsummer consummer = new WebSocketConsummer(uri); + StartWebSocketConsummer startconsummer = new StartWebSocketConsummer(consummer); + startconsummer.start(); + + } + +} diff --git a/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playgrounds/str/websockets/WebSocketConsummer.java b/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playgrounds/str/websockets/WebSocketConsummer.java new file mode 100644 index 0000000000000000000000000000000000000000..3527562eb3f27b1278077eb8b49e29ec8670a50e --- /dev/null +++ b/playgrounds/str/websockets-playground/src/main/java/eu/chorevolution/vsb/playgrounds/str/websockets/WebSocketConsummer.java @@ -0,0 +1,51 @@ +package eu.chorevolution.vsb.playgrounds.str.websockets; + +import java.net.URI; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.java_websocket.handshake.ServerHandshake; + +public class WebSocketConsummer extends org.java_websocket.client.WebSocketClient implements Runnable { + + + + public BlockingQueue<String> msgQueue; + + public WebSocketConsummer(URI serverURI) { + super(serverURI); + msgQueue = new LinkedBlockingQueue<String>(); + + } + + @Override + public void onOpen(ServerHandshake handshakedata) { + System.out.println("Client opens a stream"); + } + + @Override + public void onMessage(String message) { + + msgQueue.add(message); + message += " at " + System.currentTimeMillis(); + //System.out.println("Client receives : " + message); + + } + + @Override + public void onClose(int code, String reason, boolean remote) { + + System.err.println("stream closed"); + } + + @Override + public void onError(Exception ex) { + + System.err.println("an error occured " + ex.getStackTrace() + " " + ex.getMessage()); + } + + + +} + + diff --git a/protocol-pool/gm-rest/src/main/java/eu/chorevolution/vsb/gm/protocols/rest/BcRestSubcomponent.java b/protocol-pool/gm-rest/src/main/java/eu/chorevolution/vsb/gm/protocols/rest/BcRestSubcomponent.java index 1ed9a1e85c052864e52074f4119cc12c0365729f..eb2472f248afc1bd486381584d7a768f51e20346 100644 --- a/protocol-pool/gm-rest/src/main/java/eu/chorevolution/vsb/gm/protocols/rest/BcRestSubcomponent.java +++ b/protocol-pool/gm-rest/src/main/java/eu/chorevolution/vsb/gm/protocols/rest/BcRestSubcomponent.java @@ -168,7 +168,7 @@ public class BcRestSubcomponent extends BcGmSubcomponent { } } - String message = msgObj.toJSONString(); + String message = msgObj.toJSONString()+"-"+message_id; executor.execute(new Runnable() { @@ -296,7 +296,7 @@ public class BcRestSubcomponent extends BcGmSubcomponent { e1.printStackTrace(); } - // System.err.println("Server responded: ss"); +// System.err.println(receivedText); processing(receivedText); return new StringRepresentation(""); diff --git a/protocol-pool/gm-websocket/src/main/java/eu/chorevolution/vsb/websocket/BcWebsocketSubcomponent.java b/protocol-pool/gm-websocket/src/main/java/eu/chorevolution/vsb/websocket/BcWebsocketSubcomponent.java index bf9669f4aeecdae507c49bfda7bf37ecb2bef353..40b4fcbad39c553631cfa60f32f95fe02c97428f 100644 --- a/protocol-pool/gm-websocket/src/main/java/eu/chorevolution/vsb/websocket/BcWebsocketSubcomponent.java +++ b/protocol-pool/gm-websocket/src/main/java/eu/chorevolution/vsb/websocket/BcWebsocketSubcomponent.java @@ -34,7 +34,6 @@ public class BcWebsocketSubcomponent extends BcGmSubcomponent{ switch (this.bcConfiguration.getSubcomponentRole()) { case SERVER: - URI uri = null; try {