Subject
Introduction#
Subjects are classes that behave as Observables and observers at the same time.
https://reactivex.io/documentation/subject.html
Subject and its internal state
In Rx Subjects have internal states that can control their behavior.
A common use-case form Subject is subscribing it to multiple Observables. The following example creates two different Observables and subscribes a Subject to both of them. Then it tries to print all values that went through:
let subject = new Subject();
subject.subscribe(val => console.log(val));
Observable.range(1, 5).subscribe(subject);
Observable.from(['a', 'b', 'c']).subscribe(subject);
See live demo: https://jsbin.com/pesumup/2/edit?js,console
This example just prints numbers 1 - 5
and didn’t print any of the characters a
, b
, c
.
1
2
3
4
5
The question is what happened? The problem here is the internal state of the Subject instance when it received the complete
notification. When a Subject receives an error
or complete
notifications it marks itself as stopped and will never emit any other signal.
It needs to be this way because Subjects are basically Observables and Observables can only emit one complete
or error
notification at the end of the stream but never both.
The problem with the example above is that the first Observable Observable.range()
emits also the complete
notification which is then received by the Subject and therefore it doesn’t reemit any value when subscribed to the second Observable.
We can see that the Subject really receives the complete
notification by setting also the complete callback.
subject.subscribe(val => console.log(val), null, () => console.log('complete'));
The output is the same just at the end it also prints complete
.
1
2
3
4
5
complete
So if we don’t want the Subject to receive the complete
notification, we can just manually send the next
signals. This means instead of subscribing the Subject directly we’ll subscribe a callback that calls the next()
method on the Subject:
Observable.range(1, 5).subscribe(val => subject.next(val));
Observable.from(['a', 'b', 'c']).subscribe(val => subject.next(val));
See live demo: https://jsbin.com/funeka/1/edit?js,console
1
2
3
4
5
a
b
c
Note that this exact same principle applies everywhere where we use Subjects.
For example operators such as publish()
, share()
and all their variants that use the same instance of Subject under the hood are affected by this.