Commit bc6edd9f authored by mriedl's avatar mriedl
Browse files

retrying connection, movied to subscription part

parent 5f7919d5
......@@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.melodic.event.brokerclient.BrokerClient;
import eu.melodic.event.brokerclient.BrokerPublisher;
import eu.morphemic.prediction_orchestrator.communication.listeners.ActiveMQListener;
import eu.morphemic.prediction_orchestrator.communication.messages.outcoming_messages.PooledPredictionMessage;
import eu.morphemic.prediction_orchestrator.registries.PredictionRegistry;
import eu.morphemic.prediction_orchestrator.service.Coordinator;
......@@ -18,7 +19,6 @@ import lombok.extern.slf4j.Slf4j;
import javax.jms.JMSException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Objects;
......@@ -36,31 +36,31 @@ public class CommunicationService {
this.properties = properties;
}
public synchronized void startReceivingMetricList(Coordinator coordinator) throws JMSException {
public synchronized void startReceivingMetricList(Coordinator coordinator) {
log.info("Starting Receiving Metrics List");
String topicName = TopicFactory.getReceiveMetricsListTopic();
ReceiveMetricsListener receiveMetricsListener =
new ReceiveMetricsListener(coordinator, topicName);
Objects.requireNonNull(brokerClients.computeIfAbsent(topicName, client -> getNewClient())).subscribe(null, topicName, receiveMetricsListener);
subscribeWithRetries(topicName, receiveMetricsListener);
}
public synchronized void startReceivingModelRetrained(Coordinator coordinator) throws JMSException {
public synchronized void startReceivingModelRetrained(Coordinator coordinator) {
log.info("Starting Receiving RetrainedModels messages");
String topicName = TopicFactory.getReceiveModelRetrained();
RetrainedModelListener retrainedModelListener = new RetrainedModelListener(coordinator, topicName);
Objects.requireNonNull(brokerClients.computeIfAbsent(topicName, client -> getNewClient())).subscribe(null, topicName, retrainedModelListener);
subscribeWithRetries(topicName, retrainedModelListener);
}
public synchronized void startReceivingPredictions(String metricName, String methodName,
PredictionRegistry predictionRegistry,
MetricHandler metricHandler) throws JMSException {
MetricHandler metricHandler) {
log.info("Starting Receiving PredictionFromForecasterMessage from {} concerning metric {}", methodName, metricName);
String topicName = TopicFactory.getReceivePredictionTopic(metricName, methodName);
PredictionListener predictionListener = new PredictionListener(
topicName,
predictionRegistry,
metricHandler);
Objects.requireNonNull(brokerClients.computeIfAbsent(topicName, client -> getNewClient())).subscribe(null, topicName, predictionListener);
subscribeWithRetries(topicName, predictionListener);
}
public synchronized void stopReceivingPredictions(String metricName, String methodName) throws JMSException {
......@@ -104,26 +104,38 @@ public class CommunicationService {
);
}
private BrokerClient getNewClient() {
BrokerClient brokerClient;
while (true) {
private void subscribeWithRetries(String topicName, ActiveMQListener mqListener) {
int retriesCount = properties.getActiveMqRestartCount();
while (retriesCount > 0) {
try {
brokerClient = BrokerClient.newClient(
null,
null,
properties.getBroker_properties_configuration_file_location()
);
break;
} catch (IOException | JMSException e) {
Objects.requireNonNull(brokerClients.computeIfAbsent(topicName, client -> getNewClient()))
.subscribe(null, topicName, mqListener);
break;
} catch (JMSException e) {
e.printStackTrace();
log.warn("Connection failed, retrying in: {}", properties.getActiveMqRestartInterval());
try {
Thread.sleep(properties.getActiveMqRestartInterval());
retriesCount--;
} catch (InterruptedException iex) {
iex.printStackTrace();
}
}
}
}
private BrokerClient getNewClient() {
BrokerClient brokerClient = null;
try {
brokerClient = BrokerClient.newClient(
null,
null,
properties.getBroker_properties_configuration_file_location()
);
} catch (Exception e) {
e.printStackTrace();
}
return brokerClient;
}
}
......@@ -41,4 +41,7 @@ public class Properties {
@Value("${activemq.restartinterval:10000}")
private long activeMqRestartInterval;
@Value("${activemq.restartcount:20}")
private int activeMqRestartCount;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment