Tuesday, June 21, 2011

The definitive list of Rx sites

Over at the Rx forums, Eamon_OTuathail has created what seems to be the list of Rx sites. It is a list of Blogs and open source projects.
Great stuff. This should save a lot of time for a lot of people.
http://social.msdn.microsoft.com/Forums/en-US/rx/thread/2cbd3b1c-d535-46ba-a9cf-3cd576a8e7c2
Humbled to make the grade.

EDIT: Hopefully www.IntroToRx.com will also make the grade. It is the online book that evolved from the blog series.

Saturday, June 18, 2011

Isolating custom Library dependencies versions from consumer dependency versions

This post is more about the CLR and dependency management than it is about Rx. However at this point in Rx’s lifecycle it seems relevant to comment on. The same principles could obviously be applied to open source projects et. al.

I have recently heard of an interesting situation that no-doubt has proved troublesome to people before. This problem is particularly interesting with Rx and it’s recent rate of versions. If you are trying to incorporate Rx into a library of yours and you then want to publish that library, you are effectively forcing users to use the same version of Rx that you do. Considering that a version of Rx comes out say every two months and that often there are breaking changes, this can create quite a mess. It is also made more interesting that up until recently they have not specified if a release was experimental or considered stable.

To provide an example to better understand my particular point; imagine you have a library that wraps a messaging platform. You want to avoid the use of Events and APM, and expose things via IObservable<T>. You feel happy that IObservable<T> is exposed natively in .NET 4 so you should not have to expose your implementations of Rx. You do however, want to use Rx as it has features you need and don’t want to (re-)write yourself. Your standard approaches to package/deployment are:

  • deploy the parts of the Rx libraries you use with your code. Users can just put them all into a “lib\MyFramework” folder and reference them.
  • excluded the libraries from your code and set Specific Version = False and hope your code will work with the consumer’s version of Rx
  • use a package management tool like Nuget to publish your package and specify the valid versions of Rx your library will work with.
  • rely on things being in the GAC so you can utilise the side-by-side versioning it provides
  • just try and implement the parts of Rx you want and avoid DLL Hell.

I gave this some thought and I think I have come up with a solution that could help library authors protect themselves. While there is the obvious option of using Nuget as part of your dependency management, this does not solve the problem, it just eases the pain. If the customer wants to use the latest version of Rx and you only support the 3 previous versions, your customer is still in some trouble.

The theory i had was that I can specify the specific version of Rx I want to reference in my library, the problem being that it may be named the same as the client’s referenced version. Depending on where their references were built to, they could overwrite each other. It seemed the solution was to embed the dependency into my library.

This turns out to actually be quite easy. If you have a project in visual studio that references your version of Rx, you have to follow these steps:

  1. Ensure you have a file reference (not a project or a GAC reference) to the dependency, in this case System.Reactive.dll
  2. Set the reference to be Specific Version = True
  3. Set the reference Copy Local = False
  4. Embed the dependency into your library. I created a folder in my project called EmbeddedAssemblies. I “Add Existing…” to this folder, navigate to the dependencies (just System.Reactive.dll in this case), and then choose “Add as Link…
  5. Set the “Build Action” of  the newly added link to Embedded Resource
  6. Ensure that the resource is loaded correctly at run time…

The last part of that list proves to be not too hard. You can hook on to the AppDomain.AssemblyResolve event and load your embedded resource. You can return your embedded dependency by reading the byte stream and creating the assembly from it and then returning that in the event handler

internal static class DependencyResolver
{
  private static int _isSet = 0;

  internal static void Ensure()
  {
    if (Interlocked.CompareExchange(ref _isSet, 1, 0) == 0)
    {
      var thisAssembly = Assembly.GetExecutingAssembly();
      var assemblyName = new AssemblyName(thisAssembly.FullName).Name;
      var embededAssemblyPrefix = assemblyName + ".EmbeddedAssemblies.";

      var myEmbeddedAssemblies =
        Assembly.GetExecutingAssembly().GetManifestResourceNames()
          .Where(name => name.StartsWith(embededAssemblyPrefix))
          .Select(resourceName =>
                    {
                      using (var stream = Assembly.GetExecutingAssembly().GetManifestResourceStream(resourceName))
                      {
                        var assemblyData = new Byte[stream.Length];
                        stream.Read(assemblyData, 0, assemblyData.Length);
                        return Assembly.Load(assemblyData);
                      }
                    })
          .ToDictionary(ass => ass.FullName);

      AppDomain.CurrentDomain.AssemblyResolve += (sender, args) =>
                                                    {
                                                      Assembly assemblyToLoad = null;
                                                      myEmbeddedAssemblies.TryGetValue(args.Name, out assemblyToLoad);
                                                      return assemblyToLoad;
                                                    };
    }
  }
}

You can return null in the event handler to say I don't know how to load this assembly. This allows others to be able to have a go at loading the assembly in the same way.

Next, to ensure that my embedded dependency resolver is called, I opted to make a call to it in the static constructor of the key types in my library. eg

public class SomeProvider
{
  static SomeProvider()
  {
    DependencyResolver.Ensure();
  }

  //other stuff goes here...
}

Have a look at the example code by downloading this zip file

Caveats : This is a thought and the code and concepts are only demo quality. I have not used this in production quality code. I also have not verified that the license allows for this style of packaging.

Further links:

Sunday, June 5, 2011

Rx v.1.0.10425–Breaking changes!

For those that follow Rx, many will have noticed a new drop was made available in late April 2011. This was interesting for numerous reasons:

  1. I believe that it was the first drop since the Rx team move off of the dev labs site and on to the Microsoft Data site
  2. They have supplied two downloads, Supported and Experimental
  3. It is available via Nuget
  4. It broke most of the sample code of this blog
  5. ...It has massive breaking changes !

The first thing I noticed was that only the more recent frameworks were in the “supported” release i.e. .NET 4, Silverlight 4 and Windows Phone. Next, when you open up the install directory (which has moved, again) the files are different to previous releases.

This is a comparison of the location and files of the old v.s. new version (.NET 4 target):

C:\Program Files\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2856.0\Net4
System.CoreEx.dll
System.Interactive.dll
System.Linq.Async.dll
System.Reactive.ClientProfile.dll
System.Reactive.dll
System.Reactive.ExtendedProfile.dll
System.Reactive.Testing.dll

C:\Program Files\Microsoft Reactive Extensions SDK\v1.0.10425\Binaries\.NETFramework\v4.0
Microsoft.Reactive.Testing.dll
System.Reactive.dll
System.Reactive.Providers.dll
System.Reactive.Windows.Forms.dll
System.Reactive.Windows.Threading.dll

Note the files names are named System.Reactive instead of hijacking the existing BCL namespaces such as System, System.IO, System.Linq etc. Here are a list of the old and new Namespaces found in the Rx libraries with the count of the Types in each namespace.

C:\Program Files\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2856.0\Net4
System                         6
System.Collections.Generic     21
System.Concurrency             14
System.Diagnostics             1
System.Disposables             8
System.IO                      1
System.Joins                   34
System.Linq                    13
System.Reactive.Testing        4
System.Reactive.Testing.Mocks  10
System.Threading.Tasks         1
System.Web                     1
System.Windows.Forms           1
System.Windows.Threading       1

C:\Program Files\Microsoft Reactive Extensions SDK\v1.0.10425\Binaries\.NETFramework\v4.0
Microsoft.Reactive.Testing     7
System                         1
System.Reactive                10
System.Reactive.Concurrency    16
System.Reactive.Disposables    9
System.Reactive.Joins          34
System.Reactive.Linq           8
System.Reactive.Subjects       8
System.Reactive.Threading.Tasks 1

Summary of impact

Here is a quick list of the changes that have affected the code from the samples off this blog

Unit has been moved to System.Reactive. I imagine to prevent any conflicts with FSharp’s Unit?

EventLoopScheduler no longer has a constructor that takes a string for the name of the thread, it instead takes a function to act as a Thread Factory so you can use that to specify a name for the thread instead.

Similar methods have been collapsed to overloads. For example; BufferWithTime, BufferWithCount & BufferTimeOrCount are all just Buffer with various overloads. Same goes for WindowWithTime, WIndowWithCount & WindowTimeOrCount. GenerateWithTime is now just an overload of Generate. CreateWithDisposable is now just an overload of Create.

Potentially confusing names have been adjusted. I must admit, I found AsyncSubject and odd name, and also found it odd that Prune was the method you would use to transform an existing observable to one that exhibited AsynchSubject behaviour. The new version of this is the far more appropriately named TakeLast(int). Obviously just pass a value of 1 to get the old semantics of Prune.

The Run method has now been renamed the more obvious (well to new comers) ForEach. It is a bit of an enumerable construct, but that is cool.

My big problem has been the disappearance of the IsEmpty extension method. So I had a long think about this and have decided that you can replace it by creating your own extension method that looks like this

public static class ObservableExtensions
{
  //Is missing now the System.Interactive has been dropped
  public static IObservable<bool> IsEmpty<T>(this IObservable<T> source)
  {
    return source.Materialize()
        .Take(1)
        .Select(n => n.Kind == NotificationKind.OnCompleted
                      || n.Kind == NotificationKind.OnError);
  }
}

For even more details, check out the forum post that has the summary release notes and allsorts of rants about what has changed and what is causing problems for other (+ some helpful tips).

http://social.msdn.microsoft.com/Forums/en-US/rx/thread/527002a3-18af-4eda-8e35-760ca0006b98

This change is not one that you can just take and expect all to be well. However assuming that you can just update some references and add the missing extensions to you own library, you could be fine.

Changes to TestScheduler

The real problems come from the big changes to the testing part of Rx. This appears to have undergone a complete change and most of my existing code does not work. This looks like a big and possibly welcome change to the TestScheduler.

Problems with the previous TestScheduler

There were some fundamental problems with the TestScheduler as it was. There were problems with running it to a specific point in time if there were not any actions scheduled for that point in time. For example, the old TestScheduler would allow you to request that it run to say 5seconds. If there were no actions scheduled, then the clock would not actually advance. You could then Schedule an action to happen at 3Seconds (effectively in the past) and then call Run and it would execute the actions. Hmmm.

TestScheduler scheduler = new TestScheduler();
scheduler.Schedule(()=>{Console.WriteLine("Running at 1seconds");}, TimeSpan.FromSeconds(1));
scheduler.RunTo( scheduler.FromTimeSpan(TimeSpan.FromSeconds(5)));
//The next line is scheduled in the past and somehow this all works!
scheduler.Schedule(()=>{Console.WriteLine("Running at 3seconds");}, TimeSpan.FromSeconds(3));
scheduler.Run();

There also seemed to be a lack of functionality for running to a relative point in time. I have used these extension methods to work around this shortcoming.

public static class TestSchedulerExtensions
{
  /// <summary>
  /// Runs the scheduler from now to the given TimeSpan. Advances relative to it's <c>Now</c> value.
  /// </summary>
  public static void RunNext(this TestScheduler scheduler, TimeSpan interval)
  {
    var tickInterval = scheduler.FromTimeSpan(interval);
    scheduler.RunTo(scheduler.Ticks + tickInterval + 1);
  }

  public static void RunTo(this TestScheduler scheduler, TimeSpan interval)
  {
    var tickInterval = scheduler.FromTimeSpan(interval);
    scheduler.RunTo(tickInterval);
  }

  public static void Step(this TestScheduler scheduler)
  {
    scheduler.RunTo(scheduler.Ticks + 1);
  }

  /// <summary>
  /// Provides a fluent interface so that you can write<c>7.Seconds()</c> instead of <c>TimeSpan.FromSeconds(7)</c>.
  /// </summary>
  /// <param name="seconds">A number of seconds</param>
  /// <returns>Returns a System.TimeSpan to represents the specified number of seconds.</returns>
  public static TimeSpan Seconds(this int seconds)
  {
    return TimeSpan.FromSeconds(seconds);
  }
}

However on initial inspection of the new TestScheduler there appears to be features that support running to an absolute or relative time. It looks like that the Post on testing Rx will need a re-write Sad smile

Technorati Tags: ,

Sunday, May 29, 2011

Rxx - Extension to the Reactive Extensions…

For those of you that use the Rx forums you will have no doubt found an answer or even had your question answered by either James Miles or Dave Sexton. These two have put their mighty brains together to produce some handy extensions to Rx. They went with the fairly obvious (but tongue-in-cheek) name Rxx.

http://social.msdn.microsoft.com/Forums/hu-HU/rx/thread/32f6ee34-5edf-4038-894c-ab47fc893a78

The key features that anyone that has been using Rx for a while will immediately be interested in are the

  • Tracing/Logging
  • PropertyChanged and PropertyDescriptor extensions
  • FileSystemWatcher
  • and the uber-funky OperationalObservable eg. var os = xs + ys – zs;

For a more complete list look at the documentation.

This could well be a community contribution that could help guide the actual Rx implementations that start coming out of Microsoft (like the Community Extensions around for P&P propjects). James and Dave seem to be taking this quite seriously. They have been checking regularly (with sensible comments!), constructed the codeplex site very well and have already got their issue trackers running.

This is one to keep an eye on.

http://rxx.codeplex.com/

Technorati Tags: ,

Wednesday, May 25, 2011

Rx code from Perth Presentation

Sorry about the delay in getting this code up. For those who could not make it, my part of the presentation did a bit of an intro and then discussed the testability of Rx and the easy way to deal with streaming data such as pricing in a financial industry.

RxSamplesGalleryScreenShot

RxSamplesTWAPChartScreenShot

The key samples from my half of the presentation that raised some interest was the testability of the Photo Gallery View model. The Gallery ViewModel was effectively this

/// <summary>
/// Tested Rx implementation of the ViewModel
/// </summary>
public sealed class RxPhotoGalleryViewModel : INotifyPropertyChanged
{
    public RxPhotoGalleryViewModel(IImageService imageService, ISchedulerProvider scheduler)
    {
        IsLoading = true;
        var files = imageService.EnumerateImages()
                                .ToObservable();

        files
            .SubscribeOn(scheduler.ThreadPool)
            .ObserveOn(scheduler.Dispatcher)
            .Subscribe(
                imagePath =>
                {
                    Images.Add(imagePath);
                },
                () =>
                {
                    IsLoading = false;
                });
    }

    private readonly ObservableCollection<string> _images = new ObservableCollection<string>();
    public ObservableCollection<string> Images
    {
        get { return _images; }
    }

    private bool _isLoading;
    public bool IsLoading
    {
        get { return _isLoading; }
        set
        {
            if (_isLoading != value)
            {
                _isLoading = value;
                InvokePropertyChanged("IsLoading");
            }
        }
    }

    #region Implementation of INotifyPropertyChanged

    public event PropertyChangedEventHandler PropertyChanged;

    public void InvokePropertyChanged(string propertyName)
    {
        PropertyChangedEventHandler handler = PropertyChanged;
        if (handler != null) handler(this, new PropertyChangedEventArgs(propertyName));
    }

    #endregion
}

Except from the constructor, there is really just two properties that expose change notification for the WPF binding engine. Now the code in the constructor is demo-quality in the sense that it is not good practice to do so much work in the constructor. Maybe this would be better

public sealed class RxPhotoGalleryViewModel
{
    private readonly IImageService _imageService;
    private readonly ISchedulerProvider _scheduler;

    public RxPhotoGalleryViewModel(IImageService imageService, ISchedulerProvider scheduler)
    {
        _imageService = imageService;
        _scheduler = scheduler;
    }

    public void Start()
    {
        IsLoading = true;
        var files = _imageService.EnumerateImages()
                                .ToObservable();

        files
            .SubscribeOn(_scheduler.ThreadPool)
            .ObserveOn(_scheduler.Dispatcher)
            .Subscribe(
                imagePath => Images.Add(imagePath),
                () =>IsLoading = false);
    }
//....
}

The test fixture is fairly simple. We pass in a mock implementation of the IImageService and the TestSchedulerProvider similar to the one shown in Rx Part 8 – Testing Rx.

[TestClass]
public class RxPhotoGalleryViewModelTests
{
    private Mock<IImageService> _imageSrvMock;
    private TestSchedulderProvider _testSchedulderProvider;
    private List<string> _expectedImages;

    [TestInitialize]
    public void SetUp()
    {
        _imageSrvMock = new Mock<IImageService>();
        _testSchedulderProvider = new TestSchedulderProvider();

        _expectedImages = new List<string> { "one.jpg", "two.jpg", "three.jpg" };
        _imageSrvMock.Setup(svc => svc.EnumerateImages())
                        .Returns(_expectedImages);

    }

    [TestMethod]
    public void Should_add_ImagesServiceResults_to_Images()
    {
        //Arrange
        // done in setup

        //Act
        var sut = new RxPhotoGalleryViewModel(_imageSrvMock.Object, _testSchedulderProvider);
        _testSchedulderProvider.ThreadPool.Run();
        _testSchedulderProvider.Dispatcher.Run();

        //Assert
        CollectionAssert.AreEqual(_expectedImages, sut.Images);
    }

    [TestMethod]
    public void Should_set_IsLoading_to_true()
    {
        //Arrange
        // done in setup

        //Act
        var sut = new RxPhotoGalleryViewModel(_imageSrvMock.Object, _testSchedulderProvider);
            
        //--NOTE-- note the missing TestScheduler.Run() calls. This will stop any observable being processed. Cool.

        //Assert
        Assert.IsTrue(sut.IsLoading);
    }

    [TestMethod]
    public void Should_set_IsLoading_to_false_when_completed_loading()
    {
        //Arrange
        // done in setup

        //Act
        var sut = new RxPhotoGalleryViewModel(_imageSrvMock.Object, _testSchedulderProvider);
        _testSchedulderProvider.ThreadPool.Run();
        _testSchedulderProvider.Dispatcher.Run();

        //Assert
        Assert.IsFalse(sut.IsLoading);
    }
}

As we now control the scheduling/concurrency we don't have to try to do anything fancy with Dispatchers,  BackgroundWorkers, ThreadPools or Tasks which are very difficult to perform unit testing on. Check out the pain that I went through to test responsive WPF apps in this post on Testing Responsive WPF complete with DispatcherFrame and Thread.Sleep(300) in my tests Sad smile

If you want the running code you can either pull the code down via SVN by following this http://code.google.com/p/rx-samples/source/checkout or you can download the zip.

The PowerPoint presentation is here

You may also be interested in the Design Guidelines produced by the Rx team at Microsoft and also where to get the latest version of Rx

Back to the contents page for Reactive Extensions for .NET Introduction

Wednesday, May 11, 2011

Rx session in Perth, Australia

James Miles and I will be giving a presentation on Rx to a Perth audience this Thursday. If you are in town then come on down!

As those that read this blog will know, Rx is my thing at the moment so feel free to bring plenty of questions.

Full details can be found at the Perth .NET Community of Practice.

Introduction to Rx with Lee Campbell and James Miles

See you there.
Lee

Monday, March 14, 2011

Rx Part 9–Join, Window, Buffer and Group Join

STOP THE PRESS! This series has now been superseded by the online book www.IntroToRx.com. The new site/book offers far better explanations, samples and depth of content. I hope you enjoy!

While this series of posts is labelled as an introduction, this post takes us past the 100 level style posts. This post looks to tackle the interesting way of combing multiple streams of data. We have looked at versions of combining streams in earlier posts with SelectMany, Merge, Zip, Concat etc. The operators that we look at in this post are different for several reasons
  1. They are new as of 2011
  2. Their matching is based on events coinciding with each other based on some given window of time
  3. They offer some pretty powerful stuff that would otherwise be complex to code and just nasty if you went sans-Rx.

Buffer

Buffer is not a new operator to us, however it can now be conceptually grouped with the group-join operators. These operators all do something with a stream and a window of time. Each operator will open a window when the source stream produces a value. The way the window is closed and which values are exposed is the main difference between each of the operators. Let us first go back to our old friend BufferWithCount that we saw in the second post in this series.
BufferWithCount will create a window when the first value is produced. It will then put that value in to an internal cache. The window will stay open until the count of values has been reached. Each of these values will have been cached. Now that the count has been reached the window will close and the cache will be OnNext’ed as an IList<T>. When the next value is produced from the source, the cache is cleared and we start again. This means that BufferWithCount will take an IObservable<T> and return an IObservable<IList<T>>.
Example Buffer with count of 3
source|-0-1-2-3-4-5-6-7-8-9|
result|-----0-----3-----6-9|
            1     4     7
            2     5     8 
In this marble diagram, I have represented the list of values being returned at a point in time as a column of data. i.e. the values 0, 1 & 2 are all returned in the first buffer.
Understanding this it is not much of a leap to understand BufferWithTime. Instead of passing a count we pass a TimeSpan. The closing of the window (and therefore the buffer’s cache) is now dictated by time instead of the count of values produced. This is ever-so more complicated as we now have introduced some sort of scheduling. To produce the IList<T> at the correct point in time we need a scheduler assigned to performing the timing. This also makes testing this stuff a lot easier.
Example Buffer with time of 5 units
source|-0-1-2-3-4-5-6-7-8-9-|
result|----0----2----5----7-|
           1    3    6    8
                4         9

Window

The Window operators are very similar to the Buffer operators, they only really differ by their return type. Where Buffer would take an IObservable<T> and return an IObservable<IList<T>>, the Window operators return an IObservable<IObservable<T>>. It is also worth noting that the Buffer operators will not yield their buffers until the window closes.
Example of Window with a count of 3
source |-0-1-2-3-4-5-6-7-8-9|
window0|-0-1-2|
window1        3-4-5|
window2              6-7-8|
window3                    9|
Example of Window with time of 5 units
source |-0-1-2-3-4-5-6-7-8-9|
window0|-0-1-|
window1      2-3-4|
window2           -5-6-|
window3                7-8-9|
So the obvious difference here is that with the Window operators you get hold of the values from the source as soon as they are produced, but the Buffer operators you must wait until the window closes before the values are accessible.

Switch is the Anti Window Smile

I think it is worth noting, at least from an academic point, that the Window operators produce IObservable<IObservable<T>> and that the Switch operator takes an IObservable<IObservable<T>> and returns an IObservable<T>. As the Window operators ensure that the windows (child streams) do not overlap, we can use the Switch operator to turn a windowed stream back into its original stream.
//is the same as Observable.Interval(TimeSpan.FromMilliseconds(200)).Take(10)
var switchedWindow = Observable.Interval(TimeSpan.FromMilliseconds(200)).Take(10)
    .WindowWithTime(TimeSpan.FromMilliseconds(500))
    .Switch();

Join

Join is not a new Method to the Rx library, but overload we are interested today in is new. From what I can see on the 2 original overloads that take an Array or an IEnumerable of Plan<T>, the usage can be replicated with Merge and Select. They are a bit of a mystery to me.
The overload we are interested in is
public static IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>
(
    this IObservable<TLeft> left, 
    IObservable<TRight> right, 
    Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, 
    Func<TRight, IObservable<TRightDuration>> rightDurationSelector, 
    Func<TLeft, TRight, TResult> resultSelector
)
Now this is a fairly hairy signature to try and understand in one go, so let’s take it one parameter at a time.
IObservable<TLeft> left is source stream that defines when a window starts. This is just like the Buffer and Window operators, except that every value published from this source opens a new window. In Buffer and Window, some values just fell into an existing window.
I like to think IObservable<TRight> right as the window value stream. While the left stream controls opening the windows, the right stream will try to pair up with a value from the left stream.
Let us imagine that our left stream produces a value, which creates a new window. If the right stream produces a value while the window is open then the resultSelector function is called with the two values. This is the crux of join, pairing two values from a stream that occur with in the same window. This then leads us to our next question; When does the window close? The answer to this question is both the power and complexity of the Join operator.
When left produces a value, a window is opened. That value is also then passed to the leftDurationSelector function. The result of this function is an IObservable<TLeftDuration>. When that IObservable OnNext’s or Completes then the window for that value is closed. Note that it is irrelevant what the type of TLeftDuration is. This initially left me with the feeling that IObservable<TLeftDuration> was all a bit over kill as you effectively just need some sort of event to say “Closed!”. However by allowing you to use IObservable you can do some clever stuff as we will see later.
So let us first imagine a scenario where we have the left stream producing values twice as fast as the right stream. Imagine that we also never close the windows. We could do this by always returning Observable.Never<Unit>() from the leftDurationSelector function. This would result in the following pairs being produced.
left  |-0-1-2-3-4-5|
right |---A---B---C|

result|---0---0---0
          A   B   C

          1   1   1
          A   B   C

              2   2
              B   C

              3   3
              B   C

                  4
                  C

                  5
                  C
As you can see the left values are cached and replayed each time the right produces a value.
Now it seems fairly obvious that if I immediately closed the window by returning Observable.Empty<Unit> or perhaps Observable.Return(0) that windows would never be opened so no pairs would ever get produced. However what could I do to make sure that these windows did not overlap so that once a second value was produced I would no longer see the first value? Well, if we returned the left stream from the leftDurationSelector that could do it. But wait, when we return the left from the leftDurationSelector it would try to create another subscription and that may introduce side effects. The quick answer to that is to Publish and RefCount the left stream. If we do that the results look more like this.
left  |-0-1-2-3-4-5|
right |---A---B---C|
result|---1---3---5
          A   B   C
This made me think that I could use Join to produce my own version of CombineLatest that we saw in the 5th post in the series. If I had the values from left expire when the next value from left was OnNext’ed then I would be well on my way. However I need the same thing to happen for the right. Luckily the Join operator also provides us with a rightDurationSelector that works just like the leftDurationSelector. This is simple to implement, all I need to do is return a reference to the same left stream when a left value is produced and then the same for the right. The code looks like this.
public static IObservable<TResult> MyCombineLatest<TLeft, TRight, TResult>
(
    IObservable<TLeft> left, 
    IObservable<TRight> right, 
    Func<TLeft, TRight, TResult> resultSelector)
{
    var refcountedLeft = left.Publish().RefCount();
    var refcountedRight = right.Publish().RefCount();
    return Observable.Join(
            refcountedLeft,
            refcountedRight,
            value => refcountedLeft,
            value => refcountedRight,
            resultSelector);
}
While the code above is not production quality (it would need to have some gates in place to mitigate race conditions), it shows us the power that we could get with Join; we can actually use it to create other operators!

GroupJoin

When the Join operator pairs up values that coincide within a window, it would always produce just the left value and the right value to the resultSelector. The GroupJoin operator takes this one step further by passing the left value immediately to the resultSelector with an IObservable of the right values that occur within the window. It’s signature is very similar to Join but note the difference in the resultSelector Func.
public static IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>
(
    this IObservable<TLeft> left, 
    IObservable<TRight> right, 
    Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, 
    Func<TRight, IObservable<TRightDuration>> rightDurationSelector, 
    Func<TLeft, IObservable<TRight>, TResult> resultSelector
)
If we went back to our first Join example where we had
  • the left producing values twice as fast as the right,
  • the left never expiring
  • the right immediately expiring
this is what the result may look like
left              |-0-1-2-3-4-5|
right             |---A---B---C|

0th window values   --A---B---C|
1st window values     A---B---C|
2nd window values       --B---C|
3rd window values         B---C|
4th window values           --C|
5th window values             C|
Now we could switch it around and have it that the left expired immediately and the right never expired the result may look like this
left              |-0-1-2-3-4-5|
right             |---A---B---C|

0th window values   |
1st window values     A|
2nd window values       A|
3rd window values         AB|
4th window values           AB|
5th window values             ABC|
This starts to make things interesting. Sharp readers may have noticed that with GroupJoin you could effectively re-create your own Join by doing something like this
public IObservable<TResult> MyJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(
    IObservable<TLeft> left,
    IObservable<TRight> right,
    Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector,
    Func<TRight, IObservable<TRightDuration>> rightDurationSelector,
    Func<TLeft, TRight, TResult> resultSelector)
{
    return Observable.GroupJoin
        (
            left, 
            right,
            leftDurationSelector,
            rightDurationSelector,
            (leftValue, rightValues)=>rightValues.Select(rightValue=>resultSelector(leftValue, rightValue))
        )
        .Merge();
}
I even was able to knock up my own version of WindowWithTime with this code below
public IObservable<IObservable<T>> MyWindowWithTime<T>(IObservable<T> source, TimeSpan windowPeriod)
{
    return Observable.CreateWithDisposable<IObservable<T>>(o =>
        {
            var windower = new Subject<long>();
            var intervals = Observable.Concat(
                    Observable.Return(0l),
                    Observable.Interval(windowPeriod)
                )
                .Publish()
                .RefCount();

            var subscription = Observable.GroupJoin
                (
                    windower,
                    source.Do(_ => { }, windower.OnCompleted),
                    _ => windower,
                    _ => Observable.Empty<Unit>(),
                    (left, sourceValues) => sourceValues
                )
                .Subscribe(o);
            var intervalSubscription = intervals.Subscribe(windower);
            return new CompositeDisposable(subscription, intervalSubscription);
        });
}
Yeah it is not so pretty, but it is an academic exercise to show case GroupJoin. Those that have read Bart DeSmet’s excellent MiniLinq post (and follow up video) can see that GroupJoin could almost be added to the 3 basic operators Cata, Ana and Bind.
GroupJoin and the other window operators can make otherwise fiddly and difficult tasks a cinch to put together. For example, those in the Finance game can now pretty much use GroupJoin to create their own VWAP and TWAP extension methods. Nice!
The full source code is now available either via svn at http://code.google.com/p/rx-samples/source/checkout or as a zip file.
Back to the contents page for Reactive Extensions for .NET Introduction
Back to the previous post; Rx Part 8 - Testing Rx
Technorati Tags: ,,

Sunday, February 20, 2011

Changes to the Rx API

Readers of the Reactive Extensions series on this blog will have noticed recently that not all of the code always works* if you download the Rx assemblies and then copy the code off the blog. This is due to the quite frequent changes to the API. I personally have 7 different versions of the libraries and I don’t think I have downloaded them all as they have been released! Also the blog post were started all the way back in May 2010 so it is fair that there has been some movement in the API

*If you get the code from the http://code.google.com/p/rx-samples/source/checkout or as a zip file then it will work, as the correct version of the assembly is included.

For fun I thought I would try to exercise my LINQ skills and write a quick diff tool so I can see what on the public API is actually changing on me. I threw this class together in LinqPad

public class AssemblyDiff
{
    private readonly string _oldAssemblyPath;
    private readonly string _newAssemblyPath;
    
    public AssemblyDiff(string oldAssemblyPath, string newAssemblyPath)
    {
        _oldAssemblyPath = oldAssemblyPath;
        _newAssemblyPath = newAssemblyPath;
    }
    
    public IEnumerable<String> NewMethodNames()
    {
        return MethodNameDelta(GetDeclaredMethods(_newAssemblyPath), GetDeclaredMethods(_oldAssemblyPath));
    }
    
    public IEnumerable<String> DeprecatedMethodNames()
    {
        return MethodNameDelta(GetDeclaredMethods(_oldAssemblyPath), GetDeclaredMethods(_newAssemblyPath));
    }
    
    public static IEnumerable<String> MethodNameDelta(IEnumerable<MethodInfo> original, IEnumerable<MethodInfo> modified)
    {
        return from methodName in original.Select(MethodName).Except(modified.Select(MethodName))
                    orderby methodName
                    select methodName;
    }
    public IEnumerable<MethodInfo> NewMethods()
    {
        var oldMethods = GetDeclaredMethods(_oldAssemblyPath);
        var currentMethods = GetDeclaredMethods(_newAssemblyPath);
        
        return MethodDelta(oldMethods, currentMethods);
    }
    
    public IEnumerable<MethodInfo> DeprecatedMethods()
    {
        var oldMethods = GetDeclaredMethods(_oldAssemblyPath);
        var currentMethods = GetDeclaredMethods(_newAssemblyPath);
        
        return MethodDelta(currentMethods, oldMethods);
    }
    
    public static IEnumerable<MethodInfo> MethodDelta(IEnumerable<MethodInfo> original, IEnumerable<MethodInfo> changed)
    {
        var existingTypes = original.Select(m => m.ReflectedType.FullName)
                                        .Distinct()
                                        .ToList();
        
        return from method in changed.Except(original, new MethodSignatureComparer())
                where existingTypes.Contains(method.ReflectedType.FullName)
                orderby method.ReflectedType.Name, method.Name
                select method;
    }
    
    public IEnumerable<Type> NewTypes()
    {
        var currentTypes = GetTypes(_newAssemblyPath);
        var oldTypes = GetTypes(_oldAssemblyPath);
        
        return from type in currentTypes
                where !oldTypes.Select (t => t.FullName).Contains(type.FullName)
                select type;
    }
    
    public IEnumerable<Type> DeprecatedTypes()
    {
        var currentTypes = GetTypes(_newAssemblyPath);
        var oldTypes = GetTypes(_oldAssemblyPath);
        
        return from type in oldTypes
                where !currentTypes.Select (t => t.FullName).Contains(type.FullName)
                select type;
    }
    
    private static IEnumerable<MethodInfo> GetAllMethods(string path)
    {
        return  from type in GetTypes(path)
                from method in type.GetMethods()
                where method.IsPublic
                select method;
    }
    
    private static IEnumerable<MethodInfo> GetDeclaredMethods(string path)
    {
        return GetAllMethods(path).Where(method => method.DeclaringType == method.ReflectedType);
    }
    
    private static IEnumerable<Type> GetTypes(string path)
    {
        return  from file in Directory.EnumerateFiles(path, "*.dll")
                from module in Assembly.LoadFrom(file).GetModules()
                from type in module.GetTypes()
                where type.IsPublic
                select type;
    }
    
    private static string MethodName(MethodInfo m)
    {
        return string.Format("{0}.{1}", m.ReflectedType.Name, m.Name);
    }
    
    public static string MethodSignature(MethodInfo m)
    {
        //return m.ToString();
        var ps = m.GetParameters();
        var args = ps.Select(p=>ParameterSignature(p, ps.Length));
        var argsDemlimted = string.Join(",", args);
        
        return string.Format("{0} {1}.{2}({3})", m.ReturnType.Name, m.ReflectedType.Name, m.Name, argsDemlimted);
    }
        
    private static string ParameterSignature(ParameterInfo parameter, int parameterCount)
    {
        var modifier = "";//out/ref/params/
        var defaultValue = "";
        if(parameter.IsOut) modifier = "out ";
        if(parameter.IsOptional)
        {
            modifier = "optional ";
            defaultValue = parameter.DefaultValue.ToString();
        }
        if(parameter.IsRetval) modifier += "isretval ";
        if(parameter.IsIn) modifier += "IsIn ";
        if(parameter.IsLcid) modifier += "IsLcid ";
        if(parameter.Position== parameterCount-1 && parameter.ParameterType.IsArray)
        {
            modifier = "params ";
        }
        return string.Format("{0}{1}{2}", modifier,parameter.Name, defaultValue);
    }
    
    private class MethodSignatureComparer : IEqualityComparer<MethodInfo>
    {
        public bool Equals(MethodInfo lhs, MethodInfo rhs)
        {                
            return string.Equals(lhs.ToString(), rhs.ToString());
        }
        
        public int GetHashCode(MethodInfo method)
        {
            return method.ToString().GetHashCode();
        }
    }
}

and then I used it like this

var old =         @"C:\Program Files\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2787.0\Net4";
var current =     @"C:\Program Files\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2838.0\Net4";

var dllDiff = new AssemblyDiff(old, current);

"New Methods".Dump();
dllDiff.NewMethodNames().Dump();
"Deprecated Methods".Dump();
dllDiff.DeprecatedMethodNames().Dump();
"New Types".Dump();
dllDiff.NewTypes().Select (t => t.FullName).Dump();
"Deprecated Types".Dump();
dllDiff.DeprecatedTypes().Select (t => t.FullName).Dump();
"New overloads".Dump();
dllDiff.NewMethods().Select(AssemblyDiff.MethodSignature).Dump();
"Deprecated overloads".Dump();
dllDiff.DeprecatedMethods().Select(AssemblyDiff.MethodSignature).Dump();

and I get this neat output.

New Methods (7 Items)

  • ConnectableObservable`2.Connect
  • ConnectableObservable`2.Subscribe
  • Observable.GroupJoin
  • Observable.Multicast
  • Observable.Window
  • Qbservable.GroupJoin
  • Qbservable.Window

Deprecated Methods (6 Items)

  • ConnectableObservable`1.Connect
  • ConnectableObservable`1.Subscribe
  • Observable.Prune
  • Observable.Replay
  • Qbservable.Prune
  • Qbservable.Replay

New Types (1 Item)

  • System.Collections.Generic.ConnectableObservable`2

Deprecated Types (0 Items)

New Overloads (30 Items)

  • IEnumerable`1 EnumerableEx.Generate(initialState,condition,iterate,resultSelector)
  • IObservable`1 Observable.BufferWithTime(source,timeSpan,timeShift,scheduler)
  • IObservable`1 Observable.BufferWithTime(source,timeSpan,scheduler)
  • IObservable`1 Observable.BufferWithTime(source,timeSpan,timeShift)
  • IObservable`1 Observable.BufferWithTime(source,timeSpan)
  • IObservable`1 Observable.BufferWithTimeOrCount(source,timeSpan,count,scheduler)
  • IObservable`1 Observable.BufferWithTimeOrCount(source,timeSpan,count)
  • IObservable`1 Observable.GroupJoin(left,right,leftDurationSelector,rightDurationSelector,resultSelector)
  • IObservable`1 Observable.If(condition,thenSource)
  • IObservable`1 Observable.Join(left,right,leftDurationSelector,rightDurationSelector,resultSelector)
  • IConnectableObservable`1 Observable.Multicast(source,subject)
  • IObservable`1 Observable.Publish(source,subject)
  • IObservable`1 Observable.Publish(source,subject,selector)
  • IObservable`1 Observable.Window(source,windowOpenings,windowClosingSelector)
  • IObservable`1 Observable.Window(source,windowClosingSelector,scheduler)
  • IObservable`1 Observable.Window(source,windowClosingSelector)
  • IQbservable`1 Qbservable.BufferWithTime(source,timeSpan,timeShift,scheduler)
  • IQbservable`1 Qbservable.BufferWithTime(source,timeSpan,scheduler)
  • IQbservable`1 Qbservable.BufferWithTime(source,timeSpan,timeShift)
  • IQbservable`1 Qbservable.BufferWithTime(source,timeSpan)
  • IQbservable`1 Qbservable.BufferWithTimeOrCount(source,timeSpan,count,scheduler)
  • IQbservable`1 Qbservable.BufferWithTimeOrCount(source,timeSpan,count)
  • IQbservable`1 Qbservable.GroupJoin(left,right,leftDurationSelector,rightDurationSelector,resultSelector)
  • IQbservable`1 Qbservable.If(provider,condition,thenSource)
  • IQbservable`1 Qbservable.Join(left,right,leftDurationSelector,rightDurationSelector,resultSelector)
  • IQbservable`1 Qbservable.Publish(source,subject,selector)
  • IQbservable`1 Qbservable.Publish(source,subject)
  • IQbservable`1 Qbservable.Window(source,windowOpenings,windowClosingSelector)
  • IQbservable`1 Qbservable.Window(source,windowClosingSelector,scheduler)
  • IQbservable`1 Qbservable.Window(source,windowClosingSelector)

Deprecated overloads (71 Items)

  • IEnumerable`1 EnumerableEx.Generate(initialState,condition,resultSelector,iterate)
  • IEnumerable`1 EnumerableEx.Generate(function)
  • IEnumerable`1 EnumerableEx.Generate(initialState,resultSelector,iterate)
  • IEnumerable`1 EnumerableEx.Generate(initial,resultSelector,iterate)
  • IEnumerable`1 EnumerableEx.Generate(initial,condition,resultSelector,iterate)
  • IObservable`1 Observable.BufferWithTime(source,timeSpan,timeShift,scheduler)
  • IObservable`1 Observable.BufferWithTime(source,timeSpan,scheduler)
  • IObservable`1 Observable.BufferWithTime(source,timeSpan,timeShift)
  • IObservable`1 Observable.BufferWithTime(source,timeSpan)
  • IObservable`1 Observable.BufferWithTimeOrCount(source,timeSpan,count,scheduler)
  • IObservable`1 Observable.BufferWithTimeOrCount(source,timeSpan,count)
  • IConnectableObservable`1 Observable.Prune(source)
  • IConnectableObservable`1 Observable.Prune(source,scheduler)
  • IObservable`1 Observable.Prune(source,selector)
  • IObservable`1 Observable.Prune(source,selector,scheduler)
  • IObservable`1 Observable.Publish(source1,source2,selector)
  • IObservable`1 Observable.Publish(source1,source2,selector,scheduler)
  • IObservable`1 Observable.Publish(source1,source2,source3,selector)
  • IObservable`1 Observable.Publish(source1,source2,source3,selector,scheduler)
  • IObservable`1 Observable.Publish(source1,source2,source3,source4,selector)
  • IObservable`1 Observable.Publish(source1,source2,source3,source4,selector,scheduler)
  • IConnectableObservable`1 Observable.Publish(source,initialValue)
  • IConnectableObservable`1 Observable.Publish(source,initialValue,scheduler)
  • IObservable`1 Observable.Publish(source,selector,initialValue)
  • IObservable`1 Observable.Publish(source,selector,initialValue,scheduler)
  • IConnectableObservable`1 Observable.Publish(source)
  • IConnectableObservable`1 Observable.Publish(source,scheduler)
  • IObservable`1 Observable.Publish(source,selector)
  • IObservable`1 Observable.Publish(source,selector,scheduler)
  • IConnectableObservable`1 Observable.Replay(source)
  • IConnectableObservable`1 Observable.Replay(source,scheduler)
  • IObservable`1 Observable.Replay(source,selector)
  • IObservable`1 Observable.Replay(source,selector,scheduler)
  • IConnectableObservable`1 Observable.Replay(source,window)
  • IObservable`1 Observable.Replay(source,selector,window)
  • IConnectableObservable`1 Observable.Replay(source,window,scheduler)
  • IObservable`1 Observable.Replay(source,selector,window,scheduler)
  • IConnectableObservable`1 Observable.Replay(source,bufferSize,scheduler)
  • IObservable`1 Observable.Replay(source,selector,bufferSize,scheduler)
  • IConnectableObservable`1 Observable.Replay(source,bufferSize)
  • IObservable`1 Observable.Replay(source,selector,bufferSize)
  • IConnectableObservable`1 Observable.Replay(source,bufferSize,window)
  • IObservable`1 Observable.Replay(source,selector,bufferSize,window)
  • IConnectableObservable`1 Observable.Replay(source,bufferSize,window,scheduler)
  • IObservable`1 Observable.Replay(source,selector,bufferSize,window,scheduler)
  • IQbservable`1 Qbservable.BufferWithTime(source,timeSpan,timeShift,scheduler)
  • IQbservable`1 Qbservable.BufferWithTime(source,timeSpan,scheduler)
  • IQbservable`1 Qbservable.BufferWithTime(source,timeSpan,timeShift)
  • IQbservable`1 Qbservable.BufferWithTime(source,timeSpan)
  • IQbservable`1 Qbservable.BufferWithTimeOrCount(source,timeSpan,count,scheduler)
  • IQbservable`1 Qbservable.BufferWithTimeOrCount(source,timeSpan,count)
  • IQbservable`1 Qbservable.Prune(source,selector,scheduler)
  • IQbservable`1 Qbservable.Prune(source,selector)
  • IQbservable`1 Qbservable.Publish(source,selector,initialValue)
  • IQbservable`1 Qbservable.Publish(source,selector,scheduler)
  • IQbservable`1 Qbservable.Publish(source,selector,initialValue,scheduler)
  • IQbservable`1 Qbservable.Publish(source,selector)
  • IQbservable`1 Qbservable.Publish(source1,source2,selector)
  • IQbservable`1 Qbservable.Publish(source1,source2,selector,scheduler)
  • IQbservable`1 Qbservable.Publish(source1,source2,source3,selector)
  • IQbservable`1 Qbservable.Publish(source1,source2,source3,selector,scheduler)
  • IQbservable`1 Qbservable.Publish(source1,source2,source3,source4,selector)
  • IQbservable`1 Qbservable.Publish(source1,source2,source3,source4,selector,scheduler)
  • IQbservable`1 Qbservable.Replay(source,selector,bufferSize)
  • IQbservable`1 Qbservable.Replay(source,selector,bufferSize,window)
  • IQbservable`1 Qbservable.Replay(source,selector,bufferSize,window,scheduler)
  • IQbservable`1 Qbservable.Replay(source,selector)
  • IQbservable`1 Qbservable.Replay(source,selector,scheduler)
  • IQbservable`1 Qbservable.Replay(source,selector,window)
  • IQbservable`1 Qbservable.Replay(source,selector,window,scheduler)
  • IQbservable`1 Qbservable.Replay(source,selector,bufferSize,scheduler)

From memory Generate has been a constant source of change which has confused some readers that are using different versions of the library to what the Part 2 post was done with. This diff script  goes to show that it is still undergoing changes

Smile

Links:

Reactive Extensions for .NET an Introduction

DevLabs: Reactive Extensions for .NET (Rx)

Sunday, February 13, 2011

Silverlight testing

I am putting this out there to see if I can get some traction with other Test Driven Silverlight coders out there. If you are one of these people you will know of the strife your day-to-day coding. For those who don't know what I mean these are the three options a Silverlight developer has with regards to test driven coding:

1) Use the Silverlight Unit Test Framework. The problem with this is that you lose any integrated development support, it is amazingly slow (in the area of 1-5 tests per second), and doesn't have any useful build tool support (coverage, TFS, TeamCity). Massive Fail.

2) Cross compile to .NET 4 all of your Models, ViewModels, Controllers, Modules, Presenters (i.e. everything that is not a View or a Control). Now write unit tests against this project. This means you get back to fast tests (100s tests per second) but take a hit on compiling twice and managing the project linking and just having twice as many projects floating around.

3) What I imagine as the most popular option, just don't write any tests.

Looking at what most of the requests are for, tells me most people are using Silverlight for Marketing websites to stream rich content. Business applications have yet to stake any dominance. What I am hoping that anyone reading this will if they feel my pain, just go to this link and vote. It seems that these polls really have an effect; DataTemplates appear to be part of SL 5 due to massive demand. I am hoping that Microsoft can focus on getting the underlying framework right before they go off and give us a 3D-multitouch-proximity aware API :)

http://dotnet.uservoice.com/forums/4325-silverlight-feature-suggestions/suggestions/313397-unit-testing-integrated-in-visual-studio-and-msbui?ref=title