Skip to content
Carmel Eve By Carmel Eve Software Engineer I
Rx operators deep dive Part 3: Re-grouping our thoughts

After a brief foray into Azure AD, we're back onto Rx!

(If you missed part 1 and 2 then might be worth having a quick read – going to gloss over some of the stuff common to both)

OnNext(The GroupBy operator)

This week we're looking at the GroupBy operator. This one's a bit more involved, so saddle up!

So we start out with our usual extension method:

static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(
  this IObservable<TSource> source, Func<TSource, TKey> keySelector)
{
    return new GroupByOperator<TSource, TKey>(source, keySelector);
}

However, some of the types here need a bit of unpicking.

So, if you have an IObservable<TSource>, when you call the GroupBy operator on it, you will be returned an IObservable<IGroupedObservable<TKey, TSource>>. The IGroupedObservable interface looks like this:

public interface IGroupedObservable<out TKey, out TSource> : IObservable<TSource>
{
    // Gets the common key.
    TKey Key { get; }
}

TKey is the type of value that you want to group the source objects on. So say you had an IObservable of people, and you wanted to group them on age.

The TSource here would be Person and the TKey would be int.

The IGroupedObservable would just be an Observable of people, but with the added property of being able to retrieve the key value for that group.

So, when you used the group by operator, you would receive an IObservable in which each item you are passed is an IObservable of people with a specific age.

For simplicity (and to avoid building a whole other class) lets just group our SourceNumbers (which is just the numbers from one to 25) by the number % 3. In order to get the groups we do:

IObservable<int> sourceNumbers = System.Reactive.Linq.Observable.Range(1, 25);

sourceNumbers.GroupBy(number => number % 3).Subscribe(group =>
{
    Console.WriteLine($"New group:{group.Key}");
});

Here, we use the GroupBy operator on the source numbers. We subscribe to the output with an anonymous observer.

Each time the observer is passed a new group it will write out to the console. However, remember, each group is also an observable. So in order to get each new number which is passed out of that group, we have to also subscribe to the group itself:

sourceNumbers.GroupBy(number => number % 3).Subscribe(group =>
{
    Console.WriteLine($"New group:{group.Key}");
    group.Subscribe(value => Console.WriteLine($"New value in group {group.Key}: {value}"));
});

So, each time a new group is raised, this will be passed to the first observer's OnNext method.

This will write out the new group to the console and subscribe a second observer to the group.

Every time a new value is raised by the group, this value will be passed to the second observer's OnNext method, which will write out that value to the console.

The output of the program will be:

Output showing interweaved group outputs.

Note: This only works because we instantly subscribe to each new group when it is created. This is a hot observable so if we subscribed sometime later, these values would be lost.

How it works

Right, so that's the easy bit 😉

Hopefully we're all fairly familiar with the pattern by now – we have a GroupByOperator which has an internal wrapping Observer which is subscribed to the underlying source and passes values onto whatever observers subscribe.

internal class GroupByOperator<TSource, TKey> : IObservable<IGroupedObservable<TKey, TSource>>
{
    private IObservable<TSource> source;
    private Func<TSource, TKey> keySelector;

    public GroupByOperator(IObservable<TSource> source, Func<TSource, TKey> keySelector)
    {
        this.source = source;
        this.keySelector = keySelector;
    }

    public IDisposable Subscribe(IObserver<IGroupedObservable<TKey, TSource>> observer)
    {
        var grouper = new Grouper(observer, this.keySelector);
        return source.Subscribe(grouper);
    }

Here, the KeySelector is the function which is used to group the source values.

The internal Grouper then looks like this:

private class Grouper : IObserver<TSource>
{
    private readonly IObserver<IGroupedObservable<TKey, TSource>> observer;
    private readonly Func<TSource, TKey> keySelector;
    private readonly Dictionary<TKey, GroupedObservable> groupedObservables;

    internal Grouper(IObserver<IGroupedObservable<TKey, TSource>> observer, Func<TSource, TKey> keySelector)
    {
        this.observer = observer;
        this.keySelector = keySelector;
        this.groupedObservables = new Dictionary<TKey, GroupedObservable>();
    }
    
    public void OnNext(TSource value)
    {
        var key = this.keySelector(value);

        if (!this.groupedObservables.TryGetValue(key, out var groupedObservable))
        {
            groupedObservable = new GroupedObservable(key);
            this.groupedObservables.Add(key, groupedObservable);
            this.observer.OnNext(groupedObservable);
        }

        groupedObservable.Subject.OnNext(value);
    }

This has an internal dictionary of grouped observables. When a new value is passed to the Grouper, it passes it through the key selector function to find the key.

The dictionary is then checked to see if the key already exists. If it doesn't then a new group is created and passed to the subscribed observer. The GroupedObservable class is as follows:

private class GroupedObservable : IGroupedObservable<TKey, TSource>
{
    internal GroupedObservable(TKey key)
    {
        this.Key = key;
        this.Subject = new Subject<TSource>();
    }

    public TKey Key { get; }

    public ISubject<TSource> Subject { get; }

    public IDisposable Subscribe(IObserver<TSource> observer)
    {
        return this.Subject.Subscribe(observer);
    }
}

This has an internal subject, and when new observers subscribe to the group they are subscribed to the subject. This handles all of the subscriptions for us (see my previous post for details).

So, once the group has been created or retrieved from the dictionary, the source value is then passed to the OnNext method of that group's subject, which then passes the value to any observers which are subscribed to that group.

In summary, when a source value is raised it is passed through the key selector.

If a group doesn't already exist for that key then a new one is created and this new group is passed to the observer which has subscribed to the GroupBy operator.

The new source value for that group is then passed to the OnNext method of internal Subject of the group, which passes the value onto any observers which are subscribed to the group.

Finally, we have the OnCompleted and OnError methods for the Grouper:

private class Grouper : IObserver<TSource>
{
    public void OnCompleted()
    {
        CompleteGroupedObservables();
        this.observer.OnCompleted();
    }

    public void OnError(Exception error)
    {
        CompleteGroupedObservables();
        this.observer.OnError(error);
    }

    private void CompleteGroupedObservables()
    {
        foreach (var groupedObservable in this.groupedObservables)
        {
            groupedObservable.Value.Subject.OnCompleted();
        }
    }

When OnCompleted is called on the Grouper by the original source, because it has finished producing values, it means that no more values will be added to any of the groups to each group must also call OnCompleted in any listening observers.

The same is true for OnError. Here the error is not propagated to each individual group so that the error will only be raised once, the individual groups are just completed.

One all of the groups have completed, OnCompleted/OnError is called on the observer subscribed to the overall GroupBy observable.

Now I realise this has been slightly involved, so if you want to take a look at the overall code, then you're in luck. I've finally sorted out the source code for these blog posts and uploaded it here.

Otherwise, until the next OnNext call!

Carmel Eve

Software Engineer I

Carmel Eve

Carmel is a software engineer, LinkedIn Learning instructor and STEM ambassador.

Over the past four years she has been focused on delivering cloud-first solutions to a variety of problems. These have ranged from highly-performant serverless architectures, to web applications, to reporting and insight pipelines and data analytics engines.

In her time at endjin, she has written many blog posts covering a huge range of topics, including deconstructing Rx operators and mental well-being and managing remote working.

Carmel's first LinkedIn Learning course on how to prepare for the Az-204 exam - developing solutions for Microsoft Azure - was released in April 2021. Over the last couple of years she has also spoken at NDC, APISpecs and SQLBits. These talks covered a range of topics, from reactive big-data processing to secure Azure architectures.

She is also passionate about diversity and inclusivity in tech. She is a STEM ambassador in her local community and is taking part in a local mentorship scheme. Through this work she hopes to be a part of positive change in the industry.

Carmel won "Apprentice Engineer of the Year" at the Computing Rising Star Awards 2019.

Carmel worked at endjin from 2016 to 2021.