This is a Java code for a Hadoop MapReduce job that analyzes employee salaries. The code consists of three main components:
Mapper (SalaryMapper): This is the first step in the MapReduce job. It reads input records line by line and splits them into fields using comma as a delimiter. It then extracts the department and salary information from each record and emits them as key-value pairs, where the department is the key (of type Text) and the salary is the value (of type DoubleWritable).
Reducer (SalaryReducer): This is the second step in the MapReduce job. It receives the key-value pairs emitted by the mapper, where the key is the department and the value is the salary. It calculates the average salary for each department by summing up all the salaries and dividing by the total count of salaries. It then emits the designation as the key and the average salary as the value.
Main driver (main method): This is the entry point of the MapReduce job. It configures and runs two jobs sequentially. The first job (job1) reads input from a specified path (args[0]) and writes intermediate output to a temporary path (args[1]). The second job (job2) reads input from the temporary path written by job1 and writes the final output to a specified path (args[2]). The output of job2 contains the average salary for each department.
The input format for this job is assumed to be plain text files, where each line represents an employee record with comma-separated fields
(e.g., employee ID, name, designation, salary).
The output format is also plain text files, where each line contains the designation and its corresponding average salary separated by a tab or space.
Note that this code uses TextInputFormat and TextOutputFormat as the input and output formats, respectively. You can modify these formats based on your specific requirements. Also, make sure to provide the correct input and output paths as command-line arguments when running this job.
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SalaryAnalysis {
public static class SalaryMapper extends
Mapper<LongWritable, Text, Text, DoubleWritable> {
private Text department = new Text();
private DoubleWritable salary = new DoubleWritable();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] employeeRecord = value.toString().split(",");
department.set(employeeRecord[2]);
salary.set(Double.parseDouble(employeeRecord[3]));
context.write(department, salary);
}
}
public static class SalaryReducer extends
Reducer<Text, DoubleWritable, Text, DoubleWritable> {
private DoubleWritable result = new DoubleWritable();
public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
double sum = 0;
int count = 0;
for (DoubleWritable val : values) {
sum += val.get();
count++;
}
double avgSalary = sum / count;
result.set(avgSalary);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf1 = new Configuration();
Job job1 = Job.getInstance(conf1, "Salary Analysis Job 1");
job1.setJarByClass(SalaryAnalysis.class);
job1.setMapperClass(SalaryMapper.class);
job1.setReducerClass(SalaryReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(DoubleWritable.class);
job1.setInputFormatClass(TextInputFormat.class);
job1.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.addInputPath(job1, new Path(args[0]));
TextOutputFormat.setOutputPath(job1, new Path(args[1]));
boolean success = job1.waitForCompletion(true);
if (success) {
Configuration conf2 = new Configuration();
Job job2 = Job.getInstance(conf2, "Salary Analysis Job 2");
job2.setJarByClass(SalaryAnalysis.class);
job2.setMapperClass(SalaryMapper.class);
job2.setReducerClass(SalaryReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(DoubleWritable.class);
job2.setInputFormatClass(TextInputFormat.class);
job2.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.addInputPath(job2, new Path(args[1]));
TextOutputFormat.setOutputPath(job2, new Path(args[2]));
System.exit(job2.waitForCompletion(true) ? 0 : 1);
}
}
}
Input file:
101,raju,prof,20000
102,rani,prof,15000
103,vani,prof,10000
104,suresh,assoc prof,8000
105,mahesh,assoc prof,7000
106,kiran,asst,5000
107,nani,asst,4000
108,hari,asst,3000
109,ramya,asst,2000