Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka producer with transaction enabled not able to publish event to event hub #209

Open
13 of 15 tasks
ravindra-dh opened this issue Sep 23, 2022 · 2 comments
Open
13 of 15 tasks

Comments

@ravindra-dh
Copy link

ravindra-dh commented Sep 23, 2022

Description

Kafka client without transaction able to publish the events to event hub. But when transaction enabled, it fails with error: an existing connection was forcibly closed by the remote host

How to reproduce

Sample code is provided as part of checklist

Has it worked previously?

First time trying this.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed or where adequate information has not been provided.

Please provide the relevant information for the following items:

  • SDK (include version info): Java SDK 11
  • Sample you're having trouble with: producer.initTransactions(); producer.beginTransaction(); producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.out.println(exception); System.exit(1); } } }); producer.commitTransaction();
  • If using Apache Kafka Java clients or a framework that uses Apache Kafka Java clients, version: kafka-client:2.3.0
  • Kafka client configuration: acks = 1 batch.size = 16384 bootstrap.servers = [*********.servicebus.windows.net:9093] buffer.memory = 33554432 client.dns.lookup = default client.id = KafkaExampleProducer compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.LongSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = [hidden] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = PLAIN security.protocol = SASL_SSL send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = auto value.serializer = class org.apache.kafka.common.serialization.StringSerializer
  • Namespace and EventHub/topic name
  • Consumer or producer failure Producer failure
  • Timestamps in UTC Sept 22 2022 - 17:15:01 UTC
  • group.id or client.id KafkaExampleProducer
  • Logs provided 2022-09-23 11:25:06 INFO KafkaProducer:532 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Instantiated a transactional producer. 2022-09-23 11:25:06 DEBUG Metrics:416 - Added sensor with name bufferpool-wait-time 2022-09-23 11:25:06 DEBUG Metrics:416 - Added sensor with name buffer-exhausted-records 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name errors 2022-09-23 11:25:07 INFO AbstractLogin:61 - Successfully logged in. 2022-09-23 11:25:07 DEBUG SslEngineBuilder:157 - Created SSL context with keystore null, truststore null 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name produce-throttle-time 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name connections-closed: 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name connections-created: 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name successful-authentication: 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name successful-reauthentication: 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name successful-authentication-no-reauth: 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name failed-authentication: 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name failed-reauthentication: 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name reauthentication-latency: 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name bytes-sent-received: 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name bytes-sent: 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name bytes-received: 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name select-time: 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name io-time: 2022-09-23 11:25:07 INFO KafkaProducer:548 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled. 2022-09-23 11:25:07 INFO KafkaProducer:574 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Overriding the default acks to all since idempotence is enabled. 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name batch-size 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name compression-rate 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name queue-time 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name request-time 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name records-per-request 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name record-retries 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name record-size 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name batch-split-rate 2022-09-23 11:25:07 WARN ProducerConfig:355 - The configuration 'producer.sasl.jaas.config' was supplied but isn't a known config. 2022-09-23 11:25:07 DEBUG Sender:233 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Starting Kafka producer I/O thread. 2022-09-23 11:25:07 WARN ProducerConfig:355 - The configuration 'producer.security.protocol' was supplied but isn't a known config. 2022-09-23 11:25:07 DEBUG NetworkClient:1113 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Initialize connection to node ame-nonprod-eventhubs.servicebus.windows.net:9093 (id: -1 rack: null) for sending metadata request 2022-09-23 11:25:07 WARN ProducerConfig:355 - The configuration 'TLS.Config.InsecureSkipVerify' was supplied but isn't a known config. 2022-09-23 11:25:07 WARN ProducerConfig:355 - The configuration 'producer.sasl.mechanism' was supplied but isn't a known config. 2022-09-23 11:25:07 WARN ProducerConfig:355 - The configuration 'TLS.Config.ClientAuth' was supplied but isn't a known config. 2022-09-23 11:25:07 DEBUG NetworkClient:944 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Initiating connection to node ame-nonprod-eventhubs.servicebus.windows.net:9093 (id: -1 rack: null) using address ame-nonprod-eventhubs.servicebus.windows.net/52.178.17.145 2022-09-23 11:25:07 INFO AppInfoParser:117 - Kafka version: 2.3.0 2022-09-23 11:25:07 INFO AppInfoParser:118 - Kafka commitId: fc1aaa116b661c8a 2022-09-23 11:25:07 INFO AppInfoParser:119 - Kafka startTimeMs: 1663912507820 2022-09-23 11:25:07 DEBUG KafkaProducer:428 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Kafka producer started 2022-09-23 11:25:07 DEBUG SaslClientAuthenticator:351 - Set SASL client state to SEND_APIVERSIONS_REQUEST 2022-09-23 11:25:07 DEBUG SaslClientAuthenticator:180 - Creating SaslClient: client=null;service=kafka;serviceHostname=ame-nonprod-eventhubs.servicebus.windows.net;mechs=[PLAIN] 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name node--1.bytes-sent 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name node--1.bytes-received 2022-09-23 11:25:07 DEBUG Metrics:416 - Added sensor with name node--1.latency 2022-09-23 11:25:07 DEBUG Selector:535 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 2022-09-23 11:25:08 DEBUG NetworkClient:902 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Completed connection to node -1. Fetching API versions. 2022-09-23 11:25:08 DEBUG SslTransportLayer:424 - [SslTransportLayer channelId=-1 key=channel=java.nio.channels.SocketChannel[connection-pending remote=ame-nonprod-eventhubs.servicebus.windows.net/52.178.17.145:9093], selector=sun.nio.ch.WindowsSelectorImpl@423749d, interestOps=8, readyOps=0] SSL handshake completed successfully with peerHost 'ame-nonprod-eventhubs.servicebus.windows.net' peerPort 9093 peerPrincipal 'CN=servicebus.windows.net, O=Microsoft Corporation, L=Redmond, ST=WA, C=US' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' 2022-09-23 11:25:08 DEBUG SaslClientAuthenticator:351 - Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE 2022-09-23 11:25:08 DEBUG SaslClientAuthenticator:351 - Set SASL client state to SEND_HANDSHAKE_REQUEST 2022-09-23 11:25:08 DEBUG SaslClientAuthenticator:351 - Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE 2022-09-23 11:25:08 DEBUG SaslClientAuthenticator:351 - Set SASL client state to INITIAL 2022-09-23 11:25:08 DEBUG SaslClientAuthenticator:351 - Set SASL client state to INTERMEDIATE 2022-09-23 11:25:09 DEBUG SaslClientAuthenticator:351 - Set SASL client state to COMPLETE 2022-09-23 11:25:09 DEBUG SaslClientAuthenticator:629 - Finished authentication with no session expiration and no session re-authentication 2022-09-23 11:25:09 DEBUG Selector:564 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Successfully authenticated with ame-nonprod-eventhubs.servicebus.windows.net/52.178.17.145 2022-09-23 11:25:09 DEBUG NetworkClient:916 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Initiating API versions fetch from node -1. 2022-09-23 11:25:09 DEBUG NetworkClient:916 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Initiating API versions fetch from node -1. 2022-09-23 11:25:09 DEBUG NetworkClient:499 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Using older server API v0 to send API_VERSIONS {} with correlation id 1 to node -1 2022-09-23 11:25:09 DEBUG NetworkClient:870 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Recorded API versions for node -1: (Produce(0): 3 to 7 [usable: 7], Fetch(1): 4 to 6 [usable: 6], ListOffsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 5], LeaderAndIsr(4): UNSUPPORTED, StopReplica(5): UNSUPPORTED, UpdateMetadata(6): UNSUPPORTED, ControlledShutdown(7): UNSUPPORTED, OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 4 [usable: 4], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 2 [usable: 2], DeleteRecords(21): UNSUPPORTED, InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): UNSUPPORTED, AddOffsetsToTxn(25): UNSUPPORTED, EndTxn(26): UNSUPPORTED, WriteTxnMarkers(27): UNSUPPORTED, TxnOffsetCommit(28): UNSUPPORTED, DescribeAcls(29): UNSUPPORTED, CreateAcls(30): UNSUPPORTED, DeleteAcls(31): UNSUPPORTED, DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): UNSUPPORTED, AlterReplicaLogDirs(34): UNSUPPORTED, DescribeLogDirs(35): UNSUPPORTED, SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): UNSUPPORTED, RenewDelegationToken(39): UNSUPPORTED, ExpireDelegationToken(40): UNSUPPORTED, DescribeDelegationToken(41): UNSUPPORTED, DeleteGroups(42): 0 [usable: 0], ElectPreferredLeaders(43): UNSUPPORTED, IncrementalAlterConfigs(44): UNSUPPORTED) 2022-09-23 11:25:09 DEBUG NetworkClient:1097 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Sending metadata request MetadataRequestData(topics=[], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node ame-nonprod-eventhubs.servicebus.windows.net:9093 (id: -1 rack: null) 2022-09-23 11:25:09 DEBUG NetworkClient:499 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Using older server API v5 to send METADATA {topics=[],allow_auto_topic_creation=true} with correlation id 2 to node -1 2022-09-23 11:25:09 DEBUG Metadata:270 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Updated cluster metadata updateVersion 2 to MetadataCache{cluster=Cluster(id = null, nodes = [ame-nonprod-eventhubs.servicebus.windows.net:9093 (id: 0 rack: null)], partitions = [], controller = ame-nonprod-eventhubs.servicebus.windows.net:9093 (id: 0 rack: null))} Test Data #0 from thread #28 2022-09-23 11:25:12 DEBUG TransactionManager:906 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Transition from state UNINITIALIZED to INITIALIZING 2022-09-23 11:25:12 INFO TransactionManager:450 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] ProducerId set to -1 with epoch -1 2022-09-23 11:25:12 DEBUG TransactionManager:942 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Enqueuing transactional request InitProducerIdRequestData(transactionalId='auto', transactionTimeoutMs=60000) 2022-09-23 11:25:12 DEBUG TransactionManager:942 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Enqueuing transactional request FindCoordinatorRequestData(key='auto', keyType=1) 2022-09-23 11:25:12 DEBUG TransactionManager:942 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Enqueuing transactional request InitProducerIdRequestData(transactionalId='auto', transactionTimeoutMs=60000) 2022-09-23 11:25:13 DEBUG NetworkClient:944 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Initiating connection to node ame-nonprod-eventhubs.servicebus.windows.net:9093 (id: 0 rack: null) using address ame-nonprod-eventhubs.servicebus.windows.net/52.178.17.145 2022-09-23 11:25:13 DEBUG SaslClientAuthenticator:351 - Set SASL client state to SEND_APIVERSIONS_REQUEST 2022-09-23 11:25:13 DEBUG SaslClientAuthenticator:180 - Creating SaslClient: client=null;service=kafka;serviceHostname=ame-nonprod-eventhubs.servicebus.windows.net;mechs=[PLAIN] 2022-09-23 11:25:13 DEBUG Metrics:416 - Added sensor with name node-0.bytes-sent 2022-09-23 11:25:13 DEBUG Metrics:416 - Added sensor with name node-0.bytes-received 2022-09-23 11:25:13 DEBUG Metrics:416 - Added sensor with name node-0.latency 2022-09-23 11:25:13 DEBUG Selector:535 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 0 2022-09-23 11:25:13 DEBUG NetworkClient:902 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Completed connection to node 0. Fetching API versions. 2022-09-23 11:25:13 DEBUG SslTransportLayer:424 - [SslTransportLayer channelId=0 key=channel=java.nio.channels.SocketChannel[connection-pending remote=ame-nonprod-eventhubs.servicebus.windows.net/52.178.17.145:9093], selector=sun.nio.ch.WindowsSelectorImpl@423749d, interestOps=8, readyOps=0] SSL handshake completed successfully with peerHost 'ame-nonprod-eventhubs.servicebus.windows.net' peerPort 9093 peerPrincipal 'CN=servicebus.windows.net, O=Microsoft Corporation, L=Redmond, ST=WA, C=US' cipherSuite 'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384' 2022-09-23 11:25:13 DEBUG SaslClientAuthenticator:351 - Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE 2022-09-23 11:25:14 DEBUG SaslClientAuthenticator:351 - Set SASL client state to SEND_HANDSHAKE_REQUEST 2022-09-23 11:25:14 DEBUG SaslClientAuthenticator:351 - Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE 2022-09-23 11:25:14 DEBUG SaslClientAuthenticator:351 - Set SASL client state to INITIAL 2022-09-23 11:25:14 DEBUG SaslClientAuthenticator:351 - Set SASL client state to INTERMEDIATE 2022-09-23 11:25:14 DEBUG SaslClientAuthenticator:351 - Set SASL client state to COMPLETE 2022-09-23 11:25:14 DEBUG SaslClientAuthenticator:629 - Finished authentication with no session expiration and no session re-authentication 2022-09-23 11:25:14 DEBUG Selector:564 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Successfully authenticated with ame-nonprod-eventhubs.servicebus.windows.net/52.178.17.145 2022-09-23 11:25:14 DEBUG NetworkClient:916 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Initiating API versions fetch from node 0. 2022-09-23 11:25:14 DEBUG NetworkClient:916 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Initiating API versions fetch from node 0. 2022-09-23 11:25:14 DEBUG NetworkClient:499 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Using older server API v0 to send API_VERSIONS {} with correlation id 4 to node 0 2022-09-23 11:25:15 DEBUG NetworkClient:870 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Recorded API versions for node 0: (Produce(0): 3 to 7 [usable: 7], Fetch(1): 4 to 6 [usable: 6], ListOffsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 5], LeaderAndIsr(4): UNSUPPORTED, StopReplica(5): UNSUPPORTED, UpdateMetadata(6): UNSUPPORTED, ControlledShutdown(7): UNSUPPORTED, OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 4 [usable: 4], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 2 [usable: 2], DeleteRecords(21): UNSUPPORTED, InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): UNSUPPORTED, AddOffsetsToTxn(25): UNSUPPORTED, EndTxn(26): UNSUPPORTED, WriteTxnMarkers(27): UNSUPPORTED, TxnOffsetCommit(28): UNSUPPORTED, DescribeAcls(29): UNSUPPORTED, CreateAcls(30): UNSUPPORTED, DeleteAcls(31): UNSUPPORTED, DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): UNSUPPORTED, AlterReplicaLogDirs(34): UNSUPPORTED, DescribeLogDirs(35): UNSUPPORTED, SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): UNSUPPORTED, RenewDelegationToken(39): UNSUPPORTED, ExpireDelegationToken(40): UNSUPPORTED, DescribeDelegationToken(41): UNSUPPORTED, DeleteGroups(42): 0 [usable: 0], ElectPreferredLeaders(43): UNSUPPORTED, IncrementalAlterConfigs(44): UNSUPPORTED) 2022-09-23 11:25:15 DEBUG Sender:455 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Sending transactional request FindCoordinatorRequestData(key='auto', keyType=1) to node ame-nonprod-eventhubs.servicebus.windows.net:9093 (id: 0 rack: null) 2022-09-23 11:25:15 DEBUG NetworkClient:499 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Using older server API v1 to send FIND_COORDINATOR {key=auto,key_type=1} with correlation id 5 to node 0 2022-09-23 11:25:15 DEBUG Selector:607 - [Producer clientId=KafkaExampleProducer, transactionalId=auto] Connection with ame-nonprod-eventhubs.servicebus.windows.net/52.178.17.145 disconnected java.io.IOException: An existing connection was forcibly closed by the remote host at java.base/sun.nio.ch.SocketDispatcher.read0(Native Method) at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276) at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:245) at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223) at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356) at org.apache.kafka.common.network.SslTransportLayer.readFromSocketChannel(SslTransportLayer.java:205) at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:528) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) at org.apache.kafka.common.network.Selector.poll(Selector.java:483) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:307) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) at java.base/java.lang.Thread.run(Thread.java:866) 2022-09-23 11:25:15 DEBUG SslTransportLayer:181 - [SslTransportLayer channelId=0 key=channel=java.nio.channels.SocketChannel[connection-pending remote=ame-nonprod-eventhubs.servicebus.windows.net/52.178.17.145:9093], selector=sun.nio.ch.WindowsSelectorImpl@423749d, interestOps=8, readyOps=0] Failed to send SSL Close message java.io.IOException: An existing connection was forcibly closed by the remote host at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method) at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:51) at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:113) at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:79) at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:50) at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:462) at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:218) at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:178) at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:836) at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:151) at org.apache.kafka.common.network.Selector.doClose(Selector.java:878) at org.apache.kafka.common.network.Selector.close(Selector.java:862) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:626) at org.apache.kafka.common.network.Selector.poll(Selector.java:483) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:307) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) at java.base/java.lang.Thread.run(Thread.java:866)
  • Standalone repro <REPLACE with e.g., Willing/able to send scenario to repro issue>
  • Operating system: Windows 11
  • Critical issue

If this is a question on basic functionality, please verify the following:

  • Port 9093 should not be blocked by firewall ("broker cannot be found" errors)
  • Pinging FQDN should return cluster DNS resolution (e.g. $ ping namespace.servicebus.windows.net returns ~ ns-eh2-prod-am3-516.cloudapp.net [13.69.64.0])
  • Namespace should be either Standard or Dedicated tier, not Basic (TopicAuthorization errors)
@arkadius
Copy link

arkadius commented Feb 1, 2023

@ravindra-dh Are you sure that it is related with transactions? Have you tried with transactions disabled?
I'm trying to use Kafka transactions with Event Hub and I have other behavior. It fails on KafkaProducer.initTransactions() with such message:

2023-02-01 14:01:33.253 [kafka-producer-network-thread | azure-test] INFO o.apache.kafka.clients.NetworkClient - [Producer clientId=azure-test, transactionalId=azure-testfa6ac8d9-59f4-4dc8-98e4-96480aafee2d] Node 0 disconnected.
2023-02-01 14:01:33.253 [kafka-producer-network-thread | azure-test] INFO o.apache.kafka.clients.NetworkClient - [Producer clientId=azure-test, transactionalId=azure-testfa6ac8d9-59f4-4dc8-98e4-96480aafee2d] Cancelled in-flight FIND_COORDINATOR request with correlation id 245 due to node 0 being disconnected (elapsed time since creation: 49ms, elapsed time since send: 49ms, request timeout: 60000ms)

I would like to know if each tenant can behave differently in this area.

@mgvinuesa
Copy link

Hello @arkadius, same here, I have those logs too. And also I detect this error:


org.springframework.transaction.CannotCreateTransactionException: Could not create Kafka transaction
	at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:162) ~[spring-kafka-3.0.2.jar:3.0.2]
	at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400) ~[spring-tx-6.0.4.jar:6.0.4]
	at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373) ~[spring-tx-6.0.4.jar:6.0.4]
	at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:595) ~[spring-tx-6.0.4.jar:6.0.4]
	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:382) ~[spring-tx-6.0.4.jar:6.0.4]
	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-6.0.4.jar:6.0.4]
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.4.jar:6.0.4]
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:752) ~[spring-aop-6.0.4.jar:6.0.4]
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:703) ~[spring-aop-6.0.4.jar:6.0.4]
	at com.ferrovial.architecture.sample.kafka.controller.PutMessageIntoTopicController$$SpringCGLIB$$0.sendMultipleAvroMessage(<generated>) ~[classes/:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
	at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:207) ~[spring-web-6.0.4.jar:6.0.4]
	at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:152) ~[spring-web-6.0.4.jar:6.0.4]
	at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-6.0.4.jar:6.0.4]
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:884) ~[spring-webmvc-6.0.4.jar:6.0.4]
	at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797) ~[spring-webmvc-6.0.4.jar:6.0.4]
	at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-6.0.4.jar:6.0.4]
	at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1080) ~[spring-webmvc-6.0.4.jar:6.0.4]
	at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:973) ~[spring-webmvc-6.0.4.jar:6.0.4]
	at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1011) ~[spring-webmvc-6.0.4.jar:6.0.4]
	at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:914) ~[spring-webmvc-6.0.4.jar:6.0.4]
	at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:731) ~[tomcat-embed-core-10.1.5.jar:6.0]
	at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885) ~[spring-webmvc-6.0.4.jar:6.0.4]
	at org.springframework.test.web.servlet.TestDispatcherServlet.service(TestDispatcherServlet.java:72) ~[spring-test-6.0.4.jar:6.0.4]
	at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:814) ~[tomcat-embed-core-10.1.5.jar:6.0]
	at org.springframework.mock.web.MockFilterChain$ServletFilterProxy.doFilter(MockFilterChain.java:165) ~[spring-test-6.0.4.jar:6.0.4]
	at org.springframework.mock.web.MockFilterChain.doFilter(MockFilterChain.java:132) ~[spring-test-6.0.4.jar:6.0.4]
	at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-6.0.4.jar:6.0.4]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.4.jar:6.0.4]
	at org.springframework.mock.web.MockFilterChain.doFilter(MockFilterChain.java:132) ~[spring-test-6.0.4.jar:6.0.4]
	at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-6.0.4.jar:6.0.4]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.4.jar:6.0.4]
	at org.springframework.mock.web.MockFilterChain.doFilter(MockFilterChain.java:132) ~[spring-test-6.0.4.jar:6.0.4]
	at org.springframework.web.filter.ServerHttpObservationFilter.doFilterInternal(ServerHttpObservationFilter.java:109) ~[spring-web-6.0.4.jar:6.0.4]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.4.jar:6.0.4]
	at org.springframework.mock.web.MockFilterChain.doFilter(MockFilterChain.java:132) ~[spring-test-6.0.4.jar:6.0.4]
	at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-6.0.4.jar:6.0.4]
	at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.0.4.jar:6.0.4]
	at org.springframework.mock.web.MockFilterChain.doFilter(MockFilterChain.java:132) ~[spring-test-6.0.4.jar:6.0.4]
	at org.springframework.test.web.servlet.MockMvc.perform(MockMvc.java:201) ~[spring-test-6.0.4.jar:6.0.4]
	at io.restassured.module.mockmvc.internal.MockMvcRequestSenderImpl.performRequest(MockMvcRequestSenderImpl.java:212) ~[spring-mock-mvc-5.2.1.jar:na]
	at io.restassured.module.mockmvc.internal.MockMvcRequestSenderImpl.sendRequest(MockMvcRequestSenderImpl.java:456) ~[spring-mock-mvc-5.2.1.jar:na]
	at io.restassured.module.mockmvc.internal.MockMvcRequestSenderImpl.post(MockMvcRequestSenderImpl.java:513) ~[spring-mock-mvc-5.2.1.jar:na]
	at io.restassured.module.mockmvc.internal.MockMvcRequestSenderImpl.post(MockMvcRequestSenderImpl.java:84) ~[spring-mock-mvc-5.2.1.jar:na]
	at com.ferrovial.architecture.sample.kafka.controller.it.KafkaProducerTransactionInEventHubTest.whenPutMultipleMessageInATransactionAndOneFail_ThenAnyOneAreConsumed(KafkaProducerTransactionInEventHubTest.java:130) ~[test-classes/:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
	at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) ~[junit-platform-commons-1.9.2.jar:1.9.2]
	at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) ~[junit-jupiter-engine-5.9.2.jar:5.9.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) ~[na:na]
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) ~[junit-platform-engine-1.9.2.jar:1.9.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:95) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:91) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:60) ~[junit-platform-launcher-1.9.2.jar:1.9.2]
	at org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:98) ~[.cp/:na]
	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:40) ~[.cp/:na]
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:529) ~[.cp/:na]
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:756) ~[.cp/:na]
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:452) ~[.cp/:na]
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210) ~[.cp/:na]
Caused by: org.springframework.kafka.KafkaException: initTransactions() failed
	at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateTxProducer(DefaultKafkaProducerFactory.java:873) ~[spring-kafka-3.0.2.jar:3.0.2]
	at org.springframework.kafka.core.DefaultKafkaProducerFactory.createTransactionalProducer(DefaultKafkaProducerFactory.java:824) ~[spring-kafka-3.0.2.jar:3.0.2]
	at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:737) ~[spring-kafka-3.0.2.jar:3.0.2]
	at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:727) ~[spring-kafka-3.0.2.jar:3.0.2]
	at org.springframework.kafka.core.ProducerFactoryUtils.getTransactionalResourceHolder(ProducerFactoryUtils.java:96) ~[spring-kafka-3.0.2.jar:3.0.2]
	at org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:146) ~[spring-kafka-3.0.2.jar:3.0.2]
	... 117 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000ms while awaiting InitProducerId

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants