Database Reference
In-Depth Information
(let [cast-row
(->> indices
(mapcat #(vector
% (try->int (nth row %))))
(apply assoc row))]
(send sink conj cast-row)
cast-row))
2.
The agent that reads the data will use the function read-row . This is mostly
similar to the read-row that we saw in the Maintaining data consistency with
validators recipe. The differences are highlighted here:
(defn read-row [rows caster sink done ]
( if-let [[item & items] (seq rows)]
(do
(send caster coerce-row-ints
item row-ints sink)
(send *agent* read-row caster sink done )
items)
( do
(dosync (commute done (constantly true) ))
'())))
3.
The function that watches the agent that coerces the data will just update a counter:
(defn watch-caster
[counter watch-key watch-agent old-state new-state]
(when-not (nil? new-state)
(dosync (commute counter inc))))
4. We'll deine a function that polls until processing is inished:
(defn wait-for-it [sleep-for ref-var]
(loop []
(when-not @ref-var
(Thread/sleep sleep-for)
(recur))))
5.
The last function creates all of the agents and references and dispatches their
functions. Finally, it blocks until they are all inished, when it returns the results.
Again, I've highlighted the differences from agent-ints in the Maintaining Data
consistency with validators recipe, which is very similar:
(defn watch-processing [input-files]
(let [reader (agent (seque
(mapcat
lazy-read-csv
input-files)))
caster (agent nil)
 
Search WWH ::




Custom Search