Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for topics with multiple schemas #238

Closed
wants to merge 7 commits into from

Conversation

mkubala
Copy link
Contributor

@mkubala mkubala commented Jan 9, 2020

This contribution makes kafka-connect-bigquery compatible with topics that carry different types of messages, especially when Kafka is used for an event sourcing.

Approach of putting several event types in a single topic is already supported by Kafka and Schema Registry (see https://www.confluent.io/blog/put-several-event-types-kafka-topic/).

This PR has been derived from #219 and rebased on top of the recent changes made by @bingqinzhou

This contribution makes kafka-connect-bigquery compatible with topics that carry different types of messages, especially when Kafka is used for an event sourcing.
Approach of putting several event types in a single topic is already supported by Kafka and Schema Registry (see https://www.confluent.io/blog/put-several-event-types-kafka-topic/).
@jaroslawZawila
Copy link

@mtagle @criccomini @C0urante Any progress on this one?

@mkubala
Copy link
Contributor Author

mkubala commented Jan 23, 2020

@mtagle @criccomini @C0urante Is there any chance for this feature to be merged and included in the next release?

I do not want to exert pressure on anyone here, but it's been almost 3 months since I started work on this contribution after receiving a positive feedback in #175 and my team is blocked with their work on ML & reporting.

I'm between a rock and a hard place and if I should start looking for another solution (write my own custom service or connector?) I'd like to know it before the team get really mad.

@mtagle
Copy link
Contributor

mtagle commented Jan 23, 2020

hey @mkubala, I've asked @whynick1 to take a look at your PR. I'm eager to get this merged as well!

@skyzyx
Copy link
Contributor

skyzyx commented Jan 30, 2020

Codecov Report

Merging #238 into master will increase coverage by 0.25%.
The diff coverage is n/a.

@@             Coverage Diff              @@
##             master     #238      +/-   ##
============================================
+ Coverage     68.69%   68.95%   +0.25%     
- Complexity      267      273       +6     
============================================
  Files            32       32              
  Lines          1460     1498      +38     
  Branches        152      153       +1     
============================================
+ Hits           1003     1033      +30     
- Misses          409      417       +8     
  Partials         48       48              
Impacted Files Coverage Δ Complexity Δ
...a/connect/bigquery/utils/TopicToTableResolver.java 73.33% <0.00%> (-18.34%) 11.00% <0.00%> (-1.00%)

@mkubala mkubala requested a review from whynick1 January 30, 2020 12:19
@mkubala
Copy link
Contributor Author

mkubala commented Jan 30, 2020

Most of the comments have been addressed. I left some food for thoughts regarding moving the getSingleMatch method back from config to the TopicToTableResolver.

Also I'd be glad if you could take a look on #237 - it's another feature, essential to start using multi schema topics. When they finally get merged, I'll expose a third PR, which brings support for resolving datasets based on the topic and record names.

Copy link
Contributor

@whynick1 whynick1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, it looks really good! Added a few suggested changes.

* @param schemaType schema type used to resolve full subject when recordName is absent.
* @return corresponding schema registry subject.
*/
public String toSubject(KafkaSchemaRecordType schemaType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is leaking Confluent Schema Registry stuff into the API layer, which must be agnostic to specific schema implementations. The concept of a subject is specific to Confluent.

In fact, the entire concept of a record name is specific to the Avro/Confluent implementation. Protobufs will probably work with this since they also have record names, but what about JSON? IIRC, there is another PR that is about supporting JSON schemas.

I am concerned that this PR is far too tied to the Confluent Schema Registry, especially in the API layer.

I am open to changing the API interfaces to make it easier on the Confluent Schema Registry, but not in a way that excludes other potential implementations (especially ones like JSON, where there is active interest). Can you suggest how this can be altered to not leak Confluent Schema Registry assumptions into the API?

Copy link
Contributor Author

@mkubala mkubala Jan 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!
I'm working on decoupling this plugin from that particular schema implementation. The rough idea is to extract and encapsulate all the SR-related logic in a specialized classes (so that it should be easy to provide support for different schemas).

Copy link
Contributor Author

@mkubala mkubala Feb 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The responsibility for assembling Confluent Schema Registry subject names has been moved to SchemaRegistrySchemaRetriever.
Also the recordName field stays optional, so in case of an alternative implementations (like aforementioned JSON) developers won't have to bother with it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so @whynick1 and I have done some talking and investigating. I think we have a good idea about the best approach to achieve not only what you're trying to do, but to evolve KCBQ to support things in a more generic way in general. This amounts to:

  1. Adding a pluggable TableRouter that takes a SinkRecord and returns which table it should be written to. (Default: RegexTableRouter)
  2. Changing SchemaRetriever interface to have two methods: Schema getKeySchema(SinkRecord) and Schema getValueSchema(SinkRecord). (Default: IdentitySchemaRetriever, which just returns sinkRecord.keySchema() and sinkRecord.valueSchema())
  3. Changing the way that schema updates are handled in the AdaptiveBigQueryWriter.

This approach should give us a ton of flexibility including:

  1. It will allow you to pick which table to route each individual message based on all of the information in SinkRecord (topic, key schema, value schema, message payload, etc.)
  2. It will allow us to fix some known schema-evolution bugs in cases where one field is added and another is dropped.
  3. It will allow us to use SinkRecord's .keySchema and .valueSchema rather than talking to the schema registry for schemas.
  4. It will make it easy to support JSON messages, even those that return null for the .keySchema() and .valueSchema() methods--you can implement a custom retriever for this case.

I am going to write up a GH issue today that documents in detail what needs to be done to implement these changes, and we can go from there.

Does this sound good?

Copy link
Contributor

@whynick1 whynick1 Feb 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some note regarding @criccomini 's point 1, 2, 3.

  1. As an example to your case,
public class MultiSchemaRegexSTableRouter {
  Map<TopicAndRecordName, TableId> topicsToBaseTableIds;

 @override
 public TableId getTable(SinkRecord sinkRecord) {
    TopicAndRecordName key = new TopicAndRecordName(sinkRecord.topic(), sinkRecord.recordName());
    if (topicsToBaseTableIds.contains(key) {
      return topicsToBaseTableIds.get(key);
    } else {
      // return matched BQ table with topic & record name
    }
  }
}
  1. Splitting into getKeySchema, valueSchema also provide an option to load SinkRecord's key schema (if has) to BQ for various purpose including deduplication, debugging, etc.
  2. Automatic schema revolution now is default (should be tunable). If a new record uses new schema to insert (to BQ), KCBQ will try to update BQ schema with the latest one retrieved from Schema Registry. Now, we want to decouple from that, and instead derive new schema from Record, assuming json-like Record might not has a schema.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #245

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this idea!

Yesterday I was struggling with changing an existing code on my local branch, responsible for picking up datasets based on both topic and record name, so that it would keep the API part agnostic and completely unaware of different dataset routing strategies. At the end of the day I was a little bit frustrated, because I had to either add yet another, configurable class (similar to SchemaRetriever but for datasets) and letting plugin users to remember about combining the right schema retriever with the right dataset resolver or add responsibility for routing datasets to SchemaRetriever, thus breaking SRP rule...

The proposed TableRouter.getTable method solves all the problems!

@criccomini
Copy link
Contributor

@mkubala are you planning to update this with TableRouter?

@mkubala
Copy link
Contributor Author

mkubala commented Feb 24, 2020

Hi @criccomini!
Sorry for late response. I cannot tell you if and when I'll find time to update this PR.

I've been working on it as a part of my daily job and since we are behind the original schedule for BigQuery integration I found myself between a rock and a hard place.

My client, who paid for the contribution made so far, does not want to spend more money and see the working integration in our product ASAP.
On the other hand you guys want to keep as clean and maintainable codebase as possible.
Due to personal reasons I cannot sacrifice my spare time in order to introduce TableRouter to this PR in a reasonable time. Also I have yet another PR to expose (datasets based on both - topic & record names).

I wonder if there is any chance that you could approve & merge this PR "as it is" and TableRouter would be introduced as a separate PR, in the near future?

@criccomini
Copy link
Contributor

No, this PR adds too much complexity to the codebase, and isn't the right approach--the refactor is. I don't think it's much more work than this PR, itself. I am reaching out to @rhauch to see if you all can allocate time for refactor that's outlined in issue #245.

@mkubala
Copy link
Contributor Author

mkubala commented Feb 25, 2020

I understand. @rhauch please let me know if you start working on that.

@rhauch
Copy link

rhauch commented Feb 25, 2020

@mkubala sorry, I won’t have time to work on this.

@mkubala
Copy link
Contributor Author

mkubala commented Feb 26, 2020

There is light in the end of tunnel! I spoke to my client this morning and they will let me finish the feature as part of my daily job if I give them a kind of hard estimate / deadline.

How much time you will need to review this PR and release a new version of the plugin?
Can you guarantee that after this PR got merged (and optionally the other one with support for matching datasets by both topic and record name) a new version of the connector plugin will be released?

@mustosm
Copy link

mustosm commented Apr 15, 2020

Hello,
We are very interested in this feature. Can you tell us when you would consider adding @mkubala's contribution to your release?

@mkubala
Copy link
Contributor Author

mkubala commented Apr 15, 2020

@mustosm Maybe I'll find some spare time in the next couple of weeks to finish this feature.
Although I have no idea how much time it will take to re-review this PR and release a new version, so if you cannot wait then build connector from this branch on your own - that's what we did.

@mustosm
Copy link

mustosm commented Apr 15, 2020

@mkubala thank you for your answer. If the feature is released by wepay it would be better.
Are you using this feature in production today?

@mkubala
Copy link
Contributor Author

mkubala commented Apr 15, 2020

It's been used on the staging environment.

@OuesFa
Copy link

OuesFa commented Nov 17, 2021

Hi @mkubala 👋
Any update on this please?
Are you using this in prod env?
Did you make a release somewhere?

@OuesFa
Copy link

OuesFa commented Nov 29, 2021

👋
I'm sharing a solution for this using SMTs with the current version of the connector.
confluentinc#114 (comment)

@mkubala mkubala closed this Nov 28, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants