diff --git a/prediction_orchestrator/pom.xml b/prediction_orchestrator/pom.xml
index 7f92fed555f8d0acef5ff381297ab2eb32d694e6..63dd52cc468aa2ecedd540029cf64131998d2a89 100644
--- a/prediction_orchestrator/pom.xml
+++ b/prediction_orchestrator/pom.xml
@@ -45,7 +45,15 @@
2.4.1
compile
-
+
+ org.springframework.boot
+ spring-boot-starter-webflux
+
+
+ org.influxdb
+ influxdb-java
+ 2.15
+
org.springframework.boot
spring-boot-starter-test
@@ -93,6 +101,12 @@
junit
test
+
+ org.projectreactor
+ reactor-spring
+ 1.0.1.RELEASE
+
+
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/CommunicationService.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java
similarity index 83%
rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/CommunicationService.java
rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java
index 63c16f36e3609e3736df961d80558ee1c70e4de6..ccd792ace96a3baa09e8b9441537dc63f4cd0e82 100644
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/CommunicationService.java
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java
@@ -1,20 +1,19 @@
-package eu.morphemic.prediction_orchestrator.communication;
+package eu.morphemic.prediction_orchestrator.communication.activemq;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.melodic.event.brokerclient.BrokerClient;
import eu.melodic.event.brokerclient.BrokerPublisher;
-import eu.morphemic.prediction_orchestrator.communication.listeners.ActiveMQListener;
+import eu.morphemic.prediction_orchestrator.communication.influx.InfluxService;
import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PooledPredictionMessage;
+import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartEnsemblingMessage;
import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry;
import eu.morphemic.prediction_orchestrator.service.Coordinator;
-import eu.morphemic.prediction_orchestrator.communication.listeners.PredictionListener;
-import eu.morphemic.prediction_orchestrator.communication.listeners.ReceiveMetricsListener;
-import eu.morphemic.prediction_orchestrator.communication.listeners.RetrainedModelListener;
import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartForecastingMessage;
import eu.morphemic.prediction_orchestrator.model.Prediction;
import eu.morphemic.prediction_orchestrator.properties.Properties;
import eu.morphemic.prediction_orchestrator.service.MetricHandler;
+import eu.morphemic.prediction_orchestrator.communication.activemq.listeners.*;
import lombok.extern.slf4j.Slf4j;
@@ -25,14 +24,14 @@ import java.util.Objects;
@Slf4j
/** We need to have a different client for each topic as we want them to run each consumer(subscriber) in a different thread (session)
*/
-public class CommunicationService {
+public class ActiveMQService {
private Properties properties;
//topicName -> brokerClientSubscriber/publisher
private HashMap brokerClients = new HashMap<>();
private HashMap brokerPublishers = new HashMap<>();
- public CommunicationService(Properties properties) {
+ public ActiveMQService(Properties properties) {
this.properties = properties;
}
@@ -53,13 +52,16 @@ public class CommunicationService {
public synchronized void startReceivingPredictions(String metricName, String methodName,
PredictionRegistry predictionRegistry,
- MetricHandler metricHandler) {
+ MetricHandler metricHandler,
+ InfluxService influxService) {
log.info("Starting Receiving PredictionFromForecasterMessage from {} concerning metric {}", methodName, metricName);
String topicName = TopicFactory.getReceivePredictionTopic(metricName, methodName);
PredictionListener predictionListener = new PredictionListener(
topicName,
predictionRegistry,
- metricHandler);
+ metricHandler,
+ methodName,
+ influxService);
subscribeWithRetries(topicName, predictionListener);
}
@@ -81,6 +83,19 @@ public class CommunicationService {
).publish(result);
}
+ public synchronized void publishStartEnsembling(StartEnsemblingMessage startEnsemblingMessage) throws JMSException {
+ String topicName = TopicFactory.getPublishStartEnsembler();
+ String result = null;
+ try {
+ result = new ObjectMapper().writeValueAsString(startEnsemblingMessage);
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ brokerPublishers.computeIfAbsent(topicName, key ->
+ getNewPublisher(topicName)
+ ).publish(result);
+ }
+
public synchronized void publishPooledPrediction(Prediction prediction, String metricName) {
String topicName = TopicFactory.getPublishPooledPrediction(metricName);
String result = null;
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/TopicFactory.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/TopicFactory.java
similarity index 97%
rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/TopicFactory.java
rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/TopicFactory.java
index cb069f1372829b984cce69efdc2cb949e6f909cf..ece3684603c992cc291f8e9b85beac51c0a65d45 100644
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/TopicFactory.java
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/TopicFactory.java
@@ -1,4 +1,4 @@
-package eu.morphemic.prediction_orchestrator.communication;
+package eu.morphemic.prediction_orchestrator.communication.activemq;
import eu.melodic.event.brokerclient.templates.TopicNames;
@@ -48,6 +48,10 @@ class TopicFactory {
return TopicNames.forecasting_methods_to_prediction_orchestrator_training_topic;
}
+ static String getPublishStartEnsembler() {
+ return "start_ensembler";
+ }
+
static String getPublishStartForecasting(String methodName) {
switch(methodName) {
case "nbeats" : {
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ActiveMQListener.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ActiveMQListener.java
similarity index 95%
rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ActiveMQListener.java
rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ActiveMQListener.java
index ed407b4d69f332b557e731dea466bb7fbc5f2d2f..d60183053b7552c8dad88405e78cfd2a5717249d 100644
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ActiveMQListener.java
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ActiveMQListener.java
@@ -1,4 +1,4 @@
-package eu.morphemic.prediction_orchestrator.communication.listeners;
+package eu.morphemic.prediction_orchestrator.communication.activemq.listeners;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/PredictionListener.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java
similarity index 72%
rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/PredictionListener.java
rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java
index ffd1e90c906ad9cb3f6d410b2645de135c1db525..f55da951f100f8813fa41c4368cf296d1c80b021 100644
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/PredictionListener.java
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java
@@ -1,5 +1,6 @@
-package eu.morphemic.prediction_orchestrator.communication.listeners;
+package eu.morphemic.prediction_orchestrator.communication.activemq.listeners;
+import eu.morphemic.prediction_orchestrator.communication.influx.InfluxService;
import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.PredictionFromForecasterMessage;
import eu.morphemic.prediction_orchestrator.model.Prediction;
import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry;
@@ -14,12 +15,17 @@ public class PredictionListener extends ActiveMQListener {
private PredictionRegistry predictionRegistry;
private MetricHandler metricHandler;
- public PredictionListener(String topicName, PredictionRegistry predictionRegistry, MetricHandler metricHandler) {
+ private InfluxService influxService;
+ private String methodName;
+
+ public PredictionListener(String topicName, PredictionRegistry predictionRegistry, MetricHandler metricHandler,
+ String methodName, InfluxService influxService) {
super(topicName);
this.predictionRegistry = predictionRegistry;
this.metricHandler = metricHandler;
-
+ this.influxService = influxService;
+ this.methodName = methodName;
log.debug("PredictionListener.: topic={}", topicName);
}
@@ -29,9 +35,11 @@ public class PredictionListener extends ActiveMQListener {
log.debug("Listener of topic {}: Converting event payload to PredictionFromForecasterMessage instance...", topicName);
PredictionFromForecasterMessage predictionFromForecasterMessage = gson.fromJson(payload, PredictionFromForecasterMessage.class);
log.info("Listener of topic {}: PredictionFromForecasterMessage instance: {}", topicName, predictionFromForecasterMessage.toString());
- int valueResult = predictionRegistry.processPrediction(new Prediction(predictionFromForecasterMessage));
+ Prediction prediction = new Prediction(predictionFromForecasterMessage);
+ int valueResult = predictionRegistry.processPrediction(prediction);
if ((valueResult == PredictionRegistry.VALUE_ADDED) ||
(valueResult == PredictionRegistry.VALUE_UPDATED)) {
+ influxService.saveToInflux(prediction.getAsPoint(metricHandler.getMetricName(), methodName));
metricHandler.launchMethodsPoolingIfNecessary(predictionFromForecasterMessage.getPredictionTime());
}
} catch (JMSException e) {
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ReceiveMetricsListener.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ReceiveMetricsListener.java
similarity index 94%
rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ReceiveMetricsListener.java
rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ReceiveMetricsListener.java
index 6e5174e8ddaa3d0dd5d118b807d0aa6f5d33053d..2801d7c5d470835bbbe91997fe81309061e9496f 100644
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ReceiveMetricsListener.java
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ReceiveMetricsListener.java
@@ -1,4 +1,4 @@
-package eu.morphemic.prediction_orchestrator.communication.listeners;
+package eu.morphemic.prediction_orchestrator.communication.activemq.listeners;
import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.MetricNeedingPredictingMessage;
import eu.morphemic.prediction_orchestrator.service.Coordinator;
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/RetrainedModelListener.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/RetrainedModelListener.java
similarity index 93%
rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/RetrainedModelListener.java
rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/RetrainedModelListener.java
index f0587aec043604271de7bb759f1f92e056e37b30..13563a00c360ffb4cf80023a02ba5c1761ac2b47 100644
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/RetrainedModelListener.java
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/RetrainedModelListener.java
@@ -1,4 +1,4 @@
-package eu.morphemic.prediction_orchestrator.communication.listeners;
+package eu.morphemic.prediction_orchestrator.communication.activemq.listeners;
import eu.morphemic.prediction_orchestrator.service.Coordinator;
import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.RetrainedModelMessage;
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java
new file mode 100644
index 0000000000000000000000000000000000000000..c0f910ef7c53a005af04350fabd4a6a54673aa7b
--- /dev/null
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java
@@ -0,0 +1,50 @@
+package eu.morphemic.prediction_orchestrator.communication.ensembler;
+
+import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.PredictionsEnsembledMessage;
+import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PredictionsToEnsembleMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
+;
+import org.springframework.web.reactive.function.client.UnknownHttpStatusCodeException;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+
+@Slf4j
+public class EnsemblerService {
+
+ private String ensemblerUri;
+
+ private final WebClient client;
+
+ public EnsemblerService(String baseUrl, String ensemblerUri) {
+ //http client is only used for requests logging
+ HttpClient httpClient = HttpClient.create()
+ .wiretap(true);
+ this.ensemblerUri = ensemblerUri;
+ this.client = WebClient.builder().baseUrl(baseUrl)
+ .clientConnector(new ReactorClientHttpConnector(httpClient))
+ .build();
+ }
+
+ public PredictionsEnsembledMessage ensemble(PredictionsToEnsembleMessage predictionsToEnsembleMessage) {
+ try {
+ return client.post()
+ .uri(ensemblerUri)
+ .bodyValue(predictionsToEnsembleMessage)
+ .accept(MediaType.APPLICATION_JSON)
+ .retrieve()
+ .onStatus(HttpStatus::isError, response -> Mono.empty())
+ .bodyToMono(PredictionsEnsembledMessage.class)
+ .block();
+ } catch (UnknownHttpStatusCodeException e) {
+ log.error("Unknown HTTP status code received: {}", e.getRawStatusCode());
+ return null;
+ } catch (RuntimeException e) {
+ log.error(e.getMessage());
+ return null;
+ }
+ }
+}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java
new file mode 100644
index 0000000000000000000000000000000000000000..39a356401553809a768c70b23c6684993d7b74ff
--- /dev/null
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java
@@ -0,0 +1,48 @@
+package eu.morphemic.prediction_orchestrator.communication.influx;
+
+import eu.morphemic.prediction_orchestrator.properties.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.OkHttpClient;
+import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.Point;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.concurrent.TimeUnit;
+
+@Configuration
+@Getter
+@Slf4j
+public class InfluxService {
+ public static final int DEFAULT_BATCH_ACTIONS_LIMIT = 50;
+ public static final int DEFAULT_BATCH_INTERVAL_DURATION = 50;
+
+ private InfluxDB influxDB;
+
+ private String database;
+
+ public InfluxService(Properties properties) {
+ this.database = properties.getInfluxDatabase();
+ OkHttpClient.Builder okHttpbuilder = new OkHttpClient.Builder();
+ okHttpbuilder.readTimeout(10, TimeUnit.SECONDS);
+ okHttpbuilder.writeTimeout(10, TimeUnit.SECONDS);
+
+ String url = properties.getInfluxHostname() + ":" + properties.getInfluxPort();
+ log.info("Connecting to influx: {}", url);
+ influxDB = InfluxDBFactory.connect(url, okHttpbuilder);
+ log.info("Connected");
+ influxDB.enableBatch(DEFAULT_BATCH_ACTIONS_LIMIT, DEFAULT_BATCH_INTERVAL_DURATION,
+ TimeUnit.SECONDS, runnable -> {
+ Thread thread = new Thread(runnable);
+ thread.setDaemon(true);
+ return thread;
+ });
+ Runtime.getRuntime().addShutdownHook(new Thread(influxDB::close));
+ }
+
+ public synchronized void saveToInflux(Point point) {
+ influxDB.write(database,"",point);
+ }
+
+}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java
index 7622b53313f210032203964bd7efb01a4e36d7d5..34162b6529e817e3c7c0bce4c2f0d67a4bcf2d11 100644
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java
@@ -24,6 +24,6 @@ public class MetricNeedingPredictingMessage {
@JsonProperty(EventFields.TranslatorToForecastingMethodsFieldsPerMetric.publish_rate)
@NonNull
@Min(1)
- private String publish_rate;
+ private int publish_rate;
}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionFromForecasterMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionFromForecasterMessage.java
index 21bbe79d2a790cdf929dbc2a24fc53199004e606..0242f0df2a29e2a90441276743e639ec6ccb42c1 100644
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionFromForecasterMessage.java
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionFromForecasterMessage.java
@@ -15,7 +15,6 @@ import java.util.List;
@ToString
public class PredictionFromForecasterMessage {
@JsonProperty(EventFields.ForecastingMethodsToPredictionOrchestratorIntermediatePredictionsFields.metric_value)
- @NonNull
@Min(0)
private double metricValue;
@@ -24,12 +23,10 @@ public class PredictionFromForecasterMessage {
private int level;
@JsonProperty(EventFields.ForecastingMethodsToPredictionOrchestratorIntermediatePredictionsFields.timestamp)
- @NonNull
@Min(1)
private long timestamp;
@JsonProperty(EventFields.ForecastingMethodsToPredictionOrchestratorIntermediatePredictionsFields.probability)
- @NonNull
@Min(0)
@Max(1)
private double probability;
@@ -40,7 +37,6 @@ public class PredictionFromForecasterMessage {
private List confidence_interval;
@JsonProperty(EventFields.ForecastingMethodsToPredictionOrchestratorIntermediatePredictionsFields.prediction_time)
- @NonNull
@Min(1)
private long predictionTime;
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java
new file mode 100644
index 0000000000000000000000000000000000000000..12a668f54de4190ae3ec0ebec9ba0689eb132b4f
--- /dev/null
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java
@@ -0,0 +1,23 @@
+package eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.*;
+
+@AllArgsConstructor
+@NoArgsConstructor
+@Data
+public class PredictionsEnsembledMessage {
+
+ @JsonProperty("ensembledValue")
+ @NonNull
+ private double ensembledValue;
+
+ @JsonProperty("timestamp")
+ @NonNull
+ private long timestamp;
+
+ @JsonProperty("predictionTime")
+ @NonNull
+ private long predictionTime;
+
+}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/PredictionsToEnsembleMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/PredictionsToEnsembleMessage.java
new file mode 100644
index 0000000000000000000000000000000000000000..492391ca8d4e7641cba6062868f61766b5f09419
--- /dev/null
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/PredictionsToEnsembleMessage.java
@@ -0,0 +1,27 @@
+package eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.AllArgsConstructor;
+import lombok.NonNull;
+
+import java.util.Map;
+
+@AllArgsConstructor
+public class PredictionsToEnsembleMessage {
+
+ @JsonProperty("method")
+ @NonNull
+ private String ensemblingMethod;
+
+ @JsonProperty("metric")
+ @NonNull
+ private String metric;
+
+ @JsonProperty("predictionTime")
+ @NonNull
+ private Long predictionTime;
+
+ @JsonProperty("predictionsToEnsemble")
+ @NonNull
+ private Map predictionsByForecaster;
+}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartEnsemblingMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartEnsemblingMessage.java
new file mode 100644
index 0000000000000000000000000000000000000000..7c34964ee6a9de1f726762905fdc13a805b5d71b
--- /dev/null
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartEnsemblingMessage.java
@@ -0,0 +1,20 @@
+package eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.MetricNeedingPredictingMessage;
+import lombok.AllArgsConstructor;
+import lombok.NonNull;
+
+import java.util.List;
+
+@AllArgsConstructor
+public class StartEnsemblingMessage {
+
+ @JsonProperty("metrics")
+ @NonNull
+ private List metrics;
+
+ @JsonProperty("models")
+ @NonNull
+ private List methodNames;
+}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/model/Prediction.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/model/Prediction.java
index b3b8932925b0626c8cc9e18673b7ac2b1c72f876..ca69612eda52bfa13b96e3e0407b3c912e086010 100644
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/model/Prediction.java
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/model/Prediction.java
@@ -1,11 +1,12 @@
package eu.morphemic.prediction_orchestrator.model;
-
import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.PredictionFromForecasterMessage;
import lombok.AllArgsConstructor;
import lombok.Data;
+import org.influxdb.dto.Point;
import java.util.List;
+import java.util.concurrent.TimeUnit;
@Data
@AllArgsConstructor
@@ -45,4 +46,14 @@ public class Prediction {
this.cloud = prediction.getCloud();
this.provider = prediction.getProvider();
}
+
+ public Point getAsPoint(String metricName, String methodName) {
+ return Point.measurement(metricName + "Predictions")
+ .time(timestamp, TimeUnit.SECONDS)
+ .addField(methodName + "_value", metricValue)
+ .addField(methodName + "_probability", probability)
+ .addField(methodName + "_left_conf", confidence_interval.get(0))
+ .addField(methodName + "_right_conf", confidence_interval.get(1))
+ .build();
+ }
}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java
index c5bfd3aef0f631b01c73e04d1085b567540d57c6..c94ee5ce8be149d6036f0085ce92f39e44b617a4 100644
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java
@@ -1,6 +1,7 @@
package eu.morphemic.prediction_orchestrator.properties;
-import eu.morphemic.prediction_orchestrator.service.pooling_strategy.PoolingStrategyType;
+import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.EnsemblerType;
+import eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier.ForecastersNumberVerifierType;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@@ -29,11 +30,14 @@ public class Properties {
@Value("${jms.client.broker_properties_configuration_file_location}")
private String broker_properties_configuration_file_location;
- @Value("${pooling.poolingStrategy}")
- private PoolingStrategyType poolingStrategyType;
+ @Value("${ensembling.forecasterNumberVerifier.type}")
+ private ForecastersNumberVerifierType forecastersNumberVerifierType;
- @Value("${pooling.threshold}")
- private double poolingDataThreshold;
+ @Value("${ensembling.forecasterNumberVerifier.threshold}")
+ private double forecasterNumberThreshold;
+
+ @Value("${ensembling.ensemblerType}")
+ private EnsemblerType ensemblerType;
@Value("${startingMethodsList}")
private List startingMethodsList;
@@ -44,4 +48,25 @@ public class Properties {
@Value("${activemq.restartcount:20}")
private int activeMqRestartCount;
+ @Value("${ensembler.base-url}")
+ private String ensemblerBaseUrl;
+
+ @Value("${ensembler.uri}")
+ private String ensemblerUri;
+
+ @Value("${influx.hostname}")
+ private String influxHostname;
+
+ @Value("${influx.port}")
+ private String influxPort;
+
+ @Value("${influx.database}")
+ private String influxDatabase;
+
+ @Value("${influx.username}")
+ private String influxUsername;
+
+ @Value("${influx.password}")
+ private String influxPassword;
+
}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java
index 418d19b6aa9efd34424032d690ab9e6e10922582..0aed9101a122738cd88ddbbe13194783c4eb8467 100644
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java
@@ -1,7 +1,9 @@
package eu.morphemic.prediction_orchestrator.service;
-import eu.morphemic.prediction_orchestrator.communication.CommunicationService;
+import eu.morphemic.prediction_orchestrator.communication.activemq.ActiveMQService;
+import eu.morphemic.prediction_orchestrator.communication.influx.InfluxService;
import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.MetricNeedingPredictingMessage;
+import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartEnsemblingMessage;
import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartForecastingMessage;
import eu.morphemic.prediction_orchestrator.log_utils.PredictionTimeFormatter;
import eu.morphemic.prediction_orchestrator.model.Prediction;
@@ -16,6 +18,7 @@ import javax.annotation.PostConstruct;
import javax.jms.JMSException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
@Slf4j
@Component
@@ -27,14 +30,16 @@ public class Coordinator {
@Autowired
private Properties properties;
- private CommunicationService communicationService;
+ private ActiveMQService activeMQService;
+ private InfluxService influxService;
@PostConstruct
public void start() throws JMSException {
log.info(properties.toString());
- this.communicationService = new CommunicationService(properties);
- communicationService.startReceivingMetricList(this);
- communicationService.startReceivingModelRetrained(this);
+ this.activeMQService = new ActiveMQService(properties);
+ activeMQService.startReceivingMetricList(this);
+ activeMQService.startReceivingModelRetrained(this);
+ this.influxService = new InfluxService(properties);
}
/** As we need to check multiple values at once and block them after publishing, we need to make sure that only one thread is using
@@ -61,13 +66,13 @@ public class Coordinator {
if (lastValueUpdate == PredictionRegistry.VALUE_ADDED) {
metricHandlers.forEach((metricName, metricHandler) -> {
Prediction prediction = metricHandler.getPooledPrediction(predictionTime);
- communicationService.publishPooledPrediction(prediction, metricName);
+ activeMQService.publishPooledPrediction(prediction, metricName);
});
} else {
//we send only updated value as at this point the entire vector has been send earlier
Prediction prediction = metricHandlers.get(callingMetricHandler)
.getPooledPrediction(predictionTime);
- communicationService.publishPooledPrediction(prediction, callingMetricHandler);
+ activeMQService.publishPooledPrediction(prediction, callingMetricHandler);
}
}
}
@@ -111,7 +116,8 @@ public class Coordinator {
this,
forecastingConfiguration,
properties,
- communicationService
+ activeMQService,
+ influxService
);
for (String methodName : properties.getStartingMethodsList()) {
newMetricHandler.addMethodHandler(methodName, forecastingConfiguration);
@@ -123,17 +129,24 @@ public class Coordinator {
//We add new methods if necessary and inform forecasters
for(Map.Entry> entry : metricsByMethod.entrySet()) {
- List methodsPredictingTheMetric = entry.getValue();
+ List metricsPredictedByMethod = entry.getValue();
StartForecastingMessage startForecastingMessage = new StartForecastingMessage(
- methodsPredictingTheMetric,
+ metricsPredictedByMethod,
System.currentTimeMillis() / 1000,
epochStartingForecast,
forwardPredictionNumber,
predictionHorizon
);
- communicationService.publishStartForecasting(startForecastingMessage, entry.getKey());
+ activeMQService.publishStartForecasting(startForecastingMessage, entry.getKey());
}
+ //Inform the Ensembler
+ StartEnsemblingMessage startEnsemblingMessage = new StartEnsemblingMessage(
+ metricNeedingPredictingMessageList,
+ properties.getStartingMethodsList()
+ );
+ activeMQService.publishStartEnsembling(startEnsemblingMessage);
+
//ToDo Stop metrics not included in new configuration file
}
}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java
index cb4703d713552106bf11082871c2df72e2b40ad3..4a9e42376c6c4dcdc062b18aec20870918022f38 100644
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java
@@ -1,12 +1,13 @@
package eu.morphemic.prediction_orchestrator.service;
-import eu.morphemic.prediction_orchestrator.communication.CommunicationService;
+import eu.morphemic.prediction_orchestrator.communication.activemq.ActiveMQService;
+import eu.morphemic.prediction_orchestrator.communication.influx.InfluxService;
import eu.morphemic.prediction_orchestrator.log_utils.PredictionTimeFormatter;
import eu.morphemic.prediction_orchestrator.properties.Properties;
import eu.morphemic.prediction_orchestrator.model.Prediction;
import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry;
-import eu.morphemic.prediction_orchestrator.service.pooling_strategy.PoolingStrategy;
-import eu.morphemic.prediction_orchestrator.service.pooling_strategy.PoolingStrategyFactory;
+import eu.morphemic.prediction_orchestrator.service.ensembling.EnsemblingMechanism;
+import eu.morphemic.prediction_orchestrator.service.ensembling.EnsemblingMechanismFactory;
import eu.morphemic.prediction_orchestrator.properties.ForecastingConfiguration;
import lombok.Getter;
import lombok.NoArgsConstructor;
@@ -16,6 +17,7 @@ import javax.jms.JMSException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
@@ -31,34 +33,37 @@ public class MetricHandler {
private PredictionRegistry pooledPredictionsRegistry;
- private PoolingStrategy poolingStrategy;
- private CommunicationService communicationService;
+ private EnsemblingMechanism ensemblingMechanism;
+ private ActiveMQService activeMQService;
+ private InfluxService influxService;
public MetricHandler(String metricName, Coordinator coordinator, ForecastingConfiguration forecastingConfiguration,
- Properties properties, CommunicationService communicationService) {
+ Properties properties, ActiveMQService activeMQService, InfluxService influxService) {
this.metricName = metricName;
- this.poolingStrategy = PoolingStrategyFactory.getPoolingStrategy(properties);
+ this.ensemblingMechanism = EnsemblingMechanismFactory.getEnsemblingMechanism(properties);
this.coordinator = coordinator;
this.pooledPredictionsRegistry = new PredictionRegistry(
forecastingConfiguration,
PredictionRegistry.getPooledPredictionsRegistryName(metricName)
);
- this.communicationService = communicationService;
+ this.activeMQService = activeMQService;
+ this.influxService = influxService;
}
public void launchMethodsPoolingIfNecessary(long predictionTime) throws JMSException {
- List predictions = methodHandlers.values().stream()
- .map(MethodHandler::getPredictionRegistry)
- .map(registry -> registry.getCopyPrediction(predictionTime))
- .collect(Collectors.toList());
+ Map predictionsByMethod = methodHandlers.entrySet().stream()
+ .collect(HashMap::new,
+ (m, e) -> m.put(e.getKey(), e.getValue().getPredictionRegistry().getCopyPrediction(predictionTime)),
+ HashMap::putAll);
- Prediction newPooledPrediction = poolingStrategy.poolPredictions(predictions);
- if (newPooledPrediction != PoolingStrategy.POOLED_PREDICTION_NOT_CREATED) {
+ Prediction newPooledPrediction = ensemblingMechanism.poolPredictions(predictionsByMethod, metricName);
+ if (newPooledPrediction != EnsemblingMechanism.POOLED_PREDICTION_NOT_CREATED) {
log.info("Pooling function has created pooled prediction for metric {} and predictionTime {}",
metricName, PredictionTimeFormatter.rawDateFormat(predictionTime));
int valueResult = pooledPredictionsRegistry.processPrediction(newPooledPrediction);
if ((valueResult == PredictionRegistry.VALUE_UPDATED) ||
(valueResult == PredictionRegistry.VALUE_ADDED)) {
+ influxService.saveToInflux(newPooledPrediction.getAsPoint(metricName, "ensembled"));
coordinator.publishIfPooledValuesProvidedForAllMetrics(predictionTime, valueResult, metricName);
}
} else {
@@ -85,12 +90,13 @@ public class MetricHandler {
);
MethodHandler methodHandler = new MethodHandler(null,null, predictionRegistry);
this.methodHandlers.put(methodName, methodHandler);
- this.communicationService.startReceivingPredictions(metricName, methodName, predictionRegistry, this);
+ this.activeMQService.startReceivingPredictions(metricName, methodName, predictionRegistry,
+ this, influxService);
}
public void removeMethodHandler(String methodName) throws JMSException {
this.methodHandlers.remove(methodName);
- this.communicationService.stopReceivingPredictions(metricName, methodName);
+ this.activeMQService.stopReceivingPredictions(metricName, methodName);
}
List getConnectedMethods() {
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java
new file mode 100644
index 0000000000000000000000000000000000000000..d68ba512da63310c44cfe01a35fca6aa2ac88c66
--- /dev/null
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java
@@ -0,0 +1,71 @@
+package eu.morphemic.prediction_orchestrator.service.ensembling;
+
+import eu.morphemic.prediction_orchestrator.model.Prediction;
+import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.Ensembler;
+import eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier.ForecastersNumberVerifier;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+@Slf4j
+@AllArgsConstructor
+public class EnsemblingMechanism {
+ public static Prediction POOLED_PREDICTION_NOT_CREATED = null;
+
+ private Ensembler ensembler;
+ private ForecastersNumberVerifier forecastersNumberVerifier;
+
+ public Prediction poolPredictions(Map predictionsByMethod, String metricName) {
+ int expectedForecastersDataCount = predictionsByMethod.size();
+ Map nonNullPredictions = predictionsByMethod.entrySet()
+ .stream()
+ .filter(e -> Objects.nonNull(e.getValue()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ int numberOfValidData = nonNullPredictions.size();
+ if (notEnoughDataToCreatePooledValue(numberOfValidData, expectedForecastersDataCount)) {
+ return POOLED_PREDICTION_NOT_CREATED;
+ }
+ Set confidence_interval_values = nonNullPredictions.values().stream()
+ .map(Prediction::getConfidence_interval)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
+ List confidence_interval = Arrays.asList(
+ confidence_interval_values.stream().min(Double::compareTo).get(),
+ confidence_interval_values.stream().max(Double::compareTo).get()
+ );
+
+ Prediction anyPrediction = nonNullPredictions.values().stream()
+ .findAny()
+ .get();
+ try {
+
+ return new Prediction(
+ ensembler.ensembleValues(predictionsByMethod, metricName),
+ System.currentTimeMillis() / 1000,
+ anyPrediction.getPredictionTime(),
+ nonNullPredictions.values().stream()
+ .mapToDouble(Prediction::getProbability)
+ .average()
+ .orElse(Double.NaN),
+ confidence_interval,
+ anyPrediction.getLevel(),
+ anyPrediction.getRefersTo(),
+ anyPrediction.getCloud(),
+ anyPrediction.getProvider()
+ );
+ } catch (Exception e) {
+ return POOLED_PREDICTION_NOT_CREATED;
+ }
+ }
+
+ private boolean notEnoughDataToCreatePooledValue(int numberOfValidData, int expectedForecastersDataCount) {
+ //We should always pool if we have only one forecaster attached
+ if ((numberOfValidData == 1) && (expectedForecastersDataCount == 1)) {
+ return false;
+ } else {
+ return !forecastersNumberVerifier.isDataSufficient(numberOfValidData, expectedForecastersDataCount);
+ }
+ }
+}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanismFactory.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanismFactory.java
new file mode 100644
index 0000000000000000000000000000000000000000..038484ca738d75ad60a7ca146c0bf3a64ebfcf65
--- /dev/null
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanismFactory.java
@@ -0,0 +1,55 @@
+package eu.morphemic.prediction_orchestrator.service.ensembling;
+
+import eu.morphemic.prediction_orchestrator.communication.ensembler.EnsemblerService;
+import eu.morphemic.prediction_orchestrator.properties.Properties;
+import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.AverageValuesEnsembler;
+import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.Ensembler;
+import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.OuterEnsembler;
+import eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier.ForecastersNumberVerifier;
+import eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier.PercentageForecastersNumberVerifier;
+import eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier.StaticForecastersNumberVerifier;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class EnsemblingMechanismFactory {
+
+ public static EnsemblingMechanism getEnsemblingMechanism(Properties properties) {
+ Ensembler ensembler;
+ ForecastersNumberVerifier forecastersNumberVerifier;
+
+ switch (properties.getForecastersNumberVerifierType()) {
+ case STATIC_FORECASTERS_COUNT_NEEDED: {
+ forecastersNumberVerifier =
+ new StaticForecastersNumberVerifier(properties.getForecasterNumberThreshold());
+ break;
+ }
+ case PERCENTAGE_FORECASTERS_COUNT_NEEDED: {
+ forecastersNumberVerifier =
+ new PercentageForecastersNumberVerifier(properties.getForecasterNumberThreshold());
+ break;
+ }
+ default: {
+ throw new IllegalArgumentException("Pooling strategy not present in the system");
+ }
+ }
+
+ switch (properties.getEnsemblerType()) {
+ case AVERAGE: {
+ ensembler = new AverageValuesEnsembler();
+ break;
+ }
+ case OUTER: {
+ ensembler = new OuterEnsembler(new EnsemblerService(
+ properties.getEnsemblerBaseUrl(),
+ properties.getEnsemblerUri()
+ ));
+ break;
+ }
+ default: {
+ throw new IllegalArgumentException("Pooling strategy not present in the system");
+ }
+ }
+
+ return new EnsemblingMechanism(ensembler, forecastersNumberVerifier);
+ }
+}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/AverageValuesEnsembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/AverageValuesEnsembler.java
new file mode 100644
index 0000000000000000000000000000000000000000..e5bc5e7c9ee42c58743b931418a82949e1e3276f
--- /dev/null
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/AverageValuesEnsembler.java
@@ -0,0 +1,19 @@
+package eu.morphemic.prediction_orchestrator.service.ensembling.ensembler;
+
+import eu.morphemic.prediction_orchestrator.model.Prediction;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class AverageValuesEnsembler extends Ensembler {
+
+
+ @Override
+ public double ensembleValues(Map predictionsByMethod, String metricName) {
+ return predictionsByMethod.values().stream()
+ .filter(Objects::nonNull)
+ .mapToDouble(Prediction::getMetricValue)
+ .average()
+ .orElse(Double.NaN);
+ }
+}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/Ensembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/Ensembler.java
new file mode 100644
index 0000000000000000000000000000000000000000..863d8b6e5e02d44b9b64ddb249d1593cd0eb6edc
--- /dev/null
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/Ensembler.java
@@ -0,0 +1,11 @@
+package eu.morphemic.prediction_orchestrator.service.ensembling.ensembler;
+
+import eu.morphemic.prediction_orchestrator.model.Prediction;
+
+import java.util.Map;
+
+public abstract class Ensembler {
+
+ public abstract double ensembleValues(Map predictionsByMethod, String metricName);
+
+}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/EnsemblerType.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/EnsemblerType.java
new file mode 100644
index 0000000000000000000000000000000000000000..62bb5548010406ab24b06128de58db2e1b067767
--- /dev/null
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/EnsemblerType.java
@@ -0,0 +1,13 @@
+package eu.morphemic.prediction_orchestrator.service.ensembling.ensembler;
+
+public enum EnsemblerType {
+ AVERAGE("average"),
+
+ OUTER("outer");
+
+ private String type;
+
+ EnsemblerType(String type) {
+ this.type = type;
+ }
+}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java
new file mode 100644
index 0000000000000000000000000000000000000000..9dfd005128112e6125533ff185a20ec187c0e9a0
--- /dev/null
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java
@@ -0,0 +1,52 @@
+package eu.morphemic.prediction_orchestrator.service.ensembling.ensembler;
+
+import eu.morphemic.prediction_orchestrator.communication.ensembler.EnsemblerService;
+import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.PredictionsEnsembledMessage;
+import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PredictionsToEnsembleMessage;
+import eu.morphemic.prediction_orchestrator.model.Prediction;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@AllArgsConstructor
+@Slf4j
+public class OuterEnsembler extends Ensembler {
+
+ private EnsemblerService ensemblerService;
+
+ @Override
+ public double ensembleValues(Map predictionsByMethod, String metricName) {
+ long predictionTime = predictionsByMethod.values().stream()
+ .findFirst()
+ .get()
+ .getPredictionTime();
+ PredictionsToEnsembleMessage predictionsToEnsembleMessage = new PredictionsToEnsembleMessage(
+ "BestSubset",
+ metricName,
+ predictionTime,
+ predictionsByMethod.entrySet().stream()
+ .collect(HashMap::new,
+ (m, v) -> m.put(v.getKey(), v.getValue() == null ? null : v.getValue().getMetricValue()),
+ HashMap::putAll)
+ );
+ PredictionsEnsembledMessage ensembledMessage;
+ try {
+ ensembledMessage = ensemblerService.ensemble(predictionsToEnsembleMessage);
+ } catch (Exception e) {
+ log.info("Error when sending ensembling request: {}", e.getMessage());
+ throw new IllegalArgumentException("Error while sending message to the Ensember");
+ }
+
+ if (ensembledMessage == null) {
+ log.warn("Message from ensembler is null");
+ throw new IllegalArgumentException("Message from the Ensembler is null");
+ }
+ if (ensembledMessage.getPredictionTime() != predictionTime) {
+ log.warn("Message from the Ensembler has wrong predictionTime");
+ throw new IllegalArgumentException("Message from the Ensembler has wrong predictionTime");
+ }
+ return ensembledMessage.getEnsembledValue();
+ }
+}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifier.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifier.java
new file mode 100644
index 0000000000000000000000000000000000000000..285cf2a248899bec1a34a9b1c426f8e8c9aed058
--- /dev/null
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifier.java
@@ -0,0 +1,6 @@
+package eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier;
+
+public abstract class ForecastersNumberVerifier {
+
+ abstract public boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount);
+}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyType.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifierType.java
similarity index 53%
rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyType.java
rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifierType.java
index cfb2f8de98259f4ac26c3bf51e4b8ae335da9b44..7467a1e9a5dd325ff68bf0a0e3f90259cf6c703c 100644
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyType.java
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifierType.java
@@ -1,13 +1,13 @@
-package eu.morphemic.prediction_orchestrator.service.pooling_strategy;
+package eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier;
-public enum PoolingStrategyType {
+public enum ForecastersNumberVerifierType {
STATIC_FORECASTERS_COUNT_NEEDED("staticForecastersCountNeeded"),
PERCENTAGE_FORECASTERS_COUNT_NEEDED("percentageForecastersCountNeeded");
private String type;
- PoolingStrategyType(String type) {
+ ForecastersNumberVerifierType(String type) {
this.type = type;
}
}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/PercentageForecastersNumberVerifier.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/PercentageForecastersNumberVerifier.java
new file mode 100644
index 0000000000000000000000000000000000000000..928f563bf26e923a743e44f920bea089c4bdc84d
--- /dev/null
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/PercentageForecastersNumberVerifier.java
@@ -0,0 +1,15 @@
+package eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier;
+
+import lombok.AllArgsConstructor;
+
+@AllArgsConstructor
+public class PercentageForecastersNumberVerifier extends ForecastersNumberVerifier {
+
+ private final double percentageThreshold;
+
+ @Override
+ public boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount) {
+ return (((double) numberOfValidData) / ((double) expectedForecastersDataCount))
+ > percentageThreshold;
+ }
+}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/StaticForecastersNumberVerifier.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/StaticForecastersNumberVerifier.java
new file mode 100644
index 0000000000000000000000000000000000000000..54ccd8ae99dd7a1e6efd74b2616911d3a2ad84f4
--- /dev/null
+++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/StaticForecastersNumberVerifier.java
@@ -0,0 +1,14 @@
+package eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier;
+
+import lombok.AllArgsConstructor;
+
+@AllArgsConstructor
+public class StaticForecastersNumberVerifier extends ForecastersNumberVerifier {
+
+ private final double staticThreshold;
+
+ @Override
+ public boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount) {
+ return numberOfValidData >= staticThreshold;
+ }
+}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategy.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategy.java
deleted file mode 100644
index 24d451467d96e285fe4ad6f5be7b8fabad6861ce..0000000000000000000000000000000000000000
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategy.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package eu.morphemic.prediction_orchestrator.service.pooling_strategy;
-
-import eu.morphemic.prediction_orchestrator.model.Prediction;
-
-import java.util.List;
-
-public interface PoolingStrategy {
-
- Prediction POOLED_PREDICTION_NOT_CREATED = null;
-
- Prediction poolPredictions(List predictions);
-}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyFactory.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyFactory.java
deleted file mode 100644
index 01fc73b98ba03e5f472133b1c7d7444b21c61dca..0000000000000000000000000000000000000000
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyFactory.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package eu.morphemic.prediction_orchestrator.service.pooling_strategy;
-
-import eu.morphemic.prediction_orchestrator.properties.Properties;
-import eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl.PercentageForecastersCountNeededStrategy;
-import eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl.StaticForecastersCountNeededStrategy;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class PoolingStrategyFactory {
-
- public static PoolingStrategy getPoolingStrategy(Properties properties) {
- switch (properties.getPoolingStrategyType()) {
- case STATIC_FORECASTERS_COUNT_NEEDED:
- return new StaticForecastersCountNeededStrategy(properties.getPoolingDataThreshold());
- case PERCENTAGE_FORECASTERS_COUNT_NEEDED:
- return new PercentageForecastersCountNeededStrategy(properties.getPoolingDataThreshold());
- default:
- throw new IllegalArgumentException("Pooling strategy not present in the system");
- }
- }
-}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/PercentageForecastersCountNeededStrategy.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/PercentageForecastersCountNeededStrategy.java
deleted file mode 100644
index 2f030e9c2d15fcdc3f99692b964105c82292de97..0000000000000000000000000000000000000000
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/PercentageForecastersCountNeededStrategy.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl;
-
-import lombok.AllArgsConstructor;
-
-@AllArgsConstructor
-public class PercentageForecastersCountNeededStrategy extends ThresholdCountForecastersNeededStrategy {
-
- private final double percentageThreshold;
-
- @Override
- boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount) {
- return (((double) numberOfValidData) / ((double) expectedForecastersDataCount))
- > percentageThreshold;
- }
-}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/StaticForecastersCountNeededStrategy.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/StaticForecastersCountNeededStrategy.java
deleted file mode 100644
index 5763666d4e01148e74dd4f10246852652f18ecc2..0000000000000000000000000000000000000000
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/StaticForecastersCountNeededStrategy.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl;
-
-import lombok.AllArgsConstructor;
-
-@AllArgsConstructor
-public class StaticForecastersCountNeededStrategy extends ThresholdCountForecastersNeededStrategy{
-
- private final double staticThreshold;
-
- @Override
- boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount) {
- return numberOfValidData >= staticThreshold;
- }
-}
diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ThresholdCountForecastersNeededStrategy.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ThresholdCountForecastersNeededStrategy.java
deleted file mode 100644
index 45aef574c98624b33253d08769e4a18e4df553d0..0000000000000000000000000000000000000000
--- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ThresholdCountForecastersNeededStrategy.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl;
-
-import eu.morphemic.prediction_orchestrator.model.Prediction;
-import eu.morphemic.prediction_orchestrator.service.pooling_strategy.PoolingStrategy;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.*;
-import java.util.stream.Collectors;
-
-@Slf4j
-@AllArgsConstructor
-public abstract class ThresholdCountForecastersNeededStrategy implements PoolingStrategy {
-
- @Override
- public Prediction poolPredictions(List predictions) {
- int expectedForecastersDataCount = predictions.size();
- List validPredictions = predictions.stream().filter(Objects::nonNull).collect(Collectors.toList());
- int numberOfValidData = validPredictions.size();
- if (notEnoughDataToCreatePooledValue(numberOfValidData, expectedForecastersDataCount)) {
- return POOLED_PREDICTION_NOT_CREATED;
- }
- Set confidence_interval_values = validPredictions.stream()
- .map(Prediction::getConfidence_interval)
- .flatMap(Collection::stream)
- .collect(Collectors.toSet());
- List confidence_interval = Arrays.asList(
- confidence_interval_values.stream().min(Double::compareTo).get(),
- confidence_interval_values.stream().max(Double::compareTo).get()
- );
-
- return new Prediction(
- validPredictions.stream().mapToDouble(Prediction::getMetricValue).average().orElse(Double.NaN),
- System.currentTimeMillis() / 1000,
- validPredictions.get(0).getPredictionTime(),
- validPredictions.stream().mapToDouble(Prediction::getProbability).average().orElse(Double.NaN),
- confidence_interval,
- validPredictions.get(0).getLevel(),
- validPredictions.get(0).getRefersTo(),
- validPredictions.get(0).getCloud(),
- validPredictions.get(0).getProvider()
- );
- }
-
- private boolean notEnoughDataToCreatePooledValue(int numberOfValidData, int expectedForecastersDataCount) {
- //We should always pool if we have only one forecaster attached
- if ((numberOfValidData == 1) && (expectedForecastersDataCount == 1)) {
- return false;
- } else {
- return !isDataSufficient(numberOfValidData, expectedForecastersDataCount);
- }
- }
-
- abstract boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount);
-
-}
diff --git a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties
index 14676ebc12db0223f4185dda5b8ea1048fd48072..6e93c62b551c6278d15ac8d26f7c68a0230f3602 100644
--- a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties
+++ b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties
@@ -35,8 +35,9 @@ forecasting_configuration.starting_forecasting_delay=200
# With the first option threshold property is in percents
#With the second it is a static count
#For example with threshold threshold=2 at least 2 synchronised forecasters are needed to be able to merge predictions and send them forward
-pooling.poolingStrategy=STATIC_FORECASTERS_COUNT_NEEDED
-pooling.threshold=2
+ensembling.forecasterNumberVerifier.type=STATIC_FORECASTERS_COUNT_NEEDED
+ensembling.forecasterNumberVerifier.threshold=2
+ensembling.ensemblerType=OUTER
jms.client.broker_properties_configuration_file_location=${MELODIC_CONFIG_DIR}/eu.melodic.event.brokerclient.properties
@@ -49,3 +50,16 @@ logging.zone_id=Europe/Warsaw
#Details of restarting connection to amq upon failure
activemq.restartinterval=10000
activemq.restartcount=20
+
+ensembler.base-url=http://ensembler:8000
+ensembler.uri=/ensemble
+
+
+influx.hostname=http://ui-influxdb
+influx.port=8086
+influx.database=morphemic
+influx.username=morphemic
+influx.password=password
+
+#ensembler communication logging
+logging.level.reactor.netty.http.client=false
diff --git a/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/PublishReceiveTest.java b/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java
similarity index 99%
rename from prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/PublishReceiveTest.java
rename to prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java
index df388e5c7522ffd623cca25f4bbdd46b304bd2fa..185315e2443ae972288230d7452d52805c3eeefa 100644
--- a/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/PublishReceiveTest.java
+++ b/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java
@@ -1,4 +1,4 @@
-package eu.morphemic.prediction_orchestrator.communication;
+package eu.morphemic.prediction_orchestrator.communication.activemq;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;