Subject

    Emitting values

    Subscribing to values

    To sum it up the following operations exist on it:

    1. next([value])
    2. error([error message])
    3. complete()
    4. subscribe()
    5. unsubscribe()

    A Subject can act as a proxy, i.e receive values from another stream that the subscriber of the Subject can listen to.

    1. let source$ = Rx.Observable.interval(500).take(3);
    2. const proxySubject = new Rx.Subject();
    3. let subscriber = source$.subscribe( proxySubject );
    4. proxySubject.subscribe( (value) => console.log('proxy subscriber', value ) );
    5. proxySubject.next( 3 );

    So essentially proxySubject listens to source$

    But it can also add its own contribution

    So what’s interesting about this? It can listen to some source when that data arrives as well as it has the ability to emit its own data and all arrives to the same subscriber. Ability to communicate between components in a bus like manner is the most obvious use case I can think of. Component 1 can place its value through next() and Component 2 can subscribe and conversely Component 2 can emit values in turn that Component 1 can subscribe to.

    1. sharedService.getDispatcher = function(){
    2. return subject;
    3. }
    4. subject.next(value)
    5. }

    prototype:

    1. new Rx.ReplaySubject([bufferSize], [windowSize], [scheduler])

    example:

    1. let replaySubject = new Rx.ReplaySubject( 2 );
    2. replaySubject.next( 0 );
    3. replaySubject.next( 1 );
    4. replaySubject.next( 2 );
    5. // 1, 2
    6. let replaySubscription = replaySubject.subscribe((value) => {
    7. console.log('replay subscription', value);
    8. });

    Wow, what happened here, what happened to the first number?
    So a .next() that happens before the subscription is created, is normally lost. But in the case of a ReplaySubject we have a chance to save emitted values in the cache. Upon creation the cache has been decided to save two values.

    Let’s illustrate how this works:

    GOTCHA
    It matters both when the .next() operation happens, the size of the cache as well as when your subscription is created.

    It’s quite easy to imagine the business case here. You fetch some data and want the app to remember what was fetched latest, and what you fetched might only be relevant for a certain time and when enough time has passed you clear the cache.

    1. let asyncSubject = new Rx.AsyncSubject();
    2. asyncSubject.subscribe(
    3. (value) => console.log('async subject', value),
    4. (error) => console.error('async error', error),
    5. () => console.log('async completed')
    6. );
    7. asyncSubject.next( 1 );
    8. asyncSubject.next( 2 );

    Looking at this we expect 1,2 to be emitted right? WRONG.
    Nothing will be emitted unless complete() happen

    1. asyncSubject.next( 3 )
    2. asyncSubject.complete()

    complete() needs to happen regardless of the finishing operation before it succeeds or fails so

    1. asyncSubject( 3 )
    2. asyncSubject.error('err')
    3. asyncSubject.complete()
    4. // will emit 'err' as the last action

    When you care about preserving the last state just before the stream ends, be it a value or an error. So NOT last emitted state generally but last before closing time. With state I mean value or error.

    This Subject emits the following:

    • the initial value
    • the values emitted generally
    • last emitted value.

    methods:

    1. let behaviorSubject = new Rx.BehaviorSubject(42);
    2. behaviorSubject.subscribe((value) => console.log('behaviour subject',value) );
    3. console.log('Behaviour current value',behaviorSubject.getValue());
    4. behaviorSubject.next(1);
    5. console.log('Behaviour current value',behaviorSubject.getValue());
    6. behaviorSubject.next(2);
    7. console.log('Behaviour current value',behaviorSubject.getValue());
    8. behaviorSubject.next(3);
    9. console.log('Behaviour current value',behaviorSubject.getValue());
    10. // emits 42
    11. // current value 42
    12. // emits 1
    13. // current value 1
    14. // emits 2
    15. // current value 2
    16. // current value 3