Endjin - Home

Rx operators deep dive part 2: Slowly aggregating knowledge

by Carmel Eve

There’s been a little bit of a gap since my last Rx blog, I’ve been pretty busy with keeping up with Advent of Code in any spare time (and I’m sure there will be a blog along those lines at some point in the near future).

But, for now, it’s time for a deep dive into a few more of the operators in the world of Rx!

First up, the Select operator!

OnNext(Understanding of how the Select operator works)

If you’ve read my last blog, you’ll soon see that building your own Select operator is fairly similar to building a Where operator. You set up an extension method on IObservable:

This time however, instead of a predicate you pass in a Func<TSource, TOutput>. TSource will be the type which the underlying source produces, and TOutput will be the type of the values you want to select.

As before, when an observer subscribes, an internal observer (Selector) is then constructed using the subscribing observer and the selection function. This Selector is then subscribed to the observable.

When the underlying source passes a value to the OnNext method of the Selector, the value is then passed through the selection function. The return value of this function is then passed to the original observer.  The Selector is an observer of TSource, as it will accept values of the type TSource from the underlying source. The subscribing observer takes values of type TOutput, which is the type which is returned by the selection function.

In this way, the observer is passed the selected value rather than the one originally offered up by the observable.

You can then pass in a transform which will operate on each of the produced values and subscribe to the observable produced by the select operator:

The output of the above code will be as follows:

OUTPUT: 1
OUTPUT: 2
OUTPUT: 3
OUTPUT: 4
OUTPUT: 5
OUTPUT: 6
OUTPUT: 7
OUTPUT: 8
OUTPUT: 9
OUTPUT: 10

Each number in the range 1 to 10 is passed through the selection function and transformed into a string. These strings are then offered up to our listening anonymous observer and printed out to the console.

In the last blog, I talked about using a multicast observable as to not produce values multiple times for multiple subscribers. You can also do this here, but for simplicity I’ve just used the one subscriber.

Now onto something a little more interesting…

OnNext(The aggregate operator)

The aggregate operator is the underlying mechanism for all of the LINQ operators that take many inputs, and produce one (e.g. Count, Sum, Max etc. etc.). In the world of Rx these operators still produce one value, but that value is in the form of an IObservable which offers up that value. This is to keep everything in our new stream-based world in sync.

Sum

Let’s try implementing our own Sum, and then generalise that for any aggregation. Keep in mind, aggregate operators won’t return until all of the elements have been produced, so they have to wait until the source has called OnComplete!

So with Sum, the overall setup is pretty similar to the Where/Select operators. A “filtering” observer is places in between the source and the subscribing observer:

The Summer looks something like this:

With each OnNext call, the value is added to the running total. Here dynamic is used as we need to be able to add values of an unknown type, if the type didn’t implement the + operator an exception would be thrown at runtime. When OnComplete is called by the source, we then know that there are not going to be any more items passed along and the Summer can pass the sum on to the listening observer. It then calls OnComplete on the subscribed observer, as it has produced the sum and won’t be producing anything further.

The output for in this case would be 55.

Aggregate

The aggregate operator is very similar, but instead of a running total it stores a list of all the values which have been passed to it. When OnComplete is called, this list is run through the provided aggregator function which produces a single value.

In this example, if we use an aggregator function which just concatenates strings:

And then we can use this aggregator on the formatted output from the select operator.

Here we can see that operators can be chained together, as is possible with LINQ to objects. When this program runs, you subscribe to the aggregate operator, which encapsulates your observer in an Aggregator which will only pass it values once OnComplete is called. This Aggregator is then encapsulated in a Selector which will only pass the formatted values on to the Aggregator. So, when the numbers from range are passed to the Select operator, that offers up the formatted strings to the Aggregator, which then offers up the single aggregated string to the observer. The output will be as follows:

OUTPUT: 1,OUTPUT: 2,OUTPUT: 3,OUTPUT: 4,OUTPUT: 5,OUTPUT: 6,OUTPUT: 7,OUTPUT: 8,OUTPUT: 9,OUTPUT: 10,

And there we have it, a quick look under the hoods of the Select and Aggregate operators, watch out for the next OnNext() where I’ll tackle another of the many operators still to explore!

About the author

Carmel is a 2nd year apprentice software engineer focusing on Azure based solutions for data handling. She has a masters degree in physics from the University of Manchester which has given her a keen interest in problem solving in new and imaginative ways. You can follow Carmel on Twitter.