The concept of Reactive Programming is basically to react to events. This is nothing new and can be done with a normal event based architecture in C#. Reacting to a single event is not a problem, but whenever we need to include additional events and make them depend on each other, we start getting spaghetti code. That's why I started to use Reactive Extensions (Rx) in C#. Its biggest strength is its ability to combine and aggregate any events into simple declarative code.
Since the concept of Reactive Programming is just getting traction, I could find only basic examples which couldn't solve the issues I had in my production code when I started investigating it. However, since then I have learned much more about Reactive Programming and Rx, and I hope that other people can benefit from this experience.
Ok, enough babbling. I've promised you guys reactive programming in practice so let’s do it.
Reactive Programming in action
My C# projects consist of an MVVM architecture, a dependency injection framework, unit tests, support of cancellation for every async method, view-models bound to XAML and no code behind non testable UI elements.
I needed a way to nicely integrate Rx into such setups.
Let’s look at an example of some basic Rx code. If I would like to display a number every second on the console, the code would look something like this:
Observable.Interval(TimeSpan.FromSeconds(1))
.Subscribe(number => System.Diagnostics.Debug.WriteLine(number));
This code doesn't look so bad, but since my view-model is bound to XAML, I would like to display it using a property that is bound to TextBlock
. The resulting code doesn't look very clean:
Normal property
First we need to define a property inside the view-model:
public int Title
{
get { return _title; }
private set
{
if (_title != value)
{
_title = value;
OnPropertyChanged("Title");
}
}
}
private int _title;
Next we need to bind it to XAML:
<TextBlock Text={Binding Title}/>
And we need to update it inside the view-model's constructor using Rx:
Observable.Interval(TimeSpan.FromSeconds(1))
.Subscribe(number => Title = number);
Finally, we would need to switch to the UI thread somewhere before subscribing to the observable, because UI elements cannot be manipulated from other threads.
This is a lot of code for a really simple example. That's why you shouldn't be content with this way of doing reactive code.
Reactive property
Thankfully, I found a really nice solution by Mr Yoshifumi Kawai who introduced ReactiveProperty. This is a property that can be bound directly to XAML and also works perfectly with Rx code.
Here's an example of defining a ReactiveProperty inside a view-model:
public ReactiveProperty<int> Title {get; private set;}
Next we need to bind to it from XAML:
<TextBlock Text={Binding Title.Value}/>
And update it in the view-model's constructor:
Title = Observable.Interval(TimeSpan.FromSeconds(1))
.ToReactiveProperty();
Additionally, we don’t need to switch to the UI thread because it's done automatically inside the ReactiveProperty. The simplicity of changing an observable into a ReactiveProperty is great, but what’s even better is that a ReactiveProperty is also an observable. What this means is that you can now use properties as observables and, in turn, use them to create other reactive properties or observables.
ReactiveProperty was a milestone in my project, as it made it easier to use Rx in the view-models and services.
Reactive Extensions
As I mentioned before, the biggest strength in Reactive Programming is the ability to operate on the streams (observables). When I found a simple way to connect observables with properties, the next step was to get to know all the extensions that the Rx library supplies.
It was a challenge, because I had to change the way I think about programming. I used to operate on variables, but reactive programming is about operating on streams. It was a struggle, but an enjoyable one.
First I’ve learned about the most common and useful extensions:
CombineLatest()
- the most used method of combining multiple observable. Use it whenever you need to combine multiple observables and operate on the last values they emittedDistinctUntilChanged()
- a simple but very useful extension to reduce all the noiseThrottle()
- I usually use it with an observable that triggers often, and when I am interested in the last value that hasn’t changed for some timeFromEventPatters()
- very useful when changing those annoying C# events into nice and clean observablesSelect(...).Switch()
- one of the most powerful combinations of extensions that I'll give an example on in a second. I could write an entire article about it, so if you don’t know what it is, check it out!Do()
- really handy for logging and debugging issues, but I wouldn't be using it for changing state of the application as this could easily introduce side-effects, which is not the reactive wayFromAsync()
- used when you have some async method that returns aTask
and you would like to include it into your observable chain of executionToObservable()
- when used onIEnumberable
will allow you to handle lists in a reactive way
During my time with Rx I've learned also plenty of other really useful stuff that you may find interesting:
Cancellation:
I cannot emphasize enough how important it is to properly cancel an asynchronous task. Every time you don’t cancel, for example, a web request that the user is no longer interested in, it's a big waste of resources. Thankfully C# has a really nice way of canceling a long running operation by using CancellationToken
.
Every asynchronous method that I write looks something like this:
Task<string> GetDefinitionForTextAsync(string text, CancellationToken token)
This way a developer has the possibility to cancel an outgoing request that the user is no longer interested in. In normal C# programming this could be a bit messy with all those CancellationTokenSources
.
Thankfully when it comes to Rx things get really simple. Let’s imagine that we would like to download a definition for some text that the user has submitted and assign the result to a ReactiveProperty.
Let's imagine that the user is typing a word in a text field and wants to get a definition of that word.
First let’s define two properties: one for text, that is bound to the TextBox
where the user can input text, and the other one for displaying the definition, which is bound to a TextBlock
:
public ReactiveProperty<string> Text {get; private set;}
public ReactiveProperty<string> Definition {get; private set;}
Now I will skip the XAML as it looks in most cases the same, but here is how does the constructor in the view-model looks like:
Text = new ReactiveProperty<string>();
Definition = Text.Select( text => Observable.FromAsync(token => GetDefinitionForTextAsync(text, token))
.Switch()
.ToReactiveProperty();
And that's it!
Whenever the user enters a new letter, Select
creates a new observable from the Task
returned from GetDefinitionForTextAsync
and Switch()
will cancel the previous running operation.
To make it even nicer I would add Throttle()
so we wouldn't start a search after every single letter entered by the user.
CombineLatest:
Remember that although observables really look like properties, they are not: they are like events. This is even more important when using CombineLatest()
. If any of the given observables never push a value, CombineLatest()
won't trigger.
I usually like to add StartWith(...)
to observables so CombineLatest()
would work as expected.
Issues with Select().Switch()
:
As I mentioned before, the combination Select(...).Switch()
is extremely powerful, but it has also been the part where I’ve had plenty of confusing bugs.
Here is an example: let’s say we have a ClickCommand
which is triggered every time I press a button:
ClickCommand.Do(_ => Debug.WriteLine("Command"))
.Select(_ => Observable.Return<object>(null))
.Switch()
.Do(_ => Debug.WriteLine("Last"))
.Subscribe(_ => Debug.WriteLine("Subscription"));
On the first line I just log whenever the button is clicked, and then I create a Select().Switch()
combination that returns null
. After that I log "Last" whenever I receive the next element in the stream. Finally, I log "Subscription" whenever OnNext
triggers.
Whenever I press the button I get this console output:
Command
Last
Subscription
Which is expected behaviour.
Now let’s say we want to use the observable that we get from the Select().Switch()
chain and subscribe to it one additional time:
var clickObservable = ClickCommand.Do(_ => Debug.WriteLine("Command"))
.Select(_ => Observable.Return<object>(null))
.Switch();
clickObservable.Do(_ => Debug.WriteLine("Last"))
.Subscribe(_ => Debug.WriteLine("Subscription"));
clickObservable.Subscribe(_ => { });
The only difference is that I subscribe after Switch()
twice, so I would expect that I would have the same console output. However, that's not the case:
Command
Last
Subscription
Command
Everything before Switch()
is executed an additional time. Imagine that instead of logging "Command", you would make an HTTP request. Suddenly you are performing a completely unnecessary and pricey operation.
The solution is to every time use this combination: Select().Switch().Publish().RefCount()
. That's why I would recommend to add Publish().RefCount()
to every Select().Switch()
even if you are not subscribing multiple times.
Exceptions:
When creating chains of observables, usually I didn't think about things that could go wrong. I quickly learned better. Reactive code is still code where something could go wrong, so don’t forget to handle the exceptions (the best way to do it is by using Catch()
).
When in doubt, use Do()
:
Do()
is a really good tool for logging any observable that doesn’t behave the way you intended.
Memory leaks:
Don’t forget to clean up after you subscribe to any observable (every Subscribe
returns IDisposable
). Also, ReactiveProperties create a subscription internally, so don’t forget to dispose them.
Don’t stress:
When learning how to write reactive code, you don’t need to write everything in a reactive way. I don’t see a problem in having normal code together with reactive. If you try to write reactive code that you don't feel comfortable with, it’s better to write it as you used to and come back to it when you have more experience.
Use Subject<T>:
Whenever you need to change normal code into observable, use Subject<>
. I would not advise using Observable.Create()
as Subject
is much easier to use and good enough 99% of time.
Do your own extensions:
Don’t be afraid to create your own extensions. It’s really simple to write for example a WhereIsNotNull(),
which should be self-explanatory.
Testing:
Testing time/timer based code was a really big pain for me. So how can you test an observable that should trigger every hour? With ease: just include into your test project the “rx-testing” nuget package (from the same guys that develop Rx) and use the TestScheduler
. It is a virtual time scheduler and makes you a Time Lord.
Be mindful how you subscribe:
When subscribing, always attach the IDisposable
object to a member variable. If you don't, when the method leaves the scope, the subscription will be disposed and the observable most likely will never be triggered.
If you’re interested in checking out some reactive C# code, I have a little pet project where I'm testing out all the reactive/architectural approaches that I am keen on. Additionally I have a lightweight ReactiveProperties library that I use in that project, so you will need it as well.
I really enjoyed my journey with Reactive Programming and hope it gains traction in the development world.
If you haven't got the Reactive bug yet, try it out!
- Tomek PolańskiSoftware Developer