Databases Reference
In-Depth Information
for (int j = 0; j < 8; j++) {
if (((int)nextByte & (1<<j)) != 0) {
bf.set(8 * i + j);
}
}
}
}
}
Next we'll create the MapReduce program to make a Bloom filter using Hadoop. As
we said earlier, each mapper will instantiate a
BloomFilter
object and add the key
of each record in its split into its
BloomFilter
instance. (We're using the key of the
record to follow our data joining example.) We'll create a union of the
BloomFilter
s
by collecting them into a single reducer.
The driver for the MapReduce program is straightforward. Our mappers will output
a key/value pair where the value is a
BloomFilter
instance.
job.setOutputValueClass(BloomFilter.class);
The output key will not matter in terms of partitioning because we only have a single
reducer.
job.setNumReduceTasks(1);
We want our reducer to output the final
BloomFilter
as a binary file. Hadoop's
OutputFormat
s outputs either text files or assumes a key/value pair. Our reducer,
therefore, won't use Hadoop's MapReduce output mechanism and instead we'll write
the result out to a file ourselves.
job.setOutputFormat(NullOutputFormat.class);
WARNING
In general life gets a little more dangerous when you deviate
from MapReduce's input/output framework and start working with your
own files. Your tasks are no longer guaranteed to be idempotent and you'll
need to understand how various failure scenarios can affect your tasks.
For example, your files may only be partially written when some tasks are
restarted. Our example here is safe(r) because all the file operations take
place together only once in the
close()
method and in only one reducer.
A more careful/paranoid implementation would check each individual file
operation more closely.
Recall that our strategy for the mapper is to build a single Bloom filter on the entire split
and output it
at the end of the split
to the reducer. Given that the
map()
method of the
Map-
Class
has no state information about which record in the split it's processing, we should
output the
BloomFilter
in the
close()
method to ensure that all the records in the
split have been read. Although the
map()
method is passed an
OutputCollector
to col-
lect the mapper's outputs, the
close()
method is not given one. The standard pattern