Here’s a dummy topic, in JSON:
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic mysql_users
{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}
In KSQL declare the source stream, specifying the schema (here a subset of the full schema, just for brevity):
ksql> CREATE STREAM source (uid INT, name VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
Now create a derived stream, specifying the target serialization (Avro) and the target topic (this is optional; without it will just take the name of the stream):
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> CREATE STREAM target_avro WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='mysql_users_avro') AS SELECT * FROM source;
Message
----------------------------
Stream created and running
----------------------------
ksql>
Check out the resulting Avro topic:
$ kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic mysql_users_avro --from-beginning
{"UID":{"int":1},"NAME":{"string":"Cliff"}}
{"UID":{"int":2},"NAME":{"string":"Nick"}}
Because KSQL is a continuous query, any new records arriving on the source JSON topic will be automatically converted to Avro on the derived topic.