Pages

Tuesday, 11 April 2023

Chaining MapReduce jobs - Chaining preprocessing and postprocessing steps

You’ve been doing data processing tasks which a single MapReduce job can accomplish. As you get more comfortable writing MapReduce programs and take on more ambitious data processing tasks, you’ll find that many complex tasks need to be broken down into simpler subtasks, each accomplished by an individual MapReduce job. 
For example, from the citation data set you may be interested in finding the ten most cited patents. A sequence of two MapReduce jobs can do this. The first one creates the “inverted” citation data set and counts the number of citations for each patent, and the second job finds the top ten in that “inverted” data.

Chaining MapReduce jobs in a sequence
Though you can execute the two jobs manually one after the other, it’s more convenient to automate the execution sequence. You can chain MapReduce jobs to run sequentially, with the output of one MapReduce job being the input to the next. 
Chaining MapReduce jobs is analogous to Unix pipes.
mapreduce-1 | mapreduce-2 | mapreduce-3 | ...

Chaining preprocessing and postprocessing steps
A lot of data processing tasks involve record-oriented preprocessing and postprocessing. You can write a separate MapReduce job for each of these pre and postprocessing steps and chain them together, using Identity Reducer for these steps

Hadoop introduced the ChainMapper and the ChainReducer to simplify the composition of pre- and postprocessing.

You can think of chaining MapReduce jobs, using the pseudo-regular expression:
[MAP | REDUCE]+
where a reducer REDUCE comes after a mapper MAP, and this [MAP | REDUCE] sequence can repeat itself one or more times, one right after another. 

The analogous expression for a job using ChainMapper and ChainReducer would be
MAP+ | REDUCE | MAP*
The job runs multiple mappers in sequence to preprocess the data, and after running reduce it can optionally run multiple mappers in sequence to postprocess the data.

Consider the example where there are four mappers (Map1, Map2, Map3, and Map4) and one reducer (Reduce), and they’re chained into a single MapReduce job in this sequence:
Map1 | Map2 | Reduce | Map3 | Map4
In this setup, you should think of Map2 and Reduce as the core of the MapReduce job, with the standard partitioning and shuffling applied between the mapper and reducer. You should consider Map1 as a preprocessing step and Map3 and Map4 as postprocessing steps.

Configuration conf = getConf();
JobConf job = new JobConf(conf);

job.setJobName("ChainJob");
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);

JobConf map1Conf = new JobConf(false);
ChainMapper.addMapper
(job,
Map1.class, 
LongWritable.class,
Text.class, 
Text.class, 
Text.class, 
true, 
map1Conf);  //Add Map1 step to job

JobConf map2Conf = new JobConf(false);
ChainMapper.addMapper
(job,
Map2.class, 
Text.class, 
Text.class, 
LongWritable.class,
Text.class, 
true, 
map2Conf);  //Add Map2 step to job

JobConf reduceConf = new JobConf(false);
ChainReducer.setReducer
(job,
Reduce.class,
LongWritable.class,
Text.class, 
Text.class, 
Text.class, 
true, 
reduceConf);  //Add Reduce step to job

JobConf map3Conf = new JobConf(false);
ChainReducer.addMapper
(job,
Map3.class, 
Text.class, 
Text.class, 
LongWritable.class,
Text.class, 
true, 
map3Conf); //Add Map3 step to job

JobConf map4Conf = new JobConf(false);
ChainReducer.addMapper(job,
Map4.class, 
LongWritable.class,
Text.class, 
LongWritable.class,
Text.class, 
true, 
map4Conf); // Add Map4 step to job
JobClient.runJob(job);

This method has eight arguments. The first and last are the global and local JobConf objects, respectively. The second argument (class) is the Mapper class that will do the data processing. The four arguments inputValueClass, inputKeyClass, outputKeyClass, and outputValueClass are the input/output class types of the Mapper class

public static <K1,V1,K2,V2> void 
      addMapper(JobConf job Class<? extends 
                     Mapper<K1,V1,K2,V2>> Class,
                                Class<? extends K1> inputKeyClass,
                                Class<? extends V1> inputValueClass,
                                Class<? extends K2> outputKeyClass,
                                Class<? extends V2> outputValueClass,
                                boolean byValue,
                                JobConf mapperConf)

No comments:

Post a Comment

Friends-of-friends-Map Reduce program

Program to illustrate FOF Map Reduce: import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; import or...