Monday, 8 March 2021

Conversion of Kafka topics from JSON to Avro with KSQL

 Here’s a dummy topic, in JSON:

$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic mysql_users
{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}

In KSQL declare the source stream, specifying the schema (here a subset of the full schema, just for brevity):

ksql> CREATE STREAM source (uid INT, name VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

Now create a derived stream, specifying the target serialization (Avro) and the target topic (this is optional; without it will just take the name of the stream):

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> CREATE STREAM target_avro WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='mysql_users_avro') AS SELECT * FROM source;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>

Check out the resulting Avro topic:

$ kafka-avro-console-consumer \
                   --bootstrap-server localhost:9092 \
                   --property schema.registry.url=http://localhost:8081 \
                   --topic mysql_users_avro --from-beginning
{"UID":{"int":1},"NAME":{"string":"Cliff"}}
{"UID":{"int":2},"NAME":{"string":"Nick"}}

Because KSQL is a continuous query, any new records arriving on the source JSON topic will be automatically converted to Avro on the derived topic.