Skip to content

Commit

Permalink
Controller.Api.MQTT: reconnect on connection failure (#2602)
Browse files Browse the repository at this point in the history
# Add Reconnection Logic to MQTT Controller

This PR introduces reconnection logic to the MQTT Controller, ensuring robustness in the face of connection losses with the MQTT broker. The implementation schedules reconnection attempts with an exponential backoff strategy, aiming to minimize the impact on both the broker and the controller while trying to re-establish a lost connection.

# Key Changes:

1. **Scheduled Reconnection Attempts**: Utilizes a single-threaded `ScheduledExecutorService` to manage reconnection attempts, allowing for delayed execution with exponential backoff.

2. **Adaptive Delay Calculation**: Implements an adaptive delay calculation using a base delay, a maximum delay cap, and a multiplier to manage the delay between reconnection attempts, preventing aggressive reconnection attempts.

3. **Integration with Component Lifecycle**: Hooks the reconnection logic into the component's activation process, ensuring attempts to connect or reconnect are made upon activation. Similarly, ensures proper shutdown of the executor service upon deactivation to clean up resources.

## Integration:
This implementation ensures that the MQTT Controller attempts to reconnect to the broker following a connection loss, employing an exponential backoff strategy to balance between prompt reconnection and avoiding overwhelming the broker or the network.
  • Loading branch information
Sn0w3y authored Jun 16, 2024
1 parent d7cf502 commit 625122c
Showing 1 changed file with 73 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package io.openems.edge.controller.api.mqtt;

import static io.openems.common.utils.ThreadPoolUtils.shutdownAndAwaitTermination;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.paho.mqttv5.client.IMqttClient;
import org.eclipse.paho.mqttv5.common.MqttException;
Expand Down Expand Up @@ -46,21 +53,28 @@ public class ControllerApiMqttImpl extends AbstractOpenemsComponent

protected static final String COMPONENT_NAME = "Controller.Api.MQTT";

private static final long INITIAL_RECONNECT_DELAY_SECONDS = 5;
private static final long MAX_RECONNECT_DELAY_SECONDS = 300; // 5 minutes maximum delay.
private static final double RECONNECT_DELAY_MULTIPLIER = 1.5;

private final Logger log = LoggerFactory.getLogger(ControllerApiMqttImpl.class);
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final AtomicInteger reconnectionAttempt = new AtomicInteger(0);
private final SendChannelValuesWorker sendChannelValuesWorker = new SendChannelValuesWorker(this);
private final MqttConnector mqttConnector = new MqttConnector();

protected Config config;

private volatile ScheduledFuture<?> reconnectFuture = null;
private String topicPrefix;
private IMqttClient mqttClient = null;

@Reference(policy = ReferencePolicy.DYNAMIC, policyOption = ReferencePolicyOption.GREEDY, cardinality = ReferenceCardinality.OPTIONAL)
private volatile Timedata timedata = null;

@Reference
protected ComponentManager componentManager;

protected Config config;

private String topicPrefix;
private IMqttClient mqttClient = null;

public ControllerApiMqttImpl() {
super(//
OpenemsComponent.ChannelId.values(), //
Expand All @@ -82,6 +96,7 @@ private void activate(ComponentContext context, Config config) throws Exception
this.mqttClient = client;
this.logInfo(this.log, "Connected to MQTT Broker [" + config.uri() + "]");
});
this.scheduleReconnect();
}

/**
Expand Down Expand Up @@ -113,13 +128,13 @@ protected static String createTopicPrefix(Config config) {
@Deactivate
protected void deactivate() {
super.deactivate();
this.mqttConnector.deactivate();
this.sendChannelValuesWorker.deactivate();
shutdownAndAwaitTermination(this.scheduledExecutorService, 0);

if (this.mqttClient != null) {
try {
this.mqttClient.close();
} catch (MqttException e) {
this.logWarn(this.log, "Unable to close connection to MQTT brokwer: " + e.getMessage());
this.logWarn(this.log, "Unable to close connection to MQTT broker: " + e.getMessage());
e.printStackTrace();
}
}
Expand Down Expand Up @@ -159,6 +174,7 @@ public void handleEvent(Event event) {
// Trigger sending of all channel values, because a Component might have
// disappeared
this.sendChannelValuesWorker.sendValuesOfAllChannelsOnce();
break;
}
}

Expand Down Expand Up @@ -199,4 +215,53 @@ protected boolean publish(String subTopic, String message, int qos, boolean reta
var msg = new MqttMessage(message.getBytes(StandardCharsets.UTF_8), qos, retained, properties);
return this.publish(subTopic, msg);
}

private synchronized void scheduleReconnect() {
if (this.reconnectFuture != null && !this.reconnectFuture.isDone()) {
this.reconnectFuture.cancel(false);
}

this.attemptConnect();
}

private void attemptConnect() {
if (this.mqttClient != null && this.mqttClient.isConnected()) {
return; // Already connected
}
try {
this.mqttConnector
.connect(this.config.uri(), this.config.clientId(), this.config.username(), this.config.password(),
this.config.certPem(), this.config.privateKeyPem(), this.config.trustStorePem())
.thenAccept(client -> {
this.mqttClient = client;
this.logInfo(this.log, "Connected to MQTT Broker [" + this.config.uri() + "]");
this.reconnectionAttempt.set(0); // Reset on successful connection.
}) //
.exceptionally(ex -> {
this.log.error("Failed to connect to MQTT broker: " + ex.getMessage(), ex);
this.scheduleNextAttempt(); // Schedule the next attempt with an increased delay.
return null;
});
} catch (Exception e) {
this.log.error("Error attempting to connect to MQTT broker", e);
this.scheduleNextAttempt(); // Schedule the next attempt with an increased delay.
}
}

private void scheduleNextAttempt() {
long delay = this.calculateNextDelay();
// Ensure the executor service is not shut down
if (!this.scheduledExecutorService.isShutdown()) {
this.reconnectFuture = this.scheduledExecutorService.schedule(this::attemptConnect, delay,
TimeUnit.SECONDS);
}
}

private long calculateNextDelay() {
long delay = (long) (INITIAL_RECONNECT_DELAY_SECONDS
* Math.pow(RECONNECT_DELAY_MULTIPLIER, this.reconnectionAttempt.getAndIncrement()));
delay = Math.min(delay, MAX_RECONNECT_DELAY_SECONDS); // Ensure delay does not exceed maximum
return delay;
}

}

0 comments on commit 625122c

Please sign in to comment.