//BloomFilterExample.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import java.io.IOException;
import java.util.Iterator;
public class BloomFilterExample {
public static class BloomFilterMapper extends Mapper<Object, Text, NullWritable, BloomFilter> {
private BloomFilter filter = new BloomFilter(16, 2, 0);
// private BloomFilter filter = new BloomFilter(1000000, 7, BloomFilter.MURMUR_HASH);
private NullWritable nullKey = NullWritable.get();
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// Create a Bloom filter key from the input record
Key bfKey = new Key(value.toString().getBytes());
// Add the key to the Bloom filter
filter.add(bfKey);
// Write the filter to output
context.write(nullKey, filter);
}
}
public static class BloomFilterReducer extends Reducer<NullWritable, BloomFilter, NullWritable, BloomFilter> {
@Override
public void reduce(NullWritable key, Iterable<BloomFilter> values, Context context) throws IOException, InterruptedException {
// Combine the Bloom filters from all mappers
Iterator<BloomFilter> iter = values.iterator();
BloomFilter filter = new BloomFilter(16, 2, 0);
// BloomFilter filter = new BloomFilter(1000000, 7, BloomFilter.MURMUR_HASH);
while (iter.hasNext()) {
BloomFilter nextFilter = iter.next();
filter.or(nextFilter);
}
// Write the final filter to output
context.write(key, filter);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "BloomFilterExample");
job.setJarByClass(BloomFilterExample.class);
job.setMapperClass(BloomFilterMapper.class);
job.setReducerClass(BloomFilterReducer.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(BloomFilter.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(BloomFilter.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Step 1:Create a folder (bloom) and give the permission to access the folder in non root terminal using the following command
hduser@ubuntu:~$ chmod -R 777 bloomStep 2:Create an input (binput.txt) and (BloomFilterExample.java) files inside the created folder using the following commands
hduser@ubuntu:~/bloom$ cat > binput.txt
Step 3:generate class files using the following commands
hduser@ubuntu:~/bloom$ export CLASSPATH=`hadoop classpath`
hduser@ubuntu:~/bloom$ echo $CLASSPATH
hduser@ubuntu:~/bloom$ javac -d . BloomFilterExample.java
Step 4:Create a jar file using the following command
hduser@ubuntu:~/bloom$ jar -cvf bloom.jar -C /home/hduser/bloom .
Step 5:create a folder bloomrk and then copy the binput.txt file into the DFS using the following commands
hduser@ubuntu:~/bloom$ hadoop fs -mkdir /bloomrk
hduser@ubuntu:~/bloom$ hadoop fs -put binput.txt /bloomrk
hduser@ubuntu:~/bloom$ hadoop fs -put binput.txt /bloomrk
hduser@ubuntu:~/bloom$ hadoop fs -lsr /bloomrk
hduser@ubuntu:~/bloom$ hadoop fs -cat /bloomrk/binput.txt
hduser@ubuntu:~/bloom$ hadoop fs -cat /bloomrk/binput.txt
Step 6: Now run the hadoop jar command
hduser@ubuntu:~/bloom$ hadoop jar bloom.jar BloomFilterExample /bloomrk/binput.txt /bloomrk/out
Step 7:Now we can check the parallelized bloom filter output for the given dataset using the following commands
hduser@ubuntu:~/bloom$ hadoop fs -lsr /bloomrk
hduser@ubuntu:~/bloom$ hadoop fs -cat /bloomrk/out/part-r-00000
hduser@ubuntu:~/bloom$ hadoop fs -cat /bloomrk/out/part-r-00000
No comments:
Post a Comment