Endjin - Home

Rx operators deep dive Part 3: Re-grouping our thoughts

by Carmel Eve

After a brief foray into Azure AD, we’re back onto Rx!

(If you missed part 1 and 2 then might be worth having a quick read – going to gloss over some of the stuff common to both)

OnNext(The GroupBy operator)

This week we’re looking at the GroupBy operator. This one’s a bit more involved, so saddle up!

So we start out with our usual extension method:

However, some of the types here need a bit of unpicking.

So, if you have an IObservable<TSource>, when you call the GroupBy operator on it, you will be returned an IObservable<IGroupedObservable<TKey, TSource>>. The IGroupedObservable interface looks like this:

TKey is the type of value that you want to group the source objects on. So say you had an IObservable of people, and you wanted to group them on age. The TSource here would be Person and the TKey would be int. The IGroupedObservable would just be an Observable of people, but with the added property of being able to retrieve the key value for that group. So, when you used the group by operator, you would receive an IObservable in which each item you are passed is an IObservable of people with a specific age.

For simplicity (and to avoid building a whole other class) lets just group our SourceNumbers (which is just the numbers from one to 25) by the number % 3. In order to get the groups we do:

Here, we use the GroupBy operator on the source numbers. We subscribe to the output with an anonymous observer. Each time the observer is passed a new group it will write out to the console. However, remember, each group is also an observable. So in order to get each new number which is passed out of that group, we have to also subscribe to the group itself:

So, each time a new group is raised, this will be passed to the first observer’s OnNext method. This will write out the new group to the console and subscribe a second observer to the group. Every time a new value is raised by the group, this value will be passed to the second observer’s OnNext method, which will write out that value to the console.

The output of the program will be:

Note: This only works because we instantly subscribe to each new group when it is created. This is a hot observable so if we subscribed sometime later, these values would be lost.

How it works

Right, so that’s the easy bit 😉

Hopefully we’re all fairly familiar with the pattern by now – we have a GroupByOperator which has an internal wrapping Observer which is subscribed to the underlying source and passes values onto whatever observers subscribe.

Here, the KeySelector is the function which is used to group the source values.

The internal Grouper then looks like this:

This has an internal dictionary of grouped observables. When a new value is passed to the Grouper, it passes it through the key selector function to find the key. The dictionary is then checked to see if the key already exists. If it doesn’t then a new group is created and passed to the subscribed observer. The GroupedObservable class is as follows:

This has an internal subject, and when new observers subscribe to the group they are subscribed to the subject. This handles all of the subscriptions for us (see my previous post for details).

So, once the group has been created or retrieved from the dictionary, the source value is then passed to the OnNext method of that group’s subject, which then passes the value to any observers which are subscribed to that group.

In summary, when a source value is raised it is passed through the key selector. If a group doesn’t already exist for that key then a new one is created and this new group is passed to the observer which has subscribed to the GroupBy operator. The new source value for that group is then passed to the OnNext method of internal Subject of the group, which passes the value onto any observers which are subscribed to the group.

Finally, we have the OnCompleted and OnError methods for the Grouper:

When OnCompleted is called on the Grouper by the original source, because it has finished producing values, it means that no more values will be added to any of the groups to each group must also call OnCompleted in any listening observers. The same is true for OnError. Here the error is not propagated to each individual group so that the error will only be raised once, the individual groups are just completed. One all of the groups have completed, OnCompleted/OnError is called on the observer subscribed to the overall GroupBy observable.

Now I realise this has been slightly involved, so if you want to take a look at the overall code, then you’re in luck. I’ve finally sorted out the source code for these blog posts and uploaded it here.

Otherwise, until the next OnNext call!

About the author

Carmel is a 2nd year apprentice software engineer focusing on Azure based solutions for data handling. She has a masters degree in physics from the University of Manchester which has given her a keen interest in problem solving in new and imaginative ways. You can follow Carmel on Twitter.