Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

on IdeaJ, Flink DatastreamAPI faced: Caused by: org.apache.kafka.common.errors.TimeoutException: Topic kafkaproducer not present in metadata after 60000 ms. #239

Open
1 of 15 tasks
cicicao opened this issue Mar 19, 2023 · 0 comments

Comments

@cicicao
Copy link

cicicao commented Mar 19, 2023

Description

on Flink 1.16.0,

KafkaSink sink = KafkaSink.builder()
.setProperty("bootstrap.servers","cicieventhub.servicebus.windows.net:9093")
.setProperty("transaction.timeout.ms","700000")
.setProperty("security.protocol", "PLAINTEXT")
.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://cicieventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=12345....=";")
.setDeliveryGuarantee(DeliveryGuarantee.NONE)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("kafkaproducer")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.build();

Caused by: org.apache.kafka.common.errors.TimeoutException: Topic kafkaproducer not present in metadata after 60000 ms.

How to reproduce

100% happen

Has it worked previously?

never

Checklist

IMPORTANT: We will close issues where the checklist has not been completed or where adequate information has not been provided.

Please provide the relevant information for the following items:

  • SDK (include version info): <REPLACE with e.g., kafka-python/Java SDK/confluent-kafka-dotnet with version info>
    jdk 1.8
  • Sample you're having trouble with: <REPLACE with e.g., Java quickstart>

// 1. get stream execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(8);

    // 2. read kafka message as stream input
    String kafka_brokers = "wn0-xx:9092,wn1-xx:9092,wn2-xx:9092";
    KafkaSource<String> source = KafkaSource.<String>builder()
            .setBootstrapServers(kafka_brokers)
            .setTopics("click_events")
            .setGroupId("my-group")
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();

    DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    stream.print();

    // 4. sink click into iot built-in eventhub
    KafkaSink<String> sink = KafkaSink.<String>builder()
            .setProperty("bootstrap.servers","cicieventhub.servicebus.windows.net:9093")
            .setProperty("transaction.timeout.ms","700000")
            .setProperty("security.protocol", "PLAINTEXT")
            .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://cicieventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=12345....=\";")
            .setDeliveryGuarantee(DeliveryGuarantee.NONE)
            .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                    .setTopic("kafkaproducer")
                    .setValueSerializationSchema(new SimpleStringSchema())
                    .build())
            .build();

    stream.sinkTo(sink);

    // 5. execute the stream
    env.execute("kafka Sink to other topic");
}

}

  • If using Apache Kafka Java clients or a framework that uses Apache Kafka Java clients, version: <REPLACE with e.g., 1.1.0>

  • Kafka client configuration: <REPLACE with e.g., auto.reset.offset=earliest, ..> (do not include your connection string or SAS Key)

  • Namespace and EventHub/topic name
    cicieventhub/kafkaproducer

  • Consumer or producer failure <REPLACE with e.g., Consumer failure>

Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka kafkaproducer--1@-1 with FlinkKafkaInternalProducer{transactionalId='null', inTransaction=false, closed=false}
at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:436)
at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:417)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic kafkaproducer not present in metadata after 60000 ms.

  • Timestamps in UTC <REPLACE with e.g., Nov 7 2018 - 17:15:01 UTC>
  • group.id or client.id <REPLACE with e.g., group.id=cg-name>
    not setting
  • Logs provided (with debug-level logging enabled if possible, e.g. log4j.rootLogger=DEBUG) or exception call stack
  • Standalone repro <REPLACE with e.g., Willing/able to send scenario to repro issue>
  • Operating system: <REPLACE with e.g., Ubuntu 16.04.5 (x64) LTS>

Win11, ideaJ develop

  • Critical issue

If this is a question on basic functionality, please verify the following:

  • Port 9093 should not be blocked by firewall ("broker cannot be found" errors)

on eventhub namespace level: allowed win11 and kafka vnet|subnet,
on win11, allow 9093 port and allow eventhub servicehub

  • Pinging FQDN should return cluster DNS resolution (e.g. $ ping namespace.servicebus.windows.net returns ~ ns-eh2-prod-am3-516.cloudapp.net [13.69.64.0])

C:\Users\cicivm>nslookup cicieventhub.servicebus.windows.net
Server: UnKnown
Address: 168.63.129.16

Non-authoritative answer:
Name: ns-eh2-prod-bn3-az502-s1.cloudapp.net
Address: 104.208.144.55
Aliases: cicieventhub.servicebus.windows.net
ns-eh2-prod-bn3-az502.trafficmanager.net

cannot ping

  • Namespace should be either Standard or Dedicated tier, not Basic (TopicAuthorization errors)

Standard

Other:
Flink 1.16.0
HDInsight Kafka 3.2.0
Azure eventhub

KAFKA SURFACE:
ENABLED

event hub:
kafkaproducer
kafkaconsumer
namespace: cicieventhub.servicebus.windows.net:443

eventhub networking setting:
Selected networks(include develop windows vm and source kafka source)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant