Getting started with system.reactive
Remarks#
This section provides an overview of what system.reactive is, and why a developer might want to use it.
It should also mention any large subjects within system.reactive, and link out to the related topics. Since the Documentation for system.reactive is new, you may need to create initial versions of those related topics.
Using Rx in your project
Install the NuGet package System.Reactive
, then add this using statement to access the Rx extension methods:
using System.Reactive.Linq;
Filtering the values of an observable
Selecting a new value for each value in an observable
Subscribing/unsubscribing to an observable (IDisposable)
Subscribing to an observable (CancellationToken)
Wrapping an async method as an observable
Sharing a single subscription (Publish)
Sharing a single subscription (Publish + RefCount)
Installation or Setup
Reactive Extensions are published on both NuGet and MyGet.
Installing and using them is therefore the same as any other NuGet package:
Install-Package System.Reactive
NB package names changed between v2 and v3. See the README on Github for more info
Breaking changes
The NuGet packages have changed their package naming in the move from v2.x.x to >v3.0.0
Rx-Main is now System.Reactive Rx-Core is now System.Reactive.Core Rx-Interfaces is now System.Reactive.Interfaces Rx-Linq is now System.Reactive.Linq Rx-PlatformServices is now System.Reactive.PlatformServices Rx-Testing is now Microsoft.Reactive.Testing
Throttling a stream
Ignoring repeated values
Get a running aggregation
Suppose you have a hot observable for which you would love to keep the count of. It could be the IObservable<StockTick>
and you want to keep count of the average trade volume. You can use Scan
for that.
var tradeVolume = stockTicks.Select(e => e.Price)
.Scan(0.0m, (aggregated, newtick) => aggregated + newtick)
.Select((aggregated, index) => aggregated / (index + 1))
Now you can simply subscribe to your trade volume which is live updated upon receipt of every new Tick.
var subscription = tradeVolume.Subscribe(vol => Console.WriteLine("New trade volume is {0}", vol);