Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KinesisMessageDrivenChannelAdapter is not exported by IntegrationMbeansExporter #3092

Open
tomek82 opened this issue Mar 7, 2025 · 2 comments

Comments

@tomek82
Copy link

tomek82 commented Mar 7, 2025

From SO https://stackoverflow.com/questions/79489969/how-to-get-kinesismessagedrivenchanneladapter-exposed-on-jmx/79490248#79490248

Describe the issue
I'm trying to get KinesisMessageDrivenChannelAdapter exposed on JMX so that I get access to operations that manipulate the checkpoints (resetCheckpointForShardToSequenceNumber etc.). But all I can see are just the channel beans (null, error, functionRouter and one defined for StreamBridge).

I have spring-integration-jmx added, I have the @EnableIntegrationMBeanExport on my configuration and I can see the IntegrationMbeansExporter afterSingletonsInstantiated getting called but at that time the KinesisMessageDrivenChannelAdapter hasn't been created yet so it's not getting exported. Is there a different way of handling this?

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Version of the framework
Spring Cloud Stream 4.0.5
Spring Integration AWS 3.0.6
Expected behavior
Screenshots

Additional context
Artem's solution is not possible since DefaultBinding.getEndpoint() is package-private. I've also tried using ConsumerEndpointCustomizer but then since KinesisMessageDrivenChannelAdapter is both instance of MessageProducer, Lifecycle and AbstractEndpoint I'm not sure the correct path is chosen in postProcessAfterInitialization.

@artembilan
Copy link
Member

Yeah... Sorry, I have missed that DefaultBinding.getEndpoint() is not public.
Yes, ConsumerEndpointCustomizer<KinesisMessageDrivenChannelAdapter> is a good choice instead.
The logic there in the IntegrationMBeanExporter is like:

	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
		if (this.singletonsInstantiated) {
			try {
				if (bean instanceof MessageChannel) {
					MessageChannel monitor = (MessageChannel) extractTarget(bean);
					if (monitor instanceof IntegrationManagement) {
						this.channels.put(beanName, (IntegrationManagement) monitor);
						registerChannel((IntegrationManagement) monitor);
						this.runtimeBeans.add(bean);
					}
				}
				else if (bean instanceof MessageProducer && bean instanceof Lifecycle) {
					registerProducer((MessageProducer) bean);
					this.runtimeBeans.add(bean);
				}
				else if (bean instanceof AbstractEndpoint) {
					postProcessAbstractEndpoint(bean);
				}
			}
			catch (Exception e) {
				logger.error("Could not register an MBean for: " + beanName, e);
			}
		}
		return bean;
	}

So, even if KinesisMessageDrivenChannelAdapter is an AbstractEndpoint, it won't reach the last branch because it is bean instanceof MessageProducer && bean instanceof Lifecycle.

Looks like spring-integration-jmx is unconditional dependency of the spring-cloud-stream, so it looks safe to inject an IntegrationMBeanExporter into the AbstractMessageChannelBinder and do respective postProcessAfterInitialization() for all the binding endpoints.

@artembilan
Copy link
Member

Forgot to mention: this is not only related to Kinesis binder.
All others are also based on Spring Integration and have to follow similar JMX functionality.

But that all makes sense only if there is no other way to deal with bindings in Spring Cloud Stream to fulfill JMX expectation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants