Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for topics with multiple schemas #114

Closed
art-i-svsg opened this issue May 18, 2021 · 11 comments
Closed

Add support for topics with multiple schemas #114

art-i-svsg opened this issue May 18, 2021 · 11 comments

Comments

@art-i-svsg
Copy link

This functionality has been developed in wepay#238, but looks like its abandoned since then.

Is it possible to integrate this solution into connector?

@C0urante
Copy link

The connector doesn't support this natively, but it's possible to implement by the use of SMTs that redirect records to topics based on their schemas (or on some other predicate that has a directly correlation with their schema).

Just note that the use of SMTs that change the topic/partition/offset information of a record cannot be used in combination with the upsert.enabled and/or delete.enabled properties.

@OuesFa
Copy link

OuesFa commented Nov 17, 2021

Hi @C0urante 👋
Can you please share an example of similar use of the SMTs?

@C0urante
Copy link

Sure, there's one in the quickstart config:

# An example regex router SMT that strips (kcbq_) from the topic name.
# Replace with relevant regex to replace the topic of each sink record with
# destination dataset and table name in the format <dataset>:<tableName> or only the destination
# table name in the format <tableName>
transforms=RegexTransformation
transforms.RegexTransformation.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.RegexTransformation.regex=(kcbq_)(.*)
transforms.RegexTransformation.replacement=$2

@OuesFa
Copy link

OuesFa commented Nov 17, 2021

Thanks @C0urante, I'm already using this conf in order to route data to the table with the name I want but I can't see how I could use these confs in order to redirect records from one topic to multiple BQ tables depending on their schema.

@OuesFa
Copy link

OuesFa commented Nov 17, 2021

I'm providing a little more context on my usecase, basically I'm using io.confluent.kafka.serializers.subject.RecordNameStrategy to write data within my Kafka topics, so I'm wondering if there is a way to specify it to the connector in order to create a separate BQ table for each schema.
I can see in this comment that you were suggesting to provide a way for users to specify that via a configuration somewhere but I don't know if it's possible or way or another.
I've to admit that the SMT approach is not clear to me for now.

@OuesFa
Copy link

OuesFa commented Nov 17, 2021

Schemas are stored into the Confluent Schema Registry

@C0urante
Copy link

Ah sorry, I thought you wanted a general demonstration of how to configure an SMT.

For your specific use case I don't know if there is an already-implemented SMT available, but it shouldn't be too hard to implement one that redirects records to different topics based on, e.g., the name of the record's schema. You can see some examples of how to implement an SMT in the Kafka Connect codebase here, including the RegexRouter and ExtractField SMTs.

@OuesFa
Copy link

OuesFa commented Nov 18, 2021

Thanks for the lead.
Seems that there is no way to have the schema name queried from the schema registry using SMTs 🤔
At least I can't find any SMT using that information and no lead on that part anywhere.

@OuesFa
Copy link

OuesFa commented Nov 18, 2021

I just found this one that is promising.
But I can see in the connector limitations doc that it

does not currently support any Single Message Transformations (SMTs) that modify the topic name

Also I can't find the SMT mentioned above within the list of supported SMTs https://docs.confluent.io/cloud/current/connectors/single-message-transforms.html#list-of-available-smts

@C0urante
Copy link

C0urante commented Nov 18, 2021

Seems that there is no way to have the schema name queried from the schema registry using SMTs 🤔

Doesn't the Confluent Avro converter automatically populate the schema name based on the name of the Avro record it was derived from?

I just found this one that is promising.

Ah yeah! That's exactly what I was thinking of. You can probably combine this with the RegexRouter to get what you want; SchemaNameToTopic can be used to disambiguate records coming from the same topic based on their record name, and RegexRouter can be used as an optional follow-up in case you want to route to a table whose name doesn't exactly match the name of the record's schema.

But I can see in the connector limitations doc that it

does not currently support any Single Message Transformations (SMTs) that modify the topic name

I'm sorry, those docs are incorrect and I've reached out to the team that owns them to see about publishing a correction. The connector works fine with SMTs that modify topic/partition/offset, unless you are running with upsert/delete turned on (i.e., upsertEnabled and/or deleteEnabled are true in the connector config).

@OuesFa
Copy link

OuesFa commented Nov 18, 2021

Thanks for your answer !
Indeed it works well using that SMT
It was not easy to figure out the exact configs to use but here it is, simply:

          transforms: "SchemaNameToTopic"
          transforms.SchemaNameToTopic.type: "com.github.jcustenborder.kafka.connect.transform.common.SchemaNameToTopic$Value"

and RegexRouter can be used as an optional follow-up in case you want to route to a table whose name doesn't exactly match the name of the record's schema.

Good idea I didn't think about that, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants