diff --git a/prediction_orchestrator/pom.xml b/prediction_orchestrator/pom.xml index 7f92fed555f8d0acef5ff381297ab2eb32d694e6..63dd52cc468aa2ecedd540029cf64131998d2a89 100644 --- a/prediction_orchestrator/pom.xml +++ b/prediction_orchestrator/pom.xml @@ -45,7 +45,15 @@ 2.4.1 compile - + + org.springframework.boot + spring-boot-starter-webflux + + + org.influxdb + influxdb-java + 2.15 + org.springframework.boot spring-boot-starter-test @@ -93,6 +101,12 @@ junit test + + org.projectreactor + reactor-spring + 1.0.1.RELEASE + + diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/CommunicationService.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java similarity index 83% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/CommunicationService.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java index 63c16f36e3609e3736df961d80558ee1c70e4de6..ccd792ace96a3baa09e8b9441537dc63f4cd0e82 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/CommunicationService.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java @@ -1,20 +1,19 @@ -package eu.morphemic.prediction_orchestrator.communication; +package eu.morphemic.prediction_orchestrator.communication.activemq; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.melodic.event.brokerclient.BrokerClient; import eu.melodic.event.brokerclient.BrokerPublisher; -import eu.morphemic.prediction_orchestrator.communication.listeners.ActiveMQListener; +import eu.morphemic.prediction_orchestrator.communication.influx.InfluxService; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PooledPredictionMessage; +import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartEnsemblingMessage; import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry; import eu.morphemic.prediction_orchestrator.service.Coordinator; -import eu.morphemic.prediction_orchestrator.communication.listeners.PredictionListener; -import eu.morphemic.prediction_orchestrator.communication.listeners.ReceiveMetricsListener; -import eu.morphemic.prediction_orchestrator.communication.listeners.RetrainedModelListener; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartForecastingMessage; import eu.morphemic.prediction_orchestrator.model.Prediction; import eu.morphemic.prediction_orchestrator.properties.Properties; import eu.morphemic.prediction_orchestrator.service.MetricHandler; +import eu.morphemic.prediction_orchestrator.communication.activemq.listeners.*; import lombok.extern.slf4j.Slf4j; @@ -25,14 +24,14 @@ import java.util.Objects; @Slf4j /** We need to have a different client for each topic as we want them to run each consumer(subscriber) in a different thread (session) */ -public class CommunicationService { +public class ActiveMQService { private Properties properties; //topicName -> brokerClientSubscriber/publisher private HashMap brokerClients = new HashMap<>(); private HashMap brokerPublishers = new HashMap<>(); - public CommunicationService(Properties properties) { + public ActiveMQService(Properties properties) { this.properties = properties; } @@ -53,13 +52,16 @@ public class CommunicationService { public synchronized void startReceivingPredictions(String metricName, String methodName, PredictionRegistry predictionRegistry, - MetricHandler metricHandler) { + MetricHandler metricHandler, + InfluxService influxService) { log.info("Starting Receiving PredictionFromForecasterMessage from {} concerning metric {}", methodName, metricName); String topicName = TopicFactory.getReceivePredictionTopic(metricName, methodName); PredictionListener predictionListener = new PredictionListener( topicName, predictionRegistry, - metricHandler); + metricHandler, + methodName, + influxService); subscribeWithRetries(topicName, predictionListener); } @@ -81,6 +83,19 @@ public class CommunicationService { ).publish(result); } + public synchronized void publishStartEnsembling(StartEnsemblingMessage startEnsemblingMessage) throws JMSException { + String topicName = TopicFactory.getPublishStartEnsembler(); + String result = null; + try { + result = new ObjectMapper().writeValueAsString(startEnsemblingMessage); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + brokerPublishers.computeIfAbsent(topicName, key -> + getNewPublisher(topicName) + ).publish(result); + } + public synchronized void publishPooledPrediction(Prediction prediction, String metricName) { String topicName = TopicFactory.getPublishPooledPrediction(metricName); String result = null; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/TopicFactory.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/TopicFactory.java similarity index 97% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/TopicFactory.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/TopicFactory.java index cb069f1372829b984cce69efdc2cb949e6f909cf..ece3684603c992cc291f8e9b85beac51c0a65d45 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/TopicFactory.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/TopicFactory.java @@ -1,4 +1,4 @@ -package eu.morphemic.prediction_orchestrator.communication; +package eu.morphemic.prediction_orchestrator.communication.activemq; import eu.melodic.event.brokerclient.templates.TopicNames; @@ -48,6 +48,10 @@ class TopicFactory { return TopicNames.forecasting_methods_to_prediction_orchestrator_training_topic; } + static String getPublishStartEnsembler() { + return "start_ensembler"; + } + static String getPublishStartForecasting(String methodName) { switch(methodName) { case "nbeats" : { diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ActiveMQListener.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ActiveMQListener.java similarity index 95% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ActiveMQListener.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ActiveMQListener.java index ed407b4d69f332b557e731dea466bb7fbc5f2d2f..d60183053b7552c8dad88405e78cfd2a5717249d 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ActiveMQListener.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ActiveMQListener.java @@ -1,4 +1,4 @@ -package eu.morphemic.prediction_orchestrator.communication.listeners; +package eu.morphemic.prediction_orchestrator.communication.activemq.listeners; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/PredictionListener.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java similarity index 72% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/PredictionListener.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java index ffd1e90c906ad9cb3f6d410b2645de135c1db525..f55da951f100f8813fa41c4368cf296d1c80b021 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/PredictionListener.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java @@ -1,5 +1,6 @@ -package eu.morphemic.prediction_orchestrator.communication.listeners; +package eu.morphemic.prediction_orchestrator.communication.activemq.listeners; +import eu.morphemic.prediction_orchestrator.communication.influx.InfluxService; import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.PredictionFromForecasterMessage; import eu.morphemic.prediction_orchestrator.model.Prediction; import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry; @@ -14,12 +15,17 @@ public class PredictionListener extends ActiveMQListener { private PredictionRegistry predictionRegistry; private MetricHandler metricHandler; - public PredictionListener(String topicName, PredictionRegistry predictionRegistry, MetricHandler metricHandler) { + private InfluxService influxService; + private String methodName; + + public PredictionListener(String topicName, PredictionRegistry predictionRegistry, MetricHandler metricHandler, + String methodName, InfluxService influxService) { super(topicName); this.predictionRegistry = predictionRegistry; this.metricHandler = metricHandler; - + this.influxService = influxService; + this.methodName = methodName; log.debug("PredictionListener.: topic={}", topicName); } @@ -29,9 +35,11 @@ public class PredictionListener extends ActiveMQListener { log.debug("Listener of topic {}: Converting event payload to PredictionFromForecasterMessage instance...", topicName); PredictionFromForecasterMessage predictionFromForecasterMessage = gson.fromJson(payload, PredictionFromForecasterMessage.class); log.info("Listener of topic {}: PredictionFromForecasterMessage instance: {}", topicName, predictionFromForecasterMessage.toString()); - int valueResult = predictionRegistry.processPrediction(new Prediction(predictionFromForecasterMessage)); + Prediction prediction = new Prediction(predictionFromForecasterMessage); + int valueResult = predictionRegistry.processPrediction(prediction); if ((valueResult == PredictionRegistry.VALUE_ADDED) || (valueResult == PredictionRegistry.VALUE_UPDATED)) { + influxService.saveToInflux(prediction.getAsPoint(metricHandler.getMetricName(), methodName)); metricHandler.launchMethodsPoolingIfNecessary(predictionFromForecasterMessage.getPredictionTime()); } } catch (JMSException e) { diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ReceiveMetricsListener.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ReceiveMetricsListener.java similarity index 94% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ReceiveMetricsListener.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ReceiveMetricsListener.java index 6e5174e8ddaa3d0dd5d118b807d0aa6f5d33053d..2801d7c5d470835bbbe91997fe81309061e9496f 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ReceiveMetricsListener.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ReceiveMetricsListener.java @@ -1,4 +1,4 @@ -package eu.morphemic.prediction_orchestrator.communication.listeners; +package eu.morphemic.prediction_orchestrator.communication.activemq.listeners; import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.MetricNeedingPredictingMessage; import eu.morphemic.prediction_orchestrator.service.Coordinator; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/RetrainedModelListener.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/RetrainedModelListener.java similarity index 93% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/RetrainedModelListener.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/RetrainedModelListener.java index f0587aec043604271de7bb759f1f92e056e37b30..13563a00c360ffb4cf80023a02ba5c1761ac2b47 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/RetrainedModelListener.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/RetrainedModelListener.java @@ -1,4 +1,4 @@ -package eu.morphemic.prediction_orchestrator.communication.listeners; +package eu.morphemic.prediction_orchestrator.communication.activemq.listeners; import eu.morphemic.prediction_orchestrator.service.Coordinator; import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.RetrainedModelMessage; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java new file mode 100644 index 0000000000000000000000000000000000000000..c0f910ef7c53a005af04350fabd4a6a54673aa7b --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java @@ -0,0 +1,50 @@ +package eu.morphemic.prediction_orchestrator.communication.ensembler; + +import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.PredictionsEnsembledMessage; +import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PredictionsToEnsembleMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +; +import org.springframework.web.reactive.function.client.UnknownHttpStatusCodeException; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; + +@Slf4j +public class EnsemblerService { + + private String ensemblerUri; + + private final WebClient client; + + public EnsemblerService(String baseUrl, String ensemblerUri) { + //http client is only used for requests logging + HttpClient httpClient = HttpClient.create() + .wiretap(true); + this.ensemblerUri = ensemblerUri; + this.client = WebClient.builder().baseUrl(baseUrl) + .clientConnector(new ReactorClientHttpConnector(httpClient)) + .build(); + } + + public PredictionsEnsembledMessage ensemble(PredictionsToEnsembleMessage predictionsToEnsembleMessage) { + try { + return client.post() + .uri(ensemblerUri) + .bodyValue(predictionsToEnsembleMessage) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .onStatus(HttpStatus::isError, response -> Mono.empty()) + .bodyToMono(PredictionsEnsembledMessage.class) + .block(); + } catch (UnknownHttpStatusCodeException e) { + log.error("Unknown HTTP status code received: {}", e.getRawStatusCode()); + return null; + } catch (RuntimeException e) { + log.error(e.getMessage()); + return null; + } + } +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java new file mode 100644 index 0000000000000000000000000000000000000000..39a356401553809a768c70b23c6684993d7b74ff --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java @@ -0,0 +1,48 @@ +package eu.morphemic.prediction_orchestrator.communication.influx; + +import eu.morphemic.prediction_orchestrator.properties.Properties; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import okhttp3.OkHttpClient; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Point; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.TimeUnit; + +@Configuration +@Getter +@Slf4j +public class InfluxService { + public static final int DEFAULT_BATCH_ACTIONS_LIMIT = 50; + public static final int DEFAULT_BATCH_INTERVAL_DURATION = 50; + + private InfluxDB influxDB; + + private String database; + + public InfluxService(Properties properties) { + this.database = properties.getInfluxDatabase(); + OkHttpClient.Builder okHttpbuilder = new OkHttpClient.Builder(); + okHttpbuilder.readTimeout(10, TimeUnit.SECONDS); + okHttpbuilder.writeTimeout(10, TimeUnit.SECONDS); + + String url = properties.getInfluxHostname() + ":" + properties.getInfluxPort(); + log.info("Connecting to influx: {}", url); + influxDB = InfluxDBFactory.connect(url, okHttpbuilder); + log.info("Connected"); + influxDB.enableBatch(DEFAULT_BATCH_ACTIONS_LIMIT, DEFAULT_BATCH_INTERVAL_DURATION, + TimeUnit.SECONDS, runnable -> { + Thread thread = new Thread(runnable); + thread.setDaemon(true); + return thread; + }); + Runtime.getRuntime().addShutdownHook(new Thread(influxDB::close)); + } + + public synchronized void saveToInflux(Point point) { + influxDB.write(database,"",point); + } + +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java index 7622b53313f210032203964bd7efb01a4e36d7d5..34162b6529e817e3c7c0bce4c2f0d67a4bcf2d11 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java @@ -24,6 +24,6 @@ public class MetricNeedingPredictingMessage { @JsonProperty(EventFields.TranslatorToForecastingMethodsFieldsPerMetric.publish_rate) @NonNull @Min(1) - private String publish_rate; + private int publish_rate; } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionFromForecasterMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionFromForecasterMessage.java index 21bbe79d2a790cdf929dbc2a24fc53199004e606..0242f0df2a29e2a90441276743e639ec6ccb42c1 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionFromForecasterMessage.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionFromForecasterMessage.java @@ -15,7 +15,6 @@ import java.util.List; @ToString public class PredictionFromForecasterMessage { @JsonProperty(EventFields.ForecastingMethodsToPredictionOrchestratorIntermediatePredictionsFields.metric_value) - @NonNull @Min(0) private double metricValue; @@ -24,12 +23,10 @@ public class PredictionFromForecasterMessage { private int level; @JsonProperty(EventFields.ForecastingMethodsToPredictionOrchestratorIntermediatePredictionsFields.timestamp) - @NonNull @Min(1) private long timestamp; @JsonProperty(EventFields.ForecastingMethodsToPredictionOrchestratorIntermediatePredictionsFields.probability) - @NonNull @Min(0) @Max(1) private double probability; @@ -40,7 +37,6 @@ public class PredictionFromForecasterMessage { private List confidence_interval; @JsonProperty(EventFields.ForecastingMethodsToPredictionOrchestratorIntermediatePredictionsFields.prediction_time) - @NonNull @Min(1) private long predictionTime; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..12a668f54de4190ae3ec0ebec9ba0689eb132b4f --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java @@ -0,0 +1,23 @@ +package eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.*; + +@AllArgsConstructor +@NoArgsConstructor +@Data +public class PredictionsEnsembledMessage { + + @JsonProperty("ensembledValue") + @NonNull + private double ensembledValue; + + @JsonProperty("timestamp") + @NonNull + private long timestamp; + + @JsonProperty("predictionTime") + @NonNull + private long predictionTime; + +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/PredictionsToEnsembleMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/PredictionsToEnsembleMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..492391ca8d4e7641cba6062868f61766b5f09419 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/PredictionsToEnsembleMessage.java @@ -0,0 +1,27 @@ +package eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.NonNull; + +import java.util.Map; + +@AllArgsConstructor +public class PredictionsToEnsembleMessage { + + @JsonProperty("method") + @NonNull + private String ensemblingMethod; + + @JsonProperty("metric") + @NonNull + private String metric; + + @JsonProperty("predictionTime") + @NonNull + private Long predictionTime; + + @JsonProperty("predictionsToEnsemble") + @NonNull + private Map predictionsByForecaster; +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartEnsemblingMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartEnsemblingMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..7c34964ee6a9de1f726762905fdc13a805b5d71b --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartEnsemblingMessage.java @@ -0,0 +1,20 @@ +package eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages; + +import com.fasterxml.jackson.annotation.JsonProperty; +import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.MetricNeedingPredictingMessage; +import lombok.AllArgsConstructor; +import lombok.NonNull; + +import java.util.List; + +@AllArgsConstructor +public class StartEnsemblingMessage { + + @JsonProperty("metrics") + @NonNull + private List metrics; + + @JsonProperty("models") + @NonNull + private List methodNames; +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/model/Prediction.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/model/Prediction.java index b3b8932925b0626c8cc9e18673b7ac2b1c72f876..ca69612eda52bfa13b96e3e0407b3c912e086010 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/model/Prediction.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/model/Prediction.java @@ -1,11 +1,12 @@ package eu.morphemic.prediction_orchestrator.model; - import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.PredictionFromForecasterMessage; import lombok.AllArgsConstructor; import lombok.Data; +import org.influxdb.dto.Point; import java.util.List; +import java.util.concurrent.TimeUnit; @Data @AllArgsConstructor @@ -45,4 +46,14 @@ public class Prediction { this.cloud = prediction.getCloud(); this.provider = prediction.getProvider(); } + + public Point getAsPoint(String metricName, String methodName) { + return Point.measurement(metricName + "Predictions") + .time(timestamp, TimeUnit.SECONDS) + .addField(methodName + "_value", metricValue) + .addField(methodName + "_probability", probability) + .addField(methodName + "_left_conf", confidence_interval.get(0)) + .addField(methodName + "_right_conf", confidence_interval.get(1)) + .build(); + } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java index c5bfd3aef0f631b01c73e04d1085b567540d57c6..c94ee5ce8be149d6036f0085ce92f39e44b617a4 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java @@ -1,6 +1,7 @@ package eu.morphemic.prediction_orchestrator.properties; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.PoolingStrategyType; +import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.EnsemblerType; +import eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier.ForecastersNumberVerifierType; import lombok.Getter; import lombok.Setter; import lombok.ToString; @@ -29,11 +30,14 @@ public class Properties { @Value("${jms.client.broker_properties_configuration_file_location}") private String broker_properties_configuration_file_location; - @Value("${pooling.poolingStrategy}") - private PoolingStrategyType poolingStrategyType; + @Value("${ensembling.forecasterNumberVerifier.type}") + private ForecastersNumberVerifierType forecastersNumberVerifierType; - @Value("${pooling.threshold}") - private double poolingDataThreshold; + @Value("${ensembling.forecasterNumberVerifier.threshold}") + private double forecasterNumberThreshold; + + @Value("${ensembling.ensemblerType}") + private EnsemblerType ensemblerType; @Value("${startingMethodsList}") private List startingMethodsList; @@ -44,4 +48,25 @@ public class Properties { @Value("${activemq.restartcount:20}") private int activeMqRestartCount; + @Value("${ensembler.base-url}") + private String ensemblerBaseUrl; + + @Value("${ensembler.uri}") + private String ensemblerUri; + + @Value("${influx.hostname}") + private String influxHostname; + + @Value("${influx.port}") + private String influxPort; + + @Value("${influx.database}") + private String influxDatabase; + + @Value("${influx.username}") + private String influxUsername; + + @Value("${influx.password}") + private String influxPassword; + } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java index 418d19b6aa9efd34424032d690ab9e6e10922582..0aed9101a122738cd88ddbbe13194783c4eb8467 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java @@ -1,7 +1,9 @@ package eu.morphemic.prediction_orchestrator.service; -import eu.morphemic.prediction_orchestrator.communication.CommunicationService; +import eu.morphemic.prediction_orchestrator.communication.activemq.ActiveMQService; +import eu.morphemic.prediction_orchestrator.communication.influx.InfluxService; import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.MetricNeedingPredictingMessage; +import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartEnsemblingMessage; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartForecastingMessage; import eu.morphemic.prediction_orchestrator.log_utils.PredictionTimeFormatter; import eu.morphemic.prediction_orchestrator.model.Prediction; @@ -16,6 +18,7 @@ import javax.annotation.PostConstruct; import javax.jms.JMSException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; @Slf4j @Component @@ -27,14 +30,16 @@ public class Coordinator { @Autowired private Properties properties; - private CommunicationService communicationService; + private ActiveMQService activeMQService; + private InfluxService influxService; @PostConstruct public void start() throws JMSException { log.info(properties.toString()); - this.communicationService = new CommunicationService(properties); - communicationService.startReceivingMetricList(this); - communicationService.startReceivingModelRetrained(this); + this.activeMQService = new ActiveMQService(properties); + activeMQService.startReceivingMetricList(this); + activeMQService.startReceivingModelRetrained(this); + this.influxService = new InfluxService(properties); } /** As we need to check multiple values at once and block them after publishing, we need to make sure that only one thread is using @@ -61,13 +66,13 @@ public class Coordinator { if (lastValueUpdate == PredictionRegistry.VALUE_ADDED) { metricHandlers.forEach((metricName, metricHandler) -> { Prediction prediction = metricHandler.getPooledPrediction(predictionTime); - communicationService.publishPooledPrediction(prediction, metricName); + activeMQService.publishPooledPrediction(prediction, metricName); }); } else { //we send only updated value as at this point the entire vector has been send earlier Prediction prediction = metricHandlers.get(callingMetricHandler) .getPooledPrediction(predictionTime); - communicationService.publishPooledPrediction(prediction, callingMetricHandler); + activeMQService.publishPooledPrediction(prediction, callingMetricHandler); } } } @@ -111,7 +116,8 @@ public class Coordinator { this, forecastingConfiguration, properties, - communicationService + activeMQService, + influxService ); for (String methodName : properties.getStartingMethodsList()) { newMetricHandler.addMethodHandler(methodName, forecastingConfiguration); @@ -123,17 +129,24 @@ public class Coordinator { //We add new methods if necessary and inform forecasters for(Map.Entry> entry : metricsByMethod.entrySet()) { - List methodsPredictingTheMetric = entry.getValue(); + List metricsPredictedByMethod = entry.getValue(); StartForecastingMessage startForecastingMessage = new StartForecastingMessage( - methodsPredictingTheMetric, + metricsPredictedByMethod, System.currentTimeMillis() / 1000, epochStartingForecast, forwardPredictionNumber, predictionHorizon ); - communicationService.publishStartForecasting(startForecastingMessage, entry.getKey()); + activeMQService.publishStartForecasting(startForecastingMessage, entry.getKey()); } + //Inform the Ensembler + StartEnsemblingMessage startEnsemblingMessage = new StartEnsemblingMessage( + metricNeedingPredictingMessageList, + properties.getStartingMethodsList() + ); + activeMQService.publishStartEnsembling(startEnsemblingMessage); + //ToDo Stop metrics not included in new configuration file } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java index cb4703d713552106bf11082871c2df72e2b40ad3..4a9e42376c6c4dcdc062b18aec20870918022f38 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java @@ -1,12 +1,13 @@ package eu.morphemic.prediction_orchestrator.service; -import eu.morphemic.prediction_orchestrator.communication.CommunicationService; +import eu.morphemic.prediction_orchestrator.communication.activemq.ActiveMQService; +import eu.morphemic.prediction_orchestrator.communication.influx.InfluxService; import eu.morphemic.prediction_orchestrator.log_utils.PredictionTimeFormatter; import eu.morphemic.prediction_orchestrator.properties.Properties; import eu.morphemic.prediction_orchestrator.model.Prediction; import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.PoolingStrategy; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.PoolingStrategyFactory; +import eu.morphemic.prediction_orchestrator.service.ensembling.EnsemblingMechanism; +import eu.morphemic.prediction_orchestrator.service.ensembling.EnsemblingMechanismFactory; import eu.morphemic.prediction_orchestrator.properties.ForecastingConfiguration; import lombok.Getter; import lombok.NoArgsConstructor; @@ -16,6 +17,7 @@ import javax.jms.JMSException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; @@ -31,34 +33,37 @@ public class MetricHandler { private PredictionRegistry pooledPredictionsRegistry; - private PoolingStrategy poolingStrategy; - private CommunicationService communicationService; + private EnsemblingMechanism ensemblingMechanism; + private ActiveMQService activeMQService; + private InfluxService influxService; public MetricHandler(String metricName, Coordinator coordinator, ForecastingConfiguration forecastingConfiguration, - Properties properties, CommunicationService communicationService) { + Properties properties, ActiveMQService activeMQService, InfluxService influxService) { this.metricName = metricName; - this.poolingStrategy = PoolingStrategyFactory.getPoolingStrategy(properties); + this.ensemblingMechanism = EnsemblingMechanismFactory.getEnsemblingMechanism(properties); this.coordinator = coordinator; this.pooledPredictionsRegistry = new PredictionRegistry( forecastingConfiguration, PredictionRegistry.getPooledPredictionsRegistryName(metricName) ); - this.communicationService = communicationService; + this.activeMQService = activeMQService; + this.influxService = influxService; } public void launchMethodsPoolingIfNecessary(long predictionTime) throws JMSException { - List predictions = methodHandlers.values().stream() - .map(MethodHandler::getPredictionRegistry) - .map(registry -> registry.getCopyPrediction(predictionTime)) - .collect(Collectors.toList()); + Map predictionsByMethod = methodHandlers.entrySet().stream() + .collect(HashMap::new, + (m, e) -> m.put(e.getKey(), e.getValue().getPredictionRegistry().getCopyPrediction(predictionTime)), + HashMap::putAll); - Prediction newPooledPrediction = poolingStrategy.poolPredictions(predictions); - if (newPooledPrediction != PoolingStrategy.POOLED_PREDICTION_NOT_CREATED) { + Prediction newPooledPrediction = ensemblingMechanism.poolPredictions(predictionsByMethod, metricName); + if (newPooledPrediction != EnsemblingMechanism.POOLED_PREDICTION_NOT_CREATED) { log.info("Pooling function has created pooled prediction for metric {} and predictionTime {}", metricName, PredictionTimeFormatter.rawDateFormat(predictionTime)); int valueResult = pooledPredictionsRegistry.processPrediction(newPooledPrediction); if ((valueResult == PredictionRegistry.VALUE_UPDATED) || (valueResult == PredictionRegistry.VALUE_ADDED)) { + influxService.saveToInflux(newPooledPrediction.getAsPoint(metricName, "ensembled")); coordinator.publishIfPooledValuesProvidedForAllMetrics(predictionTime, valueResult, metricName); } } else { @@ -85,12 +90,13 @@ public class MetricHandler { ); MethodHandler methodHandler = new MethodHandler(null,null, predictionRegistry); this.methodHandlers.put(methodName, methodHandler); - this.communicationService.startReceivingPredictions(metricName, methodName, predictionRegistry, this); + this.activeMQService.startReceivingPredictions(metricName, methodName, predictionRegistry, + this, influxService); } public void removeMethodHandler(String methodName) throws JMSException { this.methodHandlers.remove(methodName); - this.communicationService.stopReceivingPredictions(metricName, methodName); + this.activeMQService.stopReceivingPredictions(metricName, methodName); } List getConnectedMethods() { diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java new file mode 100644 index 0000000000000000000000000000000000000000..d68ba512da63310c44cfe01a35fca6aa2ac88c66 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java @@ -0,0 +1,71 @@ +package eu.morphemic.prediction_orchestrator.service.ensembling; + +import eu.morphemic.prediction_orchestrator.model.Prediction; +import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.Ensembler; +import eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier.ForecastersNumberVerifier; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.*; +import java.util.stream.Collectors; + +@Slf4j +@AllArgsConstructor +public class EnsemblingMechanism { + public static Prediction POOLED_PREDICTION_NOT_CREATED = null; + + private Ensembler ensembler; + private ForecastersNumberVerifier forecastersNumberVerifier; + + public Prediction poolPredictions(Map predictionsByMethod, String metricName) { + int expectedForecastersDataCount = predictionsByMethod.size(); + Map nonNullPredictions = predictionsByMethod.entrySet() + .stream() + .filter(e -> Objects.nonNull(e.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + int numberOfValidData = nonNullPredictions.size(); + if (notEnoughDataToCreatePooledValue(numberOfValidData, expectedForecastersDataCount)) { + return POOLED_PREDICTION_NOT_CREATED; + } + Set confidence_interval_values = nonNullPredictions.values().stream() + .map(Prediction::getConfidence_interval) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + List confidence_interval = Arrays.asList( + confidence_interval_values.stream().min(Double::compareTo).get(), + confidence_interval_values.stream().max(Double::compareTo).get() + ); + + Prediction anyPrediction = nonNullPredictions.values().stream() + .findAny() + .get(); + try { + + return new Prediction( + ensembler.ensembleValues(predictionsByMethod, metricName), + System.currentTimeMillis() / 1000, + anyPrediction.getPredictionTime(), + nonNullPredictions.values().stream() + .mapToDouble(Prediction::getProbability) + .average() + .orElse(Double.NaN), + confidence_interval, + anyPrediction.getLevel(), + anyPrediction.getRefersTo(), + anyPrediction.getCloud(), + anyPrediction.getProvider() + ); + } catch (Exception e) { + return POOLED_PREDICTION_NOT_CREATED; + } + } + + private boolean notEnoughDataToCreatePooledValue(int numberOfValidData, int expectedForecastersDataCount) { + //We should always pool if we have only one forecaster attached + if ((numberOfValidData == 1) && (expectedForecastersDataCount == 1)) { + return false; + } else { + return !forecastersNumberVerifier.isDataSufficient(numberOfValidData, expectedForecastersDataCount); + } + } +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanismFactory.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanismFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..038484ca738d75ad60a7ca146c0bf3a64ebfcf65 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanismFactory.java @@ -0,0 +1,55 @@ +package eu.morphemic.prediction_orchestrator.service.ensembling; + +import eu.morphemic.prediction_orchestrator.communication.ensembler.EnsemblerService; +import eu.morphemic.prediction_orchestrator.properties.Properties; +import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.AverageValuesEnsembler; +import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.Ensembler; +import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.OuterEnsembler; +import eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier.ForecastersNumberVerifier; +import eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier.PercentageForecastersNumberVerifier; +import eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier.StaticForecastersNumberVerifier; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class EnsemblingMechanismFactory { + + public static EnsemblingMechanism getEnsemblingMechanism(Properties properties) { + Ensembler ensembler; + ForecastersNumberVerifier forecastersNumberVerifier; + + switch (properties.getForecastersNumberVerifierType()) { + case STATIC_FORECASTERS_COUNT_NEEDED: { + forecastersNumberVerifier = + new StaticForecastersNumberVerifier(properties.getForecasterNumberThreshold()); + break; + } + case PERCENTAGE_FORECASTERS_COUNT_NEEDED: { + forecastersNumberVerifier = + new PercentageForecastersNumberVerifier(properties.getForecasterNumberThreshold()); + break; + } + default: { + throw new IllegalArgumentException("Pooling strategy not present in the system"); + } + } + + switch (properties.getEnsemblerType()) { + case AVERAGE: { + ensembler = new AverageValuesEnsembler(); + break; + } + case OUTER: { + ensembler = new OuterEnsembler(new EnsemblerService( + properties.getEnsemblerBaseUrl(), + properties.getEnsemblerUri() + )); + break; + } + default: { + throw new IllegalArgumentException("Pooling strategy not present in the system"); + } + } + + return new EnsemblingMechanism(ensembler, forecastersNumberVerifier); + } +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/AverageValuesEnsembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/AverageValuesEnsembler.java new file mode 100644 index 0000000000000000000000000000000000000000..e5bc5e7c9ee42c58743b931418a82949e1e3276f --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/AverageValuesEnsembler.java @@ -0,0 +1,19 @@ +package eu.morphemic.prediction_orchestrator.service.ensembling.ensembler; + +import eu.morphemic.prediction_orchestrator.model.Prediction; + +import java.util.Map; +import java.util.Objects; + +public class AverageValuesEnsembler extends Ensembler { + + + @Override + public double ensembleValues(Map predictionsByMethod, String metricName) { + return predictionsByMethod.values().stream() + .filter(Objects::nonNull) + .mapToDouble(Prediction::getMetricValue) + .average() + .orElse(Double.NaN); + } +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/Ensembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/Ensembler.java new file mode 100644 index 0000000000000000000000000000000000000000..863d8b6e5e02d44b9b64ddb249d1593cd0eb6edc --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/Ensembler.java @@ -0,0 +1,11 @@ +package eu.morphemic.prediction_orchestrator.service.ensembling.ensembler; + +import eu.morphemic.prediction_orchestrator.model.Prediction; + +import java.util.Map; + +public abstract class Ensembler { + + public abstract double ensembleValues(Map predictionsByMethod, String metricName); + +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/EnsemblerType.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/EnsemblerType.java new file mode 100644 index 0000000000000000000000000000000000000000..62bb5548010406ab24b06128de58db2e1b067767 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/EnsemblerType.java @@ -0,0 +1,13 @@ +package eu.morphemic.prediction_orchestrator.service.ensembling.ensembler; + +public enum EnsemblerType { + AVERAGE("average"), + + OUTER("outer"); + + private String type; + + EnsemblerType(String type) { + this.type = type; + } +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java new file mode 100644 index 0000000000000000000000000000000000000000..9dfd005128112e6125533ff185a20ec187c0e9a0 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java @@ -0,0 +1,52 @@ +package eu.morphemic.prediction_orchestrator.service.ensembling.ensembler; + +import eu.morphemic.prediction_orchestrator.communication.ensembler.EnsemblerService; +import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.PredictionsEnsembledMessage; +import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PredictionsToEnsembleMessage; +import eu.morphemic.prediction_orchestrator.model.Prediction; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; + +@AllArgsConstructor +@Slf4j +public class OuterEnsembler extends Ensembler { + + private EnsemblerService ensemblerService; + + @Override + public double ensembleValues(Map predictionsByMethod, String metricName) { + long predictionTime = predictionsByMethod.values().stream() + .findFirst() + .get() + .getPredictionTime(); + PredictionsToEnsembleMessage predictionsToEnsembleMessage = new PredictionsToEnsembleMessage( + "BestSubset", + metricName, + predictionTime, + predictionsByMethod.entrySet().stream() + .collect(HashMap::new, + (m, v) -> m.put(v.getKey(), v.getValue() == null ? null : v.getValue().getMetricValue()), + HashMap::putAll) + ); + PredictionsEnsembledMessage ensembledMessage; + try { + ensembledMessage = ensemblerService.ensemble(predictionsToEnsembleMessage); + } catch (Exception e) { + log.info("Error when sending ensembling request: {}", e.getMessage()); + throw new IllegalArgumentException("Error while sending message to the Ensember"); + } + + if (ensembledMessage == null) { + log.warn("Message from ensembler is null"); + throw new IllegalArgumentException("Message from the Ensembler is null"); + } + if (ensembledMessage.getPredictionTime() != predictionTime) { + log.warn("Message from the Ensembler has wrong predictionTime"); + throw new IllegalArgumentException("Message from the Ensembler has wrong predictionTime"); + } + return ensembledMessage.getEnsembledValue(); + } +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifier.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifier.java new file mode 100644 index 0000000000000000000000000000000000000000..285cf2a248899bec1a34a9b1c426f8e8c9aed058 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifier.java @@ -0,0 +1,6 @@ +package eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier; + +public abstract class ForecastersNumberVerifier { + + abstract public boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount); +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyType.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifierType.java similarity index 53% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyType.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifierType.java index cfb2f8de98259f4ac26c3bf51e4b8ae335da9b44..7467a1e9a5dd325ff68bf0a0e3f90259cf6c703c 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyType.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifierType.java @@ -1,13 +1,13 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy; +package eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier; -public enum PoolingStrategyType { +public enum ForecastersNumberVerifierType { STATIC_FORECASTERS_COUNT_NEEDED("staticForecastersCountNeeded"), PERCENTAGE_FORECASTERS_COUNT_NEEDED("percentageForecastersCountNeeded"); private String type; - PoolingStrategyType(String type) { + ForecastersNumberVerifierType(String type) { this.type = type; } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/PercentageForecastersNumberVerifier.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/PercentageForecastersNumberVerifier.java new file mode 100644 index 0000000000000000000000000000000000000000..928f563bf26e923a743e44f920bea089c4bdc84d --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/PercentageForecastersNumberVerifier.java @@ -0,0 +1,15 @@ +package eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier; + +import lombok.AllArgsConstructor; + +@AllArgsConstructor +public class PercentageForecastersNumberVerifier extends ForecastersNumberVerifier { + + private final double percentageThreshold; + + @Override + public boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount) { + return (((double) numberOfValidData) / ((double) expectedForecastersDataCount)) + > percentageThreshold; + } +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/StaticForecastersNumberVerifier.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/StaticForecastersNumberVerifier.java new file mode 100644 index 0000000000000000000000000000000000000000..54ccd8ae99dd7a1e6efd74b2616911d3a2ad84f4 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/StaticForecastersNumberVerifier.java @@ -0,0 +1,14 @@ +package eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier; + +import lombok.AllArgsConstructor; + +@AllArgsConstructor +public class StaticForecastersNumberVerifier extends ForecastersNumberVerifier { + + private final double staticThreshold; + + @Override + public boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount) { + return numberOfValidData >= staticThreshold; + } +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategy.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategy.java deleted file mode 100644 index 24d451467d96e285fe4ad6f5be7b8fabad6861ce..0000000000000000000000000000000000000000 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategy.java +++ /dev/null @@ -1,12 +0,0 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy; - -import eu.morphemic.prediction_orchestrator.model.Prediction; - -import java.util.List; - -public interface PoolingStrategy { - - Prediction POOLED_PREDICTION_NOT_CREATED = null; - - Prediction poolPredictions(List predictions); -} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyFactory.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyFactory.java deleted file mode 100644 index 01fc73b98ba03e5f472133b1c7d7444b21c61dca..0000000000000000000000000000000000000000 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyFactory.java +++ /dev/null @@ -1,21 +0,0 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy; - -import eu.morphemic.prediction_orchestrator.properties.Properties; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl.PercentageForecastersCountNeededStrategy; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl.StaticForecastersCountNeededStrategy; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class PoolingStrategyFactory { - - public static PoolingStrategy getPoolingStrategy(Properties properties) { - switch (properties.getPoolingStrategyType()) { - case STATIC_FORECASTERS_COUNT_NEEDED: - return new StaticForecastersCountNeededStrategy(properties.getPoolingDataThreshold()); - case PERCENTAGE_FORECASTERS_COUNT_NEEDED: - return new PercentageForecastersCountNeededStrategy(properties.getPoolingDataThreshold()); - default: - throw new IllegalArgumentException("Pooling strategy not present in the system"); - } - } -} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/PercentageForecastersCountNeededStrategy.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/PercentageForecastersCountNeededStrategy.java deleted file mode 100644 index 2f030e9c2d15fcdc3f99692b964105c82292de97..0000000000000000000000000000000000000000 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/PercentageForecastersCountNeededStrategy.java +++ /dev/null @@ -1,15 +0,0 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl; - -import lombok.AllArgsConstructor; - -@AllArgsConstructor -public class PercentageForecastersCountNeededStrategy extends ThresholdCountForecastersNeededStrategy { - - private final double percentageThreshold; - - @Override - boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount) { - return (((double) numberOfValidData) / ((double) expectedForecastersDataCount)) - > percentageThreshold; - } -} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/StaticForecastersCountNeededStrategy.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/StaticForecastersCountNeededStrategy.java deleted file mode 100644 index 5763666d4e01148e74dd4f10246852652f18ecc2..0000000000000000000000000000000000000000 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/StaticForecastersCountNeededStrategy.java +++ /dev/null @@ -1,14 +0,0 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl; - -import lombok.AllArgsConstructor; - -@AllArgsConstructor -public class StaticForecastersCountNeededStrategy extends ThresholdCountForecastersNeededStrategy{ - - private final double staticThreshold; - - @Override - boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount) { - return numberOfValidData >= staticThreshold; - } -} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ThresholdCountForecastersNeededStrategy.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ThresholdCountForecastersNeededStrategy.java deleted file mode 100644 index 45aef574c98624b33253d08769e4a18e4df553d0..0000000000000000000000000000000000000000 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ThresholdCountForecastersNeededStrategy.java +++ /dev/null @@ -1,56 +0,0 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl; - -import eu.morphemic.prediction_orchestrator.model.Prediction; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.PoolingStrategy; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import java.util.*; -import java.util.stream.Collectors; - -@Slf4j -@AllArgsConstructor -public abstract class ThresholdCountForecastersNeededStrategy implements PoolingStrategy { - - @Override - public Prediction poolPredictions(List predictions) { - int expectedForecastersDataCount = predictions.size(); - List validPredictions = predictions.stream().filter(Objects::nonNull).collect(Collectors.toList()); - int numberOfValidData = validPredictions.size(); - if (notEnoughDataToCreatePooledValue(numberOfValidData, expectedForecastersDataCount)) { - return POOLED_PREDICTION_NOT_CREATED; - } - Set confidence_interval_values = validPredictions.stream() - .map(Prediction::getConfidence_interval) - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - List confidence_interval = Arrays.asList( - confidence_interval_values.stream().min(Double::compareTo).get(), - confidence_interval_values.stream().max(Double::compareTo).get() - ); - - return new Prediction( - validPredictions.stream().mapToDouble(Prediction::getMetricValue).average().orElse(Double.NaN), - System.currentTimeMillis() / 1000, - validPredictions.get(0).getPredictionTime(), - validPredictions.stream().mapToDouble(Prediction::getProbability).average().orElse(Double.NaN), - confidence_interval, - validPredictions.get(0).getLevel(), - validPredictions.get(0).getRefersTo(), - validPredictions.get(0).getCloud(), - validPredictions.get(0).getProvider() - ); - } - - private boolean notEnoughDataToCreatePooledValue(int numberOfValidData, int expectedForecastersDataCount) { - //We should always pool if we have only one forecaster attached - if ((numberOfValidData == 1) && (expectedForecastersDataCount == 1)) { - return false; - } else { - return !isDataSufficient(numberOfValidData, expectedForecastersDataCount); - } - } - - abstract boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount); - -} diff --git a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties index 14676ebc12db0223f4185dda5b8ea1048fd48072..6e93c62b551c6278d15ac8d26f7c68a0230f3602 100644 --- a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties +++ b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties @@ -35,8 +35,9 @@ forecasting_configuration.starting_forecasting_delay=200 # With the first option threshold property is in percents #With the second it is a static count #For example with threshold threshold=2 at least 2 synchronised forecasters are needed to be able to merge predictions and send them forward -pooling.poolingStrategy=STATIC_FORECASTERS_COUNT_NEEDED -pooling.threshold=2 +ensembling.forecasterNumberVerifier.type=STATIC_FORECASTERS_COUNT_NEEDED +ensembling.forecasterNumberVerifier.threshold=2 +ensembling.ensemblerType=OUTER jms.client.broker_properties_configuration_file_location=${MELODIC_CONFIG_DIR}/eu.melodic.event.brokerclient.properties @@ -49,3 +50,16 @@ logging.zone_id=Europe/Warsaw #Details of restarting connection to amq upon failure activemq.restartinterval=10000 activemq.restartcount=20 + +ensembler.base-url=http://ensembler:8000 +ensembler.uri=/ensemble + + +influx.hostname=http://ui-influxdb +influx.port=8086 +influx.database=morphemic +influx.username=morphemic +influx.password=password + +#ensembler communication logging +logging.level.reactor.netty.http.client=false diff --git a/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/PublishReceiveTest.java b/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java similarity index 99% rename from prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/PublishReceiveTest.java rename to prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java index df388e5c7522ffd623cca25f4bbdd46b304bd2fa..185315e2443ae972288230d7452d52805c3eeefa 100644 --- a/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/PublishReceiveTest.java +++ b/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java @@ -1,4 +1,4 @@ -package eu.morphemic.prediction_orchestrator.communication; +package eu.morphemic.prediction_orchestrator.communication.activemq; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor;