diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java index 34162b6529e817e3c7c0bce4c2f0d67a4bcf2d11..7402fd06eae5ddce42e93bc008c3c597c07be7b7 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/incoming_messages/MetricNeedingPredictingMessage.java @@ -25,5 +25,9 @@ public class MetricNeedingPredictingMessage { @NonNull @Min(1) private int publish_rate; + + @JsonProperty("version") + @NonNull + private int version; } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartForecastingMessage.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartForecastingMessage.java index f5df1c768ab6d7dd1dfdc3b67a6965dede3e4788..3a8858c971c279515a5573b9706cabac4e3e3c47 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartForecastingMessage.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/messages/outcoming_messages/StartForecastingMessage.java @@ -2,13 +2,13 @@ package eu.morphemic.prediction_orchestrator.communication.messages.outcoming_me import com.fasterxml.jackson.annotation.JsonProperty; import eu.melodic.event.brokerclient.templates.EventFields; +import eu.morphemic.prediction_orchestrator.properties.ForecastingConfiguration; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NonNull; import java.util.List; -@AllArgsConstructor @Getter public class StartForecastingMessage { @JsonProperty(EventFields.PredictionOrchestratorToForecastingMethodsStartForecastingEventFields.metrics) @@ -30,4 +30,17 @@ public class StartForecastingMessage { @JsonProperty(EventFields.PredictionOrchestratorToForecastingMethodsStartForecastingEventFields.prediction_horizon) @NonNull private int prediction_horizon; + + @JsonProperty("version") + @NonNull + private int version; + + public StartForecastingMessage(List metrics, long timestamp, ForecastingConfiguration forecastingConfiguration) { + this.metrics = metrics; + this.timestamp = timestamp; + this.epoch_start = forecastingConfiguration.getFirst_epoch_starting_forecast(); + this.number_of_forward_predictions = (int) forecastingConfiguration.getForward_prediction_number(); + this.prediction_horizon = forecastingConfiguration.getPrediction_horizon(); + this.version = forecastingConfiguration.getVersion(); + } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/ForecastingConfiguration.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/ForecastingConfiguration.java index cc4622ee89700cff532cdb63d8b3637dd8b81f1b..53b1802c334753490aea9e9af4f0dbf867030f7e 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/ForecastingConfiguration.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/properties/ForecastingConfiguration.java @@ -2,18 +2,22 @@ package eu.morphemic.prediction_orchestrator.properties; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.Setter; @AllArgsConstructor @Getter +@Setter public class ForecastingConfiguration { private long first_epoch_starting_forecast; private int prediction_horizon; private long forward_prediction_number; + private int version; public ForecastingConfiguration(ForecastingConfiguration forecastingConfiguration) { this.first_epoch_starting_forecast = forecastingConfiguration.getFirst_epoch_starting_forecast(); this.prediction_horizon = forecastingConfiguration.getPrediction_horizon(); this.forward_prediction_number = forecastingConfiguration.getForward_prediction_number(); + this.version = forecastingConfiguration.getVersion(); } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistry.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistry.java index fe776e071883824a22da8b9d8b3dd32e5b4977dd..a097cbcb9c5238022e0349a6fd44b185228b9155 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistry.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistry.java @@ -30,11 +30,11 @@ import java.util.concurrent.atomic.AtomicReference; @Slf4j public class PredictionRegistry { - public static int VALUE_ADDED = 2; - public static int VALUE_UPDATED = 1; - public static int VALUE_REJECTED = 0; + public final static int VALUE_ADDED = 2; + public final static int VALUE_UPDATED = 1; + public final static int VALUE_REJECTED = 0; - String registryName; + private String registryName; private AtomicReference forecastingConfiguration; @@ -96,7 +96,12 @@ public class PredictionRegistry { return predictions.containsKey(predictionTime); } - int updateBufferRecord(Prediction prediction) throws JMSException { + public long geEarliestPredictionTime() { + return earliestPredictionTime.get(); + + } + + private int updateBufferRecord(Prediction prediction) throws JMSException { AtomicInteger valueResult = new AtomicInteger(VALUE_REJECTED); predictions.compute( prediction.getPredictionTime(), @@ -121,17 +126,19 @@ public class PredictionRegistry { return valueResult.get(); } - void removeBufferRecord(long keyToRemove) { + private void removeBufferRecord(long keyToRemove) { this.predictions.remove(keyToRemove); } - String formatPredictionTime(long predictionTime) { + private String formatPredictionTime(long predictionTime) { return PredictionTimeFormatter.horizonPatternCheckFormat(predictionTime, earliestPredictionTime.get(), forecastingConfiguration.get().getPrediction_horizon() ); } + + private void moveBuffer(long predictionTime) { long firstToRemove = earliestPredictionTime.get(); long shift = (predictionTime - latestPredictionTime.get()); diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java index 0aed9101a122738cd88ddbbe13194783c4eb8467..cca588b61f201d485fdf83af254491bf745e7f60 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java @@ -18,7 +18,6 @@ import javax.annotation.PostConstruct; import javax.jms.JMSException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; @Slf4j @Component @@ -91,15 +90,24 @@ public class Coordinator { //Communication 2 public void notifyReceivedMetricList(List metricNeedingPredictingMessageList) throws JMSException { - int forwardPredictionNumber = properties.getInitial_forward_prediction_number(); - int predictionHorizon = properties.getInitial_prediction_horizon(); - long epochStartingForecast = System.currentTimeMillis() / 1000 + properties.getStarting_forecasting_delay(); - this.forecastingConfiguration = new ForecastingConfiguration( - epochStartingForecast, - predictionHorizon, - forwardPredictionNumber - ); + int receivedVersion = metricNeedingPredictingMessageList.stream() + .map(MetricNeedingPredictingMessage::getVersion) + .findAny() + .orElseThrow(IllegalArgumentException::new); + + boolean isNewerVersion = isNewerVersion(receivedVersion); + + if (isNewerVersion) { + initConfiguration(properties, receivedVersion); + } else { + long currentTimestamp = metricHandlers.values().stream() + .mapToLong(MetricHandler::getEarliestPredictionTime) + .min() + .getAsLong(); + this.forecastingConfiguration.setFirst_epoch_starting_forecast( + currentTimestamp + properties.getInitial_prediction_horizon()); + } //we update list of metrics that needs predicting //Metric -> list of methods currently predicting this metric @@ -107,7 +115,9 @@ public class Coordinator { for (MetricNeedingPredictingMessage metricNeedingPredictingMessage : metricNeedingPredictingMessageList) { String metricNeedingPrediction = metricNeedingPredictingMessage.getMetric(); if (metricHandlers.containsKey(metricNeedingPrediction)) { - metricHandlers.get(metricNeedingPrediction).notifyConfigurationChanged(forecastingConfiguration); + if (isNewerVersion) { + metricHandlers.get(metricNeedingPrediction).notifyConfigurationChanged(forecastingConfiguration); + } metricHandlers.get(metricNeedingPrediction).getConnectedMethods().forEach( method -> metricsByMethod.computeIfAbsent(method, list -> new LinkedList<>()).add(metricNeedingPrediction)); } else { @@ -133,9 +143,7 @@ public class Coordinator { StartForecastingMessage startForecastingMessage = new StartForecastingMessage( metricsPredictedByMethod, System.currentTimeMillis() / 1000, - epochStartingForecast, - forwardPredictionNumber, - predictionHorizon + forecastingConfiguration ); activeMQService.publishStartForecasting(startForecastingMessage, entry.getKey()); } @@ -149,4 +157,24 @@ public class Coordinator { //ToDo Stop metrics not included in new configuration file } + + private void initConfiguration(Properties properties, int version) { + int forwardPredictionNumber = properties.getInitial_forward_prediction_number(); + int predictionHorizon = properties.getInitial_prediction_horizon(); + long epochStartingForecast = System.currentTimeMillis() / 1000 + properties.getStarting_forecasting_delay(); + + this.forecastingConfiguration = new ForecastingConfiguration( + epochStartingForecast, + predictionHorizon, + forwardPredictionNumber, + version + ); + } + + private boolean isNewerVersion(int receivedVersion) { + if (forecastingConfiguration == null) { + return true; + } + return receivedVersion > forecastingConfiguration.getVersion(); + } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java index 4a9e42376c6c4dcdc062b18aec20870918022f38..bc5370fe1278c0000dbdae377a33eb12e5dc8314 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java @@ -111,4 +111,8 @@ public class MetricHandler { return pooledPredictionsRegistry.containsPrediction(predictionTime); } + long getEarliestPredictionTime() { + return pooledPredictionsRegistry.geEarliestPredictionTime(); + } + } diff --git a/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java b/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java index 185315e2443ae972288230d7452d52805c3eeefa..04ed39761c7783293aadb72b01ee7231de86f31d 100644 --- a/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java +++ b/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/communication/activemq/PublishReceiveTest.java @@ -11,6 +11,7 @@ import eu.morphemic.prediction_orchestrator.communication.messages.incoming_mess import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PooledPredictionMessage; import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.StartForecastingMessage; import eu.morphemic.prediction_orchestrator.model.Prediction; +import eu.morphemic.prediction_orchestrator.properties.ForecastingConfiguration; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; @@ -69,9 +70,12 @@ public class PublishReceiveTest { private StartForecastingMessage startForecastingMessage = new StartForecastingMessage( Arrays.asList(metrics), System.currentTimeMillis(), - System.currentTimeMillis() + 50000, - 8, - 600 + new ForecastingConfiguration( + System.currentTimeMillis() + 50000, + 8, + 600, + 1 + ) ); diff --git a/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistryTest.java b/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistryTest.java index 768529a9d2e95390b3fa832d03714c4d4f6405bb..3699f539420a961ceb7e0020fca45c455d55560d 100644 --- a/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistryTest.java +++ b/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistryTest.java @@ -57,7 +57,8 @@ class PredictionRegistryTest { ForecastingConfiguration forecastingConfiguration = new ForecastingConfiguration( 100000000, 1000, - 10 + 10, + 1 ); PredictionRegistry predictionRegistry = new PredictionRegistry( forecastingConfiguration, @@ -78,13 +79,15 @@ class PredictionRegistryTest { ForecastingConfiguration forecastingConfiguration = new ForecastingConfiguration( first_epoch_starting_forecast, prediction_horizon, - numberOfForwardPrediction + numberOfForwardPrediction, + 1 ); //we need to publish more than the buffer can handle ForecastingConfiguration forecastingConfigurationPublisher = new ForecastingConfiguration( first_epoch_starting_forecast, prediction_horizon, - 3 * numberOfForwardPrediction + 3 * numberOfForwardPrediction, + 1 ); PredictionRegistry predictionRegistry = new PredictionRegistry( forecastingConfiguration,