Subject
Emitting values
Subscribing to values
To sum it up the following operations exist on it:
next([value])
error([error message])
complete()
subscribe()
unsubscribe()
A Subject
can act as a proxy, i.e receive values from another stream that the subscriber of the Subject
can listen to.
let source$ = Rx.Observable.interval(500).take(3);
const proxySubject = new Rx.Subject();
let subscriber = source$.subscribe( proxySubject );
proxySubject.subscribe( (value) => console.log('proxy subscriber', value ) );
proxySubject.next( 3 );
So essentially proxySubject listens
to source$
But it can also add its own contribution
So what’s interesting about this? It can listen to some source when that data arrives as well as it has the ability to emit its own data and all arrives to the same subscriber. Ability to communicate between components in a bus like manner is the most obvious use case I can think of. Component 1 can place its value through next()
and Component 2 can subscribe and conversely Component 2 can emit values in turn that Component 1 can subscribe to.
sharedService.getDispatcher = function(){
return subject;
}
subject.next(value)
}
prototype:
new Rx.ReplaySubject([bufferSize], [windowSize], [scheduler])
example:
let replaySubject = new Rx.ReplaySubject( 2 );
replaySubject.next( 0 );
replaySubject.next( 1 );
replaySubject.next( 2 );
// 1, 2
let replaySubscription = replaySubject.subscribe((value) => {
console.log('replay subscription', value);
});
Wow, what happened here, what happened to the first number?
So a .next()
that happens before the subscription is created, is normally lost. But in the case of a ReplaySubject
we have a chance to save emitted values in the cache. Upon creation the cache has been decided to save two values.
Let’s illustrate how this works:
GOTCHA
It matters both when the .next()
operation happens, the size of the cache as well as when your subscription is created.
It’s quite easy to imagine the business case here. You fetch some data and want the app to remember what was fetched latest, and what you fetched might only be relevant for a certain time and when enough time has passed you clear the cache.
let asyncSubject = new Rx.AsyncSubject();
asyncSubject.subscribe(
(value) => console.log('async subject', value),
(error) => console.error('async error', error),
() => console.log('async completed')
);
asyncSubject.next( 1 );
asyncSubject.next( 2 );
Looking at this we expect 1,2 to be emitted right? WRONG.
Nothing will be emitted unless complete()
happen
asyncSubject.next( 3 )
asyncSubject.complete()
complete()
needs to happen regardless of the finishing operation before it succeeds or fails so
asyncSubject( 3 )
asyncSubject.error('err')
asyncSubject.complete()
// will emit 'err' as the last action
When you care about preserving the last state just before the stream ends, be it a value or an error. So NOT last emitted state generally but last before closing time. With state I mean value or error.
This Subject emits the following:
- the initial value
- the values emitted generally
- last emitted value.
methods:
let behaviorSubject = new Rx.BehaviorSubject(42);
behaviorSubject.subscribe((value) => console.log('behaviour subject',value) );
console.log('Behaviour current value',behaviorSubject.getValue());
behaviorSubject.next(1);
console.log('Behaviour current value',behaviorSubject.getValue());
behaviorSubject.next(2);
console.log('Behaviour current value',behaviorSubject.getValue());
behaviorSubject.next(3);
console.log('Behaviour current value',behaviorSubject.getValue());
// emits 42
// current value 42
// emits 1
// current value 1
// emits 2
// current value 2
// current value 3