Saturday, January 11, 2014

ReplaySubject Performance improvments – code changes

In the previous post I talked about some changes that could be made to the ReplaySubject<T> implementation to squeeze large performance gains out of it. In that post I discussed mainly the result of the changes. In this post I will show some of the changes that I made.

Analysis of existing implementation

As mentioned in the previous post, all constructor overloads of the ReplaySubject<T> eventually called into the same constructor, but just provided default values for the MaxCountBuffer, MaxTimeBuffer and Scheduler. For example :

public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
{
    _bufferSize = bufferSize;
    _window = window;
    _scheduler = scheduler;

    _stopwatch = _scheduler.StartStopwatch();
    _queue = new Queue<TimeInterval<T>>();
    _isStopped = false;
    _error = null;

    _observers = new ImmutableList<ScheduledObserver<T>>();
}

public ReplaySubject(int bufferSize, TimeSpan window)
    : this(bufferSize, window, SchedulerDefaults.Iteration)
{ }

public ReplaySubject()
    : this(InfiniteBufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
{ }

public ReplaySubject(IScheduler scheduler)
    : this(InfiniteBufferSize, TimeSpan.MaxValue, scheduler)
{ }

public ReplaySubject(int bufferSize, IScheduler scheduler)
    : this(bufferSize, TimeSpan.MaxValue, scheduler)
{ }

public ReplaySubject(int bufferSize)
    : this(bufferSize, TimeSpan.MaxValue, SchedulerDefaults.Iteration)
{ }

public ReplaySubject(TimeSpan window, IScheduler scheduler)
    : this(InfiniteBufferSize, window, scheduler)
{ }

public ReplaySubject(TimeSpan window)
    : this(InfiniteBufferSize, window, SchedulerDefaults.Iteration)
{ }

There are a total of 8 constructor overloads, but 7 of them just delegate to the top overload above, passing default values.

Next, lets look at the Subscribe method. Here we see the passed observer is wrapped in a ScheduledObserver<T>, which is only relevant for time based buffers. Also a Trim() command is made which again is only relevant for time based buffers.

public IDisposable Subscribe(IObserver observer)
{
    var so = new ScheduledObserver(_scheduler, observer);
    var n = 0;
    var subscription = new RemovableDisposable(this, so);
    lock (_gate)
    {
        CheckDisposed();
        Trim(_stopwatch.Elapsed);
        _observers = _observers.Add(so);

        n = _queue.Count;
        foreach (var item in _queue)
            so.OnNext(item.Value);

        if (_error != null)
        {
            n++;
            so.OnError(_error);
        }
        else if (_isStopped)
        {
            n++;
            so.OnCompleted();
        }
    }
    so.EnsureActive(n);
    return subscription;
}

The next part of the code that is interesting is the implementation of the OnNext method.

public void OnNext(T value)
{
    var o = default(ScheduledObserver<T>[]);
    lock (_gate)
    {
        CheckDisposed();

        if (!_isStopped)
        {
            var now = _stopwatch.Elapsed;
            _queue.Enqueue(new TimeInterval<T>(value, now));
            Trim(now);

            o = _observers.Data;
            foreach (var observer in o)
                observer.OnNext(value);
        }
    }

    if (o != null)
        foreach (var observer in o)
            observer.EnsureActive();
}

There are several things to note here:

  1. The use of an array of ScheduledObserver<T>
  2. The use of the TimeInterval<T> envelope for the value
  3. The Trim() command

Each of the three things above become quite dubious when we consider ReplayAll and ReplayOne implementations. A ReplayMany implementation, may need a Trim() command, but surely does not need ScheduledObservers nor its values time-stamped.

Next we look at the Trim command itself:

void Trim(TimeSpan now)
{
    while (_queue.Count > _bufferSize)
        _queue.Dequeue();
    while (_queue.Count > 0 && now.Subtract(_queue.Peek().Interval).CompareTo(_window) > 0)
        _queue.Dequeue();
}

Again we see that the code for the second while loop is wholly unnecessary for non-time based implementations.

Each of these small overheads start to add up. However where the cost really start to kick in is in the implementation of the ScheduledObserver<T>.

At a minimum, for each OnNext call to a ReplaySubject<T>, the ScheduledObserver<T> performs some condition checks, virtual method calls and some Interlocked operations. These are all fairly cheap operations. It is the unnecessary scheduling that incurs the costs. The scheduling incurs at least the following allocations

  1. ScheduledItem<TimeSpan, TState>
  2. AnonymousDisposable and the Action
  3. IndexedItem

There is also all the queues that are involved.

  1. ReplaySubject<T>._queue (Queue<TimeInterval<T>>)
  2. ScheduledObserver<T>._queue (ConcurrentQueue<T>)
  3. CurrentThreadScheduler.s_threadLocalQueue (SchedulerQueue<TimeSpan>)

And the concurrency controls

  1. ReplaySubject uses the IStopWatch from _scheduler.StartStopwatch()
  2. ScheduledObserver<T>; will use Interlocked.CompareExchange 3-4 times on a standard OnNext call.
  3. ConcurrentQueue uses a Interlocked.Increment and also a System.Threading.SpinWait when en-queuing a value. It also uses up to 3 SpinWaits and a Interlocked.CompareExchange to de-queue a value.
  4. The recursive scheduling extension method use lock twice
  5. CurrentThreadScheduler uses a Thread.Sleep()

Alternative implementations

All of the code above is fairly innocent looking when looked at in isolation. However, when we consider what we probably need for a ReplayOne, ReplayMany or ReplayAll implementation, all this extra code might make you blush.

The implementations that do not have a time consideration now share a base class and its updated OnNext implementation is now simply:

public void OnNext(T value)
{
    lock (_gate)
    {
        CheckDisposed();

        if (!_isStopped)
        {
            AddValueToBuffer(value);
            Trim();

            var o = _observers.Data;
            foreach (var observer in o)
                observer.OnNext(value);
        }
    }
}

The ReplayOne implementation is now reduced to :

private sealed class ReplayOne : ReplayBufferBase, IReplaySubjectImplementation
{
    private bool _hasValue;
    private T _value;

    protected override void Trim()
    {
        //NoOp. No need to trim.
    }

    protected override void AddValueToBuffer(T value)
    {
        _hasValue = true;
        _value = value;
    }

    protected override void ReplayBuffer(IObserver<T> observer)
    {
        if (_hasValue)
            observer.OnNext(_value);
    }

    protected override void Dispose(bool disposing)
    {
        base.Dispose(disposing);
        _value = default(T);
    }
}

Note that there are no queues, schedulers, allocations etc. We have replaced all of that with simply a field to hold the single value, and a boolean flag to indicate if there has been a value buffered yet.

This is allowed to become so simple due to the base class ReplayBufferBase:

private abstract class ReplayBufferBase : IReplaySubjectImplementation
{
    private readonly object _gate = new object();
    private bool _isDisposed;
    private bool _isStopped;
    private Exception _error;
    private ImmutableList<IObserver<T>> _observers;

    protected ReplayBufferBase()
    {
        _observers = new ImmutableList<IObserver<T>>();
    }

    protected abstract void Trim();
    protected abstract void AddValueToBuffer(T value);
    protected abstract void ReplayBuffer(IObserver<T> observer);

    public bool HasObservers
    {
        get
        {
            var observers = _observers;
            return observers != null && observers.Data.Length > 0;
        }
    }

    public void OnNext(T value)
    {
        lock (_gate)
        {
            CheckDisposed();

            if (!_isStopped)
            {
                AddValueToBuffer(value);
                Trim();

                var o = _observers.Data;
                foreach (var observer in o)
                    observer.OnNext(value);
            }
        }
    }

    public void OnError(Exception error) {/*...*/}

    public void OnCompleted() {/*...*/}

    public IDisposable Subscribe(IObserver<T> observer) {/*...*/}

    public void Unsubscribe(IObserver<T> observer) {/*...*/}

    private void CheckDisposed() {/*...*/}

    public void Dispose() {/*...*/}

    protected virtual void Dispose(bool disposing) {/*...*/}
}

The ReplayMany and ReplayAll implementations are slightly more complex as they require a Queue to store the buffered values. Again we add another base class to do most of the work.

private abstract class ReplayManyBase : ReplayBufferBase, IReplaySubjectImplementation
{
    private readonly Queue<T> _queue;

    protected ReplayManyBase(int queueSize)
        : base()
    {
        _queue = new Queue<T>(queueSize);
    }

    protected Queue<T> Queue { get { return _queue; } }

    protected override void AddValueToBuffer(T value)
    {
        _queue.Enqueue(value);
    }

    protected override void ReplayBuffer(IObserver<T> observer)
    {
        foreach (var item in _queue)
            observer.OnNext(item);
    }

    protected override void Dispose(bool disposing)
    {
        base.Dispose(disposing);
        _queue.Clear();
    }
}

Now the only differences are the initial buffer size and whether the buffer gets trimmed or not. This leaves us with the final two implementations:

private sealed class ReplayMany : ReplayManyBase, IReplaySubjectImplementation
{
    private readonly int _bufferSize;

    public ReplayMany(int bufferSize)
        : base(bufferSize)
    {
        _bufferSize = bufferSize;
    }

    protected override void Trim()
    {
        while (Queue.Count > _bufferSize)
            Queue.Dequeue();
    }
}

private sealed class ReplayAll : ReplayManyBase, IReplaySubjectImplementation
{
    public ReplayAll()
        : base(0)
    {
    }

    protected override void Trim()
    {
        //NoOp; i.e. Dont' trim, keep all values.
    }
}

Less code, more speed

Like Kunu says "Do less".

By removing a lot of the excess code we are able to massively improve the performance of the ReplaySubject<T> for arguably most use-cases.

Saturday, January 4, 2014

ReplaySubject performance improvements

Over the last year I have been looking at improving the performance of the ReplaySubject<T> in the .NET implementation of the Reactive Extensions. It has been a fun journey and I am really excited about the results.

How it started

In a project i was working on in early 2013, we were doing our standard pre-release performance checks when @marcuswhit found an unusual spike in allocations and therefore increase in GC pressure. For the user, this resulted in a drop in performance for the application we had built. We had a look at the culprit code and it was simply the introduction of a Replay operator. The replay operator was required for the new functionality, but the performance hit was unexpected.

Later when John Rayner and I looked into the underlying code in the Rx codebase we found that all flavours of a ReplaySubject shared the same code path. This meant that if you were creating a Replay-One sequence, you would incur the same costs that a Replay-By-Time sequence would. These costs are outlined in the codeplex workitem - https://rx.codeplex.com/workitem/35. To summarize: In my opinion, if you construct a ReplaySubject without a TimeSpan argument, then you shouldn’t have to pay the cost of schedulers and stopwatches. In my experience, Replay() and Replay(1) (or Replay-All and Replay-One) make up at least 80% of the uses of Replay.

Performance testing

When I opened up the code base I was able to see the excess code that would run for each variation of the ReplaySubject. I was sure that I could make it faster, but I obviously needed to get some metrics for it. So I created a little performance test harness (https://github.com/LeeCampbell/RxPerfTests) that I could use to test implementations of ReplaySubject from various versions of Rx.

Running on my little holiday traveller i3 laptop i was able to get maximum throughput of up to ~225k messages per second on version 2.1.30214.0 of Rx. I then pulled down the latest (v2.2.0.0) code base from GitHub, compiled the Release40 build. This also showed throughput of up to ~225k messages per second. The performance scaled in a fairly linear fashion: doubling the subscriptions seemed to halve the throughput. Here is the results of a test against v2.1.30214.0 for the a Replay-All subject.

Rx v2.1.30214.0 Replay() - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 218747  92306  56871  23445  13673
    10000 215906 109191  54546  26872  13244
   100000 200946 105164  52730  26595  13311
  1000000 204420 104031  53133  26775  13164

Each flavour (Replay-All, Replay-One, Replay-Many, Replay-by-Time and Replay-by-Time-and-Count) showed similar performance characteristics.

The changes

I then pulled apart the ReplaySubject on my local git repo of Rx and made some changes. I decided that as each flavour had different requirements, that they also could have different implementations. I couldn’t change the public API, so I opted for creating private nested implementations for each flavour of the ReplaySubject. Each implementation started off as a copy of the original ReplaySubject and was cut back to just the code that was required. Then any duplicate code was refactored back into a base class. I found a few bugs in my own code that were not covered by the existing test, so I added some more tests.

Using the performance test harness mentioned above, I was able to tweak here and there and get massive improvements. The Replay-by-Time and Replay-by-Time-and-Count flavours showed no performance changes. This is because they basically remained the same code base with just a new layer of indirection. The big improvements came from the Replay-All, Replay-One and Replay-Many flavours. Now instead of single subscriptions getting ~225msg/s, I now started seeing throughput of over 9million msg/s. While this was great, and even better improvement was the scalability profile. Now adding subscriptions had a far lower impact. In the unmodified implementations adding 16 subscriptions had the effect of reducing the throughput by a factor of 16. The new implementation now could add 16 subscriptions and see a reduction in throughput by a factor of less than 2.

Rx v2.2.0.0(modified) Replay() - Throughput (msg/sec)
         Subscriptions
 Messages       1       2       4       8      16
     1000 9823183 9541985 8583691 7107321 5167959
    10000 9863878 9447331 8727527 7175660 5174912
   100000 9788280 9356550 8727450 6881744 5154719
  1000000 8998729 8885193 8108485 6720683 5031854

Why such large performance gains?

While these are micro-benchmarks and need to be taken with a grain of salt, we can still consider why these large improvements are being seen. We can look to another metric that I collect while running these tests : Generation 0 Garbage collections. In the case of a single subscription being pushed 100k messages (just integer values as the messages), each flavour consistently causes 41 or 42 Gen0 GC’s to occur. With the new implementations, only Replay-by-Time and Replay-by-Time-and-Count still cause this much GC activity. This is due to them being essentially the old implementations. In all other cases, 100k messages cause no Gen0 collections. In fact we have to push the messages up to 1 million messages to get a single Gen0 Collection.

Here is table of the unmodified implementation’s Garbage Collection profile:

Rx v2.1.30214.0 Replay() - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   42   83  164  325  655
  1000000  422  838 1664 3321 6550

Here is the data for the new implementation:

Rx v2.2.0.0(modified) Replay() - GCs
         Subscriptions
 Messages  1  2  4  8 16
     1000  0  0  0  0  0
    10000  0  0  0  0  0
   100000  0  0  0  0  0
  1000000  1  1  1  1  1

This data makes it very clear to see that we are just doing a lot less work, less allocations & less garbage collection which means a lot more speed.

As most people interested in performance probably don’t run an i3 processor, I also re-ran the tests on my Surface Pro that has an i5 processor. The improvements were still just as good. The i5 could squeeze out just over 400k msg/s on the unmodified code base, and still subscriptions had the same linear effect on performance.

Rx v2.1.30214.0 Replay() - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 380431 218871 114416  57253  28110
    10000 398996 204956 113002  55781  27993
   100000 414960 217336 109840  55951  27422
  1000000 418740 217453 110330  56145  26576

With the modified code however, we could now reach over 17million msg/s. Where 16 subscriptions caused the original implementation to fall down to just under 30k msg/s, we can still reach over 9million.

Rx v2.2.0.0(modified) Replay() - Throughput (msg/sec)
         Subscriptions
 Messages        1        2        4        8       16
     1000 17452007 17452007 16260163 12953368  9363296
    10000 18975332 17869907 15928640 13326226  9470594
   100000 18110693 18102824 16780494 13748350  9360316
  1000000 17682715 17005559 16196903 12687618  9242486

 

Full Test results

i3 processor – Unmodified code
Rx v2.1.30214.0 Replay() - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 218747  92306  56871  23445  13673
    10000 215906 109191  54546  26872  13244
   100000 200946 105164  52730  26595  13311
  1000000 204420 104031  53133  26775  13164

Rx v2.1.30214.0 Replay() - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   42   83  164  325  655
  1000000  422  838 1664 3321 6550



Rx v2.1.30214.0 Replay(1) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 214450 110044  55577  28174  14292
    10000 182842 102424  53205  27201  13200
   100000 202244 103372  53023  26730  12901
  1000000 199423 100937  52423  26692  13175

Rx v2.1.30214.0 Replay(1) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   41   82  164  329  658
  1000000  416  833 1667 3325 6610



Rx v2.1.30214.0 Replay(5) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 154466 112586  53806  28533  12066
    10000 209941 105418  52227  25618  12894
   100000 203439 103846  52172  26408  12773
  1000000 199373 102434  52428  26218  13154

Rx v2.1.30214.0 Replay(5) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   41   82  164  329  658
  1000000  416  833 1667 3325 6610



Rx v2.1.30214.0 Replay(5.Seconds()) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 165085 113025  52570  26510  12423
    10000 208303 106808  52318  26740  13070
   100000 197664 103754  51916  26164  13254
  1000000 198327 103020  52606  26467  13200

Rx v2.1.30214.0 Replay(5.Seconds()) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   42   83  164  325  655
  1000000  422  837 1664 3319 6547



Rx v2.1.30214.0 Replay(5, 5.Seconds()) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 216216 112792  56599  28629  13872
    10000 203111 106995  51609  26650  13174
   100000 198527 102207  52097  26304  13025
  1000000 198140 103024  49670  26410  13067

Rx v2.1.30214.0 Replay(5, 5.Seconds()) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   41   82  164  329  658
  1000000  416  833 1667 3325 6610
i3 processor - Modified code
Rx v2.2.0.0 Replay() - Throughput (msg/sec)
         Subscriptions
 Messages       1       2       4       8      16
     1000 9823183 9541985 8583691 7107321 5167959
    10000 9863878 9447331 8727527 7175660 5174912
   100000 9788280 9356550 8727450 6881744 5154719
  1000000 8998729 8885193 8108485 6720683 5031854

Rx v2.2.0.0 Replay() - GCs
         Subscriptions
 Messages  1  2  4  8 16
     1000  0  0  0  0  0
    10000  0  0  0  0  0
   100000  0  0  0  0  0
  1000000  1  1  1  1  1



Rx v2.2.0.0 Replay(1) - Throughput (msg/sec)
         Subscriptions
 Messages        1        2        4        8       16
     1000 11560694 11976048 10256410  8077544  5707763
    10000 11444266 11689071 10127608  7958615  5674403
   100000 11923925 11776760 10193472  8016096  5638377
  1000000 11963712 11374908  9795912  7469844  5392897

Rx v2.2.0.0 Replay(1) - GCs
         Subscriptions
 Messages  1  2  4  8 16
     1000  0  0  0  0  0
    10000  0  0  0  0  0
   100000  0  0  0  0  0
  1000000  0  0  0  0  0



Rx v2.2.0.0 Replay(5) - Throughput (msg/sec)
         Subscriptions
 Messages       1       2       4       8      16
     1000 8748906 8583691 7072136 6289308 4688233
    10000 8683571 8504848 7512019 6407381 4731712
   100000 8470556 8434904 7535170 6329915 4552138
  1000000 8516944 8072641 7322719 5943437 4545893

Rx v2.2.0.0 Replay(5) - GCs
         Subscriptions
 Messages  1  2  4  8 16
     1000  0  0  0  0  0
    10000  0  0  0  0  0
   100000  0  0  0  0  0
  1000000  0  0  0  0  0



Rx v2.2.0.0 Replay(5.Seconds()) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 203145 112941  55796  27764  13894
    10000 210778 103922  52273  25893  13369
   100000 199651 103438  52908  25945  13168
  1000000 201586 103890  51972  26753  12945

Rx v2.2.0.0 Replay(5.Seconds()) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   42   83  164  325  655
  1000000  422  838 1663 3319 6548



Rx v2.2.0.0 Replay(5, 5.Seconds()) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 207215 111986  56923  28671  13387
    10000 200563 106150  52456  25914  13460
   100000 195847 103178  53104  26365  13366
  1000000 194919 103045  52904  26238  13325

Rx v2.2.0.0 Replay(5, 5.Seconds()) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   41   82  164  329  658
  1000000  416  833 1667 3325 6610

 

i5 processor - unmodified code
Rx v2.1.30214.0 Replay() - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 380431 218871 114416  57253  28110
    10000 398996 204956 113002  55781  27993
   100000 414960 217336 109840  55951  27422
  1000000 418740 217453 110330  56145  26576

Rx v2.1.30214.0 Replay() - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   42   83  164  325  655
  1000000  422  838 1664 3321 6627



Rx v2.1.30214.0 Replay(1) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 344222 223389 113538  52829  29044
    10000 417744 219372 109963  55726  27561
   100000 415620 216711 110546  56107  28060
  1000000 419986 218789 110905  56176  28149

Rx v2.1.30214.0 Replay(1) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   41   82  164  329  658
  1000000  416  833 1667 3325 6610



Rx v2.1.30214.0 Replay(5) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 361978 223899 113654  56229  28129
    10000 412809 212685 111607  55976  27862
   100000 422559 217703 110316  55860  28014
  1000000 423255 217803 110782  56183  28099

Rx v2.1.30214.0 Replay(5) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   41   82  164  329  658
  1000000  416  833 1667 3325 6610



Rx v2.1.30214.0 Replay(5.Seconds()) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 384675 221239 106288  56164  28373
    10000 416682 219457 110083  56036  27671
   100000 422329 218200 110706  55608  27383
  1000000 422114 217501 110514  56183  26502

Rx v2.1.30214.0 Replay(5.Seconds()) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   42   83  164  325  655
  1000000  422  838 1664 3320 6551



Rx v2.1.30214.0 Replay(5, 5.Seconds()) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 375094 225119 113670  56691  28672
    10000 410477 220168 111093  55885  27876
   100000 420379 217921 110347  56104  27049
  1000000 420783 217080 111352  56076  28017

Rx v2.1.30214.0 Replay(5, 5.Seconds()) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   41   82  164  329  658
  1000000  416  833 1667 3325 6610
i5 processor - modified code
Rx v2.2.0.0 Replay() - Throughput (msg/sec)
         Subscriptions
 Messages        1        2        4        8       16
     1000 17452007 17452007 16260163 12953368  9363296
    10000 18975332 17869907 15928640 13326226  9470594
   100000 18110693 18102824 16780494 13748350  9360316
  1000000 17682715 17005559 16196903 12687618  9242486

Rx v2.2.0.0 Replay() - GCs
         Subscriptions
 Messages  1  2  4  8 16
     1000  0  0  0  0  0
    10000  0  0  0  0  0
   100000  0  0  0  0  0
  1000000  1  1  1  1  1



Rx v2.2.0.0 Replay(1) - Throughput (msg/sec)
         Subscriptions
 Messages        1        2        4        8       16
     1000 22421525 23364486 20449898 16103060 11049724
    10000 20449898 23529412 20733983 16097875 10883761
   100000 22837829 22088220 20341327 15736384 10903817
  1000000 23659399 23495633 20324584 15783451 10902201

Rx v2.2.0.0 Replay(1) - GCs
         Subscriptions
 Messages  1  2  4  8 16
     1000  0  0  0  0  0
    10000  0  0  0  0  0
   100000  0  0  0  0  0
  1000000  0  0  0  0  0



Rx v2.2.0.0 Replay(5) - Throughput (msg/sec)
         Subscriptions
 Messages        1        2        4        8       16
     1000 15948963 17094017 15797788 13054830  9578544
    10000 16417665 16716817 15852885 12655024  9417083
   100000 16009990 16974759 15540016 12802622  9371456
  1000000 17090833 16916123 15518023 12824245  9297327

Rx v2.2.0.0 Replay(5) - GCs
         Subscriptions
 Messages  1  2  4  8 16
     1000  0  0  0  0  0
    10000  0  0  0  0  0
   100000  0  0  0  0  0
  1000000  0  0  0  0  0



Rx v2.2.0.0 Replay(5.Seconds()) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 320328 222336 112826  53494  28674
    10000 421850 221479 112933  56733  27410
   100000 412311 216091 112233  56235  27692
  1000000 416718 219312 112004  56672  26932

Rx v2.2.0.0 Replay(5.Seconds()) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   42   83  164  325  655
  1000000  422  838 1664 3320 6550



Rx v2.2.0.0 Replay(5, 5.Seconds()) - Throughput (msg/sec)
         Subscriptions
 Messages      1      2      4      8     16
     1000 388018 222306 114519  57530  27928
    10000 415835 222314 109739  57219  28178
   100000 421980 215986 113116  56194  28354
  1000000 419603 218809 112619  56641  28380

Rx v2.2.0.0 Replay(5, 5.Seconds()) - GCs
         Subscriptions
 Messages    1    2    4    8   16
     1000    0    0    1    3    6
    10000    4    8   16   33   64
   100000   41   82  164  329  658
  1000000  416  833 1667 3325 6610

 

TL;DR

I have submitted a pull request that should give at least 25x the throughput on ReplaySubjects where the buffer has no time component. For multiple subscriptions, performance improvements are much higher, with tests for 16 subscriptions showing ~350x improvement. So if you have any Last-value caches, if you use the .Replay() extension method, or the Replay(int) overload, and you want to see less garbage collection then feel free to up vote this issue on codeplex, and hopefully we will see the Rx team at Microsoft accept the pull request.

Tuesday, November 6, 2012

Rx now open source

Awesome news today that will please many (Thargy?) is that Rx is now open source.

The new home page has just gone up
http://rx.codeplex.com/

Scott Hanselman himself is plugging it
http://www.hanselman.com/blog/ReactiveExtensionsRxIsNowOpenSource.aspx

And we even get a mention for the IntroToRx.com site (Yay!)


Friday, September 21, 2012

.NET 4 dynamic feature

When .NET 4 was coming out a couple of years ago, one of the neat new features was the inclusion of (better support for?) dynamic languages. C# even got some love and the dynamic keyword was added. I was quite excited about the prospect of having dynamic coding features in my C# code base. Now about 3 years later I have finally used the dynamic keyword; once in "real" code and once for debugging purposes.
 
Now that I have used the feature, it has opened my eyes to other opportunities to leverage it. I hope this helps someone else too.

The first time I used the dynamic keyword was when I needed to access some COM interop component. My specific example is when I wanted access to the document of a WPF WebBroswer object. The problem is the WebBrowser.Document property is of type System.Object. As I debug, I can peek into the object and see that it has the title property I want with all the data I need. However, you need to cast it to the COM object to use it in code. I have always been a bit queasy when I need to do COM interop.

Regardless of my feelings towards COM interop, there could be COM interface version issues depending on the version of Browser installed and it just seems like a big hammer for simply getting the title from the Document property. Well this clever clogs had already posted a super simple solution to my problem; just use the dynamic keyword. I know I want the title property, I just don't care which interface is used to get it. Great stuff!

Today I find myself hacking around a part of a code base that I am unfamiliar with. There seems to be some sort of On^2 operation introduced into the code base. I want to hack in some logging into a Generic usercontrol that is doing too much work. When I go to log the events I just get the Infragistics* DataRecord's ToString() implementation filling up my logs. I know that in this case I have a Deal in the data context for the problem I am logging, but it is an open generic type of T in the control. The control has no reference to the assembly that the Deal object lives but I want to log the Deal.DealId property. I want to see the DealId so I know if we are looping over the same deal the whole time or if we have jitter or just double handling etc...

Well you guessed it, where I need to get the DealId, I just assign the DataRecord's DataItem to a dynamic variable and then just log the magical DealId property on that dynamic object. Super! (Obviously once we found the issue, I ripped out my hack)
 
So I don’t think this is a feature I will use a lot of, but it really helped in these two scenarios.

Monday, August 20, 2012

Intro To Rx on TWC9

Brian Keller and Dan Fernandez mentioned IntroToRx.com on their Channel9 show This week on Channel 9. It was somewhat dwarfed by the talk of Win8, VS2012 etc as one would expect.

For those that don't know, Channel9 is Microsoft's virtual/online TV channel for keep the public in touch with  all that is happening around the world relating to Microsoft. This week on Channel9 is a great show that quickly summarizes what has been published on the site for the week. Other shows to check out is the Defrag and Ping.

Sunday, July 1, 2012

Introduction to Rx on Amazon


We finally got the book published on Amazon in Kindle format. So if you use Kindle and want that quick & easy way to get offline content in your hands, then this is for you. If you don't own a kindle, you can still read the book offline with Kindle for PC/Mac/iOs/Android.

Sorry about the 99c (77p) price tag, but I couldn't find a way to get it up there for free, while keeping the agreement non-exclusive.

Introduction to Rx @ Amazon.com

Introduction to Rx @ Amazon.co.uk

It should also be available on the other Amazon sites.

The completely free version is available for manual download at www.IntroToRx.com

Thursday, June 21, 2012

Rx v2.0 release candidate is available

Last night at the LDNUG meetup, Bart De Smet gave his presentation on Rx. At the end of the presentation he announced that Rx v2.0 was now officially a release candidate. He then went to go on and release to Nuget in front of the team. You could feel the nerdy excitement in the room.

The release notes can be found on the Rx team blog. The post is huge (~48 printed pages).

Now I have a race to update the Introduction to Rx book to have all the v2 features before they actually release it.