Commit 6e8df039 authored by Marta Różańska's avatar Marta Różańska
Browse files

Merge branch 'predictionOrchestrator' into 'morphemic-rc1.5'

Prediction orchestrator

See merge request !163
parents 7ac51b5c e6ff94b0
Pipeline #16233 passed with stages
in 16 minutes and 20 seconds
......@@ -29,8 +29,9 @@ public class PredictionListener extends ActiveMQListener {
log.debug("Listener of topic {}: Converting event payload to PredictionFromForecasterMessage instance...", topicName);
PredictionFromForecasterMessage predictionFromForecasterMessage = gson.fromJson(payload, PredictionFromForecasterMessage.class);
log.info("Listener of topic {}: PredictionFromForecasterMessage instance: {}", topicName, predictionFromForecasterMessage.toString());
boolean valueUpdated = predictionRegistry.processPrediction(new Prediction(predictionFromForecasterMessage));
if (valueUpdated) {
int valueResult = predictionRegistry.processPrediction(new Prediction(predictionFromForecasterMessage));
if ((valueResult == PredictionRegistry.VALUE_ADDED) ||
(valueResult == PredictionRegistry.VALUE_UPDATED)) {
metricHandler.launchMethodsPoolingIfNecessary(predictionFromForecasterMessage.getPredictionTime());
}
} catch (JMSException e) {
......
package eu.morphemic.prediction_orchestrator.registries;
import eu.morphemic.prediction_orchestrator.model.Prediction;
import eu.morphemic.prediction_orchestrator.properties.ForecastingConfiguration;
import lombok.extern.slf4j.Slf4j;
import javax.jms.JMSException;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class NoAccessWithoutUpdatePredictionRegistry extends PredictionRegistry {
public final static Prediction PREDICTION_ALREADY_ACCESSED = null;
private ConcurrentHashMap<Long, Boolean> availableToAccess;
public NoAccessWithoutUpdatePredictionRegistry(ForecastingConfiguration forecastingConfiguration, String registryName) {
super(forecastingConfiguration, registryName);
}
@Override
void removeBufferRecord(long keyToRemove) {
this.predictions.remove(keyToRemove);
this.availableToAccess.remove(keyToRemove);
}
@Override
public synchronized void reconfigureBuffer(ForecastingConfiguration forecastingConfiguration) {
super.reconfigureBuffer(forecastingConfiguration);
this.availableToAccess = new ConcurrentHashMap<>();
}
/**
* We need to synchronise this block as we need to makes sure that we block next access just after checking if we
* can get the element ("if" condition and updating the "availableToAccessMap" operations need to be bounded into atomic block)
*/
@Override
public Prediction getCopyPrediction(long predictionTime) {
synchronized (this) {
if (availableToAccess.getOrDefault(predictionTime, false)) {
availableToAccess.put(predictionTime, false);
return super.getCopyPrediction(predictionTime);
} else {
log.info("Registry {}: Prediction on predictionTime: {} has already been accessed. Wait for its update", registryName,
formatPredictionTime(predictionTime));
return PREDICTION_ALREADY_ACCESSED;
}
}
}
@Override
boolean updateBufferRecord(Prediction prediction) throws JMSException {
log.debug("Processing pooled prediction of registry {} for {}", registryName,
formatPredictionTime(prediction.getPredictionTime()));
boolean valueChanged = super.updateBufferRecord(prediction);
if (valueChanged || !availableToAccess.containsKey(prediction.getPredictionTime())) {
this.availableToAccess.put(prediction.getPredictionTime(), true);
}
return valueChanged;
}
}
......@@ -8,6 +8,7 @@ import lombok.extern.slf4j.Slf4j;
import javax.jms.JMSException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
......@@ -29,6 +30,10 @@ import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class PredictionRegistry {
public static int VALUE_ADDED = 2;
public static int VALUE_UPDATED = 1;
public static int VALUE_REJECTED = 0;
String registryName;
private AtomicReference<ForecastingConfiguration> forecastingConfiguration;
......@@ -69,7 +74,7 @@ public class PredictionRegistry {
earliestPredictionTime.get() + predictionHorizon * (forecastingConfiguration.getForward_prediction_number() - 1));
}
public synchronized boolean processPrediction(Prediction prediction) throws JMSException {
public synchronized int processPrediction(Prediction prediction) throws JMSException {
if (prediction != null) {
long predictionTime = prediction.getPredictionTime();
if (predictionTime < earliestPredictionTime.get()) {
......@@ -84,20 +89,24 @@ public class PredictionRegistry {
return updateBufferRecord(prediction);
}
}
return false;
return VALUE_REJECTED;
}
public boolean containsPrediction(long predictionTime) {
return predictions.containsKey(predictionTime);
}
boolean updateBufferRecord(Prediction prediction) throws JMSException {
AtomicBoolean valueUpdated = new AtomicBoolean(false);
int updateBufferRecord(Prediction prediction) throws JMSException {
AtomicInteger valueResult = new AtomicInteger(VALUE_REJECTED);
predictions.compute(
prediction.getPredictionTime(),
(k, v) -> {
if ((v == null) || (v.getTimestamp() < prediction.getTimestamp())) {
valueUpdated.set(true);
if (v == null) {
valueResult.set(VALUE_ADDED);
} else {
valueResult.set(VALUE_UPDATED);
}
log.debug("Updating value for {} inside registry: {} on predictionTime: {} and timestamp {}",
prediction.getMetricValue(), registryName, prediction.getPredictionTime(), prediction.getTimestamp());
return prediction;
......@@ -109,7 +118,7 @@ public class PredictionRegistry {
if (latestPredictionTime.get() < prediction.getPredictionTime()) {
moveBuffer(prediction.getPredictionTime());
}
return valueUpdated.get();
return valueResult.get();
}
void removeBufferRecord(long keyToRemove) {
......
......@@ -7,7 +7,7 @@ import eu.morphemic.prediction_orchestrator.log_utils.PredictionTimeFormatter;
import eu.morphemic.prediction_orchestrator.model.Prediction;
import eu.morphemic.prediction_orchestrator.properties.ForecastingConfiguration;
import eu.morphemic.prediction_orchestrator.properties.Properties;
import eu.morphemic.prediction_orchestrator.registries.NoAccessWithoutUpdatePredictionRegistry;
import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -44,9 +44,9 @@ public class Coordinator {
* and before getting them. But this is not a problem as we want to publish as much as possible assuming we had a full metric vector
* at any point. If the buffer would move in the meantime we just wouldn't publish this specific value
*/
//Communication 4 and 1
synchronized void publishIfPooledValuesProvidedForAllMetrics(long predictionTime) {
//Communication 5 and 1
synchronized void publishIfPooledValuesProvidedForAllMetrics(long predictionTime,
int lastValueUpdate, String callingMetricHandler) {
//metricName -> predictionAvailability
HashMap<String, Boolean> isPooledPredictionAvailable = new HashMap<>();
metricHandlers.forEach((metricName, metricHandler) ->
......@@ -57,12 +57,18 @@ public class Coordinator {
if (entireVectorIsReady) {
log.info("Starting publishing pooled predictions for metrics for {} ",
PredictionTimeFormatter.rawDateFormat(predictionTime));
metricHandlers.forEach((metricName, metricHandler) -> {
Prediction prediction = metricHandler.getPooledPrediction(predictionTime);
if (prediction != NoAccessWithoutUpdatePredictionRegistry.PREDICTION_ALREADY_ACCESSED) {
//We send the entire vector only once, when all values are present for the first time
if (lastValueUpdate == PredictionRegistry.VALUE_ADDED) {
metricHandlers.forEach((metricName, metricHandler) -> {
Prediction prediction = metricHandler.getPooledPrediction(predictionTime);
communicationService.publishPooledPrediction(prediction, metricName);
}
});
});
} else {
//we send only updated value as at this point the entire vector has been send earlier
Prediction prediction = metricHandlers.get(callingMetricHandler)
.getPooledPrediction(predictionTime);
communicationService.publishPooledPrediction(prediction, callingMetricHandler);
}
}
}
......
......@@ -3,7 +3,6 @@ package eu.morphemic.prediction_orchestrator.service;
import eu.morphemic.prediction_orchestrator.communication.CommunicationService;
import eu.morphemic.prediction_orchestrator.log_utils.PredictionTimeFormatter;
import eu.morphemic.prediction_orchestrator.properties.Properties;
import eu.morphemic.prediction_orchestrator.registries.NoAccessWithoutUpdatePredictionRegistry;
import eu.morphemic.prediction_orchestrator.model.Prediction;
import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry;
import eu.morphemic.prediction_orchestrator.service.pooling_strategy.PoolingStrategy;
......@@ -30,7 +29,7 @@ public class MetricHandler {
//methodName -> MethodHandler
private HashMap<String, MethodHandler> methodHandlers = new HashMap<>();
private NoAccessWithoutUpdatePredictionRegistry pooledPredictionsRegistry;
private PredictionRegistry pooledPredictionsRegistry;
private PoolingStrategy poolingStrategy;
private CommunicationService communicationService;
......@@ -40,7 +39,7 @@ public class MetricHandler {
this.metricName = metricName;
this.poolingStrategy = PoolingStrategyFactory.getPoolingStrategy(properties);
this.coordinator = coordinator;
this.pooledPredictionsRegistry = new NoAccessWithoutUpdatePredictionRegistry(
this.pooledPredictionsRegistry = new PredictionRegistry(
forecastingConfiguration,
PredictionRegistry.getPooledPredictionsRegistryName(metricName)
);
......@@ -57,9 +56,10 @@ public class MetricHandler {
if (newPooledPrediction != PoolingStrategy.POOLED_PREDICTION_NOT_CREATED) {
log.info("Pooling function has created pooled prediction for metric {} and predictionTime {}",
metricName, PredictionTimeFormatter.rawDateFormat(predictionTime));
boolean valueChanged = pooledPredictionsRegistry.processPrediction(newPooledPrediction);
if (valueChanged) {
coordinator.publishIfPooledValuesProvidedForAllMetrics(predictionTime);
int valueResult = pooledPredictionsRegistry.processPrediction(newPooledPrediction);
if ((valueResult == PredictionRegistry.VALUE_UPDATED) ||
(valueResult == PredictionRegistry.VALUE_ADDED)) {
coordinator.publishIfPooledValuesProvidedForAllMetrics(predictionTime, valueResult, metricName);
}
} else {
log.debug("Pooling function has not created pooled prediction for metric {} and predictionTime {}",
......
forecasting_configuration.initial_prediction_horizon=120000
#forecasting_configuration are properties that are sent further to The forecasters
#They explain pattern for predictionTimes, predictions's count and what should be the predictionTime
#of the first message send
#global pattern is: predictionTime = epochStart + k*initial_prediction_horizon where k is integer
#unit: seconds
#This property indicates what is the time difference between following predictionTimes
# both inside the forecasters and the PO
# Depending on the forecaster it may also mean how often the Forecaster would publish predictions
# to the PO
forecasting_configuration.initial_prediction_horizon=120
#This property indicates how many forward prediction the forecaster should send
#at the moment of publishing predictions
forecasting_configuration.initial_forward_prediction_number=8
forecasting_configuration.starting_forecasting_delay=50000
#This property indicates what is epochStart and what predictionTime should have the first message send by the forecaster
#It also explains time before the forecasters start publishing
#EpochStart = currentTime + starting_forecasting_delay
# predictionTime of first predictionSend = EpochStart + initial_prediction_horizon
forecasting_configuration.starting_forecasting_delay=200
#In the most common scenario:
#PO counts epochStart based on starting_forecasting_delay and send the above properties to the Forecaster
#At moment epochStart the forecaster would publish initial_forward_prediction_number predictions
#with predictionTimes equal = epochStart + (1,2,3 .. initial_forward_prediction_number) * initial_prediction_horizon
# At moment (epochStart + 1*initial_prediction_horizon) the Forecaster would send initial_forward_prediction_number predictions
#with predictionTimes equal = epochStart + (2,3 .. (initial_forward_prediction_number+1)) * initial_prediction_horizon
#This property indicates method for checking how many forecasters' data is needed to be abe to pool/ensemble
# predictions and publish them to EMS
# It currntly may equal PERCENTAGE_FORECASTERS_COUNT_NEEDED or STATIC_FORECASTERS_COUNT_NEEDED
# With the first option threshold property is in percents
#With the second it is a static count
#For example with threshold threshold=2 at least 2 synchronised forecasters are needed to be able to merge predictions and send them forward
pooling.poolingStrategy=STATIC_FORECASTERS_COUNT_NEEDED
pooling.threshold=2
jms.client.broker_properties_configuration_file_location=${MELODIC_CONFIG_DIR}/eu.melodic.event.brokerclient.properties
startingMethodsList=nbeats,es_hybrid,arima,tsetlin_machines,exponential_smoothing,lstm,gluon_machines,prophet
#List of connected forecaster that PO expects to be working. PO would not coummuncate with the forecasters not on that list
startingMethodsList=es_hybrid,cnn,prophet,tft,nbeats,arima,tsetlin_machines,exponential_smoothing,lstm,gluon_machines
logging.config=file:${MELODIC_CONFIG_DIR}/logback-conf/logback-spring.xml
#this is only for logging, have no influence on working flow of The PO
logging.zone_id=Europe/Warsaw
#Details of restarting connection to amq upon failure
activemq.restartinterval=10000
activemq.restartcount=20
......@@ -98,58 +98,6 @@ class PredictionRegistryTest {
predictionRegistry, first_epoch_starting_forecast, prediction_horizon);
}
@Test
void multipleReadersBlockingRegistryTest() throws InterruptedException {
int numberOfForwardPrediction = 10;
long first_epoch_starting_forecast = 100000000;
int prediction_horizon = 1000;
ForecastingConfiguration forecastingConfiguration = new ForecastingConfiguration(
first_epoch_starting_forecast,
prediction_horizon,
numberOfForwardPrediction
);
//we need to publish more than the buffer can handle
ForecastingConfiguration forecastingConfigurationPublisher = new ForecastingConfiguration(
first_epoch_starting_forecast,
prediction_horizon,
2 * numberOfForwardPrediction
);
PredictionRegistry predictionRegistry = new NoAccessWithoutUpdatePredictionRegistry(
forecastingConfiguration,
"populateMovingTest"
);
List<Thread> publishingThreads = populateTestBed(forecastingConfigurationPublisher, predictionRegistry);
waitForThreads(publishingThreads);
//At this moment we should have a registry with boundaries: [10,19]
ConcurrentHashMap<Long, List<Prediction>> results = new ConcurrentHashMap<>();
List<Thread> readingThreads = new LinkedList<>();
for (int threadNo = 0; threadNo < THREAD_COUNT; threadNo++) {
Thread reading_thread = new Thread(() -> {
for (int j = 10; j < 20; j++) {
long predictionTime = (j * prediction_horizon) + first_epoch_starting_forecast;
Prediction prediction =
predictionRegistry.getCopyPrediction(predictionTime);
results.computeIfAbsent(predictionTime, k -> new LinkedList<>()).add(prediction);
}
});
readingThreads.add(reading_thread);
reading_thread.start();
}
waitForThreads(publishingThreads);
waitForThreads(readingThreads);
// We will be checking if no element has been pulled twice
for (List<Prediction> pulledPredictions : results.values()) {
List<Prediction> withoutNullsList = pulledPredictions.stream().filter(Objects::nonNull).collect(Collectors.toList());;
log.info(withoutNullsList.toString());
assert( (new HashSet<>(withoutNullsList)).size() == withoutNullsList.size());
}
}
/**
* This procedure launches pool of publishing threads and wait until they are finished
*/
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment