This is the second part of an article dedicated to Clojure transducers. In the first part we discussed transducers fundamentals and the functional abstraction they represent. In this article, we are going to explore how they are used in practice, including:
- Composability
- Reuse across transports with core.async
- A logging stateless transducer example
- An
interleave
stateful transducer example - Parallelisation
Composability
Transducers have the important property to isolate transforming reducing functions (like (map inc)
or (filter odd?)
) from the necessary sequential iteration. One interesting consequence of this design is that transducers can now compose like any other function. Have a look at the following example:
(def inc-and-filter (comp (map inc) (filter odd?))) (def special+ (inc-and-filter +)) (special+ 1 1) ;; 1 (special+ 1 2) ;; 4
(map inc)
or (filter odd?)
generates a function with the same interface: taking one item and returning a transformation of that item. We can compose them with comp to form a new function inc-and-filter
which is the composition of the two.
We can also provide an argument +
to inc-and-filter
which returns a new function. We could call the new function special+
because it enhances normal +
with two transformations. We can see the effect of incorporating inc
and odd?
into special+
by calling it with two arguments; the second argument is incremented and then conditionally added to the first depending on it being odd or even. We can use reduce
with special+
as usual and compare it with the same version using a simple +
:
(reduce special+ 0 (range 10)) ;; 25 (reduce + 0 (filter odd? (map inc (range 10)))) ;; 25
The two reduce
appear the same on the surface, but the mechanics is completely different. In the first case, our special+
applies transformations while iterating through the input sequence. The second case produces an intermediate sequences for each transformation, plus one for the final reduction. More importantly, we can now isolate transformations from reduce
. By using transduce
we make that explicit in the arguments:
(transduce (comp (map inc) (filter odd?)) + (range 10)) ;; 25
How many transducers (our transforming reducing functions) can be composed this way? The answer is … as many as you like:
(def x-form (comp (map inc) (filter even?) (dedupe) (mapcat range) (partition-all 3) (partition-by #((apply + %) 7)) (mapcat flatten) (random-sample 1.0) (take-nth 1) (keep #(when (odd? %) (* % %))) (keep-indexed #(when (even? %1) (* %1 %2))) (replace {2 "two" 6 "six" 18 "eighteen"}) (take 11) (take-while #(not= 300 %)) (drop 1) (drop-while string?) (remove string?))) (transduce x-form + (vec (interleave (range 18) (range 20)))) ;; 246
The example above is a famous gist created by Rich Hickey (the inventor of Clojure) to show off the available transducers in the standard library.
But there’s more: composition can be applied on top of additional composition, allowing programmers to isolate and name concepts accordingly. For example, we could group the previous, long chain of transducers into more manageable chunks giving each chunk a specific name:
(def x-clean (comp (map inc) (filter even?) (dedupe) (mapcat range))) (def x-filter (comp (partition-all 3) (partition-by #((apply + %) 7)) (mapcat flatten) (random-sample 1.0))) (def x-additional-info (comp (take-nth 1) (keep #(when (odd? %) (* % %))) (keep-indexed #(when (even? %1) (* %1 %2))) (replace {2 "two" 6 "six" 18 "eighteen"}))) (def x-calculate (comp (take 11) (take-while #(not= 300 %)) (drop 1) (drop-while string?) (remove string?))) (def x-prepare (compx-cleanx-filter)) (def x-process (compx-additional-infox-calculate)) (def x-form (compx-preparex-process)) (transduce x-form + data) ;; 246
As you can see, comp
can be used several times to compose over already composed transducers, producing named computations that are now easy to read and reuse.
Note that, although there are structural similarities, the thread last macro ->> doesn’t allow for a similar kind of composition. For example, the following expand-and-group
composition is not possible:
(def coll (range 10)) (def expand (->> coll (map inc) (filter even?) (dedupe) (mapcat range))) (def group (->> coll (partition-all 3) (partition-by #((apply + %) 7)))) (def expand-and-group (comp expand group)) (expand-and-group) ;; Execution error (ClassCastException) ;; clojure.lang.LazySeq cannot be cast to clojure.lang.IFn
There are many things not working as expected in the example above. First of all, expand
and group
definitions already contain the result of the computation. Their composition is not the composition of their computational recipes but their results. comp
works as expected, but trying to invoke it produces the non-sensical use of a lazy sequence as a function.
Reuse across transports
There is another important aspect of transducers that contributes to reuse (apart from composition). The composition of transducers happens without any knowledge of the input they will be applied to. As a consequence, we can reuse transducers with other transports.
A transport is essentially the way a collection of items is iterated. One of the most common transports in the standard library is sequential iteration (that is, using sequence functions like map
, filter
, etc.). But there are other examples of transports. The core.async library for instance, implements iteration through an abstraction called “channel” which behaves similarly to a blocking queue.
The following example shows the same transducers chain we have seen before to process incoming elements from a core.async
channel:
(require '[clojure.core.async :refer [>! ] :as a]) (def xform (comp (filter odd?) (map inc))) (defn process [items] (let [out (a/chan 1 xform) in (a/to-chan items)] (a/go-loop [] (if-some [item ( in)] (do (>! out item) (recur)) (a/close! out))) ( (a/reduce conj [] out)))) (process (range 10)) ;; [2 4 6 8 10]
The transducer xform
is now applied to a channel out
inside a go-loop
. Every item pushed down the channel passes through the transducer chain. The input is a range
of 10 numbers transformed into a channel, but items could be streamed asynchronously (and often are; for example, as server side events on a web page).
The go-loop
works sequentially in this case, but core.async
also contains facilities to apply the same transducers in parallel. A pipeline
is an abstraction that supports multi-threaded access. It sits between an input and an output channel and it can be used with transducers:
(defn process [items] (let [out (a/chan (a/buffer 100))] (a/pipeline 4 out xform (a/to-chan items)) ( (a/reduce conj [] out)))) (process (range 10)) [2 4 6 8 10]
A pipeline
construct accepts a max number of parallel threads (usually the same of the physical cores available in the system). Each incoming item from the input channel is processed in parallel through the transducer chain up to the max number of threads.
Custom transducers
The standard library provides some transducer-enabled functions, which cover most of the common scenarios. When those are not enough, you can create your own. However, a custom transducer needs to obey a few rules to play well with other transducers:
- It must support a zero, one or two-arguments calls.
- The zero-argument call defines the initial value for the reduction. This arity is not currently used by transducers, but it is recommended to implement it anyway by invoking the reducing function (usually abbreviated “rf”) with no arguments. This is a conservative approach that plays well with the current behaviour of
transduce
just in case a future change to the standard library will require the zero-argument call. - The single-argument call defines the “tear-down” behaviour. This is especially useful if the transducer contains any state that needs deallocation. It will be called once with the final result of the reduction. After providing any custom logic (optional), the custom transducer is expected to call “rf” with the result, so other transducers in the chain can also have their chance for cleanup.
- The two-arguments call represents the standard reduction step and should contain the business logic for the custom transducer. This is the typical reduce operation: the first argument represents the result so far, followed by the next item from the input. It is expected that the reducing function “rf” is invoked after applying any transformation, propagating the call to other transducers in the chain.
- A transducer can decide to terminate reduction at any time by calling reduced on the results. Other transducers should pay attention to reduced elements and prevent unnecessary computation. This is, for example, what happens using transducers like take-while. When the predicate becomes
false
, no other items should be reduced and the computation should stop. If aftertake-while
there are other computationally intensive transducers, they should also stop processing and allow the results through without doing anything else.
This set of rules describes the protocol to follow to execute a pipeline of transducers. The primary goal of the protocol is to give a fair chance to each transducer to execute (or stop) a computation, initialise internal state or perform cleanup logic.
Not surprisingly, transducers maintaining state are called “stateful” to distinguish them from “stateless” transducers. Maintaining state in a transducer is quite common and roughly half of transducers in the standard library do.
It is so common that a new concurrency primitive has been created called volatile!. A volatile!
is a concurrency construct that allows multiple threads to promptly see a value as soon as it’s updated. It’s very different from the other concurrency primitives (var, atom, ref and agent) that instead “protect” state from concurrent access.
If you are wondering why volatile!
is necessary, the answer is that with core.async
pipelines the same transducer instance could run on multiple threads. A volatile!
guarantees that the state inside the transducer is seen by all threads as soon as possible. So if the same transducer happens to run on a different thread, the thread will see the most recent internal state of the transducer. The reason why this might not be the case without a volatile!
has to do with aggressive use of CPU registers as caches and instruction re-ordering, a common JVM optimisation.
A logging (stateless) transducer
So, let’s get practical and create our first stateless transducer. In this example, we assume that we just created a complicated, but nicely composed, transducer chain for data processing (we are simulating it here with a much shorter and simpler one).
How would you go about debugging it? You might need to understand at which step in the chain things are not working as expected. Here’s an idea for a logging transducer with a printing side effect. Other than printing on screen, the transducer is not altering the reduction:
(defn log [& [idx]] (fn [rf] (fn ([] (rf)) ([result] (rf result)) ([result el] (let [n-step (if idx (str "Step: " idx ". ") "")] (println (format "%sResult: %s, Item: %s" n-step result el))) (rf result el))))) (sequence (log) [:a :b :c]) ;; Result: null, Item: :a ;; Result: null, Item: :b ;; Result: null, Item: :c ;; (:a :b :c)
In this example, log
is a transducer accepting an optional single argument “idx”. When “idx” is present, log
additionally prints it, assuming “idx” is the position of the transducer in a composed chain (we’ll see in a second how that information can be used). Before being composed via comp
, transducers are just a list of functions. The idea is that we can interleave the list with the logging transducer ahead of comp
and use a dynamic variable to control when to print to the console:
(def ^:dynamic *dbg?* false) (defn comp* [& xforms] (apply comp (if *dbg?* (->> (range) (map log) (interleave xforms)) xforms))) (transduce (comp* (filter odd?) (map inc)) + (range 5)) ;; 6 (binding [*dbg?* true] (transduce (comp* (filter odd?) (map inc)) + (range 5))) ;; Step: 0. Result: 0, Item: 1 ;; Step: 1. Result: 0, Item: 2 ;; Step: 0. Result: 2, Item: 3 ;; Step: 1. Result: 2, Item: 4 ;; 6
Here, comp*
is a thin wrapper around the normal comp
function in the standard library. It has the responsibility to check the *dbg?*
dynamic variable. When *dbg?*
is true
, we interleave
our logging transducer to an already existing chain of transducers passed as input. Otherwise we do nothing.
The first example invocation of transduce shows that comp*
behaves like normal comp
. When we bind *dbg*?
to true
though, the logging transducer starts printing. Several logging transducers instances are created — as many as necessary to interleave the input chain. Each logging transducer carries the “idx” information about its position in the chain, so it can print it. By looking at the source code, we know that ‘step 1’ corresponds to the transducer at index 1 in the (zero-indexed) list passed to comp*
(which is filter
). If we see some odd value of ‘Item’ or ‘Result’, we know which step is producing it and we can take action.
An interleave (stateful) transducer
Let’s now see an example of stateful custom transducer. The following chain of sequential transformations shows a way to implement the egyptian multiplication algorithm. Egyptians didn’t use time tables to multiply numbers, but they worked out the operation by decomposing numbers by powers of two:
(defn egypt-mult [x y] (->> (interleave (iterate #(quot % 2) x) (iterate #(* % 2) y)) (partition-all 2) (take-while #(pos? (first %))) (filter #(odd? (first %))) (map second) (reduce +))) (egypt-mult 640 10) ;; 6400
We would like to express this algorithm with transducers but there is no interleave transducer in the standard library — just normal interleave
. All other operations inside the thread last ->> form are available as transducers.
How should we design the interleave
transducer to work with transduce
? First of all, transduce
does not support multiple collections as input. So, the idea is to use one sequence as the main input for transduce
and the other as the sequence of interleaving elements. The sequence of interleaving elements lives inside the state of the interleave
transducer, taking the first item to interleave at each reducing step. The remaining elements have to be stored as state inside the transducer while waiting for the next call. Without further ado, here’s the interleave transducer:
(defn interleave-xform ; [coll] (fn [rf] (let [fillers (volatile! (seq coll))] ; (fn ([] (rf)) ([result] (rf result)) ([result input] (if-let [[filler] @fillers] ; (let [step (rf result input)] (if (reduced? step) ; step (do (vswap! fillers next) ; (rf step filler)))) (reduced result))))))) ;
interleave-xform
is modeled on the same semantic of theinterleave
function in the standard library: it interleaves elements up to the end of the shortest sequence.interleave-xform
contains all the required arities: no argument, single argument and two argument.interleave-xform
assumes the interleaving is coming from a collection passed while creating the transducer. We need to keep track of the remaining items in the sequence as we consume them, so the rest of are stored in avolatile!
instance.- During the reducing step, we verify to have at least one more element to interleave before allowing the reduction. Note the use of if-let and destructuring on the first element of the content of the
volatile!
instance. - We need to check whether another transducer along the chain has required the end of the reduction. In that case, we obey without propagating any further reducing step.
- If, instead, we are not at the end of the reduction and we have more elements to interleave, we can proceed to update our
volatile!
state and call the next transducer using the “filler” element coming from the internal state. Note that at this point, this is the second time we invoke “rf”; the first being for the normal reducing step, the second is an additional reducing step for the interleaving. - In case we don’t have any more items to interleave, we end the reduction using reduced. This prevents
nil
elements from appearing in the final output — exactly the same as normal interleave.
With interleave-xform
in place, we can express the egyptian multiplication method as follows:
(defn egypt-mult [x y] (transduce (comp (interleave-xform (iterate #(* % 2) y)) (partition-all 2) (take-while #(pos? (first %))) (filter #(odd? (first %))) (map second)) + (iterate #(quot % 2) x))) (egypt-mult 4 5) ;; 20
The second iteration of increasingly doubling numbers is now considered the interleaving sequence that we pass when creating the transducer. The other iteration with increasingly halved numbers is instead the normal input for transduce. The two sequences are interleaved together and partitioned into vectors as part of the transducing step. Apart from the interleaving part, the rest of the processing is a mechanical refactoring from a sequence operation into the related transducer version.
Laziness
There are four main ways to apply transducers in the standard library: transduce, sequence, eduction and into. Up until now we’ve seen one of the most popular, transduce
, which is designed to completely consume the input collection. Even when transduce
is used to output a new sequence, it doesn’t work lazily, as we can quickly verify by using a counter on the number of transformations happening on each item:
(def cnt (atom 0)) (take 10 (transduce (map #(do (swap! cnt inc) %)) conj () (range 1000))) ;; (999 998 997 996 995 994 993 992 991 990) @cnt ;; 1000
In the example above, you can see that transduce
completely consumes a lazy sequence, despite the fact that we want just the first 10 elements (note that conj on a list is pre-pending element at the beginning of the current list). The counter shows that the transducer has been called on all of the items, fully evaluating the range. into
uses transduce
underneath and has the same behaviour. If you are interested in applying a transducer chain lazily by gradually consuming the input, sequence
or eduction
will do that:
(def cnt1 (atom 0)) (let [res (eduction (map #(do (swap! cnt1 inc) %)) (range 1000))] (doall (take 10 res)) @cnt1) ;; 33 (def cnt2 (atom 0)) (let [res (sequence (map #(do (swap! cnt2 inc) %)) (range 1000))] (doall (take 10 res)) @cnt2) ;; 33
The type of laziness in eduction
and sequence
is called “chunked”. It means that they are lazy but not extremely lazy, by allowing some consumption of the input sequence ahead of the actual position in the iteration. New items are processed in chunks of 32 elements each: once the 32th element is reached, all others up to the 64th are processed, and so on. In our example, we consume 10 elements but process the whole first chunk of 32.
So, what’s the difference between eduction
and sequence
? eduction
allows a variable number of transducers to be passed as argument without comp
. More importantly, eduction
does not cache results, which means potentially running all transducers again if necessary. Here’s a way to demonstrate this behaviour:
(def cnt1 (atom 0)) (let [res (eduction (map #(do (swap! cnt1 inc) %)) (range 10))] (conj (rest res) (first res)) @cnt1) ;; 20 (def cnt2 (atom 0)) (let [res (sequence (map #(do (swap! cnt2 inc) %)) (range 10))] (conj (rest res) (first res)) ; (2) @cnt2) ;; 10
In our new examples, we are using both first
and rest
on the result of eduction
and sequence
, respectively. You can see that both first
and rest
forces eduction
to scan the input sequence, as demonstrated by the counter printing “20”, which is twice the amount of items in the input sequence. sequence
caches results internally, so it doesn’t execute transducers again. eduction
approach has benefits on memory allocation at the price of multiple evaluations. A rule of thumb to pick the right choice would be:
- Use
sequence
when you plan to use the produced output multiple times. For example, assigning it to a local binding and then proceed to further process it. Internal caching results in better performance overall. At the same time, it could consume more memory, as the entire sequence could load in memory if the last element is requested. - Use
eduction
when there is no plan to perform multiple scans of the output, saving on unnecessary caching. This is the case with transducer chains which depend on some user generated search parameters. In this scenario, the application needs to produce a one-off view of the system that is discarded as soon as the response is returned.
Parallelism
We have seen an elegant example of parallelism in transducers using core.async
pipelines. There is also another option to parallelise transducible operations. fold
is a function from the reducers namespace in the standard library. fold
offers parallelism based on the “divide and conquer” model: chunks of work are created and computation happens in parallel while, at the same time, finished tasks are combined back into the final result.
The following example shows ptransduce
, a function that works like a parallel transduce
. Since reducers are based on the same functional abstraction, we can leverage fold
without changing the transducers chain:
(require '[clojure.core.reducers :refer [fold]]) (def xform (comp (map inc) (filter odd?))) (defn ptransduce [xform rf combinef coll] (foldcombinef (xform rf) (into [] coll))) (ptransduce xform + + (range 1000000)) ;; 250000000000
Note that the input collection needs to be a vector (or a map) in order for reducers to work in parallel. It also needs to be bigger than a certain size (512 items by default) to enable parallelism. Apart from this, the xform
transducer chain is the same as before but we need to call it with (xform rf)
because fold
expects a reducing function, which is what xform returns (plus added transformations when invoked with a basic reducing function like +
).
Both pipelines and fold
parallelism work unless stateful transducers are involved. Observe the following:
(require '[clojure.core.async :refer [] :as a]) (require '[clojure.core.reducers :refer [fold]]) (def xform (comp (drop 100) (map inc) (filter odd?))) (transduce xform + (range 10000)) ;; 24997500 (let [out (a/chan (a/buffer 100))] (a/pipeline 4 out xform (a/to-chan (range 10000))) ( (a/reduce + 0 out))) ;; 0 (distinct (for [ (range 100)] (fold+ (xform +) (into [] (range 10000))))) ;; (24997500 24877997 24781126 24912310 ....
(drop 100)
is now part of the transducers chain. We can see the expected result of 24997500 with a normal transduce
; but both pipeline
and fold
are producing unexpected results. This is because there are multiple instances of the same stateful transducer.
In the case of pipeline
, one transducer chain is instantiated for each item (so drop
is always dropping the single element available).
In the case of fold
, there is one transducer chain each chunk, and the situation is additionally complicated by “work stealing”, a feature in which threads that are currently not doing any work can “steal” items from other threads. When that happens, the stateful transducer is not initialised again with the result that the same state is suddenly shared across threads. That’s why, to see the inconsistency, you need to run fold
multiple times.
The problem of parallelism with stateful transducer is both technical and semantic. The drop
operation, for instance, is well defined as sequential operation: the number of requested elements is skipped at the beginning of the collection. But this definition needs to be reformulated when parallel chunks are involved: should drop
remove the same number of elements each chunk? In that case sequential drop
would diverge from parallel drop
.
Since a solution that produces consistent results does not exist in the standard library, the problem of parallel stateful transducers remains unsolved and they should be avoided with fold
or pipeline
. If you’re interested, I provided a solution to this problem in the parallel library. The proposed solution is opinionated, but it provides a consistent model to execute stateful transducers in a parallel context.
Conclusions
As we’ve seen in the previous part of this article, transducers are, at their core, a simple but powerful functional abstraction. reduce
encapsulates a pattern for recursion that can be adapted to many situations and, at the same time, it promotes the design of code in terms of a standard reducing interface. Since transducers’ building blocks conform to the same contract, they are easy to compose and reuse. Transducers were introduced late in Clojure, perhaps explaining some struggle in their initial adoption.
There are still some rough edges and room for improvement in transducers as they are today. A few libraries started to emerge to provide transducers version of other functions (most notably, Christophe Grand’s xforms), hinting at the fact that more could be added to the standard library. Transducers are also amenable for parallel computation; but there is no solid semantic for parallel stateful transducers, so they can’t be used with fold
. This is somehow discouraging their parallel use as a whole.
On the positive side, transducers already cover a fair amount of common use cases and you should consider reaching them as the default for everyday programming.
Clojure The Essential Reference
Did you enjoy reading this article? You might find my book Clojure: The Essential Refence also interesting! The book has an entire chapter dedicated to all the functions related to both reducers and transducers. There you can find more examples and insights.
Resources
- The blog post by Rich Hickey introducing transducers.
- The official transducer reference.
- Christophe’s Grand extended transducer library.
- Timothy Baldrige’s screencast on transducers.