Database Reference
In-Depth Information
(let [cast-row
(->> indices
(mapcat #(vector
% (try->int (nth row %))))
(apply assoc row))]
(send sink conj cast-row)
cast-row))
2.
The agent that reads the data will use the function
read-row
. This is mostly
similar to the
read-row
that we saw in the
Maintaining data consistency with
validators
recipe. The differences are highlighted here:
(defn read-row [rows caster sink
done
]
(
if-let
[[item & items] (seq rows)]
(do
(send caster coerce-row-ints
item row-ints sink)
(send *agent* read-row caster sink
done
)
items)
(
do
(dosync (commute done (constantly true)
))
'())))
3.
The function that watches the agent that coerces the data will just update a counter:
(defn watch-caster
[counter watch-key watch-agent old-state new-state]
(when-not (nil? new-state)
(dosync (commute counter inc))))
4. We'll deine a function that polls until processing is inished:
(defn wait-for-it [sleep-for ref-var]
(loop []
(when-not @ref-var
(Thread/sleep sleep-for)
(recur))))
5.
The last function creates all of the agents and references and dispatches their
functions. Finally, it blocks until they are all inished, when it returns the results.
Again, I've highlighted the differences from
agent-ints
in the
Maintaining Data
consistency with validators
recipe, which is very similar:
(defn watch-processing [input-files]
(let [reader (agent (seque
(mapcat
lazy-read-csv
input-files)))
caster (agent nil)