Skip to content
Carmel Eve By Carmel Eve Software Engineer I
Rx operators deep dive part 2: Slowly aggregating knowledge

There's been a little bit of a gap since my last Rx blog, I've been pretty busy with keeping up with Advent of Code in any spare time (and I'm sure there will be a blog along those lines at some point in the near future).

But, for now, it's time for a deep dive into a few more of the operators in the world of Rx!

Programming C# 10 Book, by Ian Griffiths, published by O'Reilly Media, is now available to buy.

First up, the Select operator!

OnNext(Understanding of how the Select operator works)

If you've read my last blog, you'll soon see that building your own Select operator is fairly similar to building a Where operator. You set up an extension method on IObservable:

static IObservable<TOutput> Select<TSource, TOutput>(this IObservable<TSource> source, Func<TSource, TOutput> selection)
{
    return new SelectOperator<TSource, TOutput>(source, selection);
}

This time however, instead of a predicate you pass in a Func<TSource, TOutput>. TSource will be the type which the underlying source produces, and TOutput will be the type of the values you want to select.

As before, when an observer subscribes, an internal observer (Selector) is then constructed using the subscribing observer and the selection function. This Selector is then subscribed to the observable.

public class SelectOperator<TSource, TOutput> : IObservable<TOutput>
{
    private IObservable<TSource> source;
    private Func<TSource, TOutput> selection;

    public SelectOperator(IObservable<TSource> source, Func<TSource, TOutput> selection)
    {
        this.source = source;
        this.selection = selection;
    }

    public IDisposable Subscribe(IObserver<TOutput> observer)
    {
        var selector = new Selector(observer, selection);
        return this.source.Subscribe(selector);
    }
    
    ...

When the underlying source passes a value to the OnNext method of the Selector, the value is then passed through the selection function.

The return value of this function is then passed to the original observer.

The Selector is an observer of TSource, as it will accept values of the type TSource from the underlying source.

The subscribing observer takes values of type TOutput, which is the type which is returned by the selection function.

private class Selector : IObserver<TSource>
{
    private IObserver<TOutput> observer;
    private Func<TSource, TOutput> selection;

    public Selector(IObserver<TOutput> observer, Func<TSource, TOutput> selection)
    {
        this.observer = observer;
        this.selection = selection;
    }

    public void OnNext(TSource value)
    {
        TOutput selectedValue = this.selection(value);
        this.observer.OnNext(selectedValue);
    }
...

In this way, the observer is passed the selected value rather than the one originally offered up by the observable.

The Introduction to Rx.NET 2nd Edition (2024) Book, by Ian Griffiths & Lee Campbell, is now available to download for FREE.

You can then pass in a transform which will operate on each of the produced values and subscribe to the observable produced by the select operator:

IObservable<int> sourceNumbers = System.Reactive.Linq.Observable.Range(1, 10);
IObservable<string> formattedOutputs = sourceNumbers.Select(n => $"OUTPUT: {n}");

formattedOutputs.Subscribe(n => Console.WriteLine(n));

The output of the above code will be as follows:

OUTPUT: 1
OUTPUT: 2
OUTPUT: 3
OUTPUT: 4
OUTPUT: 5
OUTPUT: 6
OUTPUT: 7
OUTPUT: 8
OUTPUT: 9
OUTPUT: 10

Each number in the range 1 to 10 is passed through the selection function and transformed into a string.

These strings are then offered up to our listening anonymous observer and printed out to the console.

In the last blog, I talked about using a multicast observable as to not produce values multiple times for multiple subscribers.

You can also do this here, but for simplicity I've just used the one subscriber.

Now onto something a little more interesting…

OnNext(The aggregate operator)

The aggregate operator is the underlying mechanism for all of the LINQ operators that take many inputs, and produce one (e.g. Count, Sum, Max etc. etc.).

In the world of Rx these operators still produce one value, but that value is in the form of an IObservable which offers up that value. This is to keep everything in our new stream-based world in sync.

Sum

Let's try implementing our own Sum, and then generalise that for any aggregation. Keep in mind, aggregate operators won't return until all of the elements have been produced, so they have to wait until the source has called OnComplete!

So with Sum, the overall setup is pretty similar to the Where/Select operators. A "filtering" observer is places in between the source and the subscribing observer:

internal class SumOperator<TSource> : IObservable<TSource>
{
    private IObservable<TSource> source;

    public SumOperator(IObservable<TSource> source)
    {
        this.source = source;
    }

    public IDisposable Subscribe(IObserver<TSource> observer)
    {
        var summer = new Summer<TSource>(observer);
        return source.Subscribe(summer);
    }
    
    ...

The Summer looks something like this:

private class Summer<TSource> : IObserver<TSource>
{
    private IObserver<TSource> observer;
    private TSource sum;

    public Summer(IObserver<TSource> observer)
    {
        this.observer = observer;
    }

    public void OnCompleted()
    {
        this.observer.OnNext(sum);
        this.observer.OnCompleted();
    }

    public void OnError(Exception error)
    {
        this.observer.OnError(error);
    }

    public void OnNext(TSource value)
    {
        sum += (dynamic)value;
    }
}

With each OnNext call, the value is added to the running total. Here dynamic is used as we need to be able to add values of an unknown type, if the type didn't implement the + operator an exception would be thrown at runtime.

When OnComplete is called by the source, we then know that there are not going to be any more items passed along and the Summer can pass the sum on to the listening observer.

It then calls OnComplete on the subscribed observer, as it has produced the sum and won't be producing anything further.

IObservable<int> sourceNumbers = System.Reactive.Linq.Observable.Range(1, 10);
IObservable<int> sum = sourceNumbers.Sum();

sum.Subscribe(n => Console.WriteLine(n));

The output for in this case would be 55.

Aggregate

The aggregate operator is very similar, but instead of a running total it stores a list of all the values which have been passed to it. When OnComplete is called, this list is run through the provided aggregator function which produces a single value.

private class Aggregator : IObserver<TSource>
{
    private IObserver<TOutput> observer;
    private Func<IList<TSource>, TOutput> aggregation;
    IList<TSource> values = new List<TSource>();

    public Aggregator(IObserver<TOutput> observer, Func<IList<TSource>, TOutput> aggregation)
    {
        this.observer = observer;
        this.aggregation = aggregation;
    }

    public void OnCompleted()
    {
        observer.OnNext(aggregation(values));
        observer.OnCompleted();
    }

    public void OnError(Exception error)
    {
        this.observer.OnError(error);
    }

    public void OnNext(TSource value)
    {
        values.Add(value);
    }
}

In this example, if we use an aggregator function which just concatenates strings:

static string CombineStrings(IList<string> values)
{
    string listedStrings = "";

    foreach (var str in values)
    {
        listedStrings += str + ",";
    }

    return listedStrings;
}

And then we can use this aggregator on the formatted output from the select operator.

IObservable<string> combinedOutputs = sourceNumbers.Select(n => $"OUTPUT: {n}").Aggregate(CombineStrings);
combinedOutputs.Subscribe(n => Console.WriteLine(n));

Here we can see that operators can be chained together, as is possible with LINQ to objects. When this program runs, you subscribe to the aggregate operator, which encapsulates your observer in an Aggregator which will only pass it values once OnComplete is called.

This Aggregator is then encapsulated in a Selector which will only pass the formatted values on to the Aggregator.

So, when the numbers from range are passed to the Select operator, that offers up the formatted strings to the Aggregator, which then offers up the single aggregated string to the observer. The output will be as follows:

OUTPUT: 1,OUTPUT: 2,OUTPUT: 3,OUTPUT: 4,OUTPUT: 5,OUTPUT: 6,OUTPUT: 7,OUTPUT: 8,OUTPUT: 9,OUTPUT: 10,

And there we have it, a quick look under the hoods of the Select and Aggregate operators, watch out for the next OnNext() where I'll tackle another of the many operators still to explore!

Carmel Eve

Software Engineer I

Carmel Eve

Carmel is a software engineer, LinkedIn Learning instructor and STEM ambassador.

Over the past four years she has been focused on delivering cloud-first solutions to a variety of problems. These have ranged from highly-performant serverless architectures, to web applications, to reporting and insight pipelines and data analytics engines.

In her time at endjin, she has written many blog posts covering a huge range of topics, including deconstructing Rx operators and mental well-being and managing remote working.

Carmel's first LinkedIn Learning course on how to prepare for the Az-204 exam - developing solutions for Microsoft Azure - was released in April 2021. Over the last couple of years she has also spoken at NDC, APISpecs and SQLBits. These talks covered a range of topics, from reactive big-data processing to secure Azure architectures.

She is also passionate about diversity and inclusivity in tech. She is a STEM ambassador in her local community and is taking part in a local mentorship scheme. Through this work she hopes to be a part of positive change in the industry.

Carmel won "Apprentice Engineer of the Year" at the Computing Rising Star Awards 2019.

Carmel worked at endjin from 2016 to 2021.