Debezium Demo Doesn't Work with 2.9.1 and Aurora 5.x #20101
Replies: 3 comments
-
Here's the output from running pulsar standalone. I stopped it towards the end: `2022-02-15T20:07:57,250+0000 [main] INFO org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble - Running 1 bookie(s) and advertised them at 127.0.0.1. 2022-02-15T20:08:13,743+0000 [Curator-Framework-0] INFO org.apache.curator.framework.imps.CuratorFrameworkImpl - backgroundOperationsLoop exiting |
Beta Was this translation helpful? Give feedback.
-
The issue had no activity for 30 days, mark with Stale label. |
Beta Was this translation helpful? Give feedback.
-
The issue had no activity for 30 days, mark with Stale label. |
Beta Was this translation helpful? Give feedback.
-
On either amd64 or arm64 Ubuntu images, https://debezium.io/blog/2019/05/23/tutorial-using-debezium-connectors-with-apache-pulsar/ doesn't work with version 2.9.1 binary download.
Tests before running. Pod can connect to AWS with mysql client.
Steps to reproduce:
Download pulsar binaries, extract. https://dlcdn.apache.org/pulsar/pulsar-2.9.1/apache-pulsar-2.9.1-bin.tar.gz
Download connector dependencies per instructions.
apt --assume-yes install wget unzip net-tools scala openjdk-8-jre-headless libcommons-io-java libcommons-lang3-java libcommons-math3-java libcommons-dbcp2-java libavro-java libjackson2-databind-java libcommons-codec-java python3 python3-pip
export environment variables, JAVA_HOME, PULSAR_HOME
start pulsar: ./bin/pulsar standalone. Appears to be running.
Use example debezium-mysql-source-config.yaml and change the hostname and credentials.
Server has multiple MySQL databases that all need to be streamed. Either defining all of them in whitelist, or defining none, doesn't change anything.
Open new terminal, run: bin/pulsar-admin source localrun --source-config-file /setup/debezium-mysql-source.yaml
It takes a minute to start logging. Then looks like the source is just restarting after crashing:
componentType: SOURCE , maxBufferedTuples=1024, functionAuthenticationSpec=null, port=0, clusterName=local, maxPendingAsyncRequests=1000, exposePulsarAdminClientEnabled=false, metricsPort=0) 2022-02-15T19:53:45,246+0000 [function-timer-thread-6-1] INFO org.apache.pulsar.common.nar.FileUtils - Jar file connectors/pulsar-io-kafka-connect-adaptor-2.9.1.nar contains META-INF/bundled-dependencies, it may be a NAR file 2022-02-15T19:53:45,246+0000 [function-timer-thread-6-1] INFO org.apache.pulsar.functions.runtime.thread.ThreadRuntime - Trying Loading file as NAR file: connectors/pulsar-io-kafka-connect-adaptor-2.9.1.nar 2022-02-15T19:53:47,043+0000 [function-timer-thread-6-1] INFO org.apache.pulsar.functions.runtime.thread.ThreadRuntime - Initialize function class loader for function debezium-kafka-source at function cache manager, functionClassLoader: org.apache.pulsar.common.nar.NarClassLoader[/tmp/pulsar_localrunner_nars_7426220787174598080/pulsar-nar/pulsar-io-kafka-connect-adaptor-2.9.1.nar-unpacked/90ickkj44GhweghdXLZZsw] 2022-02-15T19:53:47,044+0000 [function-timer-thread-6-1] INFO org.apache.pulsar.functions.runtime.thread.ThreadRuntime - ThreadContainer starting function with instance config InstanceConfig(instanceId=0, functionId=f01479ec-ab28-42eb-9d83-4b1517f72c3d, functionVersion=9879029e-4d46-480f-8a72-3481f5a91771, functionDetails=tenant: "public" namespace: "mysql8pipeline" name: "debezium-kafka-source" className: "org.apache.pulsar.functions.api.utils.IdentityFunction" autoAck: true parallelism: 1 source { className: "org.apache.pulsar.io.kafka.connect.KafkaConnectSource" configs: "{}" typeClassName: "org.apache.pulsar.common.schema.KeyValue" } sink { topic: "all-the-databases" typeClassName: "org.apache.pulsar.common.schema.KeyValue" forwardSourceMessageProperty: true } resources { cpu: 1.0 ram: 1073741824 disk: 10737418240 } componentType: SOURCE , maxBufferedTuples=1024, functionAuthenticationSpec=null, port=0, clusterName=local, maxPendingAsyncRequests=1000, exposePulsarAdminClientEnabled=false, metricsPort=0) 2022-02-15T19:53:47,056+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"all-the-databases","producerName":null,"sendTimeoutMs":0,"blockIfQueueFull":true,"maxPendingMessages":1000,"maxPendingMessagesAcrossPartitions":50000,"messageRoutingMode":"CustomPartition","hashingScheme":"Murmur3_32Hash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":10000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"compressionType":"LZ4","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{"application":"pulsar-source","id":"public/mysql8pipeline/debezium-kafka-source","instance_hostname":"pulsar-ftw-0.pulsar-ftw.pipeline.svc.cluster.local","instance_id":"0"}} 2022-02-15T19:53:47,141+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":0,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null} 2022-02-15T19:53:47,143+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [all-the-databases] [null] Creating producer on cnx [id: 0x144d34b8, L:/127.0.0.1:53656 - R:localhost/127.0.0.1:6650] 2022-02-15T19:53:47,151+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [all-the-databases] [standalone-0-22] Created producer on cnx [id: 0x144d34b8, L:/127.0.0.1:53656 - R:localhost/127.0.0.1:6650] 2022-02-15T19:53:47,241+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [all-the-databases] [standalone-0-22] Closed Producer 2022-02-15T19:54:15,243+0000 [function-timer-thread-6-1] ERROR org.apache.pulsar.functions.runtime.RuntimeSpawner - public/mysql8pipeline/debezium-kafka-source Function Container is dead with following exception. Restarting. java.lang.NullPointerException: null at java.lang.Class.forName0(Native Method) ~[?:1.8.0_312] at java.lang.Class.forName(Class.java:264) ~[?:1.8.0_312] at org.apache.pulsar.io.kafka.connect.AbstractKafkaConnectSource.open(AbstractKafkaConnectSource.java:88) ~[?:?] at org.apache.pulsar.io.kafka.connect.KafkaConnectSource.open(KafkaConnectSource.java:62) ~[?:?] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupInput(JavaInstanceRunnable.java:757) ~[org.apache.pulsar-pulsar-functions-instance-2.9.1.jar:2.9.1] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:232) ~[org.apache.pulsar-pulsar-functions-instance-2.9.1.jar:2.9.1] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:260) ~[org.apache.pulsar-pulsar-functions-instance-2.9.1.jar:2.9.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312] 2022-02-15T19:54:15,245+0000 [function-timer-thread-6-1] INFO org.apache.pulsar.functions.runtime.thread.ThreadRuntime - Unloading JAR files for function InstanceConfig(instanceId=0, functionId=f01479ec-ab28-42eb-9d83-4b1517f72c3d, functionVersion=9879029e-4d46-480f-8a72-3481f5a91771, functionDetails=tenant: "public" namespace: "mysql8pipeline" name: "debezium-kafka-source" className: "org.apache.pulsar.functions.api.utils.IdentityFunction" autoAck: true parallelism: 1 source { className: "org.apache.pulsar.io.kafka.connect.KafkaConnectSource" configs: "{}" typeClassName: "org.apache.pulsar.common.schema.KeyValue" } sink { topic: "all-the-databases" typeClassName: "org.apache.pulsar.common.schema.KeyValue" forwardSourceMessageProperty: true } resources { cpu: 1.0 ram: 1073741824 disk: 10737418240 } componentType: SOURCE , maxBufferedTuples=1024, functionAuthenticationSpec=null, port=0, clusterName=local, maxPendingAsyncRequests=1000, exposePulsarAdminClientEnabled=false, metricsPort=0) 2022-02-15T19:54:15,246+0000 [function-timer-thread-6-1] INFO org.apache.pulsar.common.nar.FileUtils - Jar file connectors/pulsar-io-kafka-connect-adaptor-2.9.1.nar contains META-INF/bundled-dependencies, it may be a NAR file 2022-02-15T19:54:15,247+0000 [function-timer-thread-6-1] INFO org.apache.pulsar.functions.runtime.thread.ThreadRuntime - Trying Loading file as NAR file: connectors/pulsar-io-kafka-connect-adaptor-2.9.1.nar 2022-02-15T19:54:17,053+0000 [function-timer-thread-6-1] INFO org.apache.pulsar.functions.runtime.thread.ThreadRuntime - Initialize function class loader for function debezium-kafka-source at function cache manager, functionClassLoader: org.apache.pulsar.common.nar.NarClassLoader[/tmp/pulsar_localrunner_nars_7426220787174598080/pulsar-nar/pulsar-io-kafka-connect-adaptor-2.9.1.nar-unpacked/90ickkj44GhweghdXLZZsw] 2022-02-15T19:54:17,054+0000 [function-timer-thread-6-1] INFO org.apache.pulsar.functions.runtime.thread.ThreadRuntime - ThreadContainer starting function with instance config InstanceConfig(instanceId=0, functionId=f01479ec-ab28-42eb-9d83-4b1517f72c3d, functionVersion=9879029e-4d46-480f-8a72-3481f5a91771, functionDetails=tenant: "public" namespace: "mysql8pipeline" name: "debezium-kafka-source" className: "org.apache.pulsar.functions.api.utils.IdentityFunction" autoAck: true parallelism: 1 source { className: "org.apache.pulsar.io.kafka.connect.KafkaConnectSource" configs: "{}" typeClassName: "org.apache.pulsar.common.schema.KeyValue" } sink { topic: "all-the-databases" typeClassName: "org.apache.pulsar.common.schema.KeyValue" forwardSourceMessageProperty: true } resources { cpu: 1.0 ram: 1073741824 disk: 10737418240 } componentType: SOURCE , maxBufferedTuples=1024, functionAuthenticationSpec=null, port=0, clusterName=local, maxPendingAsyncRequests=1000, exposePulsarAdminClientEnabled=false, metricsPort=0) 2022-02-15T19:54:17,149+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Starting Pulsar producer perf with config: {"topicName":"all-the-databases","producerName":null,"sendTimeoutMs":0,"blockIfQueueFull":true,"maxPendingMessages":1000,"maxPendingMessagesAcrossPartitions":50000,"messageRoutingMode":"CustomPartition","hashingScheme":"Murmur3_32Hash","cryptoFailureAction":"FAIL","batchingMaxPublishDelayMicros":10000,"batchingPartitionSwitchFrequencyByPublishDelay":10,"batchingMaxMessages":1000,"batchingMaxBytes":131072,"batchingEnabled":true,"chunkingEnabled":false,"compressionType":"LZ4","initialSequenceId":null,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"multiSchema":true,"accessMode":"Shared","lazyStartPartitionedProducers":false,"properties":{"application":"pulsar-source","id":"public/mysql8pipeline/debezium-kafka-source","instance_hostname":"pulsar-ftw-0.pulsar-ftw.pipeline.svc.cluster.local","instance_id":"0"}} 2022-02-15T19:54:17,151+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":null,"tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":null,"tlsTrustStorePassword":null,"tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":0,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null} 2022-02-15T19:54:17,240+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [all-the-databases] [null] Creating producer on cnx [id: 0x144d34b8, L:/127.0.0.1:53656 - R:localhost/127.0.0.1:6650] 2022-02-15T19:54:17,247+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [all-the-databases] [standalone-0-23] Created producer on cnx [id: 0x144d34b8, L:/127.0.0.1:53656 - R:localhost/127.0.0.1:6650] 2022-02-15T19:54:17,345+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [all-the-databases] [standalone-0-23] Closed Producer ^C2022-02-15T19:54:28,778+0000 [Thread-1] INFO org.apache.pulsar.functions.runtime.thread.ThreadRuntime - Unloading JAR files for function InstanceConfig(instanceId=0, functionId=f01479ec-ab28-42eb-9d83-4b1517f72c3d, functionVersion=9879029e-4d46-480f-8a72-3481f5a91771, functionDetails=tenant: "public" namespace: "mysql8pipeline" name: "debezium-kafka-source" className: "org.apache.pulsar.functions.api.utils.IdentityFunction" autoAck: true parallelism: 1 source { className: "org.apache.pulsar.io.kafka.connect.KafkaConnectSource" configs: "{}" typeClassName: "org.apache.pulsar.common.schema.KeyValue" } sink { topic: "all-the-databases" typeClassName: "org.apache.pulsar.common.schema.KeyValue" forwardSourceMessageProperty: true } resources { cpu: 1.0 ram: 1073741824 disk: 10737418240 }
Beta Was this translation helpful? Give feedback.
All reactions