Saturday, May 29, 2010

RX part 4 – Flow control

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

In the previous post we looked at the lifecycle of a subscription to a stream. A subscription can be terminated by unsubscribing/disposing and the stream (IObservable<T>) can terminate due to it completing naturally via an OnCompleted or erroneously with an OnError. OnError creates an interesting problem; it publishes the Exception, it does not throw it. This means that you can not use the standard Structured Exception Handling process of try/catch with Observables. Well that is not entirely true. In this example the stream raises an OnError that we can catch with a try/catch block thanks to the extension method overload of subscribe I use. It will take an OnError and then throw the Exception.
try
{
    var subject = new Subject<int>();
    subject.Subscribe(Console.WriteLine);
    //Or explicitly rethrow as per this line.
    //subject.Subscribe(Console.WriteLine, ex => { throw ex.PrepareForRethrow(); });
    subject.OnNext(1);
    subject.OnError(new IOException("test exception"));
}
catch (IOException ioEx)
{
    Console.WriteLine("Caught IO exception: {0}", ioEx.Message);
}
Throwing published OnError exceptions does not allow for a very composable style of coding. It may be useful in cases like the example above, but the power of Rx as you will see in future posts is the ability to create compositions of streams together. Once we are running compositions of streams then SEH is not very helpful or useful. Rx provides several methods to provide a composition friendly way to handle errors and exceptions.

Visualising streams

Before I cover the flow control methods that Rx offers I want to divert quickly and talk about a visual tool we will use to help communicate the concepts relating to streams. Marble diagrams are great for communicating Rx streams and you may find them useful for describing any stream, except for the completely basic. When using marble diagrams to communicate Rx streams there are only a few things you need to know
  1. a stream is represented by a horizontal line
  2. time moves to the right (ie things on the left happened before things on the right)
  3. we only need 3 symbols to represent an Event
    1. “0” for OnNext
    2. “X” for an OnError
    3. “|” for OnCompleted
This is a sample of a stream that publishes 3 values and then completes
--0--0--0—|
This is a sample of a stream that publishes 4 values then errors.
--0--0--0--0--X
While these examples may seem too simple to warrant a visual representation, the simplicity of marble diagrams are great once we using multiple streams.

Flow control constructs

Sometimes when dealing with an observable, it is conceivable that errors may occur that are acceptable and we should try again. Imagine that we want this effect where the error in stream 1 (S1) is acceptable, we try again on stream 2 (S2). The last line is composition of the two streams that is the result we want to expose (R)
S1--0--0--X
S2-----------0--0--0--0
R --0--0-----0--0--0--0
In the example above we could recreate this with several methods.
Retry is the most simple method available to us. Retry will try to re-subscribe to the IObservable<T> on any failure. In this example we just use the simple overload that will always retry on any exception.
public static void RetrySample<T>(IObservable<T> stream)
{
    stream.Retry().Subscribe(t=>Console.WriteLine(t)); //Will always retry the stream
    Console.ReadKey();
}
/*
Given stream that will produce 0,1,2 then error; the output would be
0
1
2
0
1
2
0
1
2
.....
*/
which would look like this as a marble diagram
S--0--0--0--x--0--0--0--x--0--0--0--
R--0--0--0-----0--0--0-----0--0--0--
Alternatively we can specify the max number of times to retry. In this example we only retry once, therefore the error that gets published on the second subscription will be passed up to the final subscription. Note that to retry once you pass a value of 2. Maybe the method should be called Try?
public static void RetryOnceSample<T>(IObservable<T> stream)
{
    stream.Retry(2)
        .Subscribe(t=>Console.WriteLine(t), 
   ex=>Console.WriteLine("Gave up on 2nd Error")); 
    Console.ReadKey();
}
/*Ouput:
0
1
2
0
1
2
Gave up on 2nd Error
*/
As a marble diagram this would look like
S--0--0--0--x--0--0--0--x
R--0--0--0-----0--0--0--x
OnErrorResumeNext may cause some old VB developers to shudder but it offers a different route to use than Retry. While retry will always try to re-subscribe to the same stream; OnErrorResumeNext takes another IObservable<T> as a parameter to use when the original stream publishes and error. In this example when the stream1 publishes and error we re-subscribe to stream2.
public static void FailoverSample<T>(IObservable<T> stream, IObservable<T> failover)
{
    stream
        .OnErrorResumeNext(failover)
        .Subscribe(t=>Console.WriteLine(t), ex => Console.WriteLine(ex));
}
/*
stream  --"1"--"2"--"3"--X
failover--------------------"a"--"b"--"c"--X
result  --"1"--"2"--"3"-----"a"--"b"--"c"--|
*/
An important thing to note here is that when the second stream publishes an error the result stream just completes and ignores the error.
Catch is probably the most useful method to use as the previous 2 methods will react the same regardless of the type of exception published. Catch however allows you to specify which exceptions it can catch just like a normal catch block. This example is similar to the last example, but we will explicitly state the exception we should catch that would allow us to failover to the next stream.
public static void CatchSample<T>(IObservable<T> stream, IObservable<T> failover)
{
    stream
        .Catch((InvalidOperationException ex) => failover)
        .Subscribe(t => Console.WriteLine(t), 
                   ex => Console.WriteLine(ex), 
                   () => Console.WriteLine("Completed"));
}
Catch has other overloads that allow you to pass a params array of Observable<T> or an Enumerable<Observable<T>> instead of just specifying an intial and a failover stream. This means you can effectively have a large list of streams to try when a previous one fails. Of course if any of them actually complete then the next one will not be used. If the last one publishes an OnError then that error will then be published to the IObservalbe that the Catch method returned.
Another pair of interesting methods is the Materialize and Dematerialize methods. Materialize will flatten an observable’s three different publication types (OnNext, OnError & OnCompleted) into wrapped publications of a Notifcation<T> type. Notification<T> is an abstract class that exposes 4 properties and an Overloaded Accept method
public abstract class Notification<T> : IEquatable<Notification<T>>
{
    // Properties
    public abstract Exception Exception { get; }
    public abstract bool HasValue { get; }
    public abstract NotificationKind Kind { get; }
    public abstract T Value { get; }

    // Methods
    public abstract TResult Accept<TResult>(IObserver<T, TResult> observer);
    public abstract void Accept(IObserver<T> observer);
    public abstract void Accept(Action<T> onNext, Action<Exception> onError, Action onCompleted);
    public abstract TResult Accept<TResult>(Func<T, TResult> onNext, Func<Exception, TResult> onError, Func<TResult> onCompleted);

    // ...
}
By inspecting the Kind property you can identify which type of publication you have received. If the Kind is OnError you can access its published Exception via the Exception property. Accessing the Value property will get you either
  • the OnNext value,
  • throw the OnError exception
  • or throw an InvalidOperationException if the Kind is OnCompleted
The HasValue property just provides the convenience to check if the Kind is OnNext so you can safely get the value without having an exception thrown.
As you can imagine the Materialise method can be useful for Logging the content of a stream. In this example we create an extension method that logs all events to the Console.
public static class ExampleExtensions
{
    /// <summary>
    /// Logs implicit notifications to console.
    /// </summary>
    /// <example>
    /// <code>myStream.Log().Subscribe(....);</code>
    /// </example>
    public static IObservable<T> Log<T>(this IObservable<T> stream)
    {
        return stream.Materialize()
        .Do(n => Console.WriteLine(n))
        .Dematerialize();
    }
}
Note that here we use Dematerialize to take our stream of Notification<T> and transform it back to our original stream. You could also use Materialize to create your own more powerful Catch methods, but we will look at applications of Rx later.

Wire tapping a stream

The Do extension method was used in the last example and it would not be fair to continue with out explaining what it does. Do method is used to provide side effects upon a stream. In the example above the side-effect is that we wrote to the console. This is different to a Subscribe because the Do method returns you an IObservable<T> which can be thought of as the same IObservable<T> that was passed to it, however the Subscribe method returns you an IDisposable. I like to think of the Do method as a wire tap to a stream. ;-)
It would be unfair to mention Do and leave out Run. The Run method is very similar to the Do method except for two things:
  1. Run returns void
  2. Run is a blocking call
  3. Run can be called without any parameters. This effectively is a call to block until OnCompleted or OnError is published.
  4. Run has no overload that takes an Action for the OnCompleted publication. It doesn't make sense to do so as the method will just stop blocking when OnCompleted is published so you can just invoke the action you would other wise pass to OnCompleted immediately after the Run method returns.
The things in common with the Run and Do method overloads is they both provide an Overload that:
  • takes an Action<T> to be performed on each OnNext publication.
  • takes an Action<T> for OnNext and an Action<Exception> for OnError publications
  • takes an IObserver<T> that will be used to handle the publications explicitly
Now that we have had a brief look at Flow control and aggregating streams together, in the next post we will uncover the other aggregation and composition methods that Rx exposes.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 3 - Lifetime management – Completing and Unsubscribing
Forward to next post; Part 5 - Combining multiple IObservable streams
Technorati Tags: ,,

Thursday, May 27, 2010

Rx Part 3 – Lifetime management – Completing and Unsubscribing

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

So far we have discovered the basics of  the Reactive Framework which allows us to create, subscribe and perform some basic aggregations, buffering and time shifting over implementations of IObservable<T>. We have yet to look at how we could unsubscribe from a subscription. If you were to look for an Unsubscribe method in the Rx public API you would not find any. Instead of supplying an Unsubscribe method, Rx will return an IDisposable when ever a subscription is made. This disposable can be thought of as the subscription itself and therefore disposing it will dispose the subscription and effectively unsubscribe. Note that calling Dispose on the result of a Subscribe call will not affect the underlying IObservable<T>, just the instance of the subscription to the IObservable<T>. This then allows us to call Subscribe many times on a single IObservable<T>, allowing subscriptions to come an go with out affecting each other. In this example we initially have two subscriptions, then we dispose of one subscription early which still allows the other to continue to receive publications from the underlying IObservable<T>.
var interval = Observable.Interval(TimeSpan.FromMilliseconds(100));
var firstSubscription =
    interval.Subscribe(value => Console.WriteLine("1st subscription recieved {0}", value));
var secondSubscription =
    interval.Subscribe(value => Console.WriteLine("2nd subscription recieved {0}", value));

Thread.Sleep(500);
firstSubscription.Dispose();
Console.WriteLine("Disposed of 1st subscription");

Console.ReadKey();
/*Outputs:
1st subscription recieved 0
2nd subscription recieved 0
1st subscription recieved 1
2nd subscription recieved 1
1st subscription recieved 2
2nd subscription recieved 2
1st subscription recieved 3
2nd subscription recieved 3
2nd subscription recieved 4
Disposed of 1st subscription
2nd subscription recieved 5
2nd subscription recieved 6
2nd subscription recieved 7

etc....
*/
In the above example, it looks like the values are being produced by the interval Observable by a single OnNext call, however these are independent and work similarly to how a Observable.Create<T> method would work. In this sample we just pause a bit before making our second subscription. Note that the output is different to the above example.
var interval = Observable.Interval(TimeSpan.FromMilliseconds(100));

var firstSubscription =
    interval.Subscribe(value => Console.WriteLine("1st subscription recieved {0}", value));
Thread.Sleep(500);
var secondSubscription =
    interval.Subscribe(value => Console.WriteLine("2nd subscription recieved {0}", value));

Thread.Sleep(500);
firstSubscription.Dispose();
Console.WriteLine("Disposed of 1st subscription");
/*
Ouput:
1st subscription recieved 0
1st subscription recieved 1
1st subscription recieved 2
1st subscription recieved 3
1st subscription recieved 4
2nd subscription recieved 0 
1st subscription recieved 5
2nd subscription recieved 1
1st subscription recieved 6
1st subscription recieved 7
2nd subscription recieved 2
1st subscription recieved 8
2nd subscription recieved 3
Disposed of 1st subscription
2nd subscription recieved 4
2nd subscription recieved 5
2nd subscription recieved 6
etc...

*/
The benefits of using the IDisposable Type instead of creating a new ISubscription/IUnsubscription interface or amending the IObservable<T> interface to have an Unsubscribe method is that you get all of these things for free:
  • The type already exists
  • people understand the type
  • IDisposable has standard usages and patterns
  • Language support via the Using keyword
  • Static analysis tools like FxCop can help you with its usage.

OnError and OnCompleted()

Both the OnError and OnCompleted signify the completion of a stream. If your stream publishes a OnError or OnCompleted it will be the last publication and no further calls to OnNext can be performed. In this example we try to publish an OnNext call after an OnCompleted and the OnNext is ignored.
var subject = new Subject<int>();
subject.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));
subject.OnCompleted();
subject.OnNext(2);
Of course you could implement your own IObservable<T> that did allow publishing after an OnComplete or an OnError, however it would not follow the precedence of the current Subject types and would be a non standard implementation. I think it would be safe to say that the inconsistent behaviour would cause unpredictable behaviour in the applications that consumed your code.
An interesting thing to consider is that when a stream completes or errors, you should still dispose of you subscription. This can make for messy code, but we will discuss best practices in a later post.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 2 - Static and extension methods
Forward to next post; Part 4 - Flow control
Technorati Tags: ,,

Sunday, May 23, 2010

Rx part 2 – Static and Extension methods

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

In the previous post we looked at the core types to the Rx library and how we could get up and running with “Observables”. The key types we discussed were the IObservable<T> interface, IObserver<T>, Subject<T> and its siblings ReplaySubject<T>, BehaviorSubject<T> and AsyncSubject<T>. In this post we are going to explore some of the Static methods on the Observable type and some Extension methods to the IObservable<T> interface.
We have already seen on extension method in the last post which was the Overload to Subscribe which allowed us to pass just an Action<T> to be performed when OnNext was invoked instead of passing an IObserver<T>. The other overloads of Subscribe are also very useful
public static class ObservableExtensions
{
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source);
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext);
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError);
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action onCompleted);
    public static IDisposable Subscribe<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted);
} 
Each of these overloads allows you to pass various combinations of delegates that you want executed for each publication event an IObservable<T> could raise. You will use these overloads a lot but there are plenty of other static methods that are very helpful on the static Observable class.

Creation and Generation of Observables

The Observable class has several methods for conveniently creating common types of Observables. The most simple are shown below with an example of how to use it and what you would have to write to emulate the method.
Observable.Empty<T>() This returns IObservable<T> that just publishes an OnCompleted.
var subject = new ReplaySubject<string>();
subject.OnCompleted();
//Or
var empty = Observable.Empty<string>();
Observable.Return<T>(T). This method will return an Observable that publishes the value of T supplied and then publish OnCompleted.
var subject = new ReplaySubject<string>();
subject.OnNext("Value");
subject.OnCompleted();
//Or
var obReturn = Observable.Return("Value");
Observable.Never<T>(). This method will return an IObservable<T> but will not publish any events.
var subject = new Subject<string>();
//Or
var never = Observable.Never<string>();
Observable.Throw<T>(Exception). This method just publishes the exception passed.
var subject = new ReplaySubject<string>();
subject.OnError(new Exception());
//Or
var throws = Observable.Throw<string>(new Exception());
Observable.Create<T>(Func<IObserver<T>,IDisposable>) is a little different to the above creation methods. The method signature itself is a bit nasty but once you get to know him, he is not that bad. Essentially this method allows you to specify a delegate that will be executed anytime a subscription is made. The IObserver<T> that made the subscription will be passed to your delegate so that you can call the OnNext/OnError/OnCompleted methods as you need. Your delegate is a Func that returns an IDisposable. This IDisposable will have it’s Dispose method called when the subscriber disposes from their subscription. We will cover unsubscribing/disposing in a future post.
The big benefit of using the Create method is that your method will be a non blocking call which is the whole point of the Rx framework. In this example we show how we might first return an Observable via standard blocking call, and then we show the correct way to return an Observable without blocking
private IObservable<string> BlockingMethod()
{
  var subject = new ReplaySubject<string>();
  subject.OnNext("a");
  subject.OnNext("b");
  subject.OnCompleted();
  Thread.Sleep(1000);
  return subject;
}
private IObservable<string> NonBlocking()
{
  return Observable.Create<string>(
      observable =>
      {
        observable.OnNext("a");
        observable.OnNext("b");
        observable.OnCompleted();
        Thread.Sleep(1000);
        return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
        //or can return an Action like
        //return () => Console.WriteLine("Observer has unsubscribed");
      });
}
I would imagine that you will use Observable.Create<T> method a lot once you start using Rx. An obvious application of this method is if you needed to perform a slow request to the network (web service or SQL call). There is also a Observable.Create<T>(Func<IObserver<T>,Action>) method that instead of invoking the Dispose method it will just invoke the action when the subscriber disposes their subscription.
Observable.Range(int, int) returns just a range of integers. The first integer is the initial value and the second is the number of values to yield. This example will write the values "10” through to “24” and then “Completed”.
var range = Observable.Range(10, 15);
range.Subscribe(Console.WriteLine, ()=>Console.WriteLine("Completed"));
Observable.Interval(TimeSpan) will publish incremental values from 0 every period that you supply.  This examples publishes values every 250milliseconds
var interval = Observable.Interval(TimeSpan.FromMilliseconds(250));
interval.Subscribe(Console.WriteLine);
In this example I use the Observable.Interval method to fake ticking prices
var rnd = new Random();
var lastPrice = 100.0;
var interval = Observable.Interval(TimeSpan.FromMilliseconds(250))
    .Select(i =>
      {
          var variation =
              rnd.NextDouble() - 0.5;
          lastPrice += variation;
          return lastPrice;
      });

interval.Subscribe(Console.WriteLine);
Observable.Start method allows you to turn a long running Func<T> or Action into an Observable. If you use the overload that takes an Action then the returned Observable will be of type IObservable<Unit>. Unit is analogous to void and is a Functional Programming construct. In this case it is just used to publish an acknowledgement that the Action is complete, however this is rather inconsequential as OnCompleted will be published straight after Unit anyway. If the overload you use is a Func<T> then when the Func returns its value it will be published and then OnCompleted. Below is an example of using both overloads.
static void StartAction()
{
    var start = Observable.Start(() =>
    {
        Console.Write("Working away");
        for (int i = 0; i < 10; i++)
        {
            Thread.Sleep(100);
            Console.Write(".");
        }
    });
    start.Subscribe(unit => Console.WriteLine("Unit published"), () => Console.WriteLine("Action completed"));
}
static void StartFunc()
{
    var start = Observable.Start(() =>
    {
        Console.Write("Working away");
        for (int i = 0; i < 10; i++)
        {
            Thread.Sleep(100);
            Console.Write(".");
        }
        return "Published value";
    });
    start.Subscribe(Console.WriteLine, () => Console.WriteLine("Action completed"));
}

ToObservable<T>(this IEnumerable<T>) is an extension method for IEnumerable<T> that will translate any IEnumerable<T> to an IObservable<T>. It is a simple an convenient method for switching from one paradigm to another.
var enumT =  new List<string>();
enumT.Add("a");
enumT.Add("b");
enumT.Add("c");
var fromEnum = enumT.ToObservable();

fromEnum.Subscribe(Console.WriteLine);
ToEnumerable<T>(this IObservable<T>) effectively is the opposite of the above method. It will take an IObservable<T> and yield its results as an IEnumerable. Again useful for switching paradigms.
Observable.Generate provides a more elaborate way to construct Observables than Range. In this example I keep it simple and have the Generate method construct me a sequence starting from 5 and incrementing by 3 while the value is less than 15. I also have the values published as strings to show that the type of the State and Result can be different.
var generated = Observable.Generate(5, i => i < 15, i => i + 3, i => i.ToString());
generated.Subscribe(Console.WriteLine, ()=>Console.WriteLine("Completed"));

Checking for existence

Theses method below all provide some sort of check to confirm the existence of a published value.
Any<T>(this IObservable<T>) is an extension method that will either check that the IObservable will publish any value at all, or if you use the overload that take a Func<T,bool> then it will check if any values satisfy your predicate. The interesting thing about Any is that it is non-blocking and so returns an IObservable<bool>. This will only yield one value.
var range = Observable.Range(10, 15);
range.Any().Subscribe(Console.WriteLine);
range.Any(i => i > 100).Subscribe(Console.WriteLine);
Observable.Empty<int>().Any().Subscribe(Console.WriteLine);
Contains<T>(this IObservable<T>, T) is an extension method like Any but will only accept a value of T not a predicate of T. Like Any the result is an IObservable<bool>.
var range = Observable.Range(10, 15);
range.Contains(15).Subscribe(Console.WriteLine);
Observable.Empty<int>().Contains(5).Subscribe(Console.WriteLine);

Filtering and Aggregating

These next few extension methods provide some sort of filter to the underlying stream. These are very similar to the extension methods available to IEnumerable<T>
First<T>(this IObservable<T>) simply returns the first value of an Observable or throws InvalidOperationException if the stream completes without yielding a value. This is a blocking call that wont return until a value or OnCompleted is published.
var range = Observable.Range(10, 15);
Console.WriteLine(range.First());
//InvalidOperationException("Sequence contains no elements.")
Console.WriteLine(Observable.Empty<int>().First());
There are further extension methods that probably don't warrant further explanation if I assume you are familiar with the existing IEnumerable<T> extension methods. These include
  • FirstOrDefault
  • Last
  • LastOrDefault
  • Single
  • Count
  • Min
  • Max
  • Sum
  • Where
  • GroupBy
While I think that most of these are self explanatory, what maybe worth reinforcing is that some of the scalar methods (returns bool or T and not IObservable) are blocking calls. This can be very handy for transforming your non blocking Rx code into sequential blocking code.
Aggregate & Scan extension method overloads allow you to create your own aggregations. Both take similar arguments for their overloads. One of Scan’s overload takes a Func<T,T,T> which is the more simple overload. Effectively given an accumulator and the current value you must return the next accumulator. The other overload can take another type for the accumulator and allows you to specify the seed value for accumulator.
Aggregate has the same overloads. The difference is that Scan will OnNext the result of every Accumulation, Aggregate will only OnNext the last value. Therefore Aggregate is like an AsyncSubject<T> version of Scan.
In this example Scan will publish the running sum of the input sequence, and Aggregate will only publish the final total.
public void Aggregate_can_be_used_to_create_your_own_aggregates()
{
  var interval = Observable.Interval(TimeSpan.FromMilliseconds(150))
                          .Take(10);  //Must complete for Aggregate to publish a value.
  var aggregate = interval.Aggregate(0L, (acc, i) => acc + i);
  WriteStreamToConsole(aggregate, "aggregate");
}

public void Scan_allows_custom_rolling_aggregation()
{
  var interval = Observable.Interval(TimeSpan.FromMilliseconds(150));
  var scan = interval.Scan(0L, (acc, i) => acc + i);
  WriteStreamToConsole(scan, "scan");
}
Take<T>(this IObservable<T>, int) will return you the first number of publications specified by the integer value provided. Take(1) is like First() expect that is returns an IObservable so is not blocking. This example yields just the values 10 and 11.
var range = Observable.Range(10, 15);
range.Take(2).Subscribe(Console.WriteLine);
Skip<T>(this IObservable<T>, int) will ignore the first number of publications specified by the integer value provided. So while Take(2) above returned 10 and 11, Skip(2) will ignore 10 and 11 and return the rest of the stream.
var range = Observable.Range(10, 15);
range.Skip(2).Subscribe(Console.WriteLine);
DistinctUntilChanged<T>(this IObservable<T>) formerly HoldUntilChanged, will ignore any value that is the same as the previous value. In this example the values a, b and c are only written to the console once each.
var subject = new Subject<string>();
subject.DistinctUntilChanged().Subscribe(Console.WriteLine);
subject.OnNext("a");
subject.OnNext("a");
subject.OnNext("a");
subject.OnNext("b");
subject.OnNext("b");
subject.OnNext("b");
subject.OnNext("b");
subject.OnNext("c");
subject.OnNext("c");
subject.OnNext("c");

Buffering and time shifting

In some scenarios your application or user may not be able to keep up with the amount of data that a stream is providing. Alternatively, your user or application may just not need the amount of data that is being published. Rx offer some great extension methods for allowing you to control the rate at which values are published to your stream.
Buffer allows you to buffer a range of values and then re-publish them as a list once the buffer is full. You can buffer a specified number of elements or buffer all the values per timespan. Buffer also offer more advanced overloads.
static void Main(string[] args)
{
    var range = Observable.Range(10, 15);
    range.Buffer(4).Subscribe(
        enumerable =>
           {
               Console.WriteLine("--Buffered values");
               enumerable.ToList().ForEach(Console.WriteLine);
           }, () => Console.WriteLine("Completed"));

    var interval = Observable.Interval(TimeSpan.FromMilliseconds(150));
    interval.Buffer(TimeSpan.FromSeconds(1)).Subscribe(
        enumerable =>
        {
            Console.WriteLine("--Buffered values");
            enumerable.ToList().ForEach(Console.WriteLine);
        }, () => Console.WriteLine("Completed"));

    Console.ReadKey();
}
Delay is an extension method overload that will time-shift the entire Observable by the TimeSpan specified, or until the DateTime. This means that if you get a stream with 5 values in the first second and then 10 values in the 3rd second, using Delay with a timespan of 2 seconds will yield results in second 3 and 5. If you use the DateTime overload the values will be yielded with in the first second of the DateTime and then in the 3rd second after the DateTime. This example will delay by 2 seconds and also in one minute.
var inOneMinute = DateTime.Now.AddMinutes(1);
var range = Observable.Range(10, 15);
range.Delay(TimeSpan.FromSeconds(2)).Subscribe(Console.WriteLine);
range.Delay(inOneMinute).Subscribe(Console.WriteLine);
Sample<T>(TimeSpan) will just take a one sample value for every specified TimeSpan and publish that value. Quite simple, quite nice.
var interval = Observable.Interval(TimeSpan.FromMilliseconds(150));
interval.Sample(TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);
Throttle is similar to the sample and buffer methods in that it is a time based filter. It is more like sample in that it returns a single value (not an array of values like Buffer). Unlike sample, the window that the values are sampled over is not fixed, it slides every time the underlying stream publishes a value. The throttled stream will only produce a value when the underlying stream does not publish a value for the given period, in which case the last value will be published. When a value is published, the timespan will start and if another value is published the value will only be republished if the timespan has elapsed. Regardless of whether the value is published or not the timespan is reset. This means that if the underlying stream always produces values more frequently than the throttle period then Throttle will never publish any results.
The most obvious usage of this to me is holding back on sending a "Google suggest" request to the server as a user is still typing. We could throttle the keystrokes so that we only send up the search string when the user pauses for given period.
In this example all values are ignored as the publisher is producing values quicker than the throttle allows.
var interval = Observable.Interval(TimeSpan.FromMilliseconds(100));
interval.Throttle(TimeSpan.FromMilliseconds(200))
    .Subscribe(Console.WriteLine);
This example that varies the rate at which it produces value (emulating sporadic keystrokes), we will get some values written to the console where the delay is more than the throttle period.
var interval = Observable.Create<int>(
    o =>
     {
         for (int i = 0; i < 100; i++)
         {
             o.OnNext(i);
             Thread.Sleep(i++ % 10 < 5 ? 100 : 300);
         }
         return () => { };
     });
interval.Throttle(TimeSpan.FromMilliseconds(200))
    .Subscribe(Console.WriteLine);
In later posts, Buffer is covered in more detail and we expand on more complex time based operators. Also when you introduce concurrency (which these time based operators do) you need to start to understand the Scheduling semantics of Rx which are also covered in later posts.
Most of these methods are simple and pretty easy to understand. I found my problem with these was the vast number of them. I also appears the the API is less prone to changes in recent releases. If you want to get to know these methods a bit better check out these links, but most importantly – Use them. Have a play in LinqPad or just some console app.
Useful links:
Rx Home
Exploring the Major Interfaces in Rx – MSDN
IObservable<T> interface - MSDN
IObserver<T> interface - MSDN
Observer Design pattern - MSDN
ObservableExtensions class - MSDN
Observable class - MSDN
Observer class - MSDN
Qservable class - MSDN
Action<T> Delegate - MSDN
Func<T1, T2, TResult> - MSDN
The Rx Wiki site 101 Samples
Enumerate This
LinqPad
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Part 1 - Introduction to Rx
Forward to next post; Part 3 - Lifetime management – Completing and Unsubscribing
Technorati Tags: ,,

Tuesday, May 18, 2010

Introduction to Rx Part 1 - Key types

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

Microsoft has released a new library for building “reactive” applications. It’s full name is Reactive Extensions for .NET but is generally referred to as just “Rx”. Essentially Rx is built upon the foundations of the Observer pattern. .NET already exposes some other ways to implement the Observer pattern such as Multicast delegates or Events. Multicast delegates (which Events are) however can be cumbersome to use, have a nasty interface and are difficult to compose and can not be queried. Rx looks to solve these problems.
Here I will introduce you to the building blocks and some basic types that make up Rx.

IObservable<T>

IObservable<T> is one of the 2 core interfaces for working with Rx. It is a simple interface with just a Subscribe method. Microsoft are so confident that this interface will be of use to you it has been included in the BCL as of version 4.0 of .NET. You should be able to think of anything that implements IObservable<T> as a Stream of T objects. So if a method returned an IObservable<Price> I could think of it as a stream of Prices.

IObserver<T>

IObserver<T> is the other one of the 2 core interfaces for working with Rx. It too has made it into the BCL as of .NET 4.0. Don’t worry if you are not on .NET 4.0 yet as the Rx team have included these 2 interfaces in a separate assembly for .NET 3.5 users. IObserver<T> is meant to be the “functional dual of IEnumerable<T>”. If you want to know what that last statement meant then enjoy the hours of videos on Channel9 where they discuss the mathematical purity of the types. For everyone else it means that where an IEnumerable<T> can effectively yield 3 things (the next value, an exception or the end of the sequence), so too can IObservable<T> via IObserver<T>’s 3 methods OnNext(T), OnError(Exception) and OnCompleted().
Interestingly, while you will be exposed to the IObservable<T> interface a lot if you work with Rx, I find I don't often need to concern myself with IObserver<T>. Another interesting thing I have found with Rx is that I never actually implement these interfaces myself, Rx provides all of the implementations I need out of the box. Lets have a look at the simple ones.

Subject<T>

If you were to create your own implementation of IObservable<T> you may find that you need to expose method to publish items to the subscribers, throw errors and notify when the stream is complete. Hmmm they all sound like the methods on the IObserver<T> interface. While it may seem odd to have one type implementing both interfaces, it does make life easy. This is what subjects can do for you.  Subject<T> is the most basic of the subjects. Effectively you can expose your Subject<T> behind a method that returns IObservable<T> but internally you can use the OnNext, OnError and OnCompleted methods to control the stream.
In this (awfully basic) example, I create a subject, subscribe to that subject and then publish to the stream.
using System;
using System.Collections.Generic;

namespace RxConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            var subject = new Subject<string>();

            WriteStreamToConsole(subject);

            subject.OnNext("a");
            subject.OnNext("b");
            subject.OnNext("c");
            Console.ReadKey();
        }

        private static void WriteStreamToConsole(IObservable<string> stream)
        {
            stream.Subscribe(Console.WriteLine);
        }
    }
}
Note that the WriteStreamToConsole method takes an IObservable<string> as it only wants access to the subscribe method. Hang on, doesn’t the Subscribe method need an IObserver<string>? Surely Console.WriteLine does not match that interface. Well not it doesn’t but the Rx team supply me with an Extension Method to IObservable<T> that just takes an Action<T>. The action will be executed every time an item is published. There are other overloads to the Subscribe extension method that allows you to pass combinations of delegates to be invoke for OnNext, OnCompleted and OnError. This effectively means I don't need to implement IObserver<T>. Cool.
As you can see, Subject<T> could be quite useful for getting started in Rx programming. Subject<T> is a basic implementation however. There are 3 siblings to Subject<T> that offer subtly different implementations which can drastically change the way your program runs.

ReplaySubject<T>

ReplaySubject<T> will remember all publications to it so that any subscriptions that happen after publications have been made, will still get all of the publications. Consider this example where we have moved our first publication to occur before our subscription
static void Main(string[] args)
{
    var subject = new Subject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    
    subject.OnNext("b");
    subject.OnNext("c");
    Console.ReadKey();
}
The result of this would be that “b” and “c” would be written to the console, but “a” ignored. If we were to make the minor change to make subject a ReplaySubject<T> we would see all publications again.
static void Main(string[] args)
{
    var subject = new ReplaySubject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    
    subject.OnNext("b");
    subject.OnNext("c");
    Console.ReadKey();
}
This can be very handy for eliminating race conditions.

BehaviorSubject<T>

BehaviorSubject<T> is similar to ReplaySubject<T> except it only remembers the last publication. BehaviorSubject<T> also requires you to provide it a default value of T. This means that all subscribers will receive a value immediately (unless it is already completed).
In this example the value “a” is written to the console.
static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");
    WriteStreamToConsole(subject);
    Console.ReadKey();
}
In this example the value “b” is written to the console, but not “a”.
static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");
    subject.OnNext("b");
    WriteStreamToConsole(subject);
    Console.ReadKey();
}
In this example the values “b”, “c” & “d” are all written to the console, but again not “a”
static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");

    subject.OnNext("b");
    WriteStreamToConsole(subject);
    subject.OnNext("c");
    subject.OnNext("d");
    Console.ReadKey();
}
Finally in this example, no values will be published as the stream has completed. Nothing is written to the console.
static void Main(string[] args)
{
    var subject = new BehaviorSubject<string>("a");

    subject.OnNext("b");
    subject.OnNext("c");
    subject.OnCompleted();
    WriteStreamToConsole(subject);
    
    Console.ReadKey();
}

AsyncSubject<T>

AsyncSubject<T> is similar to the Replay and Behavior subjects, however it will only store the last value, and only publish it when the stream is completed.
In this example no values will be published so no values will be written to the console.
static void Main(string[] args)
{
    var subject = new AsyncSubject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    subject.OnNext("b");
    subject.OnNext("c");
    Console.ReadKey();
}
In this example we invoke the OnCompleted method and the value “c” is published and therefore written to the console.
static void Main(string[] args)
{
    var subject = new AsyncSubject<string>();

    subject.OnNext("a");
    WriteStreamToConsole(subject);
    subject.OnNext("b");
    subject.OnNext("c");
    subject.OnCompleted();
    Console.ReadKey();
}
So that is the very basics of Rx. With only that under you belt it may be hard to understand why Rx is a topic of interest. To follow on from this post I will discuss further fundamentals to Rx
  1. Extension methods
  2. Scheduling / Multithreading
  3. LINQ syntax
Once we have covered these it should allow you to really get Rx working for you to produce some tasty Reactive applications. Hopefully after we have covered these background topics we can knock up some Samples where Rx can really help you in your day to day coding.
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Related links :
IObservable<T> interface - MSDN
IObserver<T> interface - MSDN
Observer Design pattern - MSDN
Rx Home
Exploring the Major Interfaces in Rx – MSDN
ObservableExtensions class - MSDN
Using Rx Subjects - MSDN
System.Reactive.Subjects Namespace - MSDN
Subject<T> - MSDN
AsyncSubject<T> - MSDN
BehaviorSubject<T> - MSDN
ReplaySubject<T> - MSDN
Subject static class - MSDN
ISubject<TSource, TResult> - MSDN
ISubject<T> - MSDN
Back to the contents page for Reactive Extensions for .NET Introduction
Forward to next post; Part 2 - Static and extension methods
Technorati Tags: ,,

Wednesday, May 12, 2010

MergedDictionaries performance problems in WPF

I don’t normally like to blatantly plagiarise other people’s comments, but this seems to be a little know bug that sounds like it should be shared.

A colleague of mine emailed our internal tech list the following email

I strongly urge everyone working with WPF to use this or at least benchmark it in your own applications if you use ResourceDictionaries.MergedDictionaries. I consider this to be a huge problem in WPF. I’m not sure if it exists in Silverlight, but I would assume it does.

I was just debugging a very long render delay in some WPF code and I came across this little tidbit:

http://www.wpftutorial.net/MergedDictionaryPerformance.html

The quote of interest is: “Each time a control references a ResourceDictionary XAML creates a new instance of it. So if you have a custom control library with 30 controls in it and each control references a common dictionary you create 30 identical resource dictionaries!”

Normally that isn’t a huge problem, but when you consider the way that I personally (and have suggested to others) that they organize their resources in Prism projects it gets to be a **serious** problem. For example, let’s say we have this project structure:

/MyProject.Resources
       /Resources
                -Buttons.xaml
                -DataGrid.xaml
                -Global.xaml
                -Brushes.xaml
                -WindowChrome.xaml
                -Icons.xaml
 
/MyProject.Module1
      /Resources
                -Module1Resources.xaml  (References all Dictionaries in /MyProject.Resources/Resources/*)
      /Views
                -View1.xaml
                -View2.xaml
      
/MyProject.Module2
      /Resources
                -Module2Resources.xaml   (References all Dictionaries in /MyProject.Resources/Resources/*)
      /Views
                -View1.xaml
                -View2.xaml
      
/MyProject.Shell
      /Resources
                -ShellResources.xaml   
      /Views
                -MainShell.xaml

If in your views you reference the module-level ResourceDictionary (which helps for maintainability and modularity) then every time you create an instance of View1.xaml for example, you would have to parse all the ResourceDictionaries in /MyProject.Resources/Resources/* every time. This isn’t really a memory concern but it is a huge performance concern. There can potentially be thousands of lines of XAML code to parse and the time really does add up.

I recently switched all of the MergedDictionary references:

<ResourceDictionary>
    <ResourceDictionary.MergedDictionaries>
        <ResourceDictionary Source=”/SomeDictionary.xaml/>
    </ResourceDictionary.MergedDictionaries>
</ResourceDictionary>

To use the attached SharedResourceDictionary which shadows the Source property and keeps a global cache of all ResourceDictionaries parsed:

<ResourceDictionary>
    <ResourceDictionary.MergedDictionaries>
        <SharedResourceDictionary Source=”/SomeDictionary.xaml/>
    </ResourceDictionary.MergedDictionaries>
</ResourceDictionary>

And I saw a performance increase of almost two orders of magnitude … From almost 6000ms to 200ms. I’ve attached this code; I used the basic sample implementation in the link above so this is considered public information for client purposes.

Cheers,

Charlie

Thanks to Charlie Robbins (Lab49) for expanding on Christian’s blog post and for letting me re-print your email.