From 76cb2c8c1b7cbe05624a4468ae0b6c28e1c8fa0e Mon Sep 17 00:00:00 2001 From: mriedl Date: Tue, 1 Mar 2022 17:13:47 +0100 Subject: [PATCH 1/6] adapted coordinator for repeated metrics list with version field --- .../MetricNeedingPredictingMessage.java | 4 ++ .../StartForecastingMessage.java | 15 ++++- .../properties/ForecastingConfiguration.java | 4 ++ .../registries/PredictionRegistry.java | 21 ++++--- .../service/Coordinator.java | 55 ++++++++++++++----- .../service/MetricHandler.java | 4 ++ 6 files changed, 82 insertions(+), 21 deletions(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java index 34162b65..7402fd06 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java @@ -25,5 +25,9 @@ public class MetricNeedingPredictingMessage { @NonNull @Min(1) private int publish_rate; + + @JsonProperty("version") + @NonNull + private int version; } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartForecastingMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartForecastingMessage.java index f5df1c76..3a8858c9 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartForecastingMessage.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartForecastingMessage.java @@ -2,13 +2,13 @@ package eu.morphemic.prediction_orchestrator.communication.messages.outcoming_me import com.fasterxml.jackson.annotation.JsonProperty; import eu.melodic.event.brokerclient.templates.EventFields; +import eu.morphemic.prediction_orchestrator.properties.ForecastingConfiguration; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NonNull; import java.util.List; -@AllArgsConstructor @Getter public class StartForecastingMessage { @JsonProperty(EventFields.PredictionOrchestratorToForecastingMethodsStartForecastingEventFields.metrics) @@ -30,4 +30,17 @@ public class StartForecastingMessage { @JsonProperty(EventFields.PredictionOrchestratorToForecastingMethodsStartForecastingEventFields.prediction_horizon) @NonNull private int prediction_horizon; + + @JsonProperty("version") + @NonNull + private int version; + + public StartForecastingMessage(List metrics, long timestamp, ForecastingConfiguration forecastingConfiguration) { + this.metrics = metrics; + this.timestamp = timestamp; + this.epoch_start = forecastingConfiguration.getFirst_epoch_starting_forecast(); + this.number_of_forward_predictions = (int) forecastingConfiguration.getForward_prediction_number(); + this.prediction_horizon = forecastingConfiguration.getPrediction_horizon(); + this.version = forecastingConfiguration.getVersion(); + } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/ForecastingConfiguration.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/ForecastingConfiguration.java index cc4622ee..53b1802c 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/ForecastingConfiguration.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/ForecastingConfiguration.java @@ -2,18 +2,22 @@ package eu.morphemic.prediction_orchestrator.properties; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.Setter; @AllArgsConstructor @Getter +@Setter public class ForecastingConfiguration { private long first_epoch_starting_forecast; private int prediction_horizon; private long forward_prediction_number; + private int version; public ForecastingConfiguration(ForecastingConfiguration forecastingConfiguration) { this.first_epoch_starting_forecast = forecastingConfiguration.getFirst_epoch_starting_forecast(); this.prediction_horizon = forecastingConfiguration.getPrediction_horizon(); this.forward_prediction_number = forecastingConfiguration.getForward_prediction_number(); + this.version = forecastingConfiguration.getVersion(); } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistry.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistry.java index fe776e07..a097cbcb 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistry.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistry.java @@ -30,11 +30,11 @@ import java.util.concurrent.atomic.AtomicReference; @Slf4j public class PredictionRegistry { - public static int VALUE_ADDED = 2; - public static int VALUE_UPDATED = 1; - public static int VALUE_REJECTED = 0; + public final static int VALUE_ADDED = 2; + public final static int VALUE_UPDATED = 1; + public final static int VALUE_REJECTED = 0; - String registryName; + private String registryName; private AtomicReference forecastingConfiguration; @@ -96,7 +96,12 @@ public class PredictionRegistry { return predictions.containsKey(predictionTime); } - int updateBufferRecord(Prediction prediction) throws JMSException { + public long geEarliestPredictionTime() { + return earliestPredictionTime.get(); + + } + + private int updateBufferRecord(Prediction prediction) throws JMSException { AtomicInteger valueResult = new AtomicInteger(VALUE_REJECTED); predictions.compute( prediction.getPredictionTime(), @@ -121,17 +126,19 @@ public class PredictionRegistry { return valueResult.get(); } - void removeBufferRecord(long keyToRemove) { + private void removeBufferRecord(long keyToRemove) { this.predictions.remove(keyToRemove); } - String formatPredictionTime(long predictionTime) { + private String formatPredictionTime(long predictionTime) { return PredictionTimeFormatter.horizonPatternCheckFormat(predictionTime, earliestPredictionTime.get(), forecastingConfiguration.get().getPrediction_horizon() ); } + + private void moveBuffer(long predictionTime) { long firstToRemove = earliestPredictionTime.get(); long shift = (predictionTime - latestPredictionTime.get()); diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java index 0aed9101..baa4139d 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java @@ -18,7 +18,6 @@ import javax.annotation.PostConstruct; import javax.jms.JMSException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; @Slf4j @Component @@ -91,15 +90,25 @@ public class Coordinator { //Communication 2 public void notifyReceivedMetricList(List metricNeedingPredictingMessageList) throws JMSException { - int forwardPredictionNumber = properties.getInitial_forward_prediction_number(); - int predictionHorizon = properties.getInitial_prediction_horizon(); - long epochStartingForecast = System.currentTimeMillis() / 1000 + properties.getStarting_forecasting_delay(); - this.forecastingConfiguration = new ForecastingConfiguration( - epochStartingForecast, - predictionHorizon, - forwardPredictionNumber - ); + int receivedVersion = metricNeedingPredictingMessageList.stream() + .map(MetricNeedingPredictingMessage::getVersion) + .findAny() + .orElseThrow(IllegalArgumentException::new); + + boolean isNewerVersion = isNewerVersion(receivedVersion); + + if (isNewerVersion) { + initConfiguration(properties, receivedVersion); + } else { + this.forecastingConfiguration.setVersion(receivedVersion); + long currentTimestamp = metricHandlers.values().stream() + .mapToLong(MetricHandler::getEarliestPredictionTime) + .min() + .getAsLong(); + this.forecastingConfiguration.setFirst_epoch_starting_forecast( + currentTimestamp + properties.getInitial_prediction_horizon()); + } //we update list of metrics that needs predicting //Metric -> list of methods currently predicting this metric @@ -107,7 +116,9 @@ public class Coordinator { for (MetricNeedingPredictingMessage metricNeedingPredictingMessage : metricNeedingPredictingMessageList) { String metricNeedingPrediction = metricNeedingPredictingMessage.getMetric(); if (metricHandlers.containsKey(metricNeedingPrediction)) { - metricHandlers.get(metricNeedingPrediction).notifyConfigurationChanged(forecastingConfiguration); + if (isNewerVersion) { + metricHandlers.get(metricNeedingPrediction).notifyConfigurationChanged(forecastingConfiguration); + } metricHandlers.get(metricNeedingPrediction).getConnectedMethods().forEach( method -> metricsByMethod.computeIfAbsent(method, list -> new LinkedList<>()).add(metricNeedingPrediction)); } else { @@ -133,9 +144,7 @@ public class Coordinator { StartForecastingMessage startForecastingMessage = new StartForecastingMessage( metricsPredictedByMethod, System.currentTimeMillis() / 1000, - epochStartingForecast, - forwardPredictionNumber, - predictionHorizon + forecastingConfiguration ); activeMQService.publishStartForecasting(startForecastingMessage, entry.getKey()); } @@ -149,4 +158,24 @@ public class Coordinator { //ToDo Stop metrics not included in new configuration file } + + private void initConfiguration(Properties properties, int version) { + int forwardPredictionNumber = properties.getInitial_forward_prediction_number(); + int predictionHorizon = properties.getInitial_prediction_horizon(); + long epochStartingForecast = System.currentTimeMillis() / 1000 + properties.getStarting_forecasting_delay(); + + this.forecastingConfiguration = new ForecastingConfiguration( + epochStartingForecast, + predictionHorizon, + forwardPredictionNumber, + version + ); + } + + private boolean isNewerVersion(int receivedVersion) { + if (forecastingConfiguration == null) { + return true; + } + return receivedVersion > forecastingConfiguration.getVersion(); + } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java index 4a9e4237..bc5370fe 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java @@ -111,4 +111,8 @@ public class MetricHandler { return pooledPredictionsRegistry.containsPrediction(predictionTime); } + long getEarliestPredictionTime() { + return pooledPredictionsRegistry.geEarliestPredictionTime(); + } + } -- GitLab From 8f35fe2c13955f3095596121429a9986073b3f60 Mon Sep 17 00:00:00 2001 From: mriedl Date: Fri, 4 Mar 2022 07:44:40 +0100 Subject: [PATCH 2/6] temporary yaml change --- .gitlab-ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index b4992ec0..b137acc8 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -75,6 +75,7 @@ build:prediction_orchestrator: - master - morphemic-rc1.5 - morphemic-rc2.0 + - translatorAdaptationInPO script: - $PREDICTON_ORCHESTRATOR_CLI -Pwithout-docker clean install artifacts: -- GitLab From b88e245b2ac117e48cec961de64fdadb85c40567 Mon Sep 17 00:00:00 2001 From: mriedl Date: Fri, 4 Mar 2022 07:53:30 +0100 Subject: [PATCH 3/6] test bugs fixed --- .../communication/activemq/PublishReceiveTest.java | 10 +++++++--- .../registries/PredictionRegistryTest.java | 9 ++++++--- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java b/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java index 185315e2..04ed3976 100644 --- a/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java +++ b/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java @@ -11,6 +11,7 @@ import eu.morphemic.prediction_orchestrator.communication.messages.incoming_mess import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PooledPredictionMessage; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartForecastingMessage; import eu.morphemic.prediction_orchestrator.model.Prediction; +import eu.morphemic.prediction_orchestrator.properties.ForecastingConfiguration; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; @@ -69,9 +70,12 @@ public class PublishReceiveTest { private StartForecastingMessage startForecastingMessage = new StartForecastingMessage( Arrays.asList(metrics), System.currentTimeMillis(), - System.currentTimeMillis() + 50000, - 8, - 600 + new ForecastingConfiguration( + System.currentTimeMillis() + 50000, + 8, + 600, + 1 + ) ); diff --git a/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistryTest.java b/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistryTest.java index 768529a9..3699f539 100644 --- a/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistryTest.java +++ b/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistryTest.java @@ -57,7 +57,8 @@ class PredictionRegistryTest { ForecastingConfiguration forecastingConfiguration = new ForecastingConfiguration( 100000000, 1000, - 10 + 10, + 1 ); PredictionRegistry predictionRegistry = new PredictionRegistry( forecastingConfiguration, @@ -78,13 +79,15 @@ class PredictionRegistryTest { ForecastingConfiguration forecastingConfiguration = new ForecastingConfiguration( first_epoch_starting_forecast, prediction_horizon, - numberOfForwardPrediction + numberOfForwardPrediction, + 1 ); //we need to publish more than the buffer can handle ForecastingConfiguration forecastingConfigurationPublisher = new ForecastingConfiguration( first_epoch_starting_forecast, prediction_horizon, - 3 * numberOfForwardPrediction + 3 * numberOfForwardPrediction, + 1 ); PredictionRegistry predictionRegistry = new PredictionRegistry( forecastingConfiguration, -- GitLab From 5d8f8efd39daee707ec59009a82eb16c682920d5 Mon Sep 17 00:00:00 2001 From: mriedl Date: Fri, 4 Mar 2022 08:00:43 +0100 Subject: [PATCH 4/6] test bugs fixed --- .gitlab-ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index b137acc8..f6436757 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -182,6 +182,7 @@ deploy:prediction_orchestrator: - master - morphemic-rc1.5 - morphemic-rc2.0 + - translatorAdaptationInPO services: - $DOCKER_DIND_SERVICE dependencies: -- GitLab From 3f6fdcffc7fc2437e14c2c6999f1684c2a9702bf Mon Sep 17 00:00:00 2001 From: mriedl Date: Mon, 14 Mar 2022 12:29:26 +0100 Subject: [PATCH 5/6] added deleted setting version --- .../morphemic/prediction_orchestrator/service/Coordinator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java index baa4139d..cca588b6 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java @@ -101,7 +101,6 @@ public class Coordinator { if (isNewerVersion) { initConfiguration(properties, receivedVersion); } else { - this.forecastingConfiguration.setVersion(receivedVersion); long currentTimestamp = metricHandlers.values().stream() .mapToLong(MetricHandler::getEarliestPredictionTime) .min() -- GitLab From 46f651331642c2f7ca3496942addc9a31201a13a Mon Sep 17 00:00:00 2001 From: mriedl Date: Mon, 14 Mar 2022 17:12:35 +0100 Subject: [PATCH 6/6] reverted yaml --- .gitlab-ci.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f6436757..b4992ec0 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -75,7 +75,6 @@ build:prediction_orchestrator: - master - morphemic-rc1.5 - morphemic-rc2.0 - - translatorAdaptationInPO script: - $PREDICTON_ORCHESTRATOR_CLI -Pwithout-docker clean install artifacts: @@ -182,7 +181,6 @@ deploy:prediction_orchestrator: - master - morphemic-rc1.5 - morphemic-rc2.0 - - translatorAdaptationInPO services: - $DOCKER_DIND_SERVICE dependencies: -- GitLab