Skip to content

Commit

Permalink
Merge pull request #541 from gathanase/MqttRetained
Browse files Browse the repository at this point in the history
Read retained messages
  • Loading branch information
timothyjward authored Feb 6, 2025
2 parents 3d624e0 + b7ec877 commit 68e28b2
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -32,6 +32,7 @@
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.sensinact.gateway.southbound.mqtt.api.IMqttMessage;
import org.eclipse.sensinact.gateway.southbound.mqtt.api.IMqttMessageListener;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
Expand Down Expand Up @@ -74,11 +75,14 @@ public class MqttClientHandler implements MqttCallback {
*/
private Timer reconnectTimer;

private final Object lock = new Object();

/**
* Listener -> Topic handling predicate
*/
private Map<IMqttMessageListener, Predicate<String>> listeners = Collections
.synchronizedMap(new IdentityHashMap<>());
private Map<IMqttMessageListener, Predicate<String>> listeners = new IdentityHashMap<>();

private Map<String, IMqttMessage> topic2last = new HashMap<>();

private String[] topics;

Expand All @@ -92,14 +96,28 @@ public class MqttClientHandler implements MqttCallback {
@Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC)
public void addListener(IMqttMessageListener listener, Map<String, Object> svcProps) {
final String[] filters = getArrayProperty(svcProps.get(IMqttMessageListener.MQTT_TOPICS_FILTERS));
listeners.put(listener, (str) -> serviceMatchesTopic(str, filters));
Predicate<String> predicate = (str) -> serviceMatchesTopic(str, filters);
Map<String, IMqttMessage> topic2lastCopy;
synchronized(lock) {
listeners.put(listener, predicate);
topic2lastCopy = new HashMap<>(topic2last);
}
for (var lastEntry: topic2lastCopy.entrySet()) {
String topic = lastEntry.getKey();
IMqttMessage message = lastEntry.getValue();
if (predicate.test(topic) && message.getPayload().length > 0) {
listener.onMqttMessage(message.getHandlerId(), topic, message);
}
}
}

/**
* MQTT listener unregistered
*/
public void removeListener(IMqttMessageListener listener) {
listeners.remove(listener);
synchronized(lock) {
listeners.remove(listener);
}
}

/**
Expand Down Expand Up @@ -307,13 +325,14 @@ public void messageArrived(String topic, MqttMessage message) throws Exception {
final SensiNactMqttMessage snMessage = new SensiNactMqttMessage(handlerId, topic, message);
client.messageArrivedComplete(message.getId(), message.getQos());

// Notify matching listeners
Map<IMqttMessageListener, Predicate<String>> workListeners;
synchronized (listeners) {
workListeners = new IdentityHashMap<>(listeners);
Map<IMqttMessageListener, Predicate<String>> listenersCopy;
synchronized(lock) {
listenersCopy = new IdentityHashMap<>(listeners);
topic2last.put(snMessage.getTopic(), snMessage);
}

for (Entry<IMqttMessageListener, Predicate<String>> entry : workListeners.entrySet()) {
// Notify matching listeners
for (Entry<IMqttMessageListener, Predicate<String>> entry : listenersCopy.entrySet()) {
if (entry.getValue().test(topic)) {
try {
entry.getKey().onMqttMessage(handlerId, topic, snMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
**********************************************************************/
package org.eclipse.sensinact.gateway.southbound.mqtt.test;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
Expand Down Expand Up @@ -76,8 +76,8 @@ void startServerAndLocalClient() throws Exception {
IConfig config = new MemoryConfig(new Properties());
config.setProperty(IConfig.HOST_PROPERTY_NAME, "127.0.0.1");
config.setProperty(IConfig.PORT_PROPERTY_NAME, "2183");
config.setProperty(IConfig.PERSISTENCE_ENABLED_PROPERTY_NAME, "false");
server.startServer(config);

client = new MqttClient("tcp://127.0.0.1:2183", MqttClient.generateClientId());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
Expand Down Expand Up @@ -119,13 +119,13 @@ private void doTest(final BlockingQueue<IMqttMessage> receiveQueue)
IMqttMessage msg = null;
// Try a few times waiting for a reconnect
for (int i = 0; i < 5 && msg == null; i++) {
client.publish(topic, content.getBytes(StandardCharsets.UTF_8), 1, false);
client.publish(topic, content.getBytes(UTF_8), 1, false);
// Wait a bit
msg = receiveQueue.poll(1, TimeUnit.SECONDS);
}
assertNotNull(msg);
assertEquals(topic, msg.getTopic());
assertEquals(content, new String(msg.getPayload(), StandardCharsets.UTF_8));
assertEquals(content, new String(msg.getPayload(), UTF_8));
}

@Test
Expand Down Expand Up @@ -205,4 +205,28 @@ void testMqttEarlyServerStart() throws Exception {

doTest(messages1);
}

@Test
void testRetainedMessage() throws Exception {

// Start the server
startServerAndLocalClient();

client.publish("sensinact/mqtt/test1/fizz", "RetainedFizz".getBytes(UTF_8), 1, true);
client.publish("sensinact/mqtt/test1/fizz", "NotRetained".getBytes(UTF_8), 1, false);
client.publish("sensinact/mqtt/test1/buzz", "RetainedBuzz".getBytes(UTF_8), 1, true);

// Activate the handler after some retained messages have been published
handler.activate(getConfig());
final BlockingQueue<IMqttMessage> messages1 = new ArrayBlockingQueue<>(32);
final IMqttMessageListener listener1 = (handler, topic, msg) -> messages1.add(msg);
handler.addListener(listener1, Map.of(IMqttMessageListener.MQTT_TOPICS_FILTERS,
new String[] { "sensinact/mqtt/test1/fizz" }));

IMqttMessage msg = messages1.poll(1000, TimeUnit.MILLISECONDS);
assertEquals("RetainedFizz", new String(msg.getPayload(), UTF_8));

msg = messages1.poll(1000, TimeUnit.MILLISECONDS);
assertEquals(null, msg);
}
}

0 comments on commit 68e28b2

Please sign in to comment.