Commit 36cb056f authored by Stéphanie Monticelli's avatar Stéphanie Monticelli

[MqttInjector] Add check to avoid nullPointerException.

parent cfe8d3ea
Pipeline #7338 passed with stages
in 2 minutes and 49 seconds
......@@ -192,7 +192,7 @@ public class SessionObjectFuseImpl extends SessionObject {
try {
keepAlive_s = Short.parseShort(keepAlive_sStr);
} catch (NumberFormatException ex) {
throw new IsacRuntimeException("Invalid MQTT connection keepAlive: " + keepAlive_sStr);
throw new IsacRuntimeException("Invalid MQTT connection keep-alive interval: " + keepAlive_sStr);
}
}
if (id == null || id.trim().isEmpty()) {
......@@ -773,37 +773,41 @@ public class SessionObjectFuseImpl extends SessionObject {
* reached.
*/
public void messageArrived() {
Future<Message> futureR = connexion.receive();
futureR.then(new Callback<Message>() {
public void onSuccess(Message message) {
String topic = message.getTopic();
MessageQueue queue = messagesByTopic.get(topic);
if (queue == null) {
Iterator<Map.Entry<String, MessageQueue>> queueIter = messagesByTopic.entrySet().iterator();
while (queue == null && queueIter.hasNext()) {
Map.Entry<String, MessageQueue> entry = queueIter.next();
if (matchTopic(entry.getKey(), topic)) {
queue = entry.getValue();
if (connexion != null && connexion.isConnected()) {
Future<Message> futureR = connexion.receive();
futureR.then(new Callback<Message>() {
public void onSuccess(Message message) {
String topic = message.getTopic();
MessageQueue queue = messagesByTopic.get(topic);
if (queue == null) {
Iterator<Map.Entry<String, MessageQueue>> queueIter = messagesByTopic.entrySet().iterator();
while (queue == null && queueIter.hasNext()) {
Map.Entry<String, MessageQueue> entry = queueIter.next();
if (matchTopic(entry.getKey(), topic)) {
queue = entry.getValue();
}
}
}
}
if (queue != null) {
MessageWithTopic msg = new MessageWithTopic(topic, message);
queue.put(msg);
allMessages.put(msg);
}
message.ack();
messageArrived();
}
if (queue != null) {
MessageWithTopic msg = new MessageWithTopic(topic, message);
queue.put(msg);
allMessages.put(msg);
}
message.ack();
public void onFailure(Throwable value) {
// do nothing
}
});
messageArrived();
}
public void onFailure(Throwable value) {
// do nothing
}
});
} else {
throw new IsacRuntimeException("Attempt to receive a message from an unconnected MQTT client.");
}
}
/////////////////////////////////////////////////////////////////////
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment