Database Reference
In-Depth Information
Since inner classes have an implicit reference to their enclosing instance, if the enclosing
class is not serializable, then the function will not be serializable and the pipeline will fail
with a
CrunchRuntimeException
. You can easily fix this by making the function a
(named) static nested class or a top-level class, or you can make the enclosing class imple-
ment
Serializable
.
Another problem is when a function depends on nonserializable state in the form of an in-
stance variable whose class is not
Serializable
. In this case, you can mark the non-
serializable instance variable as
transient
so Java doesn't try to serialize it, then set it
in the
initialize()
method of
DoFn
. Crunch will call the
initialize()
method
before the
process()
method is invoked for the first time:
public class
CustomDoFn
<
S
,
T
>
extends
DoFn
<
S
,
T
> {
transient
NonSerializableHelper helper
;
@Override
public
void
initialize
() {
helper
=
new
NonSerializableHelper
();
}
@Override
public
void
process
(
S input
,
Emitter
<
T
>
emitter
) {
// use helper here
}
}
Although not shown here, it's possible to pass state to initialize the transient instance vari-
able using other, nontransient instance variables, such as strings.
Object reuse
In MapReduce, the objects in the reducer's values iterator are reused for efficiency (to
avoid the overhead of object allocation). Crunch has the same behavior for the iterators
used in the
combineValues()
and
mapValues()
methods on
PGroupedTable
.
Therefore, if you retain a reference to an object outside the body of the iterator, you
should make a copy to avoid object identity errors.
We can see how to go about this by writing a general-purpose utility for finding the set of
Example 18-2. Finding the set of unique values for each key in a PTable
public static
<
K
,
V
>
PTable
<
K
,
Collection
<
V
>>
uniqueValues
(
PTable
<
K
,
V
>
table
) {