rxjs

Operator: PublishReplay

How does PublishReplay work

It internally creates a ReplaySubject and makes it multicast compatible. The minimal replay value of ReplaySubject is 1 emission. This results in the following:

  • First subscription will trigger the publishReplay(1) to internally subscribe to the source stream and pipe all emissions through the ReplaySubject, effectively caching the last n(=1) emissions
  • If a second subscription is started while the source is still active the multicast() will connect us to the same replaySubject and we will receive all next emissions until the source stream completes.
  • If a subscription is started after the source is already completed the replaySubject has cached the last n emissions and it will only receive those before completing.
const source = Rx.Observable.from([1,2])
  .mergeMap(i => Rx.Observable.of('emission:'+i).delay(i * 100))
  .do(null,null,() => console.log('source stream completed'))
  .publishReplay(1)
  .refCount();

// two subscriptions which are both in time before the stream completes
source.subscribe(val => console.log(`sub1:${val}`), null, () => console.log('sub1 completed'));
source.subscribe(val => console.log(`sub2:${val}`), null, () => console.log('sub2 completed'));

// new subscription after the stream has completed already
setTimeout(() => {
  source.subscribe(val => console.log(`sub_late-to-the-party:${val}`), null, () => console.log('sub_late-to-the-party completed'));
}, 500);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

Unexpected emissions when using publishReplay

Based on a question. The following snippets Does not cache the expected emission and prevents further calls. Instead it re-subscribes to the realSource for every subscription.

var state = 5
var realSource = Rx.Observable.create(observer =>  {
  console.log("creating expensive HTTP-based emission"); 
  observer.next(state++);
//  observer.complete(); //absent on purpose
  
  return () => {
    console.log('unsubscribing from source')
  }
});


var source = realSource
.do(null, null, () => console.log('stream completed'))
.publishReplay()
.refCount();
    
subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)});
subscription1.unsubscribe();

subscription2 = source.subscribe(v => console.log('observerB: ' + v));
subscription2.unsubscribe();
        
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

When running this snippet we can see clearly that it is not emitting duplicate values for Observer B, it is in fact creating new emissions for every subscription. How come?

Every subscription is unsubscribed before the next subscription takes place. This effectively makes the refCount decrease back to zero, no multicasting is being done.

The issue resides in the fact that the realSource stream does not complete. Because we are not multicasting the next subscriber gets a fresh instance of realSource through the ReplaySubject and the new emissions are prepended with the previous already emitted emissions.


This modified text is an extract of the original Stack Overflow Documentation created by the contributors and released under CC BY-SA 3.0 This website is not affiliated with Stack Overflow