Sunday, May 22, 2016

Rx.NET community improvments

In recent months (early to mid 2016) there has been rising confusion and frustration with the lack of openness and direction of the Rx.NET code base. Code being open-sourced and hosted on GitHub in late 2012 was cause for celebration. The community could see the code without the need for a decompiler, they could raise issues and could they even submit code changes via a Pull Request.

Bright future

Over time, the community created issues and Pull Requests accumulated, but activity in the master branch seemed to come to a halt in Oct 2015. However anticipation was in the air as one of the members of the original team (and who I think is the lead of the Rx team inside Microsoft), Bart De Smet announced that Rx3.0 was underway at NDC in Mid 2015. Features seemed to include cross platform version of expression trees - Bonsai Trees. "We are going to ship this later this year" 19:33. The community was very excited.

Another key event was happening around the same time; Microsoft was making a large change to .NET. .NET Core was to be a massive undertaking that allowed .NET code to run cross platform, not just Windows, Windows RT and Windows Phone, but on Linux and OS X too.

On top of these key events, there was the growing organic momentum of the Reactive movement. A movement that in my opinion the Rx.NET team had a large hand in popularizing and setting the standard for. Rx had matured in the eyes of the development community. It had been ported from the original implementation in .NET to JVM, JavaScript, C++, Ruby and others. This widespread adoption across the industry gave developers and managers alike confidence in the tool.

Bright future?

However, as time passed new cautious developers looking to adopt Rx wanted to know that this technology wasn't at a dead end. With recent effective end-of-life for technologies such as browser plugins (Silverlight, Flash etc) and Parse, it is understandable why people with long term plans are looking for assurances. Even intrepid developers were looking for some assurances that things were still moving. After the announcement of Rx3.0 and it being inferred that we would see a change in late 2015, in conjunction with the .NET Core activity questions started coming.

In the gitter chat room for Rx.NET, questions about the next release started as far back as Sept 2015. Then in Jan 2016 more questions about the next release and if Rx is still a supported library. And again in February, March, April and May. The tone of the chat room seemed glummer and glummer each time the question was asked, and we realized that no-one from the Rx Team was answering. Even Tamir Dresher who was writing the next book on Rx couldn't get any response from Microsoft. To add to the feeling of rejection the community was feeling, we became aware of a private chat room that members of the Rx Team were engaging with a select few people from outside of Microsoft. While I was aware of one person in this chat who I would consider a key member of the community, most of the other people I would consider key members with a wealth of experience in Rx and relevant streaming technologies were left in the dark, excluded.

Just because we can't get a transparent and comforting answer about the future of Rx from someone at Microsoft doesn't mean the project is dead. However the 18 Pull Requests that have had no interaction from the Rx Team, and the lack of any commits to master in 7 months did leave a lot of people pretty uneasy about what was happening. Not only were we getting radio silence, but this was being contrasted with what appeared to be massive amounts of activity happening in the RxJs space.

Bright Future!

And then this tweet happened
The future of Rx .NET is still bright, and yes we are transitioning it to the .NET Foundation
To me it seemed to be a self congratulatory tweet. In my opinion, there wasn't much worth celebrating. Some people did some good work porting some Rx code to CoreCLR and did so on a private fork/repo. Not on a feature branch on the main repo where others could watch, but somewhere else. A heated exchange followed on twitter, for which I apologize. It is not the forum for an exchange like that. But it did prompt me to write what I hope is constructive feedback below.

A more collaborative and transparent future?

The Rx code base is open source, which I understand doesn't mean a free-for-all. However, I think it is reasonable to expect a little more from a project than what we are getting. Here are some things I think are reasonable expectations:

Be clear about the status of the Repo/Project

In the front page (Readme.md) have the status of the repo. This should probably include which version is the current latest, which is the current pre-release. Link to them in nuget and their tags/branches (like RxJs). Highlight what the roadmap is (like ASP.NET).

Instead of people having to ask in chat rooms, twitter and forums about the status of the project it should be right their front and center. Something like
We're hoping to release a new batch of functionality later this year, based on internal developments that took place within the Bing organization. As part of this, we're looking at doing future developments in the area of our reactive programming cloud platform in the open here on GitHub. Currently, we're working out the details to transition parts of the technology to the .NET Foundation.
Followed up with something like "Transition to the .NET foundation can be slow, so this may take til Q3 2016. After that we can start porting our internal work in Q4 2016. In the meantime we would love to get your help on the issues that are up for grabs." (quote my own and made up). Now we would all know what was going on. It would take 10min to update that readme.md file.

Be clear about how to contribute

Contribution is not just about signing a CLA (Contributor License Agreement). It should include some guidance on how to raise an issue, link it to a PR and create some dialogue to validate that the change is inline with the direction of the project. This should include which branch to target 'master', 'develop' or perhaps a feature branch? It should also set the expectation of what the project custodians will do for the contributor with regards to labeling, setting milestones or closing the issue.

Documentation

The readme, the wiki and the reactivex.io documentation is sub-par. Allow the community to add more links to other helpful resources. There are 2 PRs to update documentation, but they appear to have been ignored.

If this is Microsoft's documentation for Rx, then maybe the link should be front and center. However, it suggests that you "download" the Rx SDK from the Microsoft download center instead of using Nuget, and the samples kick you straight off in the wrong direction by sticking subjects in your face as the first example. I would imagine that there would be links to external resources like the various books and websites that are out there. A good starting point for the loads of options could be this compilation -http://stackoverflow.com/questions/1596158/good-introduction-to-the-net-reactive-framework/. A link to ReactiveX.io would make sense. On that note, the ReactiveX.io samples for .NET are incomplete. Even the basic `subscribe` and `create` methods are missing. 

Clean up the branches

It appears that master is the only real branch (and gh-pages). So can 'develop' and 'BetterErrors' be merged, deleted or documented?

Creating a tag that relates to the last commit that was used to create a package is a helpful thing to do. This allows people to identify bugs for specific versions and see if a fix is already in, or if they should consider raising an issue or a PR.

Engage with issues

In the last few days there appears to be a flurry of activity, however I think we can improve in this area. Issues seem to be one of three broad categories: "Acceptable", "Maybe Later" and "Not acceptable". For things that won't get accepted into the repository, let's be honest and transparent. Say this doesn't follow the direction of the project, but thanks for your contribution. Then close the issue, or give the author a few days for rebuttal or to close it themselves. For issues that look like they match to the current planned release or would be suitable for a future release then label it as such. The CoreFx team do a great job of this in their issues

Labels are not the only tool the team could use to communicate with. Milestones also are a great way to give visibility to what is current and what is for later. They also allow you to see how close to complete a milestone is. It seems that this was a prime opportunity to use them. Milestone 2.2.6 could have had just two issues. CI and CoreCLR build. These issues could have been labeled as Up for grabs. The community would have jumped at the task. However with 18 of the current 23 PRs having had no official interaction, you can see why the community appears aloof when the task is significant, but may not even get noticed.

As a concrete challenge to the Rx.NET team: Aim to close or label every issue in the Repo. 

Maybe create some new issues, and mark them as up for grabs. Watch the community jump to action.

Which brings me on to my last ask, which is basically the same as for issues, but for PRs. Engage. People that have raised a PR, probably have used the library heavily, found an opportunity for change, then forked and cloned the repo. From here figured out how to get it build (because it doesn't out of the box in Debug or Release targets). Figured out the code base, made a change and ideally some tests to support it. Finally they have submitted the PR. Best case scenario, I think would be 4hrs of their life they could have been doing something else. I imagine it would be quite a disappointing feeling to have 0 comments on your PR.

Symbiotic Relationship

I am not hating on Rx, nor am I hating on @ReactiveX quite the opposite. I think what the Rx (Volta) team have done is brilliant. I enjoy it so much I poured 6 months into writing documentation for it and a Christmas holiday writing a PR for it. There are others too out there that are equally as passionate.

But like in a loving family, sometimes you need to be the unpopular one and point out that hey, this is not okay.
All I am asking from the Rx.NET team is a little back-and-forth, a little transparency. I would think they have far more to gain from doing this than anyone else. I have worked with hundreds of people that have used Rx. A handful have contributed to RxJava and RxJs but none to Rx.NET, because it is too hard. I think there is a great opportunity for positive change here.

RxJS is almost 100% community run at this point with Microsoft helping steer the project

Can Rx.NET take a step in this direction? With brains like Bart's at the helm and the army of the willing already out there, then yes, we would have a bright future indeed. Just tell us what you want us to do.

Friday, March 18, 2016

Measuing latency with HdrHistogram

I had the pleasure last year to meet with Gil Tene, an authority on building high performance software and specifically high performance JVM implementations. He gave a brilliant presentation at React San Francisco and then again at YOW in Australia on common mistakes made when measuring performance. He he explained that measuring latency is not about getting a number, but identifying behavior and characteristics of a system.

Often when we set out to measure the performance of our software we can be guided by NFR (Non-Functional Requirements) that really don't make too much sense. More than once I have been presented with a requirement that the system must  process x requests per time-period e.g 5 messages per second. However as Gil points out this single number is either unreasonable, or misleading. If the system must always operate in a state to support these targets then it may be cost prohibitive. This requirement must also define 100% up-time. To work around that, some requirements specify that the mean response time should be y. However this is potentially less useful. By definition what we are really specifying is the 50% of requests must see worse performance than the target.

A useful visualization for pointing out the folly of chasing a mean measurement is illustrated below.

File:Anscombe's quartet 3.svg
[Source - https://en.wikipedia.org/wiki/Anscombe%27s_quartet]

All of these charts have the same mean value, but clearly show different shapes of data. If you measuring latency in your application and were targeting a mean value, you may be able to hit these targets but still have unhappy customers.

When discussing single value targets, a mean value can be thought of as just the 50th percentile. In the first case the requirement was for the 100th percentile.

Perhaps what is more useful is to measure and target several values. Maybe the 99th percentile plus targets at 99.9% and 99.99% etc is what you really are looking for.

Measuring latency with histograms

Instead of capturing a count and a sum of all latency recorded to then calculate a mean latency, you can capture latency values and assign them to a bucket. The assignment of this value to a bucket is to simply increment the count of that bucket. This now allows us to analyse the spread of latency recordings.

The example of a histogram from Wikipedia shows how to represent heights by grouping into buckets of 5cm ranges. For each value of the 31 Black Cheery Trees measured, the height is assigned to the bucket and the count for that bucket increased. Note that the x axis is linear.

An example histogram of the heights of 31 Black Cherry trees

A naive implementation of a histogram however, may require you to pre-plan your number and width of your buckets. Gil Tene has helped out here by creating an implementation of a histogram that specifically is design for high dynamic ranges, hence its name HdrHistogram.

When you create an instance of an HdrHistogram you simply specify
  1. a maximum value that you will support
  2. the precision you want to capture as the number of significant digits
  3. optionally, the minimum value you will support
The internal data structures of the HdrHistogram are such that you can very cheaply specify a maximum value that is an order of magnitude larger than you will expect, thus giving you enough headroom for your recorded values. As the HdrHistogram is designed to measure latency a common usage would be to measure a range from the minimum supported value for the platform (nanoseconds on JVM+Linux, or ticks on .NET+Windows) up to an hour, with a fidelity of 3 significant figures.


For example, a Histogram could be configured to track the counts of observed integer values between 0 and 36,000,000,000 while maintaining a value precision of 3 significant digits across that range. Value quantization within the range will thus be no larger than 1/1,000th (or 0.1%) of any value. This example Histogram could be used to track and analyze the counts of observed response times ranging between 1 tick (100 nanoseconds) and 1 hour in magnitude, while maintaining a value resolution of 100 nanosecond up to 100 microseconds, a resolution of 1 millisecond(or better) up to one second, and a resolution of 1 second (or better) up to 1,000 seconds. At it's maximum tracked value(1 hour), it would still maintain a resolution of 3.6 seconds (or better).

Application of the HdrHistogram

When Matt (@mattbarrett) and I presented Reactive User Interfaces, we used the elements of drama and crowd reaction to illustrate the differences between various ways of conflating fast moving data from a server into a client GUI application. To best illustrate the problems of flooding a client with too much data in a server-push system, we used a modestly powered Intel i3 laptop. This worked fairly well in showing the client application coming to its knees when overloaded. However it also occasionally showed Windows coming to its knees too, which was a wee bit too much drama to have on stage during a live presentation.

Instead we thought it better to provide a static visualization of what was happening in our system when it was overloaded with data from the server. We could then contrast that with alternative implementations showing how we can perform load-shedding on the client. This also meant we could present with a single high powered laptop, instead of bringing the toy i3 along with us just to demo.

We added a port of the original Java HdrHistogram to our .NET code base. We used it to capture the latency of prices from the server, to the client, and then the additional latency for the client to actually dispatch the rendering of the price. As GUI applications are single threaded, if you provided more updates than the GUI can render, there are two things that can happen:

  • updates are queued
  • updates are conflated
What you do in your client application depends on your requirements. Some systems will need to process every message. In this case they may choose to just allow the updates to be queued. Other systems may allow updates to be conflated. Conflation is the act of taking many and reducing to one. So for some systems, they maybe able to conflate many updates and average them or aggregate them. For other systems, it may only be the last message that is the most important, so the conflation algorithm here would be to only process the last message. Matt discusses this in more detail on the Adaptive Blog.

In the demo for ReactiveTrader we demo queuing all updates and 3 styles of conflation. When we applied the HdrHistogram to our code base, we were quick to see we actually had a bug in our code base.



We had two problems. The first problem was an assumption that what worked for Silverlight, would also work for WPF. As WPF has two threads dedicated to presentation (a UI thread and a dedicated Render thread), we were actually only measuring how long it took for us to put a price on another queue! You can see that the ObserveLatest1 and ObserverLatest2 (red and yellow) lines show worse performance than just processing all items on the dispatcher. I believe this is due to us just doing more work to conflate before sending to the render thread. Unlike in Silverlight, once we send something to the Render thread in WPF we can no longer measure the time taking to actually render the change. So our measurements here were not really telling us the full story.

The second problem we see is that there was actually a bug in the code we copied from our original silverlight (Rx v1) code. The original code (red line) accidentally used a MultipleAssignmentDisposable instead of a SerialDisposable. The simple change gave us the improvements seen in the yellow line.

We were happy to see that the Conflate and ConstantRate algorithms were measuring great results, which were clearly supported visually when using the application.



To find out more about the brilliant Gil Tene
I am currently working on the final details of a complete port of the original Java HdrHsitogram to .NET. You can see my work here - https://github.com/LeeCampbell/HdrHistogram.NET

Saturday, January 11, 2014

ReplaySubject Performance improvments – code changes

In the previous post I talked about some changes that could be made to the ReplaySubject<T> implementation to squeeze large performance gains out of it. In that post I discussed mainly the result of the changes. In this post I will show some of the changes that I made.

Analysis of existing implementation

As mentioned in the previous post, all constructor overloads of the ReplaySubject<T> eventually called into the same constructor, but just provided default values for the MaxCountBuffer, MaxTimeBuffer and Scheduler. For example :

public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
{
    _bufferSize = bufferSize;
    _window = window;
    _scheduler = scheduler;

    _stopwatch = _scheduler.StartStopwatch();
    _queue = new Queue<TimeInterval<T>>();
    _isStopped = false;
    _error = null;

    _observers = new ImmutableList<ScheduledObserver<T>>();
}

public ReplaySubject(int bufferSize, TimeSpan window)
    : this(bufferSize, window, SchedulerDefaults.Iteration)
{ }

public ReplaySubject()
    : this(InfiniteBufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
{ }

public ReplaySubject(IScheduler scheduler)
    : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
{ }

public ReplaySubject(int bufferSize, IScheduler scheduler)
    : this(bufferSize, TimeSpan.MaxValue, scheduler)
{ }

public ReplaySubject(int bufferSize)
    : this(bufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
{ }

public ReplaySubject(TimeSpan window, IScheduler scheduler)
    : this(InfiniteBufferSize, window, scheduler)
{ }

public ReplaySubject(TimeSpan window)
    : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
{ }

There are a total of 8 constructor overloads, but 7 of them just delegate to the top overload above, passing default values.

Next, lets look at the Subscribe method. Here we see the passed observer is wrapped in a ScheduledObserver<T>, which is only relevant for time based buffers. Also a Trim() command is made which again is only relevant for time based buffers.

public IDisposable Subscribe(IObserver observer)
{
    var so = new ScheduledObserver(_scheduler, observer);
    var n = 0;
    var subscription = new RemovableDisposable(this, so);
    lock (_gate)
    {
        CheckDisposed();
        Trim(_stopwatch.Elapsed);
        _observers = _observers.Add(so);

        n = _queue.Count;
        foreach (var item in _queue)
            so.OnNext(item.Value);

        if (_error != null)
        {
            n++;
            so.OnError(_error);
        }
        else if (_isStopped)
        {
            n++;
            so.OnCompleted();
        }
    }
    so.EnsureActive(n);
    return subscription;
}

The next part of the code that is interesting is the implementation of the OnNext method.

public void OnNext(T value)
{
    var o = default(ScheduledObserver<T>[]);
    lock (_gate)
    {
        CheckDisposed();

        if (!_isStopped)
        {
            var now = _stopwatch.Elapsed;
            _queue.Enqueue(new TimeInterval<T>(value, now));
            Trim(now);

            o = _observers.Data;
            foreach (var observer in o)
                observer.OnNext(value);
        }
    }

    if (o != null)
        foreach (var observer in o)
            observer.EnsureActive();
}

There are several things to note here:

  1. The use of an array of ScheduledObserver<T>
  2. The use of the TimeInterval<T> envelope for the value
  3. The Trim() command

Each of the three things above become quite dubious when we consider ReplayAll and ReplayOne implementations. A ReplayMany implementation, may need a Trim() command, but surely does not need ScheduledObservers nor its values time-stamped.

Next we look at the Trim command itself:

void Trim(TimeSpan now)
{
    while (_queue.Count > _bufferSize)
        _queue.Dequeue();
    while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
        _queue.Dequeue();
}

Again we see that the code for the second while loop is wholly unnecessary for non-time based implementations.

Each of these small overheads start to add up. However where the cost really start to kick in is in the implementation of the ScheduledObserver<T>.

At a minimum, for each OnNext call to a ReplaySubject<T>, the ScheduledObserver<T> performs some condition checks, virtual method calls and some Interlocked operations. These are all fairly cheap operations. It is the unnecessary scheduling that incurs the costs. The scheduling incurs at least the following allocations

  1. ScheduledItem<TimeSpan, TState>
  2. AnonymousDisposable and the Action
  3. IndexedItem

There is also all the queues that are involved.

  1. ReplaySubject<T>._queue (Queue<TimeInterval<T>>)
  2. ScheduledObserver<T>._queue (ConcurrentQueue<T>)
  3. CurrentThreadScheduler.s_threadLocalQueue (SchedulerQueue<TimeSpan>)

And the concurrency controls

  1. ReplaySubject uses the IStopWatch from _scheduler.StartStopwatch()
  2. ScheduledObserver<T>; will use Interlocked.CompareExchange 3-4 times on a standard OnNext call.
  3. ConcurrentQueue uses a Interlocked.Increment and also a System.Threading.SpinWait when en-queuing a value. It also uses up to 3 SpinWaits and a Interlocked.CompareExchange to de-queue a value.
  4. The recursive scheduling extension method use lock twice
  5. CurrentThreadScheduler uses a Thread.Sleep()

Alternative implementations

All of the code above is fairly innocent looking when looked at in isolation. However, when we consider what we probably need for a ReplayOne, ReplayMany or ReplayAll implementation, all this extra code might make you blush.

The implementations that do not have a time consideration now share a base class and its updated OnNext implementation is now simply:

public void OnNext(T value)
{
    lock (_gate)
    {
        CheckDisposed();

        if (!_isStopped)
        {
            AddValueToBuffer(value);
            Trim();

            var o = _observers.Data;
            foreach (var observer in o)
                observer.OnNext(value);
        }
    }
}

The ReplayOne implementation is now reduced to :

private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation
{
    private bool _hasValue;
    private T _value;

    protected override void Trim()
    {
        //NoOp. No need to trim.
    }

    protected override void AddValueToBuffer(T value)
    {
        _hasValue = true;
        _value = value;
    }

    protected override void ReplayBuffer(IObserver<T> observer)
    {
        if (_hasValue)
            observer.OnNext(_value);
    }

    protected override void Dispose(bool disposing)
    {
        base.Dispose(disposing);
        _value = default(T);
    }
}

Note that there are no queues, schedulers, allocations etc. We have replaced all of that with simply a field to hold the single value, and a boolean flag to indicate if there has been a value buffered yet.

This is allowed to become so simple due to the base class ReplayBufferBase:

private abstract class ReplayBufferBase : IReplaySubjectImplementation
{
    private readonly object _gate = new object();
    private bool _isDisposed;
    private bool _isStopped;
    private Exception _error;
    private ImmutableList<IObserver<T>> _observers;

    protected ReplayBufferBase()
    {
        _observers = new ImmutableList<IObserver<T>>();
    }

    protected abstract void Trim();
    protected abstract void AddValueToBuffer(T value);
    protected abstract void ReplayBuffer(IObserver<T> observer);

    public bool HasObservers
    {
        get
        {
            var observers = _observers;
            return observers != null && observers.Data.Length > 0;
        }
    }

    public void OnNext(T value)
    {
        lock (_gate)
        {
            CheckDisposed();

            if (!_isStopped)
            {
                AddValueToBuffer(value);
                Trim();

                var o = _observers.Data;
                foreach (var observer in o)
                    observer.OnNext(value);
            }
        }
    }

    public void OnError(Exception error) {/*...*/}

    public void OnCompleted() {/*...*/}

    public IDisposable Subscribe(IObserver<T> observer) {/*...*/}

    public void Unsubscribe(IObserver<T> observer) {/*...*/}

    private void CheckDisposed() {/*...*/}

    public void Dispose() {/*...*/}

    protected virtual void Dispose(bool disposing) {/*...*/}
}

The ReplayMany and ReplayAll implementations are slightly more complex as they require a Queue to store the buffered values. Again we add another base class to do most of the work.

private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation
{
    private readonly Queue<T> _queue;

    protected ReplayManyBase(int queueSize)
        : base()
    {
        _queue = new Queue<T>(queueSize);
    }

    protected Queue<T> Queue { get { return _queue; } }

    protected override void AddValueToBuffer(T value)
    {
        _queue.Enqueue(value);
    }

    protected override void ReplayBuffer(IObserver<T> observer)
    {
        foreach (var item in _queue)
            observer.OnNext(item);
    }

    protected override void Dispose(bool disposing)
    {
        base.Dispose(disposing);
        _queue.Clear();
    }
}

Now the only differences are the initial buffer size and whether the buffer gets trimmed or not. This leaves us with the final two implementations:

private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation
{
    private readonly int _bufferSize;

    public ReplayMany(int bufferSize)
        : base(bufferSize)
    {
        _bufferSize = bufferSize;
    }

    protected override void Trim()
    {
        while (Queue.Count > _bufferSize)
            Queue.Dequeue();
    }
}

private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation
{
    public ReplayAll()
        : base(0)
    {
    }

    protected override void Trim()
    {
        //NoOp; i.e. Dont' trim, keep all values.
    }
}

Less code, more speed

Like Kunu says "Do less".

By removing a lot of the excess code we are able to massively improve the performance of the ReplaySubject<T> for arguably most use-cases.

Saturday, January 4, 2014

ReplaySubject performance improvements

Over the last year I have been looking at improving the performance of the ReplaySubject<T> in the .NET implementation of the Reactive Extensions. It has been a fun journey and I am really excited about the results.

How it started

In a project i was working on in early 2013, we were doing our standard pre-release performance checks when @marcuswhit found an unusual spike in allocations and therefore increase in GC pressure. For the user, this resulted in a drop in performance for the application we had built. We had a look at the culprit code and it was simply the introduction of a Replay operator. The replay operator was required for the new functionality, but the performance hit was unexpected.

Later when John Rayner and I looked into the underlying code in the Rx codebase we found that all flavours of a ReplaySubject shared the same code path. This meant that if you were creating a Replay-One sequence, you would incur the same costs that a Replay-By-Time sequence would. These costs are outlined in the codeplex workitem - https://rx.codeplex.com/workitem/35. To summarize: In my opinion, if you construct a ReplaySubject without a TimeSpan argument, then you shouldn’t have to pay the cost of schedulers and stopwatches. In my experience, Replay() and Replay(1) (or Replay-All and Replay-One) make up at least 80% of the uses of Replay.

Performance testing

When I opened up the code base I was able to see the excess code that would run for each variation of the ReplaySubject. I was sure that I could make it faster, but I obviously needed to get some metrics for it. So I created a little performance test harness (https://github.com/LeeCampbell/RxPerfTests) that I could use to test implementations of ReplaySubject from various versions of Rx.

Running on my little holiday traveller i3 laptop i was able to get maximum throughput of up to ~225k messages per second on version 2.1.30214.0 of Rx. I then pulled down the latest (v2.2.0.0) code base from GitHub, compiled the Release40 build. This also showed throughput of up to ~225k messages per second. The performance scaled in a fairly linear fashion: doubling the subscriptions seemed to halve the throughput. Here is the results of a test against v2.1.30214.0 for the a Replay-All subject.

Rx v2.1.30214.0 Replay() - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 218747  92306  56871  23445  13673
    10000 215906 109191  54546  26872  13244
   100000 200946 105164  52730  26595  13311
  1000000 204420 104031  53133  26775  13164

Each flavour (Replay-All, Replay-One, Replay-Many, Replay-by-Time and Replay-by-Time-and-Count) showed similar performance characteristics.

The changes

I then pulled apart the ReplaySubject on my local git repo of Rx and made some changes. I decided that as each flavour had different requirements, that they also could have different implementations. I couldn’t change the public API, so I opted for creating private nested implementations for each flavour of the ReplaySubject. Each implementation started off as a copy of the original ReplaySubject and was cut back to just the code that was required. Then any duplicate code was refactored back into a base class. I found a few bugs in my own code that were not covered by the existing test, so I added some more tests.

Using the performance test harness mentioned above, I was able to tweak here and there and get massive improvements. The Replay-by-Time and Replay-by-Time-and-Count flavours showed no performance changes. This is because they basically remained the same code base with just a new layer of indirection. The big improvements came from the Replay-All, Replay-One and Replay-Many flavours. Now instead of single subscriptions getting ~225msg/s, I now started seeing throughput of over 9million msg/s. While this was great, and even better improvement was the scalability profile. Now adding subscriptions had a far lower impact. In the unmodified implementations adding 16 subscriptions had the effect of reducing the throughput by a factor of 16. The new implementation now could add 16 subscriptions and see a reduction in throughput by a factor of less than 2.

Rx v2.2.0.0(modified) Replay() - Throughput (msg/sec)
         Subscriptions
 Messages       1       2       4       8      16
     1000 9823183 9541985 8583691 7107321 5167959
    10000 9863878 9447331 8727527 7175660 5174912
   100000 9788280 9356550 8727450 6881744 5154719
  1000000 8998729 8885193 8108485 6720683 5031854

Why such large performance gains?

While these are micro-benchmarks and need to be taken with a grain of salt, we can still consider why these large improvements are being seen. We can look to another metric that I collect while running these tests : Generation 0 Garbage collections. In the case of a single subscription being pushed 100k messages (just integer values as the messages), each flavour consistently causes 41 or 42 Gen0 GC’s to occur. With the new implementations, only Replay-by-Time and Replay-by-Time-and-Count still cause this much GC activity. This is due to them being essentially the old implementations. In all other cases, 100k messages cause no Gen0 collections. In fact we have to push the messages up to 1 million messages to get a single Gen0 Collection.

Here is table of the unmodified implementation’s Garbage Collection profile:

Rx v2.1.30214.0 Replay() - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   42   83  164  325  655
  1000000  422  838 1664 3321 6550

Here is the data for the new implementation:

Rx v2.2.0.0(modified) Replay() - GCs
         Subscriptions
 Messages  1  2  4  8 16
     1000  0  0  0  0  0
    10000  0  0  0  0  0
   100000  0  0  0  0  0
  1000000  1  1  1  1  1

This data makes it very clear to see that we are just doing a lot less work, less allocations & less garbage collection which means a lot more speed.

As most people interested in performance probably don’t run an i3 processor, I also re-ran the tests on my Surface Pro that has an i5 processor. The improvements were still just as good. The i5 could squeeze out just over 400k msg/s on the unmodified code base, and still subscriptions had the same linear effect on performance.

Rx v2.1.30214.0 Replay() - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 380431 218871 114416  57253  28110
    10000 398996 204956 113002  55781  27993
   100000 414960 217336 109840  55951  27422
  1000000 418740 217453 110330  56145  26576

With the modified code however, we could now reach over 17million msg/s. Where 16 subscriptions caused the original implementation to fall down to just under 30k msg/s, we can still reach over 9million.

Rx v2.2.0.0(modified) Replay() - Throughput (msg/sec)
         Subscriptions
 Messages        1        2        4        8       16
     1000 17452007 17452007 16260163 12953368  9363296
    10000 18975332 17869907 15928640 13326226  9470594
   100000 18110693 18102824 16780494 13748350  9360316
  1000000 17682715 17005559 16196903 12687618  9242486

 

Full Test results

i3 processor – Unmodified code
Rx v2.1.30214.0 Replay() - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 218747  92306  56871  23445  13673
    10000 215906 109191  54546  26872  13244
   100000 200946 105164  52730  26595  13311
  1000000 204420 104031  53133  26775  13164

Rx v2.1.30214.0 Replay() - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   42   83  164  325  655
  1000000  422  838 1664 3321 6550



Rx v2.1.30214.0 Replay(1) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 214450 110044  55577  28174  14292
    10000 182842 102424  53205  27201  13200
   100000 202244 103372  53023  26730  12901
  1000000 199423 100937  52423  26692  13175

Rx v2.1.30214.0 Replay(1) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   41   82  164  329  658
  1000000  416  833 1667 3325 6610



Rx v2.1.30214.0 Replay(5) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 154466 112586  53806  28533  12066
    10000 209941 105418  52227  25618  12894
   100000 203439 103846  52172  26408  12773
  1000000 199373 102434  52428  26218  13154

Rx v2.1.30214.0 Replay(5) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   41   82  164  329  658
  1000000  416  833 1667 3325 6610



Rx v2.1.30214.0 Replay(5.Seconds()) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 165085 113025  52570  26510  12423
    10000 208303 106808  52318  26740  13070
   100000 197664 103754  51916  26164  13254
  1000000 198327 103020  52606  26467  13200

Rx v2.1.30214.0 Replay(5.Seconds()) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   42   83  164  325  655
  1000000  422  837 1664 3319 6547



Rx v2.1.30214.0 Replay(5, 5.Seconds()) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 216216 112792  56599  28629  13872
    10000 203111 106995  51609  26650  13174
   100000 198527 102207  52097  26304  13025
  1000000 198140 103024  49670  26410  13067

Rx v2.1.30214.0 Replay(5, 5.Seconds()) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   41   82  164  329  658
  1000000  416  833 1667 3325 6610
i3 processor - Modified code
Rx v2.2.0.0 Replay() - Throughput (msg/sec)
         Subscriptions
 Messages       1       2       4       8      16
     1000 9823183 9541985 8583691 7107321 5167959
    10000 9863878 9447331 8727527 7175660 5174912
   100000 9788280 9356550 8727450 6881744 5154719
  1000000 8998729 8885193 8108485 6720683 5031854

Rx v2.2.0.0 Replay() - GCs
         Subscriptions
 Messages  1  2  4  8 16
     1000  0  0  0  0  0
    10000  0  0  0  0  0
   100000  0  0  0  0  0
  1000000  1  1  1  1  1



Rx v2.2.0.0 Replay(1) - Throughput (msg/sec)
         Subscriptions
 Messages        1        2        4        8       16
     1000 11560694 11976048 10256410  8077544  5707763
    10000 11444266 11689071 10127608  7958615  5674403
   100000 11923925 11776760 10193472  8016096  5638377
  1000000 11963712 11374908  9795912  7469844  5392897

Rx v2.2.0.0 Replay(1) - GCs
         Subscriptions
 Messages  1  2  4  8 16
     1000  0  0  0  0  0
    10000  0  0  0  0  0
   100000  0  0  0  0  0
  1000000  0  0  0  0  0



Rx v2.2.0.0 Replay(5) - Throughput (msg/sec)
         Subscriptions
 Messages       1       2       4       8      16
     1000 8748906 8583691 7072136 6289308 4688233
    10000 8683571 8504848 7512019 6407381 4731712
   100000 8470556 8434904 7535170 6329915 4552138
  1000000 8516944 8072641 7322719 5943437 4545893

Rx v2.2.0.0 Replay(5) - GCs
         Subscriptions
 Messages  1  2  4  8 16
     1000  0  0  0  0  0
    10000  0  0  0  0  0
   100000  0  0  0  0  0
  1000000  0  0  0  0  0



Rx v2.2.0.0 Replay(5.Seconds()) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 203145 112941  55796  27764  13894
    10000 210778 103922  52273  25893  13369
   100000 199651 103438  52908  25945  13168
  1000000 201586 103890  51972  26753  12945

Rx v2.2.0.0 Replay(5.Seconds()) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   42   83  164  325  655
  1000000  422  838 1663 3319 6548



Rx v2.2.0.0 Replay(5, 5.Seconds()) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 207215 111986  56923  28671  13387
    10000 200563 106150  52456  25914  13460
   100000 195847 103178  53104  26365  13366
  1000000 194919 103045  52904  26238  13325

Rx v2.2.0.0 Replay(5, 5.Seconds()) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   41   82  164  329  658
  1000000  416  833 1667 3325 6610

 

i5 processor - unmodified code
Rx v2.1.30214.0 Replay() - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 380431 218871 114416  57253  28110
    10000 398996 204956 113002  55781  27993
   100000 414960 217336 109840  55951  27422
  1000000 418740 217453 110330  56145  26576

Rx v2.1.30214.0 Replay() - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   42   83  164  325  655
  1000000  422  838 1664 3321 6627



Rx v2.1.30214.0 Replay(1) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 344222 223389 113538  52829  29044
    10000 417744 219372 109963  55726  27561
   100000 415620 216711 110546  56107  28060
  1000000 419986 218789 110905  56176  28149

Rx v2.1.30214.0 Replay(1) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   41   82  164  329  658
  1000000  416  833 1667 3325 6610



Rx v2.1.30214.0 Replay(5) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 361978 223899 113654  56229  28129
    10000 412809 212685 111607  55976  27862
   100000 422559 217703 110316  55860  28014
  1000000 423255 217803 110782  56183  28099

Rx v2.1.30214.0 Replay(5) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   41   82  164  329  658
  1000000  416  833 1667 3325 6610



Rx v2.1.30214.0 Replay(5.Seconds()) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 384675 221239 106288  56164  28373
    10000 416682 219457 110083  56036  27671
   100000 422329 218200 110706  55608  27383
  1000000 422114 217501 110514  56183  26502

Rx v2.1.30214.0 Replay(5.Seconds()) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   42   83  164  325  655
  1000000  422  838 1664 3320 6551



Rx v2.1.30214.0 Replay(5, 5.Seconds()) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 375094 225119 113670  56691  28672
    10000 410477 220168 111093  55885  27876
   100000 420379 217921 110347  56104  27049
  1000000 420783 217080 111352  56076  28017

Rx v2.1.30214.0 Replay(5, 5.Seconds()) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   41   82  164  329  658
  1000000  416  833 1667 3325 6610
i5 processor - modified code
Rx v2.2.0.0 Replay() - Throughput (msg/sec)
         Subscriptions
 Messages        1        2        4        8       16
     1000 17452007 17452007 16260163 12953368  9363296
    10000 18975332 17869907 15928640 13326226  9470594
   100000 18110693 18102824 16780494 13748350  9360316
  1000000 17682715 17005559 16196903 12687618  9242486

Rx v2.2.0.0 Replay() - GCs
         Subscriptions
 Messages  1  2  4  8 16
     1000  0  0  0  0  0
    10000  0  0  0  0  0
   100000  0  0  0  0  0
  1000000  1  1  1  1  1



Rx v2.2.0.0 Replay(1) - Throughput (msg/sec)
         Subscriptions
 Messages        1        2        4        8       16
     1000 22421525 23364486 20449898 16103060 11049724
    10000 20449898 23529412 20733983 16097875 10883761
   100000 22837829 22088220 20341327 15736384 10903817
  1000000 23659399 23495633 20324584 15783451 10902201

Rx v2.2.0.0 Replay(1) - GCs
         Subscriptions
 Messages  1  2  4  8 16
     1000  0  0  0  0  0
    10000  0  0  0  0  0
   100000  0  0  0  0  0
  1000000  0  0  0  0  0



Rx v2.2.0.0 Replay(5) - Throughput (msg/sec)
         Subscriptions
 Messages        1        2        4        8       16
     1000 15948963 17094017 15797788 13054830  9578544
    10000 16417665 16716817 15852885 12655024  9417083
   100000 16009990 16974759 15540016 12802622  9371456
  1000000 17090833 16916123 15518023 12824245  9297327

Rx v2.2.0.0 Replay(5) - GCs
         Subscriptions
 Messages  1  2  4  8 16
     1000  0  0  0  0  0
    10000  0  0  0  0  0
   100000  0  0  0  0  0
  1000000  0  0  0  0  0



Rx v2.2.0.0 Replay(5.Seconds()) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 320328 222336 112826  53494  28674
    10000 421850 221479 112933  56733  27410
   100000 412311 216091 112233  56235  27692
  1000000 416718 219312 112004  56672  26932

Rx v2.2.0.0 Replay(5.Seconds()) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   42   83  164  325  655
  1000000  422  838 1664 3320 6550



Rx v2.2.0.0 Replay(5, 5.Seconds()) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 388018 222306 114519  57530  27928
    10000 415835 222314 109739  57219  28178
   100000 421980 215986 113116  56194  28354
  1000000 419603 218809 112619  56641  28380

Rx v2.2.0.0 Replay(5, 5.Seconds()) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   41   82  164  329  658
  1000000  416  833 1667 3325 6610

 

TL;DR

I have submitted a pull request that should give at least 25x the throughput on ReplaySubjects where the buffer has no time component. For multiple subscriptions, performance improvements are much higher, with tests for 16 subscriptions showing ~350x improvement. So if you have any Last-value caches, if you use the .Replay() extension method, or the Replay(int) overload, and you want to see less garbage collection then feel free to up vote this issue on codeplex, and hopefully we will see the Rx team at Microsoft accept the pull request.

Tuesday, November 6, 2012

Rx now open source

Awesome news today that will please many (Thargy?) is that Rx is now open source.

The new home page has just gone up
http://rx.codeplex.com/

Scott Hanselman himself is plugging it
http://www.hanselman.com/blog/ReactiveExtensionsRxIsNowOpenSource.aspx

And we even get a mention for the IntroToRx.com site (Yay!)


Friday, September 21, 2012

.NET 4 dynamic feature

When .NET 4 was coming out a couple of years ago, one of the neat new features was the inclusion of (better support for?) dynamic languages. C# even got some love and the dynamic keyword was added. I was quite excited about the prospect of having dynamic coding features in my C# code base. Now about 3 years later I have finally used the dynamic keyword; once in "real" code and once for debugging purposes.
 
Now that I have used the feature, it has opened my eyes to other opportunities to leverage it. I hope this helps someone else too.

The first time I used the dynamic keyword was when I needed to access some COM interop component. My specific example is when I wanted access to the document of a WPF WebBroswer object. The problem is the WebBrowser.Document property is of type System.Object. As I debug, I can peek into the object and see that it has the title property I want with all the data I need. However, you need to cast it to the COM object to use it in code. I have always been a bit queasy when I need to do COM interop.

Regardless of my feelings towards COM interop, there could be COM interface version issues depending on the version of Browser installed and it just seems like a big hammer for simply getting the title from the Document property. Well this clever clogs had already posted a super simple solution to my problem; just use the dynamic keyword. I know I want the title property, I just don't care which interface is used to get it. Great stuff!

Today I find myself hacking around a part of a code base that I am unfamiliar with. There seems to be some sort of On^2 operation introduced into the code base. I want to hack in some logging into a Generic usercontrol that is doing too much work. When I go to log the events I just get the Infragistics* DataRecord's ToString() implementation filling up my logs. I know that in this case I have a Deal in the data context for the problem I am logging, but it is an open generic type of T in the control. The control has no reference to the assembly that the Deal object lives but I want to log the Deal.DealId property. I want to see the DealId so I know if we are looping over the same deal the whole time or if we have jitter or just double handling etc...

Well you guessed it, where I need to get the DealId, I just assign the DataRecord's DataItem to a dynamic variable and then just log the magical DealId property on that dynamic object. Super! (Obviously once we found the issue, I ripped out my hack)
 
So I don’t think this is a feature I will use a lot of, but it really helped in these two scenarios.

Monday, August 20, 2012

Intro To Rx on TWC9

Brian Keller and Dan Fernandez mentioned IntroToRx.com on their Channel9 show This week on Channel 9. It was somewhat dwarfed by the talk of Win8, VS2012 etc as one would expect.

For those that don't know, Channel9 is Microsoft's virtual/online TV channel for keep the public in touch with  all that is happening around the world relating to Microsoft. This week on Channel9 is a great show that quickly summarizes what has been published on the site for the week. Other shows to check out is the Defrag and Ping.