Pages

Thursday 30 March 2023

Chaining with MapReduce and MRJob -Program

The process of combining multiple mappers to complete a job  which is better then putting everything a single MapReduce job and making it very complex .

logic:
mapper1 > reducer1 > mapper2 > reducer2 > . . .

 
Explanation:
This is a Java code for a Hadoop MapReduce job that analyzes employee salaries. The code consists of three main components:

Mapper (SalaryMapper): This is the first step in the MapReduce job. It reads input records line by line and splits them into fields using comma as a delimiter. It then extracts the department and salary information from each record and emits them as key-value pairs, where the department is the key (of type Text) and the salary is the value (of type DoubleWritable).

Reducer (SalaryReducer): This is the second step in the MapReduce job. It receives the key-value pairs emitted by the mapper, where the key is the department and the value is the salary. It calculates the average salary for each department by summing up all the salaries and dividing by the total count of salaries. It then emits the designation as the key and the average salary as the value.

Main driver (main method): This is the entry point of the MapReduce job. It configures and runs two jobs sequentially. The first job (job1) reads input from a specified path (args[0]) and writes intermediate output to a temporary path (args[1]). The second job (job2) reads input from the temporary path written by job1 and writes the final output to a specified path (args[2]). The output of job2 contains the average salary for each department.

The input format for this job is assumed to be plain text files, where each line represents an employee record with comma-separated fields
(e.g., employee ID, name, designation, salary).
(e.g., 101, raju, prof, 20000).
 
The output format is also plain text files, where each line contains the designation and its corresponding average salary separated by a tab or space.
 
Note that this code uses TextInputFormat and TextOutputFormat as the input and output formats, respectively. You can modify these formats based on your specific requirements. Also, make sure to provide the correct input and output paths as command-line arguments when running this job. 
 
Program:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SalaryAnalysis {

  public static class SalaryMapper extends
      Mapper<LongWritable, Text, Text, DoubleWritable> {
    
    private Text department = new Text();
    private DoubleWritable salary = new DoubleWritable();

    public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      
      String[] employeeRecord = value.toString().split(",");
      department.set(employeeRecord[2]);
      salary.set(Double.parseDouble(employeeRecord[3]));
      context.write(department, salary);
    }
  }

  public static class SalaryReducer extends
      Reducer<Text, DoubleWritable, Text, DoubleWritable> {

    private DoubleWritable result = new DoubleWritable();

    public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
        throws IOException, InterruptedException {
      double sum = 0;
      int count = 0;
      for (DoubleWritable val : values) {
        sum += val.get();
        count++;
      }
      double avgSalary = sum / count;
      result.set(avgSalary);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf1 = new Configuration();
    Job job1 = Job.getInstance(conf1, "Salary Analysis Job 1");
    job1.setJarByClass(SalaryAnalysis.class);
    job1.setMapperClass(SalaryMapper.class);
    job1.setReducerClass(SalaryReducer.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(DoubleWritable.class);
    job1.setInputFormatClass(TextInputFormat.class);
    job1.setOutputFormatClass(TextOutputFormat.class);
    TextInputFormat.addInputPath(job1, new Path(args[0]));
    TextOutputFormat.setOutputPath(job1, new Path(args[1]));
    boolean success = job1.waitForCompletion(true);
    if (success) {
        Configuration conf2 = new Configuration();
        Job job2 = Job.getInstance(conf2, "Salary Analysis Job 2");
        job2.setJarByClass(SalaryAnalysis.class);
        job2.setMapperClass(SalaryMapper.class);
        job2.setReducerClass(SalaryReducer.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(DoubleWritable.class);
        job2.setInputFormatClass(TextInputFormat.class);
        job2.setOutputFormatClass(TextOutputFormat.class);
        TextInputFormat.addInputPath(job2, new Path(args[1]));
        TextOutputFormat.setOutputPath(job2, new Path(args[2]));
        System.exit(job2.waitForCompletion(true) ? 0 : 1);
    }
  }
}
 
Input file:
101,raju,prof,20000
102,rani,prof,15000
103,vani,prof,10000
104,suresh,assoc prof,8000
105,mahesh,assoc prof,7000
106,kiran,asst,5000
107,nani,asst,4000
108,hari,asst,3000
109,ramya,asst,2000 

Step1:create a folder cmr and allot permission to access from non root
root@ubuntu:/home/hduser# chmod -R 777 cmr
Step 2:generate class files for each java file using the following commands
hduser@ubuntu:~/cmr$ export CLASSPATH=`hadoop classpath`
hduser@ubuntu:~/cmr$ echo $CLASSPATH
hduser@ubuntu:~/cmr$ javac -d . SalaryAnalysis.java
Step 3:Create a jar file using the following command
hduser@ubuntu:~/cmr$ jar -cvf cm.jar -C /home/hduser/cmr .
Step 4:create a folder chain and then copy the file input.txt under DFS using the following commands 
hduser@ubuntu:~/cmr$ hadoop fs -mkdir /crk
hduser@ubuntu:~/cmr$ hadoop fs -put input.txt /crk
hduser@ubuntu:~/cmr$ hadoop fs -lsr /crk
 
 
 
Step 5: Now run the hadoop jar command
hduser@ubuntu:~/cmr$ hadoop jar cm.jar SalaryAnalysis  /crk/input.txt /crk/joy
 

Joining data from different sources in a MapReduce environment can be challenging, but it can be done by chaining multiple MapReduce jobs together. Here are the general steps to join data from different sources in chaining MapReduce jobs:

Identify the data sources: Determine the data sources that need to be joined, and the common key that will be used to join them.

Map the data: Map the data from each source into key-value pairs, with the common key as the key. Each data source should be mapped to a different key space.

Reduce the data: Reduce the data by joining the key-value pairs with the common key. This can be done in a single MapReduce job, but it may be more efficient to split it into multiple jobs.

Chain the MapReduce jobs: If multiple jobs are used, chain them together by specifying the output of one job as the input of the next job.

Optimize the join: Optimize the join by considering the data size, skewness, and distribution. For example, if one data source is much larger than the other, consider using a distributed cache or a broadcast variable to improve performance.

Test and iterate: Test the join and iterate as needed to improve performance and accuracy.

Overall, joining data from different sources in chaining MapReduce jobs requires careful planning and optimization. By mapping and reducing the data appropriately, and by chaining the MapReduce jobs together, you can join data from different sources efficiently and accurately.

MapReduce workflow - API - Combiners

The MapReduce workflow is a data processing framework that allows developers to process large volumes of data in a distributed and parallel manner. The workflow consists of the following steps:
  1. Input Data: The first step in the workflow is to load the input data from the Hadoop Distributed File System (HDFS) or any other storage system that supports the Hadoop InputFormat API. The input data is split into small chunks called InputSplits that are processed by individual map tasks.
  2. Map: The Map step takes each InputSplit and applies a user-defined Map function to it. The Map function transforms the input data into a set of key-value pairs that are intermediate results. The Map function runs in parallel on each node in the Hadoop cluster that has a copy of the InputSplit.
  3. Shuffle and Sort: The Shuffle and Sort step collects the output from the Map function and sorts it by key. The intermediate results are partitioned based on their keys and sent to the nodes that will run the Reduce function. This step involves a network shuffle of data between the Map and Reduce nodes.
  4. Reduce: The Reduce step applies a user-defined Reduce function to the intermediate results. The Reduce function combines the intermediate results that have the same key into a set of final output key-value pairs. The Reduce function runs in parallel on each node in the Hadoop cluster that has a copy of the intermediate results.
  5. Output Data: The final step in the workflow is to write the output data to the Hadoop Distributed File System or any other storage system that supports the Hadoop OutputFormat API.

Hadoop is an evolving technology, and its APIs have undergone several changes over the years. Here are some notable changes in Hadoop APIs:
  1. Hadoop 2.x introduced the YARN API, which replaced the older MapReduce API as the system responsible for managing resources in a Hadoop cluster.
  2. Hadoop 3.x introduced several changes to the HDFS API, including support for erasure coding and improvements to the Namenode architecture.
  3. Hadoop 3.x also introduced several changes to the MapReduce API, including support for container reuse and improvements to the Job History Server.
  4. The HBase API has undergone several changes over the years, including the introduction of the HBase Thrift API and the HBase REST API.
  5. The Hive API has also undergone several changes over the years, including improvements to query optimization and support for ACID transactions.
  6. The Pig API has undergone several changes over the years, including improvements to the Pig Latin language and the introduction of the Pig Streaming API.
  7. Overall, Hadoop APIs have evolved to provide better performance, scalability, and functionality for managing and processing large-scale data on a Hadoop cluster. Developers should stay up-to-date with the latest changes in Hadoop APIs to take advantage of the latest features and capabilities.
Combiners are a useful feature in Hadoop MapReduce to improve the performance of data processing jobs. They are optional functions that can be applied on the output of the Map phase before it is sent to the Reduce phase. The purpose of combiners is to reduce the amount of data that needs to be transferred across the network to the reducer and thereby improving the performance of the job.

Here are some ways in which combiners can be used to improve performance:
  1. Reduce network I/O: Combiners can help reduce the amount of data that needs to be transferred across the network to the reducer. By applying a combiner function on the output of the Map phase, we can perform a local aggregation of the intermediate key-value pairs before sending them to the reducer. This can reduce network I/O and improve the performance of the job.
  2. Reduce computational workload: Combiners can also help reduce the computational workload on the reducer. By performing local aggregation of the intermediate key-value pairs, the combiner can reduce the number of key-value pairs that the reducer needs to process. This can improve the performance of the reducer by reducing its computational workload.
  3. Improve data locality: Combiners can help improve data locality by processing the intermediate key-value pairs on the same node where the Map task was executed. This can help reduce network I/O and improve the performance of the job.
  4. Compatibility with Reducer: Combiners can be written using the same code as the reducer function, making it easy to implement and test. In some cases, the combiner can even be the same as the reducer function.
  5. It's important to note that combiners are not always effective in improving the performance of a MapReduce job. Combiners are only useful when the key-value pairs produced by the Map function are suitable for aggregation, and when the combiner function is both associative and commutative. In some cases, using combiners can actually harm the performance of a MapReduce job if they increase the computational workload on the Map nodes. Therefore, it's important to test and evaluate the performance impact of using combiners before implementing them in a production environment.

Running locally on test data - on a cluster - Tuning a job

Running a Hadoop job locally allows developers to test their code quickly and efficiently before deploying it to a larger Hadoop cluster. Here are the general steps for running a Hadoop job locally on test data:
  1. Install Hadoop: To run a Hadoop job locally, you will need to install Hadoop on your local machine. You can download Hadoop from the Apache Hadoop website and follow the installation instructions for your operating system.
  2. Set up a local Hadoop cluster: Once Hadoop is installed, you can set up a local Hadoop cluster by configuring the Hadoop configuration files to point to your local filesystem instead of a Hadoop cluster. This allows you to run a Hadoop job on your local machine using the same Hadoop APIs and configuration as you would use on a Hadoop cluster.
  3. Write your Hadoop job: Once your local Hadoop cluster is set up, you can write your Hadoop job using the Hadoop APIs. Your Hadoop job should read input data from a file on your local filesystem and write output data to another file on your local filesystem.
  4. Run your Hadoop job: To run your Hadoop job locally, you can use the Hadoop command-line tools to submit your job to the local Hadoop cluster. The Hadoop command-line tools allow you to specify the input and output paths for your job and any other job configuration settings.
  5. Verify the results: Once your Hadoop job has finished running, you can verify the results by inspecting the output file on your local filesystem. You should compare the output to the expected output to ensure that your Hadoop job is working correctly.
  6. By following these steps, you can run a Hadoop job locally on test data and test your Hadoop application before deploying it to a larger Hadoop cluster.
Running a Hadoop job locally on a cluster can help you identify any issues that may arise when running the job on a larger dataset or on a production cluster. Here are the steps to run a Hadoop job locally on a cluster:
  1. Set up a Hadoop cluster: Set up a Hadoop cluster on your local machines by installing and configuring Hadoop on each machine. You can follow the official Hadoop documentation for instructions on how to set up a Hadoop cluster.
  2. Copy your data to the Hadoop cluster: Copy your data to the Hadoop cluster by using Hadoop File System (HDFS) commands. You can use the 'hdfs dfs -copyFromLocal' command to copy data from your local machine to the Hadoop cluster.
  3. Write a Hadoop job: Write a Hadoop job that operates on your data. You can use the same code that you plan to use for the production job.
  4. Build your Hadoop job: Build your Hadoop job by creating a JAR file that includes all the dependencies of your job.
  5. Submit your job to the Hadoop cluster: Submit your job to the Hadoop cluster by using the 'hadoop jar' command. This command takes the name of the JAR file and the name of the main class of your job.
  6. Monitor your job: Monitor your job by using the Hadoop Job Tracker web interface or by using the 'hadoop job -list' command. This will give you information on the progress of your job, including the status of the map and reduce tasks.
  7. Debug your job: If your job fails or produces incorrect results, you can debug it by looking at the logs generated by the Hadoop cluster. The logs will provide information on any errors or warnings that occurred during the execution of your job.
  8. By running a Hadoop job locally on a cluster, you can test your job with a realistic dataset and identify any issues that need to be addressed before running the job on a production cluster.
Tuning a Hadoop job is an essential step to optimize the performance of the job and achieve the best possible performance on a Hadoop cluster. Here are the steps to tune a Hadoop job:
  1. Identify the bottleneck: Identify the bottleneck of your job by analyzing the logs generated by the Hadoop cluster. This can help you understand which part of the job is taking the most time.
  2. Adjust the configuration settings: Adjust the configuration settings of your job to improve its performance. This includes changing parameters such as the number of map and reduce tasks, the amount of memory allocated to each task, the size of the input and output buffers, and the number of threads used by the job.
  3. Optimize data access patterns: Optimize data access patterns by using techniques such as data locality, data compression, and data serialization. This can help you reduce the amount of data that needs to be transferred over the network and improve the performance of your job.
  4. Use combiners: Use combiners to reduce the amount of data that needs to be shuffled over the network to the reducer. Combiners are functions that are applied to the output of the mapper before it is sent to the reducer.
  5. Use partitioning and sorting: Use partitioning and sorting techniques to reduce the amount of data that needs to be processed by the reducer. Partitioning divides the input data into smaller chunks that can be processed in parallel, while sorting organizes the data so that it can be processed more efficiently.
  6. Monitor the performance: Monitor the performance of your job by using the Hadoop Job Tracker web interface or by using the 'hadoop job -list' command. This will give you information on the progress of your job, including the status of the map and reduce tasks.
  7. Iterate: Iterate on the tuning process by adjusting the configuration settings and data access patterns until you achieve the desired level of performance.
  8. By following these steps, you can tune your Hadoop job to achieve the best possible performance on a Hadoop cluster. It is important to note that the tuning process is iterative and may require multiple adjustments to achieve the best results.

Monday 27 March 2023

Patent dataset count using map reduce program

PatentsView: PatentsView is a free, user-friendly platform that provides access to USPTO patent data in a variety of formats. The platform allows you to search and download patent data by keyword, patent number, inventor, assignee, and other criteria. You can also visualize the data and export it to CSV or JSON formats. To use PatentsView, simply go to their website (https://www.patentsview.org/) 
https://patentsview.org/download/data-download-tables

Example on the patent dataset:
 
Step 1:Based on the given data, it appears to be a list of records with two fields separated by a space character. The first field appears to be a unique identifier, and the second field appears to be a list of patent numbers associated with that identifier.
 
Now to count the total number of patents in the dataset, you would need to parse each line and count the number of patents associated with each identifier. Here's an example :
 
1 3964859,4647229
10000 4539112
100000 5031388
1000006 4714284
1000007 4766693
1000011 5033339
1000017 3908629
1000026 4043055
1000033 4190903,4975983
1000043 4091523
1000044 4082383,4055371
1000045 4290571
1000046 5918892,5525001
1000049 5996916
1000051 4541310
1000054 4946631
1000065 4748968
1000067 5312208,4944640,5071294
1000070 4928425,5009029
 
Step 2:Create a file with PatentCount.java with the following program:
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class PatentCount {

  public static class PatentCountMapper extends Mapper<Object, Text, Text, IntWritable> {

    private final Text identifier = new Text();
    private final IntWritable count = new IntWritable();

    @Override
    public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
      String[] fields = value.toString().split(" ");
      identifier.set(fields[0]);
      count.set(fields[1].split(",").length);
      context.write(identifier, count);
    }
  }

  public static class PatentCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private final IntWritable result = new IntWritable();

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "PatentCount");
    job.setJarByClass(PatentCount.class);
    job.setMapperClass(PatentCountMapper.class);
    job.setReducerClass(PatentCountReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Step 3:Generate class files for each java file using the following commands
hduser@ubuntu:~/patentcount$ export CLASSPATH=`hadoop classpath`
hduser@ubuntu:~/patentcount$ echo $CLASSPATH
hduser@ubuntu:~/patentcount$ javac -d . PatentCount.java
Step 4:Create a jar file using the following command
hduser@ubuntu:~/patentcount$ jar -cvf count.jar -C /home/hduser/patentcount .
Step 5:create a folder rkpd and then copy the patent dataset file dataset.txt under DFS using the following commands
hduser@ubuntu:~/patentcount$ hadoop fs -mkdir /rkpd
hduser@ubuntu:~/patentcount$ hadoop fs -put dataset.txt /rkpd
hduser@ubuntu:~/patentcount$ hadoop fs -lsr /rkpd

Step 6: Now run the hadoop jar command
hduser@ubuntu:~/patentcount$ hadoop jar count.jar PatentCount /rkpd/dataset.txt /rkpd/out


Step 7:Now we can check the patent dataset count for the given dataset.txt  in a folder rkpd/dataset.txt
hduser@ubuntu:~/patentcount$ hadoop fs -lsr /rkpd
hduser@ubuntu:~/patentcount$ hadoop fs -cat /rkpd/out/part-r-00000

Saturday 25 March 2023

Hadoop Pipes

Hadoop Pipes allows C++ code to use Hadoop DFS and map/reduce. In many ways, the approach will be similar to Hadoop streaming, but using Writable serialization to convert the types into bytes that are sent to the process via a socket.


The class org.apache.hadoop.mapred.pipes.Submitter has a public static method to submit a job as a JobConf and a main method that takes an application and optional configuration file, input directories, and output directory. 
bin/hadoop pipes \
 [-input inputDir] \
 [-output outputDir] \
 [-jar applicationJarFile] \
 [-inputformat class] \
 [-map class] \
 [-partitioner class] \
 [-reduce class] \
 [-writer class] \
 [-program program url] \ 
 [-conf configuration file] \
 [-D property=value] \
 [-fs local|namenode:port] \
 [-jt local|jobtracker:port] \
 [-files comma separated list of files] \ 
 [-libjars comma separated list of jars] \
 [-archives comma separated list of archives]

Hadoop Pipes has a generic Java class for handling the mapper and reducer (PipesMapRunner and PipesReducer). They fork off the application program and communicate with it over a socket. The communication is handled by the C++ wrapper library and the PipesMapRunner and PipesReducer

Friday 24 March 2023

Hadoop Streaming

  • Enable to create or run MapReduce in any language Java/Non Java(R,Python,Ruby,PERL,PhP,c++)
  • Allow to create and run MR Job with Script as Mapper/Reducer utility that comes with hadoop distribution
  • Uses unix streams as interface between hadoop and MR program
  • Allows developer to choose language
  • Both Mapper Reducer are Python Scripts that read input from STDIN and exit output to STDOUT utility create a M/R Jobs, submit Job to cluster, monitor the progress of job until complete
  • when script is specified for mapper - each mapper task will launches the script as a separate process 
  • Mapper tasks convert input (k,v) from source file into lines and feed to STDIN
  • Mapper collect the line accessed output from STDOUT and convert each line into (k,v) pair
  • when script is specified for reducer - each reducer task launches the script as separate process
  • reducer task convert input (k,v) pair into lines and feeds the lines to STDIN
  • reduce gather the line oriented output from STDOUT and convert each line into (k,v) pair 
Program on Hadoop Streaming:
Step 1:Create an input.txt file in a folder name (stream)
hduser@ubuntu:~/stream$ cat input.txt
ram wants
to eat cb
 
Step 2:Create a file mycmd.sh using the following script
hduser@ubuntu:~/stream$ cat mycmd.sh
#!/bin/bash
sed -r 's/[\t]+/\n/g' | sed "s/[^a-zA-Z0-9]//g" | tr "A-Z" "a-z"


Step 3: create a directory under hdfs and then copy the input.txt file into it using the following commands
hduser@ubuntu:~/stream$ hadoop fs -mkdir /dfs
hduser@ubuntu:~/stream$ hadoop fs -put input.txt /dfs
hduser@ubuntu:~/stream$ hadoop fs -cat /dfs/input.txt
 
Step 4:Run the hadoop jar streaming as follows
hduser@ubuntu:~/stream$ hadoop jar /home/hduser/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.3.0.jar -D mapred.reduce.tasks=0 -input /dfs/input.txt -output /dfs/out -mapper ./mycmd.sh -reducer 'uniq-c' -file mycmd.sh 
Step5:Output the hadoop streaming of the given input.txt file using the following commands
hduser@ubuntu:~/stream$ hadoop fs -lsr /dfs
hduser@ubuntu:~/stream$ hadoop fs -cat /dfs/out/part-00000

Monday 20 March 2023

MapReduce - Weather Dataset using Three files -Part III

 Program on MapReduce Weather Dataset :

Step 1: create a folder ncdc and give the permission to access any where from root
chmod -R 777 ncdc

step 2:Collect the weather datasets using NCDC domain and name this file as tempinput.txt and then place it in ncdc folder

0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999

Step 3: create 3 files namely
  1. MaxtempDriver.java 
 //package maxtemp;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class
MaxtempDriver 
{
    public static void main(String[] args) throws Exception 
    {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "
MaxtempDriver");
        job.setJarByClass(
MaxtempDriver.class);
        // TODO: specify a mapper
        job.setMapperClass(MaxtempMapper.class);
        // TODO: specify a reducer
        job.setReducerClass(MaxtempReducer.class);

        // TODO: specify output types
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // TODO: specify input and output DIRECTORIES (not files)
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        if (!job.waitForCompletion(true))
            return;
    }
}
 
 2.MaxtempMapper.java 
//package maxtemp;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
 
public class MaxtempMapper 
extends Mapper<LongWritable, Text, Text, IntWritable > 
{
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException 
    {
         String line=value.toString();
          String year=line.substring(15,19);
          int airtemp;
          if(line.charAt(87)== '+')
          {
           airtemp=Integer.parseInt(line.substring(88,92));
           }
         else
         airtemp=Integer.parseInt(line.substring(87,92));
         String q=line.substring(92,93);
         if(airtemp!=9999&&q.matches("[01459]"))
         {
          context.write(new Text(year),new IntWritable(airtemp));
         }
     }
 }
 
 3.MaxtempReducer.java 
 //package maxtemp;

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class
MaxtempReducer
extends Reducer<Text, IntWritable, Text, IntWritable> 
{
   public void reduce(Text key, Iterable<IntWritable> values, Context  context)throws IOException, InterruptedException 
    {
        int maxvalue=Integer.MIN_VALUE;
        for (IntWritable value : values)
        {
         maxvalue=Math.max(maxvalue, value.get());
        }
        context.write(key, new IntWritable(maxvalue));
    }
}

Step 4: generate class files for each java file using the following commands
hduser@ubuntu:~/ncdc$ export CLASSPATH=`hadoop classpath`
hduser@ubuntu:~/ncdc$ echo $CLASSPATH
hduser@ubuntu:~/ncdc$ javac -d . MaxtempMapper.java MaxtempReducer.java MaxtempDriver.java   

Step 5:Create a jar by using the following command
hduser@ubuntu:~/ncdc$ jar -cvf max.jar -C /home/hduser/ncdc .
Step 6:create a folder rkmaxtemp and then copy the whether dataset file tempinput.txt under DFS using the following commands
hduser@ubuntu:~/ncdc$ hadoop fs -mkdir /rkmaxtemp
hduser@ubuntu:~/ncdc$ hadoop fs -put tempinput.txt /rkmaxtemp
hduser@ubuntu:~/ncdc$ hadoop fs -lsr /rkmaxtemp
hduser@ubuntu:~/ncdc$ hadoop fs -cat /rkmaxtemp/tempinput.txt
Step 7:Now run the command hadoop jar to MaxtempDriver file as shown
hduser@ubuntu:~/ncdc$ hadoop jar max.jar MaxtempDriver /rkmaxtemp/tempinput.txt /rkmaxtemp/out

Step 8:Now we can check the maximum temperature of the given dataset in a folder rkmaxtemp/out under DFS
hduser@ubuntu:~/ncdc$ hadoop fs -cat /rkmaxtemp/out/part-r-00000

Sunday 19 March 2023

MapReduce - Weather Dataset using single file -Part II

Program on MapReduce Weather Dataset :

step 1:Collect the weather datasets using NCDC domain and name this file as tempinput 

0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
 

Step 2: create a folder kk and  then create a file inside with Maxtemp.javaand allot permission to access from non root

root@ubuntu:/home/hduser# chmod -R 777 kk

//Maxtemp.java Program

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class Maxtemp
{
    public static class MaxtempMapper
    extends Mapper<LongWritable, Text, Text, IntWritable >
  {
    public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException
    {
         String line=value.toString();
          String year=line.substring(15,19);
          int airtemp;
          if(line.charAt(87)== '+')
          {
           airtemp=Integer.parseInt(line.substring(88,92));
           }
         else
         airtemp=Integer.parseInt(line.substring(87,92));
         String q=line.substring(92,93);
         if(airtemp!=9999&&q.matches("[01459]"))
         {
          context.write(new Text(year),new IntWritable(airtemp));
         }
     }
 }
 

public static class MaxtempReducer
extends Reducer<Text, IntWritable, Text, IntWritable>
 {
   public void reduce(Text key, Iterable<IntWritable> values, Context  context)throws IOException, InterruptedException
    {
        int maxvalue=Integer.MIN_VALUE;
        for (IntWritable value : values)
        {
         maxvalue=Math.max(maxvalue, value.get());
        }
        context.write(key, new IntWritable(maxvalue));
    }
 }

public static void main(String[] args) throws Exception

    {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Maxtemp");
        job.setJarByClass(Maxtemp.class);
        // TODO: specify a mapper
        job.setMapperClass(MaxtempMapper.class);
        // TODO: specify a reducer
        job.setReducerClass(MaxtempReducer.class);

        // TODO: specify output types
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // TODO: specify input and output DIRECTORIES (not files)
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        if (!job.waitForCompletion(true))
            return;
    }

}

Step 3:generate class files for each java file using the following commands
hduser@ubuntu:~/kk$ export CLASSPATH=`hadoop classpath`
hduser@ubuntu:~/kk$ echo $CLASSPATH
hduser@ubuntu:~/kk$ javac -d . Maxtemp.java
Step 4:Create a jar file using the following command
hduser@ubuntu:~/kk$ jar -cvf max.jar -C /home/hduser/kk .  
Step 5:create a folder rk and then copy the whether dataset file tempinput.txt under DFS using the following commands
hduser@ubuntu:~/kk$ hadoop fs -mkdir /rk
hduser@ubuntu:~/kk$ ls
'Maxtemp$MaxtempMapper.class'  'Maxtemp$MaxtempReducer.class'   Maxtemp.class   Maxtemp.java   rrr.jar   tempinput.txt
hduser@ubuntu:~/kk$ hadoop fs -put /home/hduser/kk/tempinput.txt /rk
Step 6: Now run the hadoop jar command
hduser@ubuntu:~/kk$ hadoop jar max.jar Maxtemp /rk/tempinput.txt /rk/joy
Step 7:Now we can check the maximum temperature of the given dataset in a folder rk/tempinput.txt
hduser@ubuntu:~/kk$ hadoop fs -cat /rk/tempinput.txt
hduser@ubuntu:~/kk$ hadoop fs -cat /rk/joy/part-r-00000

Friends-of-friends-Map Reduce program

Program to illustrate FOF Map Reduce: import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; import or...