Databases Reference
In-Depth Information
phone company's Customers data may have only tens of millions of records (each
record containing basic information for one customer), but its transaction log can
have billions of records containing detailed call history. When the smaller source can
fit in memory of a mapper, we can achieve a tremendous gain in efficiency by copying
the smaller source to all mappers and performing joining in the map phase. This is
called replicated join in the database literature as one of the data tables is replicated
across all nodes in the cluster. (The next section will cover the case when the smaller
source doesn't fit in memory.)
Hadoop has a mechanism called distributed cache that's designed to distribute files to
all nodes in a cluster. It's normally used for distributing files containing “background”
data needed by all mappers. For example, if you're using Hadoop to classify documents,
you may have a list of keywords for each class. (Or better yet, a probabilistic model for
each class, but we digress…) You would use distributed cache to ensure all mappers
have access to the lists of keywords, the “background” data. For executing replicated
joins, we consider the smaller data source as background data.
Distributed cache is handled by the appropriately named class DistributedCache .
There are two steps to using this class. First, when configuring a job, you call the static
method DistributedCache.addCacheFile() to specify the files to be disseminated
to all nodes. These files are specified as URI objects, and they default to HDFS unless
a different filesystem is specified. The JobTracker will take this list of URIs and
create a local copy of the files in all the TaskTrackers when it starts the job. In the
second step, your mappers on each individual TaskTracker will call the static method
DistributedCache.getLocalCacheFiles() to get an array of local file Paths where
the local copy is located. At this point the mapper can use standard Java file I/O
techniques to read the local copy.
Replicated joins using DistributedCache are simpler than reduce-side joins. Let's
begin with our standard Hadoop template.
public class DataJoinDC extends Configured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper<Text, Text, Text, Text> {
...
}
public int run(String[] args) throws Exception {
...
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(),
new DataJoinDC(),
args);
System.exit(res);
}
}
 
Search WWH ::




Custom Search