system.reactive

Getting started with system.reactive

Remarks#

This section provides an overview of what system.reactive is, and why a developer might want to use it.

It should also mention any large subjects within system.reactive, and link out to the related topics. Since the Documentation for system.reactive is new, you may need to create initial versions of those related topics.

Using Rx in your project

Install the NuGet package System.Reactive, then add this using statement to access the Rx extension methods:

using System.Reactive.Linq;

Filtering the values of an observable

Selecting a new value for each value in an observable

Subscribing/unsubscribing to an observable (IDisposable)

Subscribing to an observable (CancellationToken)

Wrapping an async method as an observable

Sharing a single subscription (Publish)

Sharing a single subscription (Publish + RefCount)

Installation or Setup

Reactive Extensions are published on both NuGet and MyGet.

Installing and using them is therefore the same as any other NuGet package:

 Install-Package System.Reactive

NB package names changed between v2 and v3. See the README on Github for more info

Breaking changes

The NuGet packages have changed their package naming in the move from v2.x.x to >v3.0.0

Rx-Main is now System.Reactive Rx-Core is now System.Reactive.Core Rx-Interfaces is now System.Reactive.Interfaces Rx-Linq is now System.Reactive.Linq Rx-PlatformServices is now System.Reactive.PlatformServices Rx-Testing is now Microsoft.Reactive.Testing

Throttling a stream

Ignoring repeated values

Get a running aggregation

Suppose you have a hot observable for which you would love to keep the count of. It could be the IObservable<StockTick> and you want to keep count of the average trade volume. You can use Scan for that.

var tradeVolume = stockTicks.Select(e => e.Price)
    .Scan(0.0m, (aggregated, newtick) => aggregated + newtick)
    .Select((aggregated, index) => aggregated / (index + 1))

Now you can simply subscribe to your trade volume which is live updated upon receipt of every new Tick.

var subscription = tradeVolume.Subscribe(vol => Console.WriteLine("New trade volume is {0}", vol);

This modified text is an extract of the original Stack Overflow Documentation created by the contributors and released under CC BY-SA 3.0 This website is not affiliated with Stack Overflow