Skip to content

Commit 82df2f0

Browse files
authored
refactor(controller): add method ControllerServer#reconfigurables (#2344)
Signed-off-by: Ning Yu <[email protected]>
1 parent 7b4240a commit 82df2f0

File tree

2 files changed

+11
-3
lines changed

2 files changed

+11
-3
lines changed

core/src/main/scala/kafka/server/ControllerServer.scala

+8-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import com.automq.stream.s3.metadata.ObjectUtils
2222
import kafka.autobalancer.AutoBalancerManager
2323
import kafka.autobalancer.services.AutoBalancerService
2424
import kafka.controller.streamaspect.client.{Context, StreamClientFactoryProxy}
25-
2625
import kafka.migration.MigrationPropagator
2726
import kafka.network.{DataPlaneAcceptor, SocketServer}
2827
import kafka.raft.KafkaRaftManager
@@ -38,7 +37,7 @@ import org.apache.kafka.common.network.ListenerName
3837
import org.apache.kafka.common.security.scram.internals.ScramMechanism
3938
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
4039
import org.apache.kafka.common.utils.LogContext
41-
import org.apache.kafka.common.{ClusterResource, Endpoint, Uuid}
40+
import org.apache.kafka.common.{ClusterResource, Endpoint, Reconfigurable, Uuid}
4241
import org.apache.kafka.controller.metrics.{ControllerMetadataMetricsPublisher, QuorumControllerMetrics}
4342
import org.apache.kafka.controller.{QuorumController, QuorumControllerExtension, QuorumFeatures}
4443
import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, MetadataPublisher}
@@ -584,5 +583,12 @@ class ControllerServer(
584583
protected def replicaPlacer(): ReplicaPlacer = {
585584
new StripedReplicaPlacer(new Random())
586585
}
586+
587+
// return a list of all reconfigurable objects
588+
def reconfigurables(): java.util.List[Reconfigurable] = {
589+
java.util.List.of(
590+
autoBalancerManager
591+
)
592+
}
587593
// AutoMQ for Kafka inject end
588594
}

core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
302302
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.nodeId, controller.config, controller.metrics, controller.clusterId))
303303
}
304304
addReconfigurable(new DynamicClientQuotaCallback(controller.quotaManagers, controller.config))
305-
addReconfigurable(controller.autoBalancerManager)
305+
// AutoMQ inject start
306+
controller.reconfigurables().asScala.foreach(addReconfigurable)
307+
// AutoMQ inject end
306308
addBrokerReconfigurable(new ControllerDynamicThreadPool(controller))
307309
// TODO: addBrokerReconfigurable(new DynamicListenerConfig(controller))
308310
addBrokerReconfigurable(controller.socketServer)

0 commit comments

Comments
 (0)