Pages

Monday 10 April 2023

Paralleized Bloom Filter creation using Mapreduce

//BloomFilterExample.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;

import java.io.IOException;
import java.util.Iterator;

public class BloomFilterExample {
    public static class BloomFilterMapper extends Mapper<Object, Text, NullWritable, BloomFilter> {
private BloomFilter filter = new BloomFilter(16, 2, 0);

//        private BloomFilter filter = new BloomFilter(1000000, 7, BloomFilter.MURMUR_HASH);
        private NullWritable nullKey = NullWritable.get();

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            // Create a Bloom filter key from the input record
            Key bfKey = new Key(value.toString().getBytes());

            // Add the key to the Bloom filter
            filter.add(bfKey);

            // Write the filter to output
            context.write(nullKey, filter);
        }
    }

    public static class BloomFilterReducer extends Reducer<NullWritable, BloomFilter, NullWritable, BloomFilter> {
        @Override
        public void reduce(NullWritable key, Iterable<BloomFilter> values, Context context) throws IOException, InterruptedException {
            // Combine the Bloom filters from all mappers
            Iterator<BloomFilter> iter = values.iterator();
BloomFilter filter = new BloomFilter(16, 2, 0);

//            BloomFilter filter = new BloomFilter(1000000, 7, BloomFilter.MURMUR_HASH);
            while (iter.hasNext()) {
                BloomFilter nextFilter = iter.next();
                filter.or(nextFilter);
            }

            // Write the final filter to output
            context.write(key, filter);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "BloomFilterExample");
        job.setJarByClass(BloomFilterExample.class);
        job.setMapperClass(BloomFilterMapper.class);
        job.setReducerClass(BloomFilterReducer.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(BloomFilter.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(BloomFilter.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
 
Step 1:Create a folder (bloom) and give the permission to access the folder in non root terminal using the following command 
hduser@ubuntu:~$ chmod -R 777 bloom
Step 2:Create an input (binput.txt) and (BloomFilterExample.java) files inside the created folder using the following commands
hduser@ubuntu:~/bloom$ cat > binput.txt
Step 3:generate class files using the following commands
hduser@ubuntu:~/bloom$ export CLASSPATH=`hadoop classpath`
hduser@ubuntu:~/bloom$ echo $CLASSPATH
hduser@ubuntu:~/bloom$ javac -d . BloomFilterExample.java 
Step 4:Create a jar file using the following command
hduser@ubuntu:~/bloom$ jar -cvf bloom.jar -C /home/hduser/bloom .
Step 5:create a folder bloomrk and then copy the binput.txt file into the DFS using the following commands
hduser@ubuntu:~/bloom$ hadoop fs -mkdir /bloomrk
hduser@ubuntu:~/bloom$ hadoop fs -put binput.txt /bloomrk
hduser@ubuntu:~/bloom$ hadoop fs -lsr /bloomrk
hduser@ubuntu:~/bloom$ hadoop fs -cat /bloomrk/binput.txt
Step 6: Now run the hadoop jar command
hduser@ubuntu:~/bloom$ hadoop jar bloom.jar BloomFilterExample /bloomrk/binput.txt /bloomrk/out
Step 7:Now we can check the parallelized bloom filter output for the given dataset using the following commands
hduser@ubuntu:~/bloom$ hadoop fs -lsr /bloomrk
hduser@ubuntu:~/bloom$ hadoop fs -cat /bloomrk/out/part-r-00000
 

No comments:

Post a Comment

Friends-of-friends-Map Reduce program

Program to illustrate FOF Map Reduce: import java.io.IOException; import java.util.*; import org.apache.hadoop.conf.Configuration; import or...