From f5842635efe9f5d77b09ab048b747a7c1eadba77 Mon Sep 17 00:00:00 2001 From: mriedl Date: Wed, 22 Dec 2021 11:07:14 +0100 Subject: [PATCH 01/23] added start ensembler publish --- .../communication/CommunicationService.java | 15 ++++++++ .../communication/TopicFactory.java | 4 +++ .../StartEnsemblingMessage.java | 36 +++++++++++++++++++ .../service/Coordinator.java | 19 ++++++++-- 4 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartEnsemblingMessage.java diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/CommunicationService.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/CommunicationService.java index 63c16f36..e22a8159 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/CommunicationService.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/CommunicationService.java @@ -6,6 +6,7 @@ import eu.melodic.event.brokerclient.BrokerClient; import eu.melodic.event.brokerclient.BrokerPublisher; import eu.morphemic.prediction_orchestrator.communication.listeners.ActiveMQListener; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PooledPredictionMessage; +import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartEnsemblingMessage; import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry; import eu.morphemic.prediction_orchestrator.service.Coordinator; import eu.morphemic.prediction_orchestrator.communication.listeners.PredictionListener; @@ -81,6 +82,20 @@ public class CommunicationService { ).publish(result); } + public synchronized void publishStartEnsembling(StartEnsemblingMessage startEnsemblingMessage) throws JMSException { + String topicName = TopicFactory.getPublishStartEnsembler(); + String result = null; + try { + result = new ObjectMapper().writeValueAsString(startEnsemblingMessage); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + brokerPublishers.computeIfAbsent(topicName, key -> + getNewPublisher(topicName) + ).publish(result); + + } + public synchronized void publishPooledPrediction(Prediction prediction, String metricName) { String topicName = TopicFactory.getPublishPooledPrediction(metricName); String result = null; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/TopicFactory.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/TopicFactory.java index cb069f13..e4bd192b 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/TopicFactory.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/TopicFactory.java @@ -48,6 +48,10 @@ class TopicFactory { return TopicNames.forecasting_methods_to_prediction_orchestrator_training_topic; } + static String getPublishStartEnsembler() { + return "start_ensembler"; + } + static String getPublishStartForecasting(String methodName) { switch(methodName) { case "nbeats" : { diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartEnsemblingMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartEnsemblingMessage.java new file mode 100644 index 00000000..c5ea2aec --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartEnsemblingMessage.java @@ -0,0 +1,36 @@ +package eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages; + +import com.fasterxml.jackson.annotation.JsonProperty; +import eu.melodic.event.brokerclient.templates.EventFields; +import lombok.AllArgsConstructor; +import lombok.NonNull; + +import java.util.List; + +@AllArgsConstructor +public class StartEnsemblingMessage { + + @JsonProperty(EventFields.PredictionOrchestratorToForecastingMethodsStartForecastingEventFields.metrics) + @NonNull + private List metrics; + + @JsonProperty(EventFields.PredictionOrchestratorToForecastingMethodsStartForecastingEventFields.timestamp) + @NonNull + private long timestamp; + + @JsonProperty(EventFields.PredictionOrchestratorToForecastingMethodsStartForecastingEventFields.epoch_start) + @NonNull + private long epoch_start; + + @JsonProperty(EventFields.PredictionOrchestratorToForecastingMethodsStartForecastingEventFields.number_of_forward_predictions) + @NonNull + private int number_of_forward_predictions; + + @JsonProperty(EventFields.PredictionOrchestratorToForecastingMethodsStartForecastingEventFields.prediction_horizon) + @NonNull + private int prediction_horizon; + + @JsonProperty("models") + @NonNull + private List methodNames; +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java index 418d19b6..443ff985 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java @@ -2,6 +2,7 @@ package eu.morphemic.prediction_orchestrator.service; import eu.morphemic.prediction_orchestrator.communication.CommunicationService; import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.MetricNeedingPredictingMessage; +import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartEnsemblingMessage; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartForecastingMessage; import eu.morphemic.prediction_orchestrator.log_utils.PredictionTimeFormatter; import eu.morphemic.prediction_orchestrator.model.Prediction; @@ -16,6 +17,7 @@ import javax.annotation.PostConstruct; import javax.jms.JMSException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; @Slf4j @Component @@ -123,9 +125,9 @@ public class Coordinator { //We add new methods if necessary and inform forecasters for(Map.Entry> entry : metricsByMethod.entrySet()) { - List methodsPredictingTheMetric = entry.getValue(); + List metricsPredictedByMethod = entry.getValue(); StartForecastingMessage startForecastingMessage = new StartForecastingMessage( - methodsPredictingTheMetric, + metricsPredictedByMethod, System.currentTimeMillis() / 1000, epochStartingForecast, forwardPredictionNumber, @@ -134,6 +136,19 @@ public class Coordinator { communicationService.publishStartForecasting(startForecastingMessage, entry.getKey()); } + //Inform the Ensembler + StartEnsemblingMessage startEnsemblingMessage = new StartEnsemblingMessage( + metricNeedingPredictingMessageList.stream() + .map(MetricNeedingPredictingMessage::getMetric) + .collect(Collectors.toList()), + System.currentTimeMillis() / 1000, + epochStartingForecast, + forwardPredictionNumber, + predictionHorizon, + properties.getStartingMethodsList() + ); + communicationService.publishStartEnsembling(startEnsemblingMessage); + //ToDo Stop metrics not included in new configuration file } } -- GitLab From 564db266be798709f40134cab261e43fffe79c53 Mon Sep 17 00:00:00 2001 From: mriedl Date: Wed, 22 Dec 2021 12:45:17 +0100 Subject: [PATCH 02/23] added ensembler web service --- prediction_orchestrator/pom.xml | 10 ++++ .../communication/EnsemblerWebClient.java | 52 +++++++++++++++++++ .../PredictionsEnsembledMessage.java | 22 ++++++++ .../PredictionsToEnsembleMessage.java | 27 ++++++++++ ...orphemic.predictionorchestrator.properties | 3 ++ 5 files changed, 114 insertions(+) create mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/EnsemblerWebClient.java create mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java create mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/PredictionsToEnsembleMessage.java diff --git a/prediction_orchestrator/pom.xml b/prediction_orchestrator/pom.xml index 7f92fed5..82838fbe 100644 --- a/prediction_orchestrator/pom.xml +++ b/prediction_orchestrator/pom.xml @@ -45,6 +45,10 @@ 2.4.1 compile + + org.springframework.boot + spring-boot-starter-webflux + org.springframework.boot @@ -93,6 +97,12 @@ junit test + + org.projectreactor + reactor-spring + 1.0.1.RELEASE + + diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/EnsemblerWebClient.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/EnsemblerWebClient.java new file mode 100644 index 00000000..67aaa1de --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/EnsemblerWebClient.java @@ -0,0 +1,52 @@ +package eu.morphemic.prediction_orchestrator.communication; + +import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.PredictionsEnsembledMessage; +import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PredictionsToEnsembleMessage; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.client.UnknownHttpStatusCodeException; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; + +@Service +@Slf4j +public class EnsemblerWebClient { + + private String ensemblerUri; + + private final WebClient client; + + public EnsemblerWebClient(@Value("${ensembler.base-url}") String baseUrl, @Value("${ensembler.uri}") String ensemblerUri) { + //http client is only used for logging requests + HttpClient httpClient = HttpClient.create() + .wiretap(true); + this.ensemblerUri = ensemblerUri; + this.client = WebClient.builder().baseUrl(baseUrl) + .clientConnector(new ReactorClientHttpConnector(httpClient)) + .build(); + } + + public PredictionsEnsembledMessage ensemble(PredictionsToEnsembleMessage predictionsToEnsembleMessage) { + try { + return client.post() + .uri(ensemblerUri) + .bodyValue(predictionsToEnsembleMessage) + .accept(MediaType.APPLICATION_JSON) + .retrieve() + .onStatus(HttpStatus::isError, response -> Mono.empty()) + .bodyToMono(PredictionsEnsembledMessage.class) + .block(); + } catch (UnknownHttpStatusCodeException e) { + log.error("Unknown HTTP status code received: {}", e.getRawStatusCode()); + return null; + } catch (RuntimeException e) { + log.error(e.getMessage()); + return null; + } + } +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java new file mode 100644 index 00000000..1c359ab7 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java @@ -0,0 +1,22 @@ +package eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.NonNull; + +@AllArgsConstructor +public class PredictionsEnsembledMessage { + + @JsonProperty("metricValue") + @NonNull + private double ensembledValue; + + @JsonProperty("timestamp") + @NonNull + private long timestamp; + + @JsonProperty("predictionTime") + @NonNull + private long predictionTime; + +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/PredictionsToEnsembleMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/PredictionsToEnsembleMessage.java new file mode 100644 index 00000000..db518db7 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/PredictionsToEnsembleMessage.java @@ -0,0 +1,27 @@ +package eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.NonNull; + +import java.util.Map; + +@AllArgsConstructor +public class PredictionsToEnsembleMessage { + + @JsonProperty("method") + @NonNull + private String ensemblingMethod; + + @JsonProperty("metric") + @NonNull + private String metric; + + @JsonProperty("predictionTime") + @NonNull + private Long predictionTime; + + @JsonProperty("predictionToEnsemble") + @NonNull + private Map predictionsByForecaster; +} diff --git a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties index 14676ebc..49a849fe 100644 --- a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties +++ b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties @@ -49,3 +49,6 @@ logging.zone_id=Europe/Warsaw #Details of restarting connection to amq upon failure activemq.restartinterval=10000 activemq.restartcount=20 + +ensembler.url= +ensembler.uri= -- GitLab From cac7c8ebbe987d7b2dbb61235779144051b0c687 Mon Sep 17 00:00:00 2001 From: mriedl Date: Wed, 22 Dec 2021 16:03:07 +0100 Subject: [PATCH 03/23] reworked pooling strategies, added communication with web ensembler --- .../communication/EnsemblerWebClient.java | 3 +- .../PredictionsEnsembledMessage.java | 2 + .../properties/Properties.java | 14 +++- .../service/MetricHandler.java | 24 ++++--- .../pooling_strategy/EnsemblerType.java | 13 ++++ .../pooling_strategy/EnsemblingMechanism.java | 67 +++++++++++++++++++ .../EnsemblingMechanismFactory.java | 50 ++++++++++++++ ...castersNumberSufficiencyVerifierType.java} | 4 +- .../pooling_strategy/PoolingStrategy.java | 12 ---- .../PoolingStrategyFactory.java | 21 ------ .../impl/AverageValuesEnsembler.java | 17 +++++ .../pooling_strategy/impl/Ensembler.java | 11 +++ .../ForecastersNumberSufficiencyVerifier.java | 6 ++ .../pooling_strategy/impl/OuterEnsembler.java | 30 +++++++++ ...ForecastersNumberSufficiencyVerifier.java} | 4 +- ...ForecastersNumberSufficiencyVerifier.java} | 4 +- ...resholdCountForecastersNeededStrategy.java | 56 ---------------- 17 files changed, 228 insertions(+), 110 deletions(-) create mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblerType.java create mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblingMechanism.java create mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblingMechanismFactory.java rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/{PoolingStrategyType.java => ForecastersNumberSufficiencyVerifierType.java} (70%) delete mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategy.java delete mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyFactory.java create mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/AverageValuesEnsembler.java create mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/Ensembler.java create mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ForecastersNumberSufficiencyVerifier.java create mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/OuterEnsembler.java rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/{PercentageForecastersCountNeededStrategy.java => PercentageForecastersNumberSufficiencyVerifier.java} (61%) rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/{StaticForecastersCountNeededStrategy.java => StaticForecastersNumberSufficiencyVerifier.java} (55%) delete mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ThresholdCountForecastersNeededStrategy.java diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/EnsemblerWebClient.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/EnsemblerWebClient.java index 67aaa1de..75ca93e6 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/EnsemblerWebClient.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/EnsemblerWebClient.java @@ -13,7 +13,6 @@ import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; -@Service @Slf4j public class EnsemblerWebClient { @@ -21,7 +20,7 @@ public class EnsemblerWebClient { private final WebClient client; - public EnsemblerWebClient(@Value("${ensembler.base-url}") String baseUrl, @Value("${ensembler.uri}") String ensemblerUri) { + public EnsemblerWebClient(String baseUrl, String ensemblerUri) { //http client is only used for logging requests HttpClient httpClient = HttpClient.create() .wiretap(true); diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java index 1c359ab7..f855ed15 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java @@ -2,9 +2,11 @@ package eu.morphemic.prediction_orchestrator.communication.messages.incoming_mes import com.fasterxml.jackson.annotation.JsonProperty; import lombok.AllArgsConstructor; +import lombok.Getter; import lombok.NonNull; @AllArgsConstructor +@Getter public class PredictionsEnsembledMessage { @JsonProperty("metricValue") diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java index c5bfd3ae..1a3c6572 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java @@ -1,6 +1,7 @@ package eu.morphemic.prediction_orchestrator.properties; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.PoolingStrategyType; +import eu.morphemic.prediction_orchestrator.service.pooling_strategy.EnsemblerType; +import eu.morphemic.prediction_orchestrator.service.pooling_strategy.ForecastersNumberSufficiencyVerifierType; import lombok.Getter; import lombok.Setter; import lombok.ToString; @@ -30,11 +31,14 @@ public class Properties { private String broker_properties_configuration_file_location; @Value("${pooling.poolingStrategy}") - private PoolingStrategyType poolingStrategyType; + private ForecastersNumberSufficiencyVerifierType forecastersNumberSufficiencyVerifierType; @Value("${pooling.threshold}") private double poolingDataThreshold; + @Value("${ensembling.ensemblerType}") + private EnsemblerType ensemblerType; + @Value("${startingMethodsList}") private List startingMethodsList; @@ -44,4 +48,10 @@ public class Properties { @Value("${activemq.restartcount:20}") private int activeMqRestartCount; + @Value("${ensembler.base-url}") + private String ensemblerBaseUrl; + + @Value("${ensembler.uri}") + private String ensemblerUri; + } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java index cb4703d7..08ea8157 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java @@ -5,8 +5,8 @@ import eu.morphemic.prediction_orchestrator.log_utils.PredictionTimeFormatter; import eu.morphemic.prediction_orchestrator.properties.Properties; import eu.morphemic.prediction_orchestrator.model.Prediction; import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.PoolingStrategy; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.PoolingStrategyFactory; +import eu.morphemic.prediction_orchestrator.service.pooling_strategy.EnsemblingMechanism; +import eu.morphemic.prediction_orchestrator.service.pooling_strategy.EnsemblingMechanismFactory; import eu.morphemic.prediction_orchestrator.properties.ForecastingConfiguration; import lombok.Getter; import lombok.NoArgsConstructor; @@ -16,6 +16,7 @@ import javax.jms.JMSException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; @@ -31,13 +32,13 @@ public class MetricHandler { private PredictionRegistry pooledPredictionsRegistry; - private PoolingStrategy poolingStrategy; + private EnsemblingMechanism ensemblingMechanism; private CommunicationService communicationService; public MetricHandler(String metricName, Coordinator coordinator, ForecastingConfiguration forecastingConfiguration, Properties properties, CommunicationService communicationService) { this.metricName = metricName; - this.poolingStrategy = PoolingStrategyFactory.getPoolingStrategy(properties); + this.ensemblingMechanism = EnsemblingMechanismFactory.getEnsemblingMechanism(properties); this.coordinator = coordinator; this.pooledPredictionsRegistry = new PredictionRegistry( forecastingConfiguration, @@ -47,13 +48,14 @@ public class MetricHandler { } public void launchMethodsPoolingIfNecessary(long predictionTime) throws JMSException { - List predictions = methodHandlers.values().stream() - .map(MethodHandler::getPredictionRegistry) - .map(registry -> registry.getCopyPrediction(predictionTime)) - .collect(Collectors.toList()); - - Prediction newPooledPrediction = poolingStrategy.poolPredictions(predictions); - if (newPooledPrediction != PoolingStrategy.POOLED_PREDICTION_NOT_CREATED) { + Map predictionsByMethod = methodHandlers.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> e.getValue().getPredictionRegistry().getCopyPrediction(predictionTime) + )); + + Prediction newPooledPrediction = ensemblingMechanism.poolPredictions(predictionsByMethod, metricName); + if (newPooledPrediction != EnsemblingMechanism.POOLED_PREDICTION_NOT_CREATED) { log.info("Pooling function has created pooled prediction for metric {} and predictionTime {}", metricName, PredictionTimeFormatter.rawDateFormat(predictionTime)); int valueResult = pooledPredictionsRegistry.processPrediction(newPooledPrediction); diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblerType.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblerType.java new file mode 100644 index 00000000..408ce314 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblerType.java @@ -0,0 +1,13 @@ +package eu.morphemic.prediction_orchestrator.service.pooling_strategy; + +public enum EnsemblerType { + AVERAGE("average"), + + OUTER("outer"); + + private String type; + + EnsemblerType(String type) { + this.type = type; + } +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblingMechanism.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblingMechanism.java new file mode 100644 index 00000000..7f25d039 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblingMechanism.java @@ -0,0 +1,67 @@ +package eu.morphemic.prediction_orchestrator.service.pooling_strategy; + +import eu.morphemic.prediction_orchestrator.model.Prediction; +import eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl.Ensembler; +import eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl.ForecastersNumberSufficiencyVerifier; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.*; +import java.util.stream.Collectors; + +@Slf4j +@AllArgsConstructor +public class EnsemblingMechanism { + public static Prediction POOLED_PREDICTION_NOT_CREATED = null; + + private Ensembler ensembler; + private ForecastersNumberSufficiencyVerifier forecastersNumberSufficiencyVerifier; + + public Prediction poolPredictions(Map predictionsByMethod, String metricName) { + int expectedForecastersDataCount = predictionsByMethod.size(); + Map validPredictions = predictionsByMethod.entrySet() + .stream() + .filter(e -> Objects.nonNull(e.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + int numberOfValidData = validPredictions.size(); + if (notEnoughDataToCreatePooledValue(numberOfValidData, expectedForecastersDataCount)) { + return POOLED_PREDICTION_NOT_CREATED; + } + Set confidence_interval_values = validPredictions.values().stream() + .map(Prediction::getConfidence_interval) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + List confidence_interval = Arrays.asList( + confidence_interval_values.stream().min(Double::compareTo).get(), + confidence_interval_values.stream().max(Double::compareTo).get() + ); + + Prediction anyPrediction = validPredictions.values().stream() + .findAny() + .get(); + + return new Prediction( + ensembler.ensembleValues(predictionsByMethod, metricName), + System.currentTimeMillis() / 1000, + anyPrediction.getPredictionTime(), + validPredictions.values().stream() + .mapToDouble(Prediction::getProbability) + .average() + .orElse(Double.NaN), + confidence_interval, + anyPrediction.getLevel(), + anyPrediction.getRefersTo(), + anyPrediction.getCloud(), + anyPrediction.getProvider() + ); + } + + private boolean notEnoughDataToCreatePooledValue(int numberOfValidData, int expectedForecastersDataCount) { + //We should always pool if we have only one forecaster attached + if ((numberOfValidData == 1) && (expectedForecastersDataCount == 1)) { + return false; + } else { + return !forecastersNumberSufficiencyVerifier.isDataSufficient(numberOfValidData, expectedForecastersDataCount); + } + } +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblingMechanismFactory.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblingMechanismFactory.java new file mode 100644 index 00000000..c1978ea4 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblingMechanismFactory.java @@ -0,0 +1,50 @@ +package eu.morphemic.prediction_orchestrator.service.pooling_strategy; + +import eu.morphemic.prediction_orchestrator.communication.EnsemblerWebClient; +import eu.morphemic.prediction_orchestrator.properties.Properties; +import eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl.*; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class EnsemblingMechanismFactory { + + public static EnsemblingMechanism getEnsemblingMechanism(Properties properties) { + Ensembler ensembler; + ForecastersNumberSufficiencyVerifier forecastersNumberSufficiencyVerifier; + + switch (properties.getForecastersNumberSufficiencyVerifierType()) { + case STATIC_FORECASTERS_COUNT_NEEDED: { + forecastersNumberSufficiencyVerifier = + new StaticForecastersNumberSufficiencyVerifier(properties.getPoolingDataThreshold()); + break; + } + case PERCENTAGE_FORECASTERS_COUNT_NEEDED: { + forecastersNumberSufficiencyVerifier = + new PercentageForecastersNumberSufficiencyVerifier(properties.getPoolingDataThreshold()); + break; + } + default: { + throw new IllegalArgumentException("Pooling strategy not present in the system"); + } + } + + switch (properties.getEnsemblerType()) { + case AVERAGE: { + ensembler = new AverageValuesEnsembler(); + break; + } + case OUTER: { + ensembler = new OuterEnsembler(new EnsemblerWebClient( + properties.getEnsemblerBaseUrl(), + properties.getEnsemblerUri() + )); + break; + } + default: { + throw new IllegalArgumentException("Pooling strategy not present in the system"); + } + } + + return new EnsemblingMechanism(ensembler, forecastersNumberSufficiencyVerifier); + } +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyType.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/ForecastersNumberSufficiencyVerifierType.java similarity index 70% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyType.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/ForecastersNumberSufficiencyVerifierType.java index cfb2f8de..4d89f838 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyType.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/ForecastersNumberSufficiencyVerifierType.java @@ -1,13 +1,13 @@ package eu.morphemic.prediction_orchestrator.service.pooling_strategy; -public enum PoolingStrategyType { +public enum ForecastersNumberSufficiencyVerifierType { STATIC_FORECASTERS_COUNT_NEEDED("staticForecastersCountNeeded"), PERCENTAGE_FORECASTERS_COUNT_NEEDED("percentageForecastersCountNeeded"); private String type; - PoolingStrategyType(String type) { + ForecastersNumberSufficiencyVerifierType(String type) { this.type = type; } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategy.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategy.java deleted file mode 100644 index 24d45146..00000000 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategy.java +++ /dev/null @@ -1,12 +0,0 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy; - -import eu.morphemic.prediction_orchestrator.model.Prediction; - -import java.util.List; - -public interface PoolingStrategy { - - Prediction POOLED_PREDICTION_NOT_CREATED = null; - - Prediction poolPredictions(List predictions); -} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyFactory.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyFactory.java deleted file mode 100644 index 01fc73b9..00000000 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/PoolingStrategyFactory.java +++ /dev/null @@ -1,21 +0,0 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy; - -import eu.morphemic.prediction_orchestrator.properties.Properties; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl.PercentageForecastersCountNeededStrategy; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl.StaticForecastersCountNeededStrategy; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class PoolingStrategyFactory { - - public static PoolingStrategy getPoolingStrategy(Properties properties) { - switch (properties.getPoolingStrategyType()) { - case STATIC_FORECASTERS_COUNT_NEEDED: - return new StaticForecastersCountNeededStrategy(properties.getPoolingDataThreshold()); - case PERCENTAGE_FORECASTERS_COUNT_NEEDED: - return new PercentageForecastersCountNeededStrategy(properties.getPoolingDataThreshold()); - default: - throw new IllegalArgumentException("Pooling strategy not present in the system"); - } - } -} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/AverageValuesEnsembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/AverageValuesEnsembler.java new file mode 100644 index 00000000..faef2803 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/AverageValuesEnsembler.java @@ -0,0 +1,17 @@ +package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl; + +import eu.morphemic.prediction_orchestrator.model.Prediction; + +import java.util.Map; + +public class AverageValuesEnsembler extends Ensembler { + + + @Override + public double ensembleValues(Map predictionsByMethod, String metricName) { + return predictionsByMethod.values().stream() + .mapToDouble(Prediction::getMetricValue) + .average() + .orElse(Double.NaN); + } +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/Ensembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/Ensembler.java new file mode 100644 index 00000000..a8bb992e --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/Ensembler.java @@ -0,0 +1,11 @@ +package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl; + +import eu.morphemic.prediction_orchestrator.model.Prediction; + +import java.util.Map; + +public abstract class Ensembler { + + public abstract double ensembleValues(Map predictionsByMethod, String metricName); + +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ForecastersNumberSufficiencyVerifier.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ForecastersNumberSufficiencyVerifier.java new file mode 100644 index 00000000..ad5fb318 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ForecastersNumberSufficiencyVerifier.java @@ -0,0 +1,6 @@ +package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl; + +public abstract class ForecastersNumberSufficiencyVerifier { + + abstract public boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount); +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/OuterEnsembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/OuterEnsembler.java new file mode 100644 index 00000000..5e576500 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/OuterEnsembler.java @@ -0,0 +1,30 @@ +package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl; + +import eu.morphemic.prediction_orchestrator.communication.EnsemblerWebClient; +import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PredictionsToEnsembleMessage; +import eu.morphemic.prediction_orchestrator.model.Prediction; +import lombok.AllArgsConstructor; + +import java.util.Map; +import java.util.stream.Collectors; + +@AllArgsConstructor +public class OuterEnsembler extends Ensembler { + + private EnsemblerWebClient ensemblerWebClient; + + @Override + public double ensembleValues(Map predictionsByMethod, String metricName) { + PredictionsToEnsembleMessage predictionsToEnsembleMessage = new PredictionsToEnsembleMessage( + null, + metricName, + predictionsByMethod.values().stream() + .findFirst() + .get() + .getPredictionTime(), + predictionsByMethod.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getMetricValue())) + ); + return ensemblerWebClient.ensemble(predictionsToEnsembleMessage).getEnsembledValue(); + } +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/PercentageForecastersCountNeededStrategy.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/PercentageForecastersNumberSufficiencyVerifier.java similarity index 61% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/PercentageForecastersCountNeededStrategy.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/PercentageForecastersNumberSufficiencyVerifier.java index 2f030e9c..f7b18c12 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/PercentageForecastersCountNeededStrategy.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/PercentageForecastersNumberSufficiencyVerifier.java @@ -3,12 +3,12 @@ package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl; import lombok.AllArgsConstructor; @AllArgsConstructor -public class PercentageForecastersCountNeededStrategy extends ThresholdCountForecastersNeededStrategy { +public class PercentageForecastersNumberSufficiencyVerifier extends ForecastersNumberSufficiencyVerifier { private final double percentageThreshold; @Override - boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount) { + public boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount) { return (((double) numberOfValidData) / ((double) expectedForecastersDataCount)) > percentageThreshold; } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/StaticForecastersCountNeededStrategy.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/StaticForecastersNumberSufficiencyVerifier.java similarity index 55% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/StaticForecastersCountNeededStrategy.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/StaticForecastersNumberSufficiencyVerifier.java index 5763666d..554c2337 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/StaticForecastersCountNeededStrategy.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/StaticForecastersNumberSufficiencyVerifier.java @@ -3,12 +3,12 @@ package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl; import lombok.AllArgsConstructor; @AllArgsConstructor -public class StaticForecastersCountNeededStrategy extends ThresholdCountForecastersNeededStrategy{ +public class StaticForecastersNumberSufficiencyVerifier extends ForecastersNumberSufficiencyVerifier { private final double staticThreshold; @Override - boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount) { + public boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount) { return numberOfValidData >= staticThreshold; } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ThresholdCountForecastersNeededStrategy.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ThresholdCountForecastersNeededStrategy.java deleted file mode 100644 index 45aef574..00000000 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ThresholdCountForecastersNeededStrategy.java +++ /dev/null @@ -1,56 +0,0 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl; - -import eu.morphemic.prediction_orchestrator.model.Prediction; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.PoolingStrategy; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import java.util.*; -import java.util.stream.Collectors; - -@Slf4j -@AllArgsConstructor -public abstract class ThresholdCountForecastersNeededStrategy implements PoolingStrategy { - - @Override - public Prediction poolPredictions(List predictions) { - int expectedForecastersDataCount = predictions.size(); - List validPredictions = predictions.stream().filter(Objects::nonNull).collect(Collectors.toList()); - int numberOfValidData = validPredictions.size(); - if (notEnoughDataToCreatePooledValue(numberOfValidData, expectedForecastersDataCount)) { - return POOLED_PREDICTION_NOT_CREATED; - } - Set confidence_interval_values = validPredictions.stream() - .map(Prediction::getConfidence_interval) - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - List confidence_interval = Arrays.asList( - confidence_interval_values.stream().min(Double::compareTo).get(), - confidence_interval_values.stream().max(Double::compareTo).get() - ); - - return new Prediction( - validPredictions.stream().mapToDouble(Prediction::getMetricValue).average().orElse(Double.NaN), - System.currentTimeMillis() / 1000, - validPredictions.get(0).getPredictionTime(), - validPredictions.stream().mapToDouble(Prediction::getProbability).average().orElse(Double.NaN), - confidence_interval, - validPredictions.get(0).getLevel(), - validPredictions.get(0).getRefersTo(), - validPredictions.get(0).getCloud(), - validPredictions.get(0).getProvider() - ); - } - - private boolean notEnoughDataToCreatePooledValue(int numberOfValidData, int expectedForecastersDataCount) { - //We should always pool if we have only one forecaster attached - if ((numberOfValidData == 1) && (expectedForecastersDataCount == 1)) { - return false; - } else { - return !isDataSufficient(numberOfValidData, expectedForecastersDataCount); - } - } - - abstract boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount); - -} -- GitLab From bf1deac2444939bc1c557281f4f83e9678202d9e Mon Sep 17 00:00:00 2001 From: mriedl Date: Thu, 23 Dec 2021 10:19:32 +0100 Subject: [PATCH 04/23] changed packages names, updated properties --- .../properties/Properties.java | 12 +++++----- .../service/MetricHandler.java | 4 ++-- .../EnsemblingMechanism.java | 10 ++++---- .../EnsemblingMechanismFactory.java | 23 +++++++++++-------- .../ensembler}/AverageValuesEnsembler.java | 2 +- .../ensembler}/Ensembler.java | 2 +- .../ensembler}/EnsemblerType.java | 2 +- .../ensembler}/OuterEnsembler.java | 2 +- .../ForecastersNumberVerifier.java | 6 +++++ .../ForecastersNumberVerifierType.java} | 6 ++--- .../PercentageForecastersNumberVerifier.java} | 4 ++-- .../StaticForecastersNumberVerifier.java} | 4 ++-- .../ForecastersNumberSufficiencyVerifier.java | 6 ----- ...orphemic.predictionorchestrator.properties | 5 ++-- 14 files changed, 47 insertions(+), 41 deletions(-) rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/{pooling_strategy => ensembling}/EnsemblingMechanism.java (84%) rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/{pooling_strategy => ensembling}/EnsemblingMechanismFactory.java (53%) rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/{pooling_strategy/impl => ensembling/ensembler}/AverageValuesEnsembler.java (85%) rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/{pooling_strategy/impl => ensembling/ensembler}/Ensembler.java (75%) rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/{pooling_strategy => ensembling/ensembler}/EnsemblerType.java (69%) rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/{pooling_strategy/impl => ensembling/ensembler}/OuterEnsembler.java (93%) create mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifier.java rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/{pooling_strategy/ForecastersNumberSufficiencyVerifierType.java => ensembling/forecaster_number_verifier/ForecastersNumberVerifierType.java} (52%) rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/{pooling_strategy/impl/PercentageForecastersNumberSufficiencyVerifier.java => ensembling/forecaster_number_verifier/PercentageForecastersNumberVerifier.java} (65%) rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/{pooling_strategy/impl/StaticForecastersNumberSufficiencyVerifier.java => ensembling/forecaster_number_verifier/StaticForecastersNumberVerifier.java} (60%) delete mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ForecastersNumberSufficiencyVerifier.java diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java index 1a3c6572..6da46b6a 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java @@ -1,7 +1,7 @@ package eu.morphemic.prediction_orchestrator.properties; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.EnsemblerType; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.ForecastersNumberSufficiencyVerifierType; +import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.EnsemblerType; +import eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier.ForecastersNumberVerifierType; import lombok.Getter; import lombok.Setter; import lombok.ToString; @@ -30,11 +30,11 @@ public class Properties { @Value("${jms.client.broker_properties_configuration_file_location}") private String broker_properties_configuration_file_location; - @Value("${pooling.poolingStrategy}") - private ForecastersNumberSufficiencyVerifierType forecastersNumberSufficiencyVerifierType; + @Value("${ensembling.forecasterNumberVerifier.type}") + private ForecastersNumberVerifierType forecastersNumberVerifierType; - @Value("${pooling.threshold}") - private double poolingDataThreshold; + @Value("${ensembling.forecasterNumberVerifier.threshold}") + private double forecasterNumberThreshold; @Value("${ensembling.ensemblerType}") private EnsemblerType ensemblerType; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java index 08ea8157..d4ff38eb 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java @@ -5,8 +5,8 @@ import eu.morphemic.prediction_orchestrator.log_utils.PredictionTimeFormatter; import eu.morphemic.prediction_orchestrator.properties.Properties; import eu.morphemic.prediction_orchestrator.model.Prediction; import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.EnsemblingMechanism; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.EnsemblingMechanismFactory; +import eu.morphemic.prediction_orchestrator.service.ensembling.EnsemblingMechanism; +import eu.morphemic.prediction_orchestrator.service.ensembling.EnsemblingMechanismFactory; import eu.morphemic.prediction_orchestrator.properties.ForecastingConfiguration; import lombok.Getter; import lombok.NoArgsConstructor; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblingMechanism.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java similarity index 84% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblingMechanism.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java index 7f25d039..fe7fe7d2 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblingMechanism.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java @@ -1,8 +1,8 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy; +package eu.morphemic.prediction_orchestrator.service.ensembling; import eu.morphemic.prediction_orchestrator.model.Prediction; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl.Ensembler; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl.ForecastersNumberSufficiencyVerifier; +import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.Ensembler; +import eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier.ForecastersNumberVerifier; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -15,7 +15,7 @@ public class EnsemblingMechanism { public static Prediction POOLED_PREDICTION_NOT_CREATED = null; private Ensembler ensembler; - private ForecastersNumberSufficiencyVerifier forecastersNumberSufficiencyVerifier; + private ForecastersNumberVerifier forecastersNumberVerifier; public Prediction poolPredictions(Map predictionsByMethod, String metricName) { int expectedForecastersDataCount = predictionsByMethod.size(); @@ -61,7 +61,7 @@ public class EnsemblingMechanism { if ((numberOfValidData == 1) && (expectedForecastersDataCount == 1)) { return false; } else { - return !forecastersNumberSufficiencyVerifier.isDataSufficient(numberOfValidData, expectedForecastersDataCount); + return !forecastersNumberVerifier.isDataSufficient(numberOfValidData, expectedForecastersDataCount); } } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblingMechanismFactory.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanismFactory.java similarity index 53% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblingMechanismFactory.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanismFactory.java index c1978ea4..fe138188 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblingMechanismFactory.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanismFactory.java @@ -1,8 +1,13 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy; +package eu.morphemic.prediction_orchestrator.service.ensembling; import eu.morphemic.prediction_orchestrator.communication.EnsemblerWebClient; import eu.morphemic.prediction_orchestrator.properties.Properties; -import eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl.*; +import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.AverageValuesEnsembler; +import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.Ensembler; +import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.OuterEnsembler; +import eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier.ForecastersNumberVerifier; +import eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier.PercentageForecastersNumberVerifier; +import eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier.StaticForecastersNumberVerifier; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -10,17 +15,17 @@ public class EnsemblingMechanismFactory { public static EnsemblingMechanism getEnsemblingMechanism(Properties properties) { Ensembler ensembler; - ForecastersNumberSufficiencyVerifier forecastersNumberSufficiencyVerifier; + ForecastersNumberVerifier forecastersNumberVerifier; - switch (properties.getForecastersNumberSufficiencyVerifierType()) { + switch (properties.getForecastersNumberVerifierType()) { case STATIC_FORECASTERS_COUNT_NEEDED: { - forecastersNumberSufficiencyVerifier = - new StaticForecastersNumberSufficiencyVerifier(properties.getPoolingDataThreshold()); + forecastersNumberVerifier = + new StaticForecastersNumberVerifier(properties.getForecasterNumberThreshold()); break; } case PERCENTAGE_FORECASTERS_COUNT_NEEDED: { - forecastersNumberSufficiencyVerifier = - new PercentageForecastersNumberSufficiencyVerifier(properties.getPoolingDataThreshold()); + forecastersNumberVerifier = + new PercentageForecastersNumberVerifier(properties.getForecasterNumberThreshold()); break; } default: { @@ -45,6 +50,6 @@ public class EnsemblingMechanismFactory { } } - return new EnsemblingMechanism(ensembler, forecastersNumberSufficiencyVerifier); + return new EnsemblingMechanism(ensembler, forecastersNumberVerifier); } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/AverageValuesEnsembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/AverageValuesEnsembler.java similarity index 85% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/AverageValuesEnsembler.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/AverageValuesEnsembler.java index faef2803..dd532c7e 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/AverageValuesEnsembler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/AverageValuesEnsembler.java @@ -1,4 +1,4 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl; +package eu.morphemic.prediction_orchestrator.service.ensembling.ensembler; import eu.morphemic.prediction_orchestrator.model.Prediction; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/Ensembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/Ensembler.java similarity index 75% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/Ensembler.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/Ensembler.java index a8bb992e..863d8b6e 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/Ensembler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/Ensembler.java @@ -1,4 +1,4 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl; +package eu.morphemic.prediction_orchestrator.service.ensembling.ensembler; import eu.morphemic.prediction_orchestrator.model.Prediction; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblerType.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/EnsemblerType.java similarity index 69% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblerType.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/EnsemblerType.java index 408ce314..62bb5548 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/EnsemblerType.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/EnsemblerType.java @@ -1,4 +1,4 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy; +package eu.morphemic.prediction_orchestrator.service.ensembling.ensembler; public enum EnsemblerType { AVERAGE("average"), diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/OuterEnsembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java similarity index 93% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/OuterEnsembler.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java index 5e576500..a15d9d63 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/OuterEnsembler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java @@ -1,4 +1,4 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl; +package eu.morphemic.prediction_orchestrator.service.ensembling.ensembler; import eu.morphemic.prediction_orchestrator.communication.EnsemblerWebClient; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PredictionsToEnsembleMessage; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifier.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifier.java new file mode 100644 index 00000000..285cf2a2 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifier.java @@ -0,0 +1,6 @@ +package eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier; + +public abstract class ForecastersNumberVerifier { + + abstract public boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount); +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/ForecastersNumberSufficiencyVerifierType.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifierType.java similarity index 52% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/ForecastersNumberSufficiencyVerifierType.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifierType.java index 4d89f838..7467a1e9 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/ForecastersNumberSufficiencyVerifierType.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/ForecastersNumberVerifierType.java @@ -1,13 +1,13 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy; +package eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier; -public enum ForecastersNumberSufficiencyVerifierType { +public enum ForecastersNumberVerifierType { STATIC_FORECASTERS_COUNT_NEEDED("staticForecastersCountNeeded"), PERCENTAGE_FORECASTERS_COUNT_NEEDED("percentageForecastersCountNeeded"); private String type; - ForecastersNumberSufficiencyVerifierType(String type) { + ForecastersNumberVerifierType(String type) { this.type = type; } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/PercentageForecastersNumberSufficiencyVerifier.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/PercentageForecastersNumberVerifier.java similarity index 65% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/PercentageForecastersNumberSufficiencyVerifier.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/PercentageForecastersNumberVerifier.java index f7b18c12..928f563b 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/PercentageForecastersNumberSufficiencyVerifier.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/PercentageForecastersNumberVerifier.java @@ -1,9 +1,9 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl; +package eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier; import lombok.AllArgsConstructor; @AllArgsConstructor -public class PercentageForecastersNumberSufficiencyVerifier extends ForecastersNumberSufficiencyVerifier { +public class PercentageForecastersNumberVerifier extends ForecastersNumberVerifier { private final double percentageThreshold; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/StaticForecastersNumberSufficiencyVerifier.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/StaticForecastersNumberVerifier.java similarity index 60% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/StaticForecastersNumberSufficiencyVerifier.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/StaticForecastersNumberVerifier.java index 554c2337..54ccd8ae 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/StaticForecastersNumberSufficiencyVerifier.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/forecaster_number_verifier/StaticForecastersNumberVerifier.java @@ -1,9 +1,9 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl; +package eu.morphemic.prediction_orchestrator.service.ensembling.forecaster_number_verifier; import lombok.AllArgsConstructor; @AllArgsConstructor -public class StaticForecastersNumberSufficiencyVerifier extends ForecastersNumberSufficiencyVerifier { +public class StaticForecastersNumberVerifier extends ForecastersNumberVerifier { private final double staticThreshold; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ForecastersNumberSufficiencyVerifier.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ForecastersNumberSufficiencyVerifier.java deleted file mode 100644 index ad5fb318..00000000 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/pooling_strategy/impl/ForecastersNumberSufficiencyVerifier.java +++ /dev/null @@ -1,6 +0,0 @@ -package eu.morphemic.prediction_orchestrator.service.pooling_strategy.impl; - -public abstract class ForecastersNumberSufficiencyVerifier { - - abstract public boolean isDataSufficient(int numberOfValidData, int expectedForecastersDataCount); -} diff --git a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties index 49a849fe..509f5886 100644 --- a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties +++ b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties @@ -35,8 +35,9 @@ forecasting_configuration.starting_forecasting_delay=200 # With the first option threshold property is in percents #With the second it is a static count #For example with threshold threshold=2 at least 2 synchronised forecasters are needed to be able to merge predictions and send them forward -pooling.poolingStrategy=STATIC_FORECASTERS_COUNT_NEEDED -pooling.threshold=2 +ensembling.forecasterNumberVerifier.type=STATIC_FORECASTERS_COUNT_NEEDED +ensembling.forecasterNumberVerifier.threshold=2 +ensembling.ensemblerType=OUTER jms.client.broker_properties_configuration_file_location=${MELODIC_CONFIG_DIR}/eu.melodic.event.brokerclient.properties -- GitLab From f464bdd22a70b1eb874e2373c993305cc56a4c8c Mon Sep 17 00:00:00 2001 From: mriedl Date: Mon, 27 Dec 2021 15:23:07 +0100 Subject: [PATCH 05/23] added influx connection and renamed services --- prediction_orchestrator/pom.xml | 6 ++- .../ActiveMQService.java} | 11 ++--- .../{ => activemq}/TopicFactory.java | 2 +- .../listeners/ActiveMQListener.java | 2 +- .../listeners/PredictionListener.java | 2 +- .../listeners/ReceiveMetricsListener.java | 2 +- .../listeners/RetrainedModelListener.java | 2 +- .../EnsemblerService.java} | 6 +-- .../communication/influx/InfluxService.java | 43 +++++++++++++++++++ .../communication/influx/InfluxWebClient.java | 37 ++++++++++++++++ .../model/Prediction.java | 12 ++++++ .../properties/Properties.java | 15 +++++++ .../service/Coordinator.java | 20 ++++----- .../service/MetricHandler.java | 12 +++--- .../EnsemblingMechanismFactory.java | 4 +- .../ensembling/ensembler/OuterEnsembler.java | 6 +-- ...orphemic.predictionorchestrator.properties | 7 +++ .../{ => activemq}/PublishReceiveTest.java | 2 +- 18 files changed, 153 insertions(+), 38 deletions(-) rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/{CommunicationService.java => activemq/ActiveMQService.java} (92%) rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/{ => activemq}/TopicFactory.java (98%) rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/{ => activemq}/listeners/ActiveMQListener.java (95%) rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/{ => activemq}/listeners/PredictionListener.java (95%) rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/{ => activemq}/listeners/ReceiveMetricsListener.java (94%) rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/{ => activemq}/listeners/RetrainedModelListener.java (93%) rename prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/{EnsemblerWebClient.java => ensembler/EnsemblerService.java} (92%) create mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java create mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxWebClient.java rename prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/{ => activemq}/PublishReceiveTest.java (99%) diff --git a/prediction_orchestrator/pom.xml b/prediction_orchestrator/pom.xml index 82838fbe..8f4b06a4 100644 --- a/prediction_orchestrator/pom.xml +++ b/prediction_orchestrator/pom.xml @@ -49,7 +49,11 @@ org.springframework.boot spring-boot-starter-webflux - + + com.influxdb + influxdb-client-java + 3.4.0 + org.springframework.boot spring-boot-starter-test diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/CommunicationService.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java similarity index 92% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/CommunicationService.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java index e22a8159..038194fd 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/CommunicationService.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java @@ -1,21 +1,18 @@ -package eu.morphemic.prediction_orchestrator.communication; +package eu.morphemic.prediction_orchestrator.communication.activemq; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.melodic.event.brokerclient.BrokerClient; import eu.melodic.event.brokerclient.BrokerPublisher; -import eu.morphemic.prediction_orchestrator.communication.listeners.ActiveMQListener; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PooledPredictionMessage; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartEnsemblingMessage; import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry; import eu.morphemic.prediction_orchestrator.service.Coordinator; -import eu.morphemic.prediction_orchestrator.communication.listeners.PredictionListener; -import eu.morphemic.prediction_orchestrator.communication.listeners.ReceiveMetricsListener; -import eu.morphemic.prediction_orchestrator.communication.listeners.RetrainedModelListener; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartForecastingMessage; import eu.morphemic.prediction_orchestrator.model.Prediction; import eu.morphemic.prediction_orchestrator.properties.Properties; import eu.morphemic.prediction_orchestrator.service.MetricHandler; +import eu.morphemic.prediction_orchestrator.communication.activemq.listeners.*; import lombok.extern.slf4j.Slf4j; @@ -26,14 +23,14 @@ import java.util.Objects; @Slf4j /** We need to have a different client for each topic as we want them to run each consumer(subscriber) in a different thread (session) */ -public class CommunicationService { +public class ActiveMQService { private Properties properties; //topicName -> brokerClientSubscriber/publisher private HashMap brokerClients = new HashMap<>(); private HashMap brokerPublishers = new HashMap<>(); - public CommunicationService(Properties properties) { + public ActiveMQService(Properties properties) { this.properties = properties; } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/TopicFactory.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/TopicFactory.java similarity index 98% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/TopicFactory.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/TopicFactory.java index e4bd192b..ece36846 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/TopicFactory.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/TopicFactory.java @@ -1,4 +1,4 @@ -package eu.morphemic.prediction_orchestrator.communication; +package eu.morphemic.prediction_orchestrator.communication.activemq; import eu.melodic.event.brokerclient.templates.TopicNames; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ActiveMQListener.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ActiveMQListener.java similarity index 95% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ActiveMQListener.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ActiveMQListener.java index ed407b4d..d6018305 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ActiveMQListener.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ActiveMQListener.java @@ -1,4 +1,4 @@ -package eu.morphemic.prediction_orchestrator.communication.listeners; +package eu.morphemic.prediction_orchestrator.communication.activemq.listeners; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/PredictionListener.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java similarity index 95% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/PredictionListener.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java index ffd1e90c..a2f80708 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/PredictionListener.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java @@ -1,4 +1,4 @@ -package eu.morphemic.prediction_orchestrator.communication.listeners; +package eu.morphemic.prediction_orchestrator.communication.activemq.listeners; import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.PredictionFromForecasterMessage; import eu.morphemic.prediction_orchestrator.model.Prediction; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ReceiveMetricsListener.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ReceiveMetricsListener.java similarity index 94% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ReceiveMetricsListener.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ReceiveMetricsListener.java index 6e5174e8..2801d7c5 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/ReceiveMetricsListener.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/ReceiveMetricsListener.java @@ -1,4 +1,4 @@ -package eu.morphemic.prediction_orchestrator.communication.listeners; +package eu.morphemic.prediction_orchestrator.communication.activemq.listeners; import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.MetricNeedingPredictingMessage; import eu.morphemic.prediction_orchestrator.service.Coordinator; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/RetrainedModelListener.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/RetrainedModelListener.java similarity index 93% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/RetrainedModelListener.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/RetrainedModelListener.java index f0587aec..13563a00 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/RetrainedModelListener.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/RetrainedModelListener.java @@ -1,4 +1,4 @@ -package eu.morphemic.prediction_orchestrator.communication.listeners; +package eu.morphemic.prediction_orchestrator.communication.activemq.listeners; import eu.morphemic.prediction_orchestrator.service.Coordinator; import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.RetrainedModelMessage; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/EnsemblerWebClient.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java similarity index 92% rename from prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/EnsemblerWebClient.java rename to prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java index 75ca93e6..b8cf37fe 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/EnsemblerWebClient.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java @@ -1,4 +1,4 @@ -package eu.morphemic.prediction_orchestrator.communication; +package eu.morphemic.prediction_orchestrator.communication.ensembler; import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.PredictionsEnsembledMessage; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PredictionsToEnsembleMessage; @@ -14,13 +14,13 @@ import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; @Slf4j -public class EnsemblerWebClient { +public class EnsemblerService { private String ensemblerUri; private final WebClient client; - public EnsemblerWebClient(String baseUrl, String ensemblerUri) { + public EnsemblerService(String baseUrl, String ensemblerUri) { //http client is only used for logging requests HttpClient httpClient = HttpClient.create() .wiretap(true); diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java new file mode 100644 index 00000000..d81de03e --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java @@ -0,0 +1,43 @@ +package eu.morphemic.prediction_orchestrator.communication.influx; + +import com.influxdb.client.write.Point; +import eu.morphemic.prediction_orchestrator.communication.influx.InfluxWebClient; +import eu.morphemic.prediction_orchestrator.properties.Properties; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.util.LinkedList; +import java.util.List; + +@Service +@Slf4j +@EnableAsync +public class InfluxService { + + private List pointsToSave = new LinkedList<>(); + + private InfluxWebClient influxWebClient; + + public InfluxService(Properties properties) { + this.influxWebClient = new InfluxWebClient(properties); + + } + + public synchronized void addPointToSaveToInflux(Point point) { + this.pointsToSave.add(point); + } + + @Scheduled(fixedRate = 30000) + public synchronized void savePointsToInflux() { + if(pointsToSave.size() == 0) { + log.info("Nothing to save to Influx"); + } else { + log.info("Saving {} points to influx", pointsToSave.size()); + influxWebClient.sendPoints(pointsToSave); + this.pointsToSave.clear(); + } + } + +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxWebClient.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxWebClient.java new file mode 100644 index 00000000..15525e33 --- /dev/null +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxWebClient.java @@ -0,0 +1,37 @@ +package eu.morphemic.prediction_orchestrator.communication.influx; + +import com.influxdb.client.*; +import com.influxdb.client.write.Point; +import eu.morphemic.prediction_orchestrator.properties.Properties; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Configuration; + +import java.util.List; + +@Configuration +@Getter +@Slf4j +public class InfluxWebClient { + + + private InfluxDBClient influxDBClient; + + private WriteApi writeApi; + + public InfluxWebClient(Properties properties) { + this.influxDBClient = InfluxDBClientFactory.create(InfluxDBClientOptions.builder() + .connectionString(properties.getInfluxHostname() + properties.getInfluxPort()) + .bucket(properties.getInfluxDatabase()) + .authenticate(properties.getInfluxUsername(), properties.getInfluxPassword().toCharArray()) + .build()); + this.writeApi = influxDBClient.makeWriteApi(WriteOptions.builder() + .batchSize(1000) + .build()); + } + + public void sendPoints(List points) { + writeApi.writePoints(points); + } + +} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/model/Prediction.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/model/Prediction.java index b3b89329..b3a73d21 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/model/Prediction.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/model/Prediction.java @@ -1,10 +1,13 @@ package eu.morphemic.prediction_orchestrator.model; +import com.influxdb.client.domain.WritePrecision; +import com.influxdb.client.write.Point; import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.PredictionFromForecasterMessage; import lombok.AllArgsConstructor; import lombok.Data; +import java.lang.reflect.Type; import java.util.List; @Data @@ -45,4 +48,13 @@ public class Prediction { this.cloud = prediction.getCloud(); this.provider = prediction.getProvider(); } + + public Point getAsPoint(String metricName, String methodName) { + return Point.measurement(metricName + "Predictions") + .time(timestamp, WritePrecision.S) + .addField(methodName + "_value", metricValue) + .addField(methodName + "_probability", probability) + .addField(methodName + "_left_conf", confidence_interval.get(0)) + .addField(methodName + "_right_conf", confidence_interval.get(1)); + } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java index 6da46b6a..c94ee5ce 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/Properties.java @@ -54,4 +54,19 @@ public class Properties { @Value("${ensembler.uri}") private String ensemblerUri; + @Value("${influx.hostname}") + private String influxHostname; + + @Value("${influx.port}") + private String influxPort; + + @Value("${influx.database}") + private String influxDatabase; + + @Value("${influx.username}") + private String influxUsername; + + @Value("${influx.password}") + private String influxPassword; + } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java index 443ff985..75e72532 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java @@ -1,6 +1,6 @@ package eu.morphemic.prediction_orchestrator.service; -import eu.morphemic.prediction_orchestrator.communication.CommunicationService; +import eu.morphemic.prediction_orchestrator.communication.activemq.ActiveMQService; import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.MetricNeedingPredictingMessage; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartEnsemblingMessage; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartForecastingMessage; @@ -29,14 +29,14 @@ public class Coordinator { @Autowired private Properties properties; - private CommunicationService communicationService; + private ActiveMQService activeMQService; @PostConstruct public void start() throws JMSException { log.info(properties.toString()); - this.communicationService = new CommunicationService(properties); - communicationService.startReceivingMetricList(this); - communicationService.startReceivingModelRetrained(this); + this.activeMQService = new ActiveMQService(properties); + activeMQService.startReceivingMetricList(this); + activeMQService.startReceivingModelRetrained(this); } /** As we need to check multiple values at once and block them after publishing, we need to make sure that only one thread is using @@ -63,13 +63,13 @@ public class Coordinator { if (lastValueUpdate == PredictionRegistry.VALUE_ADDED) { metricHandlers.forEach((metricName, metricHandler) -> { Prediction prediction = metricHandler.getPooledPrediction(predictionTime); - communicationService.publishPooledPrediction(prediction, metricName); + activeMQService.publishPooledPrediction(prediction, metricName); }); } else { //we send only updated value as at this point the entire vector has been send earlier Prediction prediction = metricHandlers.get(callingMetricHandler) .getPooledPrediction(predictionTime); - communicationService.publishPooledPrediction(prediction, callingMetricHandler); + activeMQService.publishPooledPrediction(prediction, callingMetricHandler); } } } @@ -113,7 +113,7 @@ public class Coordinator { this, forecastingConfiguration, properties, - communicationService + activeMQService ); for (String methodName : properties.getStartingMethodsList()) { newMetricHandler.addMethodHandler(methodName, forecastingConfiguration); @@ -133,7 +133,7 @@ public class Coordinator { forwardPredictionNumber, predictionHorizon ); - communicationService.publishStartForecasting(startForecastingMessage, entry.getKey()); + activeMQService.publishStartForecasting(startForecastingMessage, entry.getKey()); } //Inform the Ensembler @@ -147,7 +147,7 @@ public class Coordinator { predictionHorizon, properties.getStartingMethodsList() ); - communicationService.publishStartEnsembling(startEnsemblingMessage); + activeMQService.publishStartEnsembling(startEnsemblingMessage); //ToDo Stop metrics not included in new configuration file } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java index d4ff38eb..3bd087b9 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java @@ -1,6 +1,6 @@ package eu.morphemic.prediction_orchestrator.service; -import eu.morphemic.prediction_orchestrator.communication.CommunicationService; +import eu.morphemic.prediction_orchestrator.communication.activemq.ActiveMQService; import eu.morphemic.prediction_orchestrator.log_utils.PredictionTimeFormatter; import eu.morphemic.prediction_orchestrator.properties.Properties; import eu.morphemic.prediction_orchestrator.model.Prediction; @@ -33,10 +33,10 @@ public class MetricHandler { private PredictionRegistry pooledPredictionsRegistry; private EnsemblingMechanism ensemblingMechanism; - private CommunicationService communicationService; + private ActiveMQService activeMQService; public MetricHandler(String metricName, Coordinator coordinator, ForecastingConfiguration forecastingConfiguration, - Properties properties, CommunicationService communicationService) { + Properties properties, ActiveMQService activeMQService) { this.metricName = metricName; this.ensemblingMechanism = EnsemblingMechanismFactory.getEnsemblingMechanism(properties); this.coordinator = coordinator; @@ -44,7 +44,7 @@ public class MetricHandler { forecastingConfiguration, PredictionRegistry.getPooledPredictionsRegistryName(metricName) ); - this.communicationService = communicationService; + this.activeMQService = activeMQService; } public void launchMethodsPoolingIfNecessary(long predictionTime) throws JMSException { @@ -87,12 +87,12 @@ public class MetricHandler { ); MethodHandler methodHandler = new MethodHandler(null,null, predictionRegistry); this.methodHandlers.put(methodName, methodHandler); - this.communicationService.startReceivingPredictions(metricName, methodName, predictionRegistry, this); + this.activeMQService.startReceivingPredictions(metricName, methodName, predictionRegistry, this); } public void removeMethodHandler(String methodName) throws JMSException { this.methodHandlers.remove(methodName); - this.communicationService.stopReceivingPredictions(metricName, methodName); + this.activeMQService.stopReceivingPredictions(metricName, methodName); } List getConnectedMethods() { diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanismFactory.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanismFactory.java index fe138188..038484ca 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanismFactory.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanismFactory.java @@ -1,6 +1,6 @@ package eu.morphemic.prediction_orchestrator.service.ensembling; -import eu.morphemic.prediction_orchestrator.communication.EnsemblerWebClient; +import eu.morphemic.prediction_orchestrator.communication.ensembler.EnsemblerService; import eu.morphemic.prediction_orchestrator.properties.Properties; import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.AverageValuesEnsembler; import eu.morphemic.prediction_orchestrator.service.ensembling.ensembler.Ensembler; @@ -39,7 +39,7 @@ public class EnsemblingMechanismFactory { break; } case OUTER: { - ensembler = new OuterEnsembler(new EnsemblerWebClient( + ensembler = new OuterEnsembler(new EnsemblerService( properties.getEnsemblerBaseUrl(), properties.getEnsemblerUri() )); diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java index a15d9d63..8b84e9e3 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java @@ -1,6 +1,6 @@ package eu.morphemic.prediction_orchestrator.service.ensembling.ensembler; -import eu.morphemic.prediction_orchestrator.communication.EnsemblerWebClient; +import eu.morphemic.prediction_orchestrator.communication.ensembler.EnsemblerService; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PredictionsToEnsembleMessage; import eu.morphemic.prediction_orchestrator.model.Prediction; import lombok.AllArgsConstructor; @@ -11,7 +11,7 @@ import java.util.stream.Collectors; @AllArgsConstructor public class OuterEnsembler extends Ensembler { - private EnsemblerWebClient ensemblerWebClient; + private EnsemblerService ensemblerService; @Override public double ensembleValues(Map predictionsByMethod, String metricName) { @@ -25,6 +25,6 @@ public class OuterEnsembler extends Ensembler { predictionsByMethod.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getMetricValue())) ); - return ensemblerWebClient.ensemble(predictionsToEnsembleMessage).getEnsembledValue(); + return ensemblerService.ensemble(predictionsToEnsembleMessage).getEnsembledValue(); } } diff --git a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties index 509f5886..c0b6a3b1 100644 --- a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties +++ b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties @@ -53,3 +53,10 @@ activemq.restartcount=20 ensembler.url= ensembler.uri= + + +influx.hostname=ui-influxdb +influx.port=8086 +influx.database=morphemic +influx.username=morphemic +influx.password=password diff --git a/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/PublishReceiveTest.java b/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java similarity index 99% rename from prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/PublishReceiveTest.java rename to prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java index df388e5c..185315e2 100644 --- a/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/PublishReceiveTest.java +++ b/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java @@ -1,4 +1,4 @@ -package eu.morphemic.prediction_orchestrator.communication; +package eu.morphemic.prediction_orchestrator.communication.activemq; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; -- GitLab From bb060c1fb0d3886152fe3110edb32162d9df5d9c Mon Sep 17 00:00:00 2001 From: mriedl Date: Mon, 27 Dec 2021 15:51:05 +0100 Subject: [PATCH 06/23] added influ saving invokes --- .../communication/activemq/ActiveMQService.java | 8 ++++++-- .../activemq/listeners/PredictionListener.java | 14 +++++++++++--- .../service/MetricHandler.java | 6 +++++- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java index 038194fd..b22cf72a 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.melodic.event.brokerclient.BrokerClient; import eu.melodic.event.brokerclient.BrokerPublisher; +import eu.morphemic.prediction_orchestrator.communication.influx.InfluxService; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PooledPredictionMessage; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartEnsemblingMessage; import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry; @@ -51,13 +52,16 @@ public class ActiveMQService { public synchronized void startReceivingPredictions(String metricName, String methodName, PredictionRegistry predictionRegistry, - MetricHandler metricHandler) { + MetricHandler metricHandler, + InfluxService influxService) { log.info("Starting Receiving PredictionFromForecasterMessage from {} concerning metric {}", methodName, metricName); String topicName = TopicFactory.getReceivePredictionTopic(metricName, methodName); PredictionListener predictionListener = new PredictionListener( topicName, predictionRegistry, - metricHandler); + metricHandler, + methodName, + influxService); subscribeWithRetries(topicName, predictionListener); } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java index a2f80708..eaa01169 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java @@ -1,5 +1,6 @@ package eu.morphemic.prediction_orchestrator.communication.activemq.listeners; +import eu.morphemic.prediction_orchestrator.communication.influx.InfluxService; import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.PredictionFromForecasterMessage; import eu.morphemic.prediction_orchestrator.model.Prediction; import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry; @@ -14,12 +15,17 @@ public class PredictionListener extends ActiveMQListener { private PredictionRegistry predictionRegistry; private MetricHandler metricHandler; - public PredictionListener(String topicName, PredictionRegistry predictionRegistry, MetricHandler metricHandler) { + private InfluxService influxService; + private String methodName; + + public PredictionListener(String topicName, PredictionRegistry predictionRegistry, MetricHandler metricHandler, + String methodName, InfluxService influxService) { super(topicName); this.predictionRegistry = predictionRegistry; this.metricHandler = metricHandler; - + this.influxService = influxService; + this.methodName = methodName; log.debug("PredictionListener.: topic={}", topicName); } @@ -29,9 +35,11 @@ public class PredictionListener extends ActiveMQListener { log.debug("Listener of topic {}: Converting event payload to PredictionFromForecasterMessage instance...", topicName); PredictionFromForecasterMessage predictionFromForecasterMessage = gson.fromJson(payload, PredictionFromForecasterMessage.class); log.info("Listener of topic {}: PredictionFromForecasterMessage instance: {}", topicName, predictionFromForecasterMessage.toString()); - int valueResult = predictionRegistry.processPrediction(new Prediction(predictionFromForecasterMessage)); + Prediction prediction = new Prediction(predictionFromForecasterMessage); + int valueResult = predictionRegistry.processPrediction(prediction); if ((valueResult == PredictionRegistry.VALUE_ADDED) || (valueResult == PredictionRegistry.VALUE_UPDATED)) { + influxService.addPointToSaveToInflux(prediction.getAsPoint(metricHandler.getMetricName(), methodName)); metricHandler.launchMethodsPoolingIfNecessary(predictionFromForecasterMessage.getPredictionTime()); } } catch (JMSException e) { diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java index 3bd087b9..adbb3fa0 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java @@ -1,6 +1,7 @@ package eu.morphemic.prediction_orchestrator.service; import eu.morphemic.prediction_orchestrator.communication.activemq.ActiveMQService; +import eu.morphemic.prediction_orchestrator.communication.influx.InfluxService; import eu.morphemic.prediction_orchestrator.log_utils.PredictionTimeFormatter; import eu.morphemic.prediction_orchestrator.properties.Properties; import eu.morphemic.prediction_orchestrator.model.Prediction; @@ -34,6 +35,7 @@ public class MetricHandler { private EnsemblingMechanism ensemblingMechanism; private ActiveMQService activeMQService; + private InfluxService influxService; public MetricHandler(String metricName, Coordinator coordinator, ForecastingConfiguration forecastingConfiguration, Properties properties, ActiveMQService activeMQService) { @@ -61,6 +63,7 @@ public class MetricHandler { int valueResult = pooledPredictionsRegistry.processPrediction(newPooledPrediction); if ((valueResult == PredictionRegistry.VALUE_UPDATED) || (valueResult == PredictionRegistry.VALUE_ADDED)) { + influxService.addPointToSaveToInflux(newPooledPrediction.getAsPoint(metricName, "ensembled")); coordinator.publishIfPooledValuesProvidedForAllMetrics(predictionTime, valueResult, metricName); } } else { @@ -87,7 +90,8 @@ public class MetricHandler { ); MethodHandler methodHandler = new MethodHandler(null,null, predictionRegistry); this.methodHandlers.put(methodName, methodHandler); - this.activeMQService.startReceivingPredictions(metricName, methodName, predictionRegistry, this); + this.activeMQService.startReceivingPredictions(metricName, methodName, predictionRegistry, + this, influxService); } public void removeMethodHandler(String methodName) throws JMSException { -- GitLab From f6980d5e8d3d1ebde19dd22c80bc49eb11233225 Mon Sep 17 00:00:00 2001 From: mriedl Date: Mon, 27 Dec 2021 15:53:03 +0100 Subject: [PATCH 07/23] added influ saving invokes --- .../prediction_orchestrator/service/Coordinator.java | 6 +++++- .../prediction_orchestrator/service/MetricHandler.java | 3 ++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java index 75e72532..a25cbf20 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java @@ -1,6 +1,7 @@ package eu.morphemic.prediction_orchestrator.service; import eu.morphemic.prediction_orchestrator.communication.activemq.ActiveMQService; +import eu.morphemic.prediction_orchestrator.communication.influx.InfluxService; import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.MetricNeedingPredictingMessage; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartEnsemblingMessage; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartForecastingMessage; @@ -30,6 +31,7 @@ public class Coordinator { private Properties properties; private ActiveMQService activeMQService; + private InfluxService influxService; @PostConstruct public void start() throws JMSException { @@ -37,6 +39,7 @@ public class Coordinator { this.activeMQService = new ActiveMQService(properties); activeMQService.startReceivingMetricList(this); activeMQService.startReceivingModelRetrained(this); + this.influxService = new InfluxService(properties); } /** As we need to check multiple values at once and block them after publishing, we need to make sure that only one thread is using @@ -113,7 +116,8 @@ public class Coordinator { this, forecastingConfiguration, properties, - activeMQService + activeMQService, + influxService ); for (String methodName : properties.getStartingMethodsList()) { newMetricHandler.addMethodHandler(methodName, forecastingConfiguration); diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java index adbb3fa0..aaa3a35e 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java @@ -38,7 +38,7 @@ public class MetricHandler { private InfluxService influxService; public MetricHandler(String metricName, Coordinator coordinator, ForecastingConfiguration forecastingConfiguration, - Properties properties, ActiveMQService activeMQService) { + Properties properties, ActiveMQService activeMQService, InfluxService influxService) { this.metricName = metricName; this.ensemblingMechanism = EnsemblingMechanismFactory.getEnsemblingMechanism(properties); this.coordinator = coordinator; @@ -47,6 +47,7 @@ public class MetricHandler { PredictionRegistry.getPooledPredictionsRegistryName(metricName) ); this.activeMQService = activeMQService; + this.influxService = influxService; } public void launchMethodsPoolingIfNecessary(long predictionTime) throws JMSException { -- GitLab From 44cdfb6bc7cd1128111d08a6bf08778d72d74185 Mon Sep 17 00:00:00 2001 From: mriedl Date: Mon, 27 Dec 2021 15:59:45 +0100 Subject: [PATCH 08/23] added influ saving invokes --- .gitlab-ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 292e47c3..6fbc502f 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -175,6 +175,7 @@ deploy:prediction_orchestrator: - master - morphemic-rc1.5 - morphemic-rc2.0 + - ensemblerIntegration services: - $DOCKER_DIND_SERVICE dependencies: -- GitLab From 0a33cc449b76559280920b8044876edf97cab517 Mon Sep 17 00:00:00 2001 From: mriedl Date: Mon, 27 Dec 2021 17:19:31 +0100 Subject: [PATCH 09/23] fixed influx api usage --- .../communication/ensembler/EnsemblerService.java | 3 +-- .../communication/influx/InfluxWebClient.java | 12 +++++++----- .../eu.morphemic.predictionorchestrator.properties | 6 +++--- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java index b8cf37fe..4d2b72a5 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java @@ -3,11 +3,10 @@ package eu.morphemic.prediction_orchestrator.communication.ensembler; import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.PredictionsEnsembledMessage; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PredictionsToEnsembleMessage; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.client.reactive.ReactorClientHttpConnector; -import org.springframework.stereotype.Service; +; import org.springframework.web.reactive.function.client.UnknownHttpStatusCodeException; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxWebClient.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxWebClient.java index 15525e33..37e319f9 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxWebClient.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxWebClient.java @@ -20,11 +20,13 @@ public class InfluxWebClient { private WriteApi writeApi; public InfluxWebClient(Properties properties) { - this.influxDBClient = InfluxDBClientFactory.create(InfluxDBClientOptions.builder() - .connectionString(properties.getInfluxHostname() + properties.getInfluxPort()) - .bucket(properties.getInfluxDatabase()) - .authenticate(properties.getInfluxUsername(), properties.getInfluxPassword().toCharArray()) - .build()); + this.influxDBClient = InfluxDBClientFactory.createV1( + properties.getInfluxHostname() + ":" + properties.getInfluxPort(), + properties.getInfluxUsername(), + properties.getInfluxPassword().toCharArray(), + properties.getInfluxDatabase(), + "autogen" + ); this.writeApi = influxDBClient.makeWriteApi(WriteOptions.builder() .batchSize(1000) .build()); diff --git a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties index c0b6a3b1..1b8211eb 100644 --- a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties +++ b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties @@ -51,11 +51,11 @@ logging.zone_id=Europe/Warsaw activemq.restartinterval=10000 activemq.restartcount=20 -ensembler.url= -ensembler.uri= +ensembler.base-url=- +ensembler.uri=- -influx.hostname=ui-influxdb +influx.hostname=http://ui-influxdb influx.port=8086 influx.database=morphemic influx.username=morphemic -- GitLab From 1fb99e8ba3c48f7568ce5f71eb940e1bf8e75429 Mon Sep 17 00:00:00 2001 From: mriedl Date: Wed, 29 Dec 2021 10:55:30 +0100 Subject: [PATCH 10/23] changed influx y=usage to older api --- prediction_orchestrator/pom.xml | 6 +-- .../listeners/PredictionListener.java | 2 +- .../communication/influx/InfluxService.java | 52 +++++++++---------- .../communication/influx/InfluxWebClient.java | 39 -------------- .../PredictionFromForecasterMessage.java | 4 -- .../model/Prediction.java | 11 ++-- .../service/MetricHandler.java | 2 +- 7 files changed, 35 insertions(+), 81 deletions(-) delete mode 100644 prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxWebClient.java diff --git a/prediction_orchestrator/pom.xml b/prediction_orchestrator/pom.xml index 8f4b06a4..63dd52cc 100644 --- a/prediction_orchestrator/pom.xml +++ b/prediction_orchestrator/pom.xml @@ -50,9 +50,9 @@ spring-boot-starter-webflux - com.influxdb - influxdb-client-java - 3.4.0 + org.influxdb + influxdb-java + 2.15 org.springframework.boot diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java index eaa01169..f55da951 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/listeners/PredictionListener.java @@ -39,7 +39,7 @@ public class PredictionListener extends ActiveMQListener { int valueResult = predictionRegistry.processPrediction(prediction); if ((valueResult == PredictionRegistry.VALUE_ADDED) || (valueResult == PredictionRegistry.VALUE_UPDATED)) { - influxService.addPointToSaveToInflux(prediction.getAsPoint(metricHandler.getMetricName(), methodName)); + influxService.saveToInflux(prediction.getAsPoint(metricHandler.getMetricName(), methodName)); metricHandler.launchMethodsPoolingIfNecessary(predictionFromForecasterMessage.getPredictionTime()); } } catch (JMSException e) { diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java index d81de03e..28f3d7b5 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java @@ -1,43 +1,41 @@ package eu.morphemic.prediction_orchestrator.communication.influx; -import com.influxdb.client.write.Point; -import eu.morphemic.prediction_orchestrator.communication.influx.InfluxWebClient; import eu.morphemic.prediction_orchestrator.properties.Properties; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.springframework.scheduling.annotation.EnableAsync; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Service; +import okhttp3.OkHttpClient; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Point; +import org.springframework.context.annotation.Configuration; -import java.util.LinkedList; -import java.util.List; +import java.util.concurrent.TimeUnit; -@Service +@Configuration +@Getter @Slf4j -@EnableAsync public class InfluxService { + public static final int DEFAULT_BATCH_ACTIONS_LIMIT = 50; + public static final int DEFAULT_BATCH_INTERVAL_DURATION = 50; - private List pointsToSave = new LinkedList<>(); - - private InfluxWebClient influxWebClient; + private InfluxDB influxDB; public InfluxService(Properties properties) { - this.influxWebClient = new InfluxWebClient(properties); - - } - - public synchronized void addPointToSaveToInflux(Point point) { - this.pointsToSave.add(point); + OkHttpClient.Builder okHttpbuilder = new OkHttpClient.Builder(); + + influxDB = InfluxDBFactory.connect(properties.getInfluxHostname() + + ":" + properties.getInfluxPort(), okHttpbuilder); + influxDB.enableBatch(DEFAULT_BATCH_ACTIONS_LIMIT, DEFAULT_BATCH_INTERVAL_DURATION, + TimeUnit.SECONDS, runnable -> { + Thread thread = new Thread(runnable); + thread.setDaemon(true); + return thread; + }); + Runtime.getRuntime().addShutdownHook(new Thread(influxDB::close)); } - @Scheduled(fixedRate = 30000) - public synchronized void savePointsToInflux() { - if(pointsToSave.size() == 0) { - log.info("Nothing to save to Influx"); - } else { - log.info("Saving {} points to influx", pointsToSave.size()); - influxWebClient.sendPoints(pointsToSave); - this.pointsToSave.clear(); - } + public synchronized void saveToInflux(Point point) { + influxDB.write(point); } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxWebClient.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxWebClient.java deleted file mode 100644 index 37e319f9..00000000 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxWebClient.java +++ /dev/null @@ -1,39 +0,0 @@ -package eu.morphemic.prediction_orchestrator.communication.influx; - -import com.influxdb.client.*; -import com.influxdb.client.write.Point; -import eu.morphemic.prediction_orchestrator.properties.Properties; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.springframework.context.annotation.Configuration; - -import java.util.List; - -@Configuration -@Getter -@Slf4j -public class InfluxWebClient { - - - private InfluxDBClient influxDBClient; - - private WriteApi writeApi; - - public InfluxWebClient(Properties properties) { - this.influxDBClient = InfluxDBClientFactory.createV1( - properties.getInfluxHostname() + ":" + properties.getInfluxPort(), - properties.getInfluxUsername(), - properties.getInfluxPassword().toCharArray(), - properties.getInfluxDatabase(), - "autogen" - ); - this.writeApi = influxDBClient.makeWriteApi(WriteOptions.builder() - .batchSize(1000) - .build()); - } - - public void sendPoints(List points) { - writeApi.writePoints(points); - } - -} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionFromForecasterMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionFromForecasterMessage.java index 21bbe79d..0242f0df 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionFromForecasterMessage.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionFromForecasterMessage.java @@ -15,7 +15,6 @@ import java.util.List; @ToString public class PredictionFromForecasterMessage { @JsonProperty(EventFields.ForecastingMethodsToPredictionOrchestratorIntermediatePredictionsFields.metric_value) - @NonNull @Min(0) private double metricValue; @@ -24,12 +23,10 @@ public class PredictionFromForecasterMessage { private int level; @JsonProperty(EventFields.ForecastingMethodsToPredictionOrchestratorIntermediatePredictionsFields.timestamp) - @NonNull @Min(1) private long timestamp; @JsonProperty(EventFields.ForecastingMethodsToPredictionOrchestratorIntermediatePredictionsFields.probability) - @NonNull @Min(0) @Max(1) private double probability; @@ -40,7 +37,6 @@ public class PredictionFromForecasterMessage { private List confidence_interval; @JsonProperty(EventFields.ForecastingMethodsToPredictionOrchestratorIntermediatePredictionsFields.prediction_time) - @NonNull @Min(1) private long predictionTime; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/model/Prediction.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/model/Prediction.java index b3a73d21..ca69612e 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/model/Prediction.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/model/Prediction.java @@ -1,14 +1,12 @@ package eu.morphemic.prediction_orchestrator.model; - -import com.influxdb.client.domain.WritePrecision; -import com.influxdb.client.write.Point; import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.PredictionFromForecasterMessage; import lombok.AllArgsConstructor; import lombok.Data; +import org.influxdb.dto.Point; -import java.lang.reflect.Type; import java.util.List; +import java.util.concurrent.TimeUnit; @Data @AllArgsConstructor @@ -51,10 +49,11 @@ public class Prediction { public Point getAsPoint(String metricName, String methodName) { return Point.measurement(metricName + "Predictions") - .time(timestamp, WritePrecision.S) + .time(timestamp, TimeUnit.SECONDS) .addField(methodName + "_value", metricValue) .addField(methodName + "_probability", probability) .addField(methodName + "_left_conf", confidence_interval.get(0)) - .addField(methodName + "_right_conf", confidence_interval.get(1)); + .addField(methodName + "_right_conf", confidence_interval.get(1)) + .build(); } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java index aaa3a35e..56b1f923 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java @@ -64,7 +64,7 @@ public class MetricHandler { int valueResult = pooledPredictionsRegistry.processPrediction(newPooledPrediction); if ((valueResult == PredictionRegistry.VALUE_UPDATED) || (valueResult == PredictionRegistry.VALUE_ADDED)) { - influxService.addPointToSaveToInflux(newPooledPrediction.getAsPoint(metricName, "ensembled")); + influxService.saveToInflux(newPooledPrediction.getAsPoint(metricName, "ensembled")); coordinator.publishIfPooledValuesProvidedForAllMetrics(predictionTime, valueResult, metricName); } } else { -- GitLab From 42b948520165f5ad348b2fdedceea7f3ca0beab3 Mon Sep 17 00:00:00 2001 From: mriedl Date: Wed, 29 Dec 2021 13:37:47 +0100 Subject: [PATCH 11/23] troubleshooting --- .../communication/influx/InfluxService.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java index 28f3d7b5..489b80c3 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java @@ -22,9 +22,13 @@ public class InfluxService { public InfluxService(Properties properties) { OkHttpClient.Builder okHttpbuilder = new OkHttpClient.Builder(); + okHttpbuilder.readTimeout(10, TimeUnit.SECONDS); + okHttpbuilder.writeTimeout(10, TimeUnit.SECONDS); - influxDB = InfluxDBFactory.connect(properties.getInfluxHostname() - + ":" + properties.getInfluxPort(), okHttpbuilder); + String url = properties.getInfluxHostname() + ":" + properties.getInfluxPort(); + log.info("Connecting to influx: {}", url); + influxDB = InfluxDBFactory.connect(url, okHttpbuilder); + log.info("Connected"); influxDB.enableBatch(DEFAULT_BATCH_ACTIONS_LIMIT, DEFAULT_BATCH_INTERVAL_DURATION, TimeUnit.SECONDS, runnable -> { Thread thread = new Thread(runnable); -- GitLab From 14535ac27346b7c6ddbcf8a16c9d68252e3d1aed Mon Sep 17 00:00:00 2001 From: mriedl Date: Wed, 29 Dec 2021 15:39:44 +0100 Subject: [PATCH 12/23] fixed influx bugs --- .../communication/influx/InfluxService.java | 5 ++++- .../prediction_orchestrator/service/MetricHandler.java | 7 +++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java index 489b80c3..39a35640 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/influx/InfluxService.java @@ -20,7 +20,10 @@ public class InfluxService { private InfluxDB influxDB; + private String database; + public InfluxService(Properties properties) { + this.database = properties.getInfluxDatabase(); OkHttpClient.Builder okHttpbuilder = new OkHttpClient.Builder(); okHttpbuilder.readTimeout(10, TimeUnit.SECONDS); okHttpbuilder.writeTimeout(10, TimeUnit.SECONDS); @@ -39,7 +42,7 @@ public class InfluxService { } public synchronized void saveToInflux(Point point) { - influxDB.write(point); + influxDB.write(database,"",point); } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java index 56b1f923..4a9e4237 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java @@ -52,10 +52,9 @@ public class MetricHandler { public void launchMethodsPoolingIfNecessary(long predictionTime) throws JMSException { Map predictionsByMethod = methodHandlers.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> e.getValue().getPredictionRegistry().getCopyPrediction(predictionTime) - )); + .collect(HashMap::new, + (m, e) -> m.put(e.getKey(), e.getValue().getPredictionRegistry().getCopyPrediction(predictionTime)), + HashMap::putAll); Prediction newPooledPrediction = ensemblingMechanism.poolPredictions(predictionsByMethod, metricName); if (newPooledPrediction != EnsemblingMechanism.POOLED_PREDICTION_NOT_CREATED) { -- GitLab From 30c5b86e2abf055ff214be4280d78492c49a76a4 Mon Sep 17 00:00:00 2001 From: mriedl Date: Wed, 29 Dec 2021 15:58:47 +0100 Subject: [PATCH 13/23] fixed ensembler bugs --- .../service/ensembling/EnsemblingMechanism.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java index fe7fe7d2..086f2ecf 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java @@ -41,7 +41,7 @@ public class EnsemblingMechanism { .get(); return new Prediction( - ensembler.ensembleValues(predictionsByMethod, metricName), + ensembler.ensembleValues(validPredictions, metricName), System.currentTimeMillis() / 1000, anyPrediction.getPredictionTime(), validPredictions.values().stream() -- GitLab From 28fcb7099e5b13fb4d8f7c04891943e567474f02 Mon Sep 17 00:00:00 2001 From: mriedl Date: Mon, 10 Jan 2022 11:49:37 +0100 Subject: [PATCH 14/23] - changed map for ensembling interface to contain nulls - changed format of start ensembling message to contain more metric info --- .../activemq/ActiveMQService.java | 1 - .../PredictionsToEnsembleMessage.java | 2 +- .../StartEnsemblingMessage.java | 22 +++---------------- .../service/Coordinator.java | 8 +------ .../ensembling/EnsemblingMechanism.java | 12 +++++----- .../ensembler/AverageValuesEnsembler.java | 2 ++ 6 files changed, 13 insertions(+), 34 deletions(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java index b22cf72a..ccd792ac 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/activemq/ActiveMQService.java @@ -94,7 +94,6 @@ public class ActiveMQService { brokerPublishers.computeIfAbsent(topicName, key -> getNewPublisher(topicName) ).publish(result); - } public synchronized void publishPooledPrediction(Prediction prediction, String metricName) { diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/PredictionsToEnsembleMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/PredictionsToEnsembleMessage.java index db518db7..492391ca 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/PredictionsToEnsembleMessage.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/PredictionsToEnsembleMessage.java @@ -21,7 +21,7 @@ public class PredictionsToEnsembleMessage { @NonNull private Long predictionTime; - @JsonProperty("predictionToEnsemble") + @JsonProperty("predictionsToEnsemble") @NonNull private Map predictionsByForecaster; } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartEnsemblingMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartEnsemblingMessage.java index c5ea2aec..7c34964e 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartEnsemblingMessage.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartEnsemblingMessage.java @@ -1,7 +1,7 @@ package eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages; import com.fasterxml.jackson.annotation.JsonProperty; -import eu.melodic.event.brokerclient.templates.EventFields; +import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.MetricNeedingPredictingMessage; import lombok.AllArgsConstructor; import lombok.NonNull; @@ -10,25 +10,9 @@ import java.util.List; @AllArgsConstructor public class StartEnsemblingMessage { - @JsonProperty(EventFields.PredictionOrchestratorToForecastingMethodsStartForecastingEventFields.metrics) + @JsonProperty("metrics") @NonNull - private List metrics; - - @JsonProperty(EventFields.PredictionOrchestratorToForecastingMethodsStartForecastingEventFields.timestamp) - @NonNull - private long timestamp; - - @JsonProperty(EventFields.PredictionOrchestratorToForecastingMethodsStartForecastingEventFields.epoch_start) - @NonNull - private long epoch_start; - - @JsonProperty(EventFields.PredictionOrchestratorToForecastingMethodsStartForecastingEventFields.number_of_forward_predictions) - @NonNull - private int number_of_forward_predictions; - - @JsonProperty(EventFields.PredictionOrchestratorToForecastingMethodsStartForecastingEventFields.prediction_horizon) - @NonNull - private int prediction_horizon; + private List metrics; @JsonProperty("models") @NonNull diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java index a25cbf20..0aed9101 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java @@ -142,13 +142,7 @@ public class Coordinator { //Inform the Ensembler StartEnsemblingMessage startEnsemblingMessage = new StartEnsemblingMessage( - metricNeedingPredictingMessageList.stream() - .map(MetricNeedingPredictingMessage::getMetric) - .collect(Collectors.toList()), - System.currentTimeMillis() / 1000, - epochStartingForecast, - forwardPredictionNumber, - predictionHorizon, + metricNeedingPredictingMessageList, properties.getStartingMethodsList() ); activeMQService.publishStartEnsembling(startEnsemblingMessage); diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java index 086f2ecf..4fb91305 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java @@ -19,15 +19,15 @@ public class EnsemblingMechanism { public Prediction poolPredictions(Map predictionsByMethod, String metricName) { int expectedForecastersDataCount = predictionsByMethod.size(); - Map validPredictions = predictionsByMethod.entrySet() + Map nonNullPredictions = predictionsByMethod.entrySet() .stream() .filter(e -> Objects.nonNull(e.getValue())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - int numberOfValidData = validPredictions.size(); + int numberOfValidData = nonNullPredictions.size(); if (notEnoughDataToCreatePooledValue(numberOfValidData, expectedForecastersDataCount)) { return POOLED_PREDICTION_NOT_CREATED; } - Set confidence_interval_values = validPredictions.values().stream() + Set confidence_interval_values = nonNullPredictions.values().stream() .map(Prediction::getConfidence_interval) .flatMap(Collection::stream) .collect(Collectors.toSet()); @@ -36,15 +36,15 @@ public class EnsemblingMechanism { confidence_interval_values.stream().max(Double::compareTo).get() ); - Prediction anyPrediction = validPredictions.values().stream() + Prediction anyPrediction = nonNullPredictions.values().stream() .findAny() .get(); return new Prediction( - ensembler.ensembleValues(validPredictions, metricName), + ensembler.ensembleValues(predictionsByMethod, metricName), System.currentTimeMillis() / 1000, anyPrediction.getPredictionTime(), - validPredictions.values().stream() + nonNullPredictions.values().stream() .mapToDouble(Prediction::getProbability) .average() .orElse(Double.NaN), diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/AverageValuesEnsembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/AverageValuesEnsembler.java index dd532c7e..e5bc5e7c 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/AverageValuesEnsembler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/AverageValuesEnsembler.java @@ -3,6 +3,7 @@ package eu.morphemic.prediction_orchestrator.service.ensembling.ensembler; import eu.morphemic.prediction_orchestrator.model.Prediction; import java.util.Map; +import java.util.Objects; public class AverageValuesEnsembler extends Ensembler { @@ -10,6 +11,7 @@ public class AverageValuesEnsembler extends Ensembler { @Override public double ensembleValues(Map predictionsByMethod, String metricName) { return predictionsByMethod.values().stream() + .filter(Objects::nonNull) .mapToDouble(Prediction::getMetricValue) .average() .orElse(Double.NaN); -- GitLab From c6bca6a6d8c067aa0ea9e637b213d615a34ea823 Mon Sep 17 00:00:00 2001 From: mriedl Date: Mon, 10 Jan 2022 15:45:06 +0100 Subject: [PATCH 15/23] fixed field names --- .../incoming_messages/PredictionsEnsembledMessage.java | 2 +- .../service/ensembling/ensembler/OuterEnsembler.java | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java index f855ed15..173f3f10 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java @@ -9,7 +9,7 @@ import lombok.NonNull; @Getter public class PredictionsEnsembledMessage { - @JsonProperty("metricValue") + @JsonProperty("ensembledValue") @NonNull private double ensembledValue; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java index 8b84e9e3..8640cb56 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java @@ -5,7 +5,9 @@ import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_mes import eu.morphemic.prediction_orchestrator.model.Prediction; import lombok.AllArgsConstructor; +import java.util.AbstractMap; import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; @AllArgsConstructor @@ -23,7 +25,12 @@ public class OuterEnsembler extends Ensembler { .get() .getPredictionTime(), predictionsByMethod.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getMetricValue())) + .collect(Collectors.toMap(Map.Entry::getKey, entry -> { + if (entry.getValue() == null) + return null; + else + return entry.getValue().getMetricValue(); + })) ); return ensemblerService.ensemble(predictionsToEnsembleMessage).getEnsembledValue(); } -- GitLab From c8524ba17bbc4df877d9cc7e2f2065b86d72b5fe Mon Sep 17 00:00:00 2001 From: mriedl Date: Mon, 10 Jan 2022 16:14:45 +0100 Subject: [PATCH 16/23] fixed null values map in outer ensembling --- .../ensembling/ensembler/OuterEnsembler.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java index 8640cb56..568c831a 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java @@ -5,10 +5,8 @@ import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_mes import eu.morphemic.prediction_orchestrator.model.Prediction; import lombok.AllArgsConstructor; -import java.util.AbstractMap; +import java.util.HashMap; import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; @AllArgsConstructor public class OuterEnsembler extends Ensembler { @@ -25,12 +23,9 @@ public class OuterEnsembler extends Ensembler { .get() .getPredictionTime(), predictionsByMethod.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> { - if (entry.getValue() == null) - return null; - else - return entry.getValue().getMetricValue(); - })) + .collect(HashMap::new, + (m, v) -> m.put(v.getKey(), v.getValue() == null ? null : v.getValue().getMetricValue()), + HashMap::putAll) ); return ensemblerService.ensemble(predictionsToEnsembleMessage).getEnsembledValue(); } -- GitLab From f3af24a7e3fde3444109d24c8bfd3dd060a95eb7 Mon Sep 17 00:00:00 2001 From: mriedl Date: Mon, 10 Jan 2022 16:48:41 +0100 Subject: [PATCH 17/23] set method --- .../service/ensembling/ensembler/OuterEnsembler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java index 568c831a..b0a106ae 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java @@ -16,7 +16,7 @@ public class OuterEnsembler extends Ensembler { @Override public double ensembleValues(Map predictionsByMethod, String metricName) { PredictionsToEnsembleMessage predictionsToEnsembleMessage = new PredictionsToEnsembleMessage( - null, + "BestSubset", metricName, predictionsByMethod.values().stream() .findFirst() -- GitLab From 228b8ed4a02ada241b266add026e4a5d115ec1b0 Mon Sep 17 00:00:00 2001 From: mriedl Date: Wed, 12 Jan 2022 09:39:49 +0100 Subject: [PATCH 18/23] changed publish rate format --- .../incoming_messages/MetricNeedingPredictingMessage.java | 2 +- .../resources/eu.morphemic.predictionorchestrator.properties | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java index 7622b533..34162b65 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java @@ -24,6 +24,6 @@ public class MetricNeedingPredictingMessage { @JsonProperty(EventFields.TranslatorToForecastingMethodsFieldsPerMetric.publish_rate) @NonNull @Min(1) - private String publish_rate; + private int publish_rate; } diff --git a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties index 1b8211eb..2a6dbe2a 100644 --- a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties +++ b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties @@ -51,8 +51,8 @@ logging.zone_id=Europe/Warsaw activemq.restartinterval=10000 activemq.restartcount=20 -ensembler.base-url=- -ensembler.uri=- +ensembler.base-url=http://ensembler:8000 +ensembler.uri=/ensemble influx.hostname=http://ui-influxdb -- GitLab From 46db83ccb61dafdfa5eb4b9e4e0af425a422edba Mon Sep 17 00:00:00 2001 From: mriedl Date: Wed, 12 Jan 2022 10:34:27 +0100 Subject: [PATCH 19/23] error handling in communication with ensembler --- .../ensembler/EnsemblerService.java | 2 +- .../ensembling/EnsemblingMechanism.java | 32 +++++++++++-------- .../ensembling/ensembler/OuterEnsembler.java | 16 +++++++++- ...orphemic.predictionorchestrator.properties | 3 ++ 4 files changed, 37 insertions(+), 16 deletions(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java index 4d2b72a5..c0f910ef 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/ensembler/EnsemblerService.java @@ -20,7 +20,7 @@ public class EnsemblerService { private final WebClient client; public EnsemblerService(String baseUrl, String ensemblerUri) { - //http client is only used for logging requests + //http client is only used for requests logging HttpClient httpClient = HttpClient.create() .wiretap(true); this.ensemblerUri = ensemblerUri; diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java index 4fb91305..d68ba512 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/EnsemblingMechanism.java @@ -39,21 +39,25 @@ public class EnsemblingMechanism { Prediction anyPrediction = nonNullPredictions.values().stream() .findAny() .get(); + try { - return new Prediction( - ensembler.ensembleValues(predictionsByMethod, metricName), - System.currentTimeMillis() / 1000, - anyPrediction.getPredictionTime(), - nonNullPredictions.values().stream() - .mapToDouble(Prediction::getProbability) - .average() - .orElse(Double.NaN), - confidence_interval, - anyPrediction.getLevel(), - anyPrediction.getRefersTo(), - anyPrediction.getCloud(), - anyPrediction.getProvider() - ); + return new Prediction( + ensembler.ensembleValues(predictionsByMethod, metricName), + System.currentTimeMillis() / 1000, + anyPrediction.getPredictionTime(), + nonNullPredictions.values().stream() + .mapToDouble(Prediction::getProbability) + .average() + .orElse(Double.NaN), + confidence_interval, + anyPrediction.getLevel(), + anyPrediction.getRefersTo(), + anyPrediction.getCloud(), + anyPrediction.getProvider() + ); + } catch (Exception e) { + return POOLED_PREDICTION_NOT_CREATED; + } } private boolean notEnoughDataToCreatePooledValue(int numberOfValidData, int expectedForecastersDataCount) { diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java index b0a106ae..ade2bd02 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java @@ -1,14 +1,17 @@ package eu.morphemic.prediction_orchestrator.service.ensembling.ensembler; import eu.morphemic.prediction_orchestrator.communication.ensembler.EnsemblerService; +import eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages.PredictionsEnsembledMessage; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PredictionsToEnsembleMessage; import eu.morphemic.prediction_orchestrator.model.Prediction; import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; import java.util.HashMap; import java.util.Map; @AllArgsConstructor +@Slf4j public class OuterEnsembler extends Ensembler { private EnsemblerService ensemblerService; @@ -27,6 +30,17 @@ public class OuterEnsembler extends Ensembler { (m, v) -> m.put(v.getKey(), v.getValue() == null ? null : v.getValue().getMetricValue()), HashMap::putAll) ); - return ensemblerService.ensemble(predictionsToEnsembleMessage).getEnsembledValue(); + try { + PredictionsEnsembledMessage ensembledMessage = ensemblerService.ensemble(predictionsToEnsembleMessage); + if (ensembledMessage != null) { + return ensembledMessage.getEnsembledValue(); + } else { + log.warn("Message from ensembler is null"); + throw new IllegalArgumentException("Message from the Ensembler is null"); + } + } catch (Exception e) { + log.info("Error when sending ensembling request: {}", e.getMessage()); + throw new IllegalArgumentException("Error while sending message to the Ensember"); + } } } diff --git a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties index 2a6dbe2a..6e93c62b 100644 --- a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties +++ b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties @@ -60,3 +60,6 @@ influx.port=8086 influx.database=morphemic influx.username=morphemic influx.password=password + +#ensembler communication logging +logging.level.reactor.netty.http.client=false -- GitLab From 68b72d3c52d7242d67a375282963983bd97cc729 Mon Sep 17 00:00:00 2001 From: mriedl Date: Wed, 12 Jan 2022 11:08:16 +0100 Subject: [PATCH 20/23] minor bgs solved --- .../incoming_messages/PredictionsEnsembledMessage.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java index 173f3f10..12a668f5 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/PredictionsEnsembledMessage.java @@ -1,12 +1,11 @@ package eu.morphemic.prediction_orchestrator.communication.messages.incoming_messages; import com.fasterxml.jackson.annotation.JsonProperty; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NonNull; +import lombok.*; @AllArgsConstructor -@Getter +@NoArgsConstructor +@Data public class PredictionsEnsembledMessage { @JsonProperty("ensembledValue") -- GitLab From 08dd72ff734075df2ddbe997296de34d99fbc696 Mon Sep 17 00:00:00 2001 From: mriedl Date: Wed, 12 Jan 2022 11:50:03 +0100 Subject: [PATCH 21/23] minor bgs solved --- .../ensembling/ensembler/OuterEnsembler.java | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java index ade2bd02..4fd3d278 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java @@ -18,29 +18,35 @@ public class OuterEnsembler extends Ensembler { @Override public double ensembleValues(Map predictionsByMethod, String metricName) { + long predictionTime = predictionsByMethod.values().stream() + .findFirst() + .get() + .getPredictionTime(); PredictionsToEnsembleMessage predictionsToEnsembleMessage = new PredictionsToEnsembleMessage( "BestSubset", metricName, - predictionsByMethod.values().stream() - .findFirst() - .get() - .getPredictionTime(), + predictionTime, predictionsByMethod.entrySet().stream() .collect(HashMap::new, (m, v) -> m.put(v.getKey(), v.getValue() == null ? null : v.getValue().getMetricValue()), HashMap::putAll) ); + PredictionsEnsembledMessage ensembledMessage; try { - PredictionsEnsembledMessage ensembledMessage = ensemblerService.ensemble(predictionsToEnsembleMessage); - if (ensembledMessage != null) { - return ensembledMessage.getEnsembledValue(); - } else { - log.warn("Message from ensembler is null"); - throw new IllegalArgumentException("Message from the Ensembler is null"); - } + ensembledMessage = ensemblerService.ensemble(predictionsToEnsembleMessage); } catch (Exception e) { log.info("Error when sending ensembling request: {}", e.getMessage()); throw new IllegalArgumentException("Error while sending message to the Ensember"); } + + if (ensembledMessage == null) { + log.warn("Message from ensembler is null"); + throw new IllegalArgumentException("Message from the Ensembler is null"); + } + if (ensembledMessage.getPredictionTime() != predictionTime) { + log.warn("Message from the Ensembler has wrong predictionTime"); + throw new IllegalArgumentException("Message from the Ensembler has wrong predictionTime"); + } + return ensembledMessage.getEnsembledValue(); } } -- GitLab From e4d066c6755993e776e8cb17cecf4eb5a145bab2 Mon Sep 17 00:00:00 2001 From: mriedl Date: Wed, 12 Jan 2022 14:04:56 +0100 Subject: [PATCH 22/23] minor bgs solved --- .../service/ensembling/ensembler/OuterEnsembler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java index 4fd3d278..9dfd0051 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/ensembling/ensembler/OuterEnsembler.java @@ -38,7 +38,7 @@ public class OuterEnsembler extends Ensembler { log.info("Error when sending ensembling request: {}", e.getMessage()); throw new IllegalArgumentException("Error while sending message to the Ensember"); } - + if (ensembledMessage == null) { log.warn("Message from ensembler is null"); throw new IllegalArgumentException("Message from the Ensembler is null"); -- GitLab From 09082976ec581b56fd1df0fd82442f7a2ad160aa Mon Sep 17 00:00:00 2001 From: mriedl Date: Wed, 12 Jan 2022 16:20:59 +0100 Subject: [PATCH 23/23] reverted yaml --- .gitlab-ci.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 6c29774e..9fad29f6 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -175,7 +175,6 @@ deploy:prediction_orchestrator: - master - morphemic-rc1.5 - morphemic-rc2.0 - - ensemblerIntegration services: - $DOCKER_DIND_SERVICE dependencies: -- GitLab