Pages

Thursday 30 March 2023

Chaining with MapReduce and MRJob -Program

The process of combining multiple mappers to complete a job  which is better then putting everything a single MapReduce job and making it very complex .

logic:
mapper1 > reducer1 > mapper2 > reducer2 > . . .

 
Explanation:
This is a Java code for a Hadoop MapReduce job that analyzes employee salaries. The code consists of three main components:

Mapper (SalaryMapper): This is the first step in the MapReduce job. It reads input records line by line and splits them into fields using comma as a delimiter. It then extracts the department and salary information from each record and emits them as key-value pairs, where the department is the key (of type Text) and the salary is the value (of type DoubleWritable).

Reducer (SalaryReducer): This is the second step in the MapReduce job. It receives the key-value pairs emitted by the mapper, where the key is the department and the value is the salary. It calculates the average salary for each department by summing up all the salaries and dividing by the total count of salaries. It then emits the designation as the key and the average salary as the value.

Main driver (main method): This is the entry point of the MapReduce job. It configures and runs two jobs sequentially. The first job (job1) reads input from a specified path (args[0]) and writes intermediate output to a temporary path (args[1]). The second job (job2) reads input from the temporary path written by job1 and writes the final output to a specified path (args[2]). The output of job2 contains the average salary for each department.

The input format for this job is assumed to be plain text files, where each line represents an employee record with comma-separated fields
(e.g., employee ID, name, designation, salary).
(e.g., 101, raju, prof, 20000).
 
The output format is also plain text files, where each line contains the designation and its corresponding average salary separated by a tab or space.
 
Note that this code uses TextInputFormat and TextOutputFormat as the input and output formats, respectively. You can modify these formats based on your specific requirements. Also, make sure to provide the correct input and output paths as command-line arguments when running this job. 
 
Program:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SalaryAnalysis {

  public static class SalaryMapper extends
      Mapper<LongWritable, Text, Text, DoubleWritable> {
    
    private Text department = new Text();
    private DoubleWritable salary = new DoubleWritable();

    public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      
      String[] employeeRecord = value.toString().split(",");
      department.set(employeeRecord[2]);
      salary.set(Double.parseDouble(employeeRecord[3]));
      context.write(department, salary);
    }
  }

  public static class SalaryReducer extends
      Reducer<Text, DoubleWritable, Text, DoubleWritable> {

    private DoubleWritable result = new DoubleWritable();

    public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
        throws IOException, InterruptedException {
      double sum = 0;
      int count = 0;
      for (DoubleWritable val : values) {
        sum += val.get();
        count++;
      }
      double avgSalary = sum / count;
      result.set(avgSalary);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf1 = new Configuration();
    Job job1 = Job.getInstance(conf1, "Salary Analysis Job 1");
    job1.setJarByClass(SalaryAnalysis.class);
    job1.setMapperClass(SalaryMapper.class);
    job1.setReducerClass(SalaryReducer.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(DoubleWritable.class);
    job1.setInputFormatClass(TextInputFormat.class);
    job1.setOutputFormatClass(TextOutputFormat.class);
    TextInputFormat.addInputPath(job1, new Path(args[0]));
    TextOutputFormat.setOutputPath(job1, new Path(args[1]));
    boolean success = job1.waitForCompletion(true);
    if (success) {
        Configuration conf2 = new Configuration();
        Job job2 = Job.getInstance(conf2, "Salary Analysis Job 2");
        job2.setJarByClass(SalaryAnalysis.class);
        job2.setMapperClass(SalaryMapper.class);
        job2.setReducerClass(SalaryReducer.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(DoubleWritable.class);
        job2.setInputFormatClass(TextInputFormat.class);
        job2.setOutputFormatClass(TextOutputFormat.class);
        TextInputFormat.addInputPath(job2, new Path(args[1]));
        TextOutputFormat.setOutputPath(job2, new Path(args[2]));
        System.exit(job2.waitForCompletion(true) ? 0 : 1);
    }
  }
}
 
Input file:
101,raju,prof,20000
102,rani,prof,15000
103,vani,prof,10000
104,suresh,assoc prof,8000
105,mahesh,assoc prof,7000
106,kiran,asst,5000
107,nani,asst,4000
108,hari,asst,3000
109,ramya,asst,2000 

Step1:create a folder cmr and allot permission to access from non root
root@ubuntu:/home/hduser# chmod -R 777 cmr
Step 2:generate class files for each java file using the following commands
hduser@ubuntu:~/cmr$ export CLASSPATH=`hadoop classpath`
hduser@ubuntu:~/cmr$ echo $CLASSPATH
hduser@ubuntu:~/cmr$ javac -d . SalaryAnalysis.java
Step 3:Create a jar file using the following command
hduser@ubuntu:~/cmr$ jar -cvf cm.jar -C /home/hduser/cmr .
Step 4:create a folder chain and then copy the file input.txt under DFS using the following commands 
hduser@ubuntu:~/cmr$ hadoop fs -mkdir /crk
hduser@ubuntu:~/cmr$ hadoop fs -put input.txt /crk
hduser@ubuntu:~/cmr$ hadoop fs -lsr /crk
 
 
 
Step 5: Now run the hadoop jar command
hduser@ubuntu:~/cmr$ hadoop jar cm.jar SalaryAnalysis  /crk/input.txt /crk/joy
 

Joining data from different sources in a MapReduce environment can be challenging, but it can be done by chaining multiple MapReduce jobs together. Here are the general steps to join data from different sources in chaining MapReduce jobs:

Identify the data sources: Determine the data sources that need to be joined, and the common key that will be used to join them.

Map the data: Map the data from each source into key-value pairs, with the common key as the key. Each data source should be mapped to a different key space.

Reduce the data: Reduce the data by joining the key-value pairs with the common key. This can be done in a single MapReduce job, but it may be more efficient to split it into multiple jobs.

Chain the MapReduce jobs: If multiple jobs are used, chain them together by specifying the output of one job as the input of the next job.

Optimize the join: Optimize the join by considering the data size, skewness, and distribution. For example, if one data source is much larger than the other, consider using a distributed cache or a broadcast variable to improve performance.

Test and iterate: Test the join and iterate as needed to improve performance and accuracy.

Overall, joining data from different sources in chaining MapReduce jobs requires careful planning and optimization. By mapping and reducing the data appropriately, and by chaining the MapReduce jobs together, you can join data from different sources efficiently and accurately.

No comments:

Post a Comment

Friends-of-friends-Map Reduce program

Program to illustrate FOF Map Reduce: import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; import or...