Saturday, 7 August 2021

Why Avro is the best choice for Kafka (Upto 91% compression - more SAVING!!)

 I will explain this with an example : 

Consider you have a data of 30 TB in JSON/ or any other format -

 If you directly store this data into Kafka Topic, you will need to pay according to the space you are consuming (if using the cloud)

Advice to use AVRO format in Kafka topic to compress the data 

Research shows that we can save up to 91% of space using Avro

Example :

Data : 30 TB

After Avro conversion : 2.7 TB (up to 91% )

For this, you might need to convert JSON data to Avro data, which is complex but doable. 





Tuesday, 27 July 2021

Avro Schema Evolution

When you work with data, schema evolution plays a very crucial role.


For instance, while working on Kafka with Avro data you might have observed many exceptions that the producer/consumer clients face wrt schema compatibility. It's important to understand the concept of schema evolution.

What is Avro?
~ In order to transfer data over the network you need data to be serialized (data conversion to binary format) and Avro is one such system or tool that helps you achieve it.
~ Avro depends on schema and you can also think of it as a JSON format with the schema attached to it. And that's the main reason why Avro is preferred over JSON because you can't enforce schema with JSON.
~ Avro is fast & compact.
~ Data is fully typed and named.

A simple Avro schema can look like this -
{
"type": "record",
"name": "Employee",
"fields" : [
{"name":"emp_name", "type":"string"},
{"name":"emp_id", "type":"long"},
{"name":"department", "type":"string"}
}

Schema Evolution -
In order to evolve an Avro schema, you need to keep some important things in mind in order to make the changes compatible.

Backward Compatibility -
Your producer application is producing messages/data using an old schema and your consumer application can read the data using a newly evolved schema.

Let's use the above schema to understand it.
Suppose the Producer app created a record using the following schema -

{
"type": "record",
"name": "Employee",
"fields" : [
{"name":"emp_name", "type":"string"},
{"name":"emp_id", "type":"long"},
{"name":"department", "type":"string"}
}

(so the record has emp_name, emp_id, and department)

Now, the Consumer app on the other side reads this record using a newly evolved schema that doesn't contain the field department.

{
"type": "record",
"name": "Employee",
"fields" : [
{"name":"emp_name", "type":"string"},
{"name":"emp_id", "type":"long"}
}

But the consumer is still able to read the record and the data would just have emp_name and emp_id (the department is silently ignored).

Forward Compatibility -
Your producer app uses a new schema to write messages and your consumer app can read the messages using an old schema.

~ There could exist a combination of both backward and forward compatible schema as well, a fully compatible schema.

Some of the rules that I personally found useful for creating compatible schemas -
- You can easily add a field with a default value in the new schema.
Now suppose the producer is writing using an old schema and the consumer uses this new schema, as we have a default value associated with the newly added field we don't need to worry about this field missing in the producer schema and the field would get default value on the consumer side.

- You can easily remove a field having a default value in the new schema.
- You can't rename a field but you can use aliases.
- You can't change the data type.

Credit goes to Mayank Ahuja

Friday, 16 April 2021

Spark issue : Spark Out of Memory Issue

 Out Of Memory is the most common type of error you might face in Spark

1. You can get this error at Driver 

2. or You can get this error at the Executor level.


1. Driver 

Collect Operation

If we have big files and we try to collect those in Driver. You will see Driver OOM(i.e Out Of Memory Error)

Broadcast Join

If we try to broadcast large files. Then again you will see OOM. 

2. Executor 

High Concurrency 

If we assign more cores to a single executor then it might possible for each partition we have fewer resources(memory) but need to process large files then we get OOM.

As a benchmark - use 3 or 5 cores to a single executor, not more than this

Large Partitions causes OOM

If you have a larger partition then if the executor tries to process that data then chances are it fails with OOM 

As a benchmark - Use appropriate partitions

Yarn Memory Overhead OOM

Executor container consists of Yarn Memory Overhead + Executor memory

Normally Yarn Memory Overhead(off-heap memory) should be 10% of Executor memory.

This Yarn Memory Overhead memory is used to store spark internal objects. Some time those need more space and throw OOM 

If this error you will get, try to increase the Yarn Memory Overhead parameter. 


I hope you like this blog. 

Please subscribe and follow this blog for more information on Big Data Journey


Monday, 8 March 2021

Conversion of Kafka topics from JSON to Avro with KSQL

 Here’s a dummy topic, in JSON:

$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic mysql_users
{"uid":1,"name":"Cliff","locale":"en_US","address_city":"St Louis","elite":"P"}
{"uid":2,"name":"Nick","locale":"en_US","address_city":"Palo Alto","elite":"G"}

In KSQL declare the source stream, specifying the schema (here a subset of the full schema, just for brevity):

ksql> CREATE STREAM source (uid INT, name VARCHAR) WITH (KAFKA_TOPIC='mysql_users', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

Now create a derived stream, specifying the target serialization (Avro) and the target topic (this is optional; without it will just take the name of the stream):

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> CREATE STREAM target_avro WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='mysql_users_avro') AS SELECT * FROM source;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>

Check out the resulting Avro topic:

$ kafka-avro-console-consumer \
                   --bootstrap-server localhost:9092 \
                   --property schema.registry.url=http://localhost:8081 \
                   --topic mysql_users_avro --from-beginning
{"UID":{"int":1},"NAME":{"string":"Cliff"}}
{"UID":{"int":2},"NAME":{"string":"Nick"}}

Because KSQL is a continuous query, any new records arriving on the source JSON topic will be automatically converted to Avro on the derived topic.