diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/PredictionListener.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/PredictionListener.java index 91dc1efe6a7ace533c3e9bd14cc0952dc37f11a0..ffd1e90c906ad9cb3f6d410b2645de135c1db525 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/PredictionListener.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/communication/listeners/PredictionListener.java @@ -29,8 +29,9 @@ public class PredictionListener extends ActiveMQListener { log.debug("Listener of topic {}: Converting event payload to PredictionFromForecasterMessage instance...", topicName); PredictionFromForecasterMessage predictionFromForecasterMessage = gson.fromJson(payload, PredictionFromForecasterMessage.class); log.info("Listener of topic {}: PredictionFromForecasterMessage instance: {}", topicName, predictionFromForecasterMessage.toString()); - boolean valueUpdated = predictionRegistry.processPrediction(new Prediction(predictionFromForecasterMessage)); - if (valueUpdated) { + int valueResult = predictionRegistry.processPrediction(new Prediction(predictionFromForecasterMessage)); + if ((valueResult == PredictionRegistry.VALUE_ADDED) || + (valueResult == PredictionRegistry.VALUE_UPDATED)) { metricHandler.launchMethodsPoolingIfNecessary(predictionFromForecasterMessage.getPredictionTime()); } } catch (JMSException e) { diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/registries/NoAccessWithoutUpdatePredictionRegistry.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/registries/NoAccessWithoutUpdatePredictionRegistry.java deleted file mode 100644 index 517e53655067bb69ef3274f3933c1022ee730c1d..0000000000000000000000000000000000000000 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/registries/NoAccessWithoutUpdatePredictionRegistry.java +++ /dev/null @@ -1,60 +0,0 @@ -package eu.morphemic.prediction_orchestrator.registries; - -import eu.morphemic.prediction_orchestrator.model.Prediction; -import eu.morphemic.prediction_orchestrator.properties.ForecastingConfiguration; -import lombok.extern.slf4j.Slf4j; - -import javax.jms.JMSException; -import java.util.concurrent.ConcurrentHashMap; - -@Slf4j -public class NoAccessWithoutUpdatePredictionRegistry extends PredictionRegistry { - public final static Prediction PREDICTION_ALREADY_ACCESSED = null; - - private ConcurrentHashMap availableToAccess; - - public NoAccessWithoutUpdatePredictionRegistry(ForecastingConfiguration forecastingConfiguration, String registryName) { - super(forecastingConfiguration, registryName); - } - - @Override - void removeBufferRecord(long keyToRemove) { - this.predictions.remove(keyToRemove); - this.availableToAccess.remove(keyToRemove); - } - - @Override - public synchronized void reconfigureBuffer(ForecastingConfiguration forecastingConfiguration) { - super.reconfigureBuffer(forecastingConfiguration); - this.availableToAccess = new ConcurrentHashMap<>(); - } - - /** - * We need to synchronise this block as we need to makes sure that we block next access just after checking if we - * can get the element ("if" condition and updating the "availableToAccessMap" operations need to be bounded into atomic block) - */ - @Override - public Prediction getCopyPrediction(long predictionTime) { - synchronized (this) { - if (availableToAccess.getOrDefault(predictionTime, false)) { - availableToAccess.put(predictionTime, false); - return super.getCopyPrediction(predictionTime); - } else { - log.info("Registry {}: Prediction on predictionTime: {} has already been accessed. Wait for its update", registryName, - formatPredictionTime(predictionTime)); - return PREDICTION_ALREADY_ACCESSED; - } - } - } - - @Override - boolean updateBufferRecord(Prediction prediction) throws JMSException { - log.debug("Processing pooled prediction of registry {} for {}", registryName, - formatPredictionTime(prediction.getPredictionTime())); - boolean valueChanged = super.updateBufferRecord(prediction); - if (valueChanged || !availableToAccess.containsKey(prediction.getPredictionTime())) { - this.availableToAccess.put(prediction.getPredictionTime(), true); - } - return valueChanged; - } -} diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistry.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistry.java index 5b4e6fec5e5d4bd37b9f874ccfdd81f5e9b149da..fe776e071883824a22da8b9d8b3dd32e5b4977dd 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistry.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistry.java @@ -8,6 +8,7 @@ import lombok.extern.slf4j.Slf4j; import javax.jms.JMSException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -29,6 +30,10 @@ import java.util.concurrent.atomic.AtomicReference; @Slf4j public class PredictionRegistry { + public static int VALUE_ADDED = 2; + public static int VALUE_UPDATED = 1; + public static int VALUE_REJECTED = 0; + String registryName; private AtomicReference forecastingConfiguration; @@ -69,7 +74,7 @@ public class PredictionRegistry { earliestPredictionTime.get() + predictionHorizon * (forecastingConfiguration.getForward_prediction_number() - 1)); } - public synchronized boolean processPrediction(Prediction prediction) throws JMSException { + public synchronized int processPrediction(Prediction prediction) throws JMSException { if (prediction != null) { long predictionTime = prediction.getPredictionTime(); if (predictionTime < earliestPredictionTime.get()) { @@ -84,20 +89,24 @@ public class PredictionRegistry { return updateBufferRecord(prediction); } } - return false; + return VALUE_REJECTED; } public boolean containsPrediction(long predictionTime) { return predictions.containsKey(predictionTime); } - boolean updateBufferRecord(Prediction prediction) throws JMSException { - AtomicBoolean valueUpdated = new AtomicBoolean(false); + int updateBufferRecord(Prediction prediction) throws JMSException { + AtomicInteger valueResult = new AtomicInteger(VALUE_REJECTED); predictions.compute( prediction.getPredictionTime(), (k, v) -> { if ((v == null) || (v.getTimestamp() < prediction.getTimestamp())) { - valueUpdated.set(true); + if (v == null) { + valueResult.set(VALUE_ADDED); + } else { + valueResult.set(VALUE_UPDATED); + } log.debug("Updating value for {} inside registry: {} on predictionTime: {} and timestamp {}", prediction.getMetricValue(), registryName, prediction.getPredictionTime(), prediction.getTimestamp()); return prediction; @@ -109,7 +118,7 @@ public class PredictionRegistry { if (latestPredictionTime.get() < prediction.getPredictionTime()) { moveBuffer(prediction.getPredictionTime()); } - return valueUpdated.get(); + return valueResult.get(); } void removeBufferRecord(long keyToRemove) { diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java index 1d5c8369b76f3a1a7c05b982d462709a7d8b14a1..418d19b6aa9efd34424032d690ab9e6e10922582 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/Coordinator.java @@ -7,7 +7,7 @@ import eu.morphemic.prediction_orchestrator.log_utils.PredictionTimeFormatter; import eu.morphemic.prediction_orchestrator.model.Prediction; import eu.morphemic.prediction_orchestrator.properties.ForecastingConfiguration; import eu.morphemic.prediction_orchestrator.properties.Properties; -import eu.morphemic.prediction_orchestrator.registries.NoAccessWithoutUpdatePredictionRegistry; +import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -44,9 +44,9 @@ public class Coordinator { * and before getting them. But this is not a problem as we want to publish as much as possible assuming we had a full metric vector * at any point. If the buffer would move in the meantime we just wouldn't publish this specific value */ - //Communication 4 and 1 - synchronized void publishIfPooledValuesProvidedForAllMetrics(long predictionTime) { - + //Communication 5 and 1 + synchronized void publishIfPooledValuesProvidedForAllMetrics(long predictionTime, + int lastValueUpdate, String callingMetricHandler) { //metricName -> predictionAvailability HashMap isPooledPredictionAvailable = new HashMap<>(); metricHandlers.forEach((metricName, metricHandler) -> @@ -57,12 +57,18 @@ public class Coordinator { if (entireVectorIsReady) { log.info("Starting publishing pooled predictions for metrics for {} ", PredictionTimeFormatter.rawDateFormat(predictionTime)); - metricHandlers.forEach((metricName, metricHandler) -> { - Prediction prediction = metricHandler.getPooledPrediction(predictionTime); - if (prediction != NoAccessWithoutUpdatePredictionRegistry.PREDICTION_ALREADY_ACCESSED) { + //We send the entire vector only once, when all values are present for the first time + if (lastValueUpdate == PredictionRegistry.VALUE_ADDED) { + metricHandlers.forEach((metricName, metricHandler) -> { + Prediction prediction = metricHandler.getPooledPrediction(predictionTime); communicationService.publishPooledPrediction(prediction, metricName); - } - }); + }); + } else { + //we send only updated value as at this point the entire vector has been send earlier + Prediction prediction = metricHandlers.get(callingMetricHandler) + .getPooledPrediction(predictionTime); + communicationService.publishPooledPrediction(prediction, callingMetricHandler); + } } } diff --git a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java index 32b5a35c481ffa7c010263e9a720c09940e02a26..cb4703d713552106bf11082871c2df72e2b40ad3 100644 --- a/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java +++ b/prediction_orchestrator/src/main/java/eu/morphemic/prediction_orchestrator/service/MetricHandler.java @@ -3,7 +3,6 @@ package eu.morphemic.prediction_orchestrator.service; import eu.morphemic.prediction_orchestrator.communication.CommunicationService; import eu.morphemic.prediction_orchestrator.log_utils.PredictionTimeFormatter; import eu.morphemic.prediction_orchestrator.properties.Properties; -import eu.morphemic.prediction_orchestrator.registries.NoAccessWithoutUpdatePredictionRegistry; import eu.morphemic.prediction_orchestrator.model.Prediction; import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry; import eu.morphemic.prediction_orchestrator.service.pooling_strategy.PoolingStrategy; @@ -30,7 +29,7 @@ public class MetricHandler { //methodName -> MethodHandler private HashMap methodHandlers = new HashMap<>(); - private NoAccessWithoutUpdatePredictionRegistry pooledPredictionsRegistry; + private PredictionRegistry pooledPredictionsRegistry; private PoolingStrategy poolingStrategy; private CommunicationService communicationService; @@ -40,7 +39,7 @@ public class MetricHandler { this.metricName = metricName; this.poolingStrategy = PoolingStrategyFactory.getPoolingStrategy(properties); this.coordinator = coordinator; - this.pooledPredictionsRegistry = new NoAccessWithoutUpdatePredictionRegistry( + this.pooledPredictionsRegistry = new PredictionRegistry( forecastingConfiguration, PredictionRegistry.getPooledPredictionsRegistryName(metricName) ); @@ -57,9 +56,10 @@ public class MetricHandler { if (newPooledPrediction != PoolingStrategy.POOLED_PREDICTION_NOT_CREATED) { log.info("Pooling function has created pooled prediction for metric {} and predictionTime {}", metricName, PredictionTimeFormatter.rawDateFormat(predictionTime)); - boolean valueChanged = pooledPredictionsRegistry.processPrediction(newPooledPrediction); - if (valueChanged) { - coordinator.publishIfPooledValuesProvidedForAllMetrics(predictionTime); + int valueResult = pooledPredictionsRegistry.processPrediction(newPooledPrediction); + if ((valueResult == PredictionRegistry.VALUE_UPDATED) || + (valueResult == PredictionRegistry.VALUE_ADDED)) { + coordinator.publishIfPooledValuesProvidedForAllMetrics(predictionTime, valueResult, metricName); } } else { log.debug("Pooling function has not created pooled prediction for metric {} and predictionTime {}", diff --git a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties index 0c1ecc30704d7c0789e8f705c65218c480284603..14676ebc12db0223f4185dda5b8ea1048fd48072 100644 --- a/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties +++ b/prediction_orchestrator/src/main/resources/eu.morphemic.predictionorchestrator.properties @@ -1,14 +1,51 @@ -forecasting_configuration.initial_prediction_horizon=120000 +#forecasting_configuration are properties that are sent further to The forecasters +#They explain pattern for predictionTimes, predictions's count and what should be the predictionTime +#of the first message send + +#global pattern is: predictionTime = epochStart + k*initial_prediction_horizon where k is integer + +#unit: seconds +#This property indicates what is the time difference between following predictionTimes +# both inside the forecasters and the PO +# Depending on the forecaster it may also mean how often the Forecaster would publish predictions +# to the PO +forecasting_configuration.initial_prediction_horizon=120 + +#This property indicates how many forward prediction the forecaster should send +#at the moment of publishing predictions forecasting_configuration.initial_forward_prediction_number=8 -forecasting_configuration.starting_forecasting_delay=50000 +#This property indicates what is epochStart and what predictionTime should have the first message send by the forecaster +#It also explains time before the forecasters start publishing +#EpochStart = currentTime + starting_forecasting_delay +# predictionTime of first predictionSend = EpochStart + initial_prediction_horizon +forecasting_configuration.starting_forecasting_delay=200 +#In the most common scenario: +#PO counts epochStart based on starting_forecasting_delay and send the above properties to the Forecaster +#At moment epochStart the forecaster would publish initial_forward_prediction_number predictions +#with predictionTimes equal = epochStart + (1,2,3 .. initial_forward_prediction_number) * initial_prediction_horizon +# At moment (epochStart + 1*initial_prediction_horizon) the Forecaster would send initial_forward_prediction_number predictions +#with predictionTimes equal = epochStart + (2,3 .. (initial_forward_prediction_number+1)) * initial_prediction_horizon + + +#This property indicates method for checking how many forecasters' data is needed to be abe to pool/ensemble +# predictions and publish them to EMS +# It currntly may equal PERCENTAGE_FORECASTERS_COUNT_NEEDED or STATIC_FORECASTERS_COUNT_NEEDED +# With the first option threshold property is in percents +#With the second it is a static count +#For example with threshold threshold=2 at least 2 synchronised forecasters are needed to be able to merge predictions and send them forward pooling.poolingStrategy=STATIC_FORECASTERS_COUNT_NEEDED pooling.threshold=2 jms.client.broker_properties_configuration_file_location=${MELODIC_CONFIG_DIR}/eu.melodic.event.brokerclient.properties -startingMethodsList=nbeats,es_hybrid,arima,tsetlin_machines,exponential_smoothing,lstm,gluon_machines,prophet +#List of connected forecaster that PO expects to be working. PO would not coummuncate with the forecasters not on that list +startingMethodsList=es_hybrid,cnn,prophet,tft,nbeats,arima,tsetlin_machines,exponential_smoothing,lstm,gluon_machines -logging.config=file:${MELODIC_CONFIG_DIR}/logback-conf/logback-spring.xml +#this is only for logging, have no influence on working flow of The PO logging.zone_id=Europe/Warsaw + +#Details of restarting connection to amq upon failure +activemq.restartinterval=10000 +activemq.restartcount=20 diff --git a/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistryTest.java b/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistryTest.java index ec33e9322f8334fa6d633f54ca3955e10f463fa4..768529a9d2e95390b3fa832d03714c4d4f6405bb 100644 --- a/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistryTest.java +++ b/prediction_orchestrator/src/test/java/eu/morphemic/prediction_orchestrator/registries/PredictionRegistryTest.java @@ -98,58 +98,6 @@ class PredictionRegistryTest { predictionRegistry, first_epoch_starting_forecast, prediction_horizon); } - @Test - void multipleReadersBlockingRegistryTest() throws InterruptedException { - int numberOfForwardPrediction = 10; - long first_epoch_starting_forecast = 100000000; - int prediction_horizon = 1000; - ForecastingConfiguration forecastingConfiguration = new ForecastingConfiguration( - first_epoch_starting_forecast, - prediction_horizon, - numberOfForwardPrediction - ); - //we need to publish more than the buffer can handle - ForecastingConfiguration forecastingConfigurationPublisher = new ForecastingConfiguration( - first_epoch_starting_forecast, - prediction_horizon, - 2 * numberOfForwardPrediction - ); - PredictionRegistry predictionRegistry = new NoAccessWithoutUpdatePredictionRegistry( - forecastingConfiguration, - "populateMovingTest" - ); - - List publishingThreads = populateTestBed(forecastingConfigurationPublisher, predictionRegistry); - waitForThreads(publishingThreads); - //At this moment we should have a registry with boundaries: [10,19] - - - ConcurrentHashMap> results = new ConcurrentHashMap<>(); - - List readingThreads = new LinkedList<>(); - for (int threadNo = 0; threadNo < THREAD_COUNT; threadNo++) { - Thread reading_thread = new Thread(() -> { - for (int j = 10; j < 20; j++) { - long predictionTime = (j * prediction_horizon) + first_epoch_starting_forecast; - Prediction prediction = - predictionRegistry.getCopyPrediction(predictionTime); - results.computeIfAbsent(predictionTime, k -> new LinkedList<>()).add(prediction); - } - }); - readingThreads.add(reading_thread); - reading_thread.start(); - } - waitForThreads(publishingThreads); - waitForThreads(readingThreads); - - // We will be checking if no element has been pulled twice - for (List pulledPredictions : results.values()) { - List withoutNullsList = pulledPredictions.stream().filter(Objects::nonNull).collect(Collectors.toList());; - log.info(withoutNullsList.toString()); - assert( (new HashSet<>(withoutNullsList)).size() == withoutNullsList.size()); - } - } - /** * This procedure launches pool of publishing threads and wait until they are finished */