Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature request] Improvements for Avro messages producing #195

Closed
eshepelyuk opened this issue Apr 27, 2022 · 11 comments
Closed

[feature request] Improvements for Avro messages producing #195

eshepelyuk opened this issue Apr 27, 2022 · 11 comments

Comments

@eshepelyuk
Copy link

eshepelyuk commented Apr 27, 2022

Hello this is a follow up to #181

Since basic functionality now working, let me start from the beginning and provide more complicated examples.
Yet again I am comparing with similar functionality in AKHQ UI

Use case with errors

  1. The schema
{
  "name": "CreateUserProfileWallet",
  "namespace": "Messaging.Contracts.WalletManager.Commands",
  "type": "record",
  "fields": [
    { "name": "CurrencyCode", "type": "string" },
    { "name": "ExpiresOn", "type": ["null", "string"] }
  ]
}
  1. First JSON, where ExpiresOn = null
    { 
      "CurrencyCode": "EUR" 
    }
    kaf invocation and the output
    cat create.null.json | kaf produce --avro-schema-id 29 --input-mode full create_user_profile_wallet
    Failed to encode avro
    %!(EXTRA *errors.errorString=cannot decode textual record 
    "Messaging.Contracts.WalletManager.Commands.CreateUserProfileWallet": only found 1 of 2 fields)
    
  2. Second JSON, where ExpiresOn != null
    {
      "CurrencyCode": "EUR",
      "ExpiresOn": "2022-12-12"
    }
    kaf invocation and the output
    cat create.json | kaf produce --avro-schema-id 29 --input-mode full create_user_profile_wallet
    Failed to encode avro
    %!(EXTRA *errors.errorString=cannot decode textual record 
    "Messaging.Contracts.WalletManager.Commands.CreateUserProfileWallet": cannot decode textual union: expected: '{'; actual: '"')
    

So, the messages are not working with kaf but those files are valid Avro JSON messages and can be sent to a topic using some other tools, e.g. AKHQ.

Existing solution

To overcome a problem - JSON files must be changed to some unfriendly format.

  1. First JSON, where ExpiresOn == null
    {
      "CurrencyCode": "EUR",
      "ExpiresOn": {
        "null": null
      }
    }
  2. Second JSON, where ExpiresOn != null
    {
      "CurrencyCode": "EUR",
      "ExpiresOn": {
       "string": "2022-12-12"
     }
    }    

Improvement request

As a user I'd like kaf to be able to handle plain JSON (as described in the Use case section) for producing Avro encoded message. Without need to convert my plain JSON data to that weird format.

Currently this is working for some other tools, e.g. AKHQ UI, but it is missing for CLI based tools.

@birdayz
Copy link
Owner

birdayz commented Apr 27, 2022

Hm, this looks a bit like we use the avro library incorrectly. Maybe i can write a reproducer test this evening.
@fabiojmendes can you assist and have a look? I'm not an avro user myself, i'm not sure if i find the time to setup everything and test.

@birdayz
Copy link
Owner

birdayz commented Apr 28, 2022

@eshepelyuk do you have some instructions at hand to setup schema registry? i.e. docker image and curl command to install the schema?
i could give it a shot then.

@eshepelyuk
Copy link
Author

Let me provide a docker conpose later with kafka, zk and schema registry set up
Ok ?

@birdayz
Copy link
Owner

birdayz commented Apr 28, 2022

don't need kafka/zk - schema registry is enough (and the command to upload a schema). thanks!

@fabiojmendes
Copy link
Contributor

I had a look yesterday at this and here are my findings:

Using the official avro cli tools I've got the same error when omitting the ExpiresOn field:

cat wallet.json
{
  "name": "CreateUserProfileWallet",
  "namespace": "Messaging.Contracts.WalletManager.Commands",
  "type": "record",
  "fields": [
    { "name": "CurrencyCode", "type": "string" },
    { "name": "ExpiresOn", "type": ["null", "string"] }
  ]
}

cat input.json
{
  "CurrencyCode": "EUR"
}

java -jar avro-tools-1.11.0.jar jsontofrag --schema-file wallet.json input.json
22/04/28 10:23:38 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" org.apache.avro.AvroTypeException: Expected start-union. Got END_OBJECT
        at org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:511)
        at org.apache.avro.io.JsonDecoder.readIndex(JsonDecoder.java:430)
        at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
        at org.apache.avro.tool.JsonToBinaryFragmentTool.run(JsonToBinaryFragmentTool.java:77)
        at org.apache.avro.tool.Main.run(Main.java:67)
        at org.apache.avro.tool.Main.main(Main.java:56)

I also got the same behaviour when using the kafka-avro-console-producer from confluent:

 echo '{ "CurrencyCode": "EUR" }' | kafka-avro-console-producer \
 --broker-list localhost:9092 \
 --topic avro6  \
 --property schema.registry.url=http://localhost:8081 \
 --property value.schema.id=8
org.apache.kafka.common.errors.SerializationException: Error deserializing json { "CurrencyCode": "EUR" } to Avro of schema {"type":"record","name":"CreateUserProfileWallet","namespace":"Messaging.Contracts.WalletManager.Commands","fields":[{"name":"CurrencyCode","type":"string"},{"name":"ExpiresOn","type":["null","string"]}]}
        at io.confluent.kafka.formatter.AvroMessageReader.readFrom(AvroMessageReader.java:134)
        at io.confluent.kafka.formatter.SchemaMessageReader.readMessage(SchemaMessageReader.java:325)
        at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:51)
        at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
Caused by: org.apache.avro.AvroTypeException: Expected start-union. Got END_OBJECT
        at org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:511)
        at org.apache.avro.io.JsonDecoder.readIndex(JsonDecoder.java:430)
        at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
        at io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils.toObject(AvroSchemaUtils.java:214)
        at io.confluent.kafka.formatter.AvroMessageReader.readFrom(AvroMessageReader.java:124)

goavro is a little more lenient if you add a default to the field as in:

cat wallet.json
{
  "name": "CreateUserProfileWallet",
  "namespace": "Messaging.Contracts.WalletManager.Commands",
  "type": "record",
  "fields": [
    { "name": "CurrencyCode", "type": "string" },
    { "name": "ExpiresOn", "type": ["null", "string"], "default": null }
  ]
}

echo '{ "CurrencyCode": "EUR" }' | kaf produce --avro-schema-id 9 avro1
Sent record to partition 0 at offset 201.

For the second issue I found this goavro side. And it looks like it is the same behaviour on avro-tools as well. I had to add the "string" to the union:

cat input.json
{
  "CurrencyCode": "EUR",
  "ExpiresOn": {
    "string": "2022-04-28"
  }
}

java -jar avro-tools-1.11.0.jar jsontofrag --schema-file wallet.json input.json
22/04/28 10:38:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
EUR2022-04-28%

I'm not sure what AKHQ is doing, but is seems like they're going an extra step to fill those gaps.

@eshepelyuk
Copy link
Author

don't need kafka/zk - schema registry is enough (and the command to upload a schema). thanks!

Hello @birdayz

Attaching an archive with script to setup schema-registry and install a schema there.
Before running the script - edit it and point to your Kafka instance.
Kafka should have topic create_user_profile_wallet created.

Archive contains examples from my initial post
kaf.zip

@eshepelyuk
Copy link
Author

eshepelyuk commented Apr 29, 2022

Hi all

I've been able to advance on the issue using small python program that leverages fastavro python library.

So my script is able to create AVRO json ( i.e. weird JSON with types ) from plain JSON. Then resulting AVRO JSON can be sent to kafka using kaf.

Attaching a new archive containing the script. Plz install fastavro with pip to use it.
json2avro.zip

Script can be used like this

./json2avro.py create-null.json | kaf produce --avro-schema-id 2 create_user_profile_wallet

Could you take a look and check if your libraries support the same functionality, so it can be eventually implemented in kaf ?

@birdayz
Copy link
Owner

birdayz commented Apr 29, 2022

thanks so much for putting this together. i have a lead.

since v2, the avro library we use, linkedin/goavro, supports NewCodecForStandardJSON.
We so far use NewCodec.

I assume, there's effectively two kinds of avro codec: standard, and JSON.
i see two possibilities:
a) try to be smarter than the user, and try standard avro, and if it fails, try json avro. then give up
b) add flag --avro-codec standard|json.

I did a test and with the JSON codec this works:

$ echo '{ "CurrencyCode": "EUR", "ExpiresOn": null}' | kaf produce --avro-schema-id 1 avro1 --schema-registry localhost:80
84 --input-mode full
ZZ
Sent record to partition 0 at offset 4.

however, it requires expiresOn to be present. either as null, or with value. i think this is correct.

what do you think? i tend towards the flag, and defaulting to JSON actually.. since proto is also json format.
but i would like to hear other opinions as well. what's more intuitive, avro json or standard json by default?

@eshepelyuk
Copy link
Author

Well, as I user I consider that AVRO JSON is contra intuitive for daily usage, but still maybe useful for some edge cases.
So I would rather introduce a flag for this, but with Plain JSON as a default format.

@fabiojmendes
Copy link
Contributor

Yeah I agree. Having the flag but defaulting to plain json would be ideal. I can make this change since it touches the encoding interface I’ve been working on #194. Did you had a chance to take a look @birdayz on that design?

@birdayz
Copy link
Owner

birdayz commented Apr 29, 2022

@fabiojmendes thank you. i've taken a look. If you want, you can do the changes there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants