Monday, 27 March 2017

Hadoop - Processing

Start your Big Data Learning journey from here with us .   Home Page : Dive into Big Data Hadoop




Hadoop - Processing i.e. MapReduce 

In this blog we will discuss on latest MapReduce framework which is MR2 (above 2.0 versions).

Brief on MapReduce :

Life cycle of data being processed through the MapReduce framework :

1. Creation of input split

2.Mapper

3.Combiner

4.Partitioner

5.Shuffle and Sort

6.Reducer

Lets explain above cycle using example :

1. Assume that we have a 384 GB of data to process .


    File : Customer_Data.csv

    Fields : CustomerID,Name_of_the_customer,Product,Price,Date_of_Purchase

    Sample Data : 001,Tony Bridge,Apple 7,49990,Amazon,1/4/2017

                             002,Smita Arora Singh , Apple 7 Plus , 60000,Ebay,1/4/2017


2. Input Split : At this stage data will divide to process in parallel.

Input Split is nothing but the Splitting of input data in logical partition . 


Example:  As we have consider we have 384 GB of data and default setting of block size as 128 MB.
Physical Partitions :  384GB /128 MB = approx 3000 parts
But there may be possibility that your parts1 and parts2 makes a complete record ,then this inputSplit concepts come into picture and parts1 and parts2 will become InputSplit 1 and so on .
That means it is Input Split is logical partition of data.  

Record Reader : This Depends on the InputFormat default is TextInputFormat. Lets consider we have not set any Input format then it takes as TextInputFormat.

Note :We have to understand this concept that anything input to Mapper & Reducer should be in the form of (Key,Value) format .

Record Reader as name specify read a single record at a time .
As the Input format is TextInputFormat then it will do as below :
(Key ,Value )
( 0,001,Tony Bridge,Apple 7,49990,Amazon,1/4/17)        
   
(44,002,Smita Arora Singh , Apple 7 Plus , 60000,Ebay,1/4/17)    
where key as offset of data and value as actual record.

3. Mapper : This requires input in Key-Value Pair. It will pick each records from Input Split and Process it . Each Mapper will run for each records in the Input Split.

Syntax:

public class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

extends Object

Assume the requirement is to find total number of product sale till date.

Example : 


Input of Mapper :
( 0,001,Tony Bridge,Apple 7,49990,Amazon,1/4/17) 

Key : 0
Value :001,Tony Bridge,Apple 7,49990,Amazon,1/4/17

public class RetailMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text Product = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
String parts=value.toString().split(",");Product = parts[1]
context.write(Product, one);
}
}
Sample Mapper Output :

Apple 7 , 1 

Apple 7 Plus , 1

Samsung S8 , 1 

Description of Mapper Phase :  

1.Split Values with comma (,) delimiter.
2. Consider Product as a Key and rest of values as One
Key : Product
Value : One

As we can see that this not gives the required output . Remaining part of the coding will be in Reducers phase . 

4.Combiner :  This phase is also called as Mini Reducer . The combiner should combine key/value pairs with the same key. Each combiner may run zero, once, or multiple times.

There might be situation where Mapper is producing lots of output files to process in that case we can use Combiner . So that reducer has to process less files .

Example : Lets assume there are 500 Mappers and 1 Reducer and your task is to minimize the time required by reducer to process the data or minimize the total job time.

In order to this  we can use Combiner in our Map-Reduce Program.

This phase is also use to optimize the Map-Reduce code . 

5. Partitioning : The mechanism of sending which output of Mapper to which Reducer is called Partitioning.

Default Partitioner uses a hashing on the keys to distribute them to the reduce tasks, but you can override it and use your own custom Partitioner.

Custom Partitioner Example :

For this we need to implement  "getPartition(Text key, Text value, int numPartitions)" 

Let consider product having cost less that 50K should go to Reducer1 and product cost more than 50K go to Reducer2.

Then method would be like as follow :

public int getPartition(Text key, Text value, int numPartitions)

  {
   String parts=value.toString().split(",");
    String Product_Cost = parts[2]
   int p_cost =Integer.parseInt(parts[2]);

   if((p_cost>=1)&&(p_cost<=50000))

   return 0;

   else

 return 1;}

6.Shuffle and Sort :

Shuffling and sorting are not performed at all if you specify zero reducers (setNumReduceTasks(0)). Then, the MapReduce job stops at the map phase, and the map phase does not include any kind of sorting.

The shuffle and sort phase is done by the MapReduce framework. Data from all mappers are grouped by the key, split among reducers and sorted by the key. Each reducer obtains all values associated with the same key.


7.Reducer : 

The reducer obtains sorted key/[values list] pairs, sorted by the key. The value list contains all values with the same key produced by mappers. Each reducer emits zero, one or multiple output key/value pairs for each input key/value pair.

Syntax :

public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

extends Object

Lets complete our requirement which is remaining in Mapper Phase :
public class IntSumReducer<Key> extends Reducer<Key,IntWritable,
                                                 Key,IntWritable> {
   private IntWritable result = new IntWritable();

   public void reduce(Key key, Iterable<IntWritable> values,
                      Context context) throws IOException, InterruptedException {
     int sum = 0;
     for (IntWritable val : values) {
       sum += val.get();
     }
     result.set(sum);
     context.write(key, result);
   }
 }

Reducer Output as <Key:Product, Value :Sales till date>:

Apple 7 , 78909889
Apple 7 Plus , 67978979
Samsung S8, 7898789

Now we have the Total product sales till date .
















No comments:

Post a Comment