[컴] Rx.js 에서 Observable.subscribe 의 동작

rxjs

Rx.js 에서 Observable.subscribe 의 동작

전체 흐름

  1. Observable.subscribe 을 호출
  2. –> Observable 생성시 등록한 callback(subscriber) 호출
  3. –> callback 안에서 subscriber.next(val)
  4. –> subscribe 시 등록한 subscriber function 을 호출
console.clear();
const myObservable = new rxjs.Observable(subscriber => {
  console.log('Observalbe');
  subscriber.next(42);
  subscriber.next(100);
  setTimeout(() => {
    subscriber.next(300); // happens asynchronously
  }, 1000);
});

myObservable.subscribe(x => {
  console.log(x);
});

myObservable.subscribe(x => {
  console.log('x2 : ' + x);
});

event 마다 해당하는 handler 를 전부 호출 하는 것이 아니다.

대체로 event 를 dispatch 하는 경우에는 event 마다 handler 의 list 를 가지고 있고, 그 event 가 호출될 때마다 해당하는 handler 를 전부 호출하는 구조이다.

하지만 여기는 다르다. 직접 실행해 보면 알겠지만, myObservable.subscribe 를 호출하는 시점에 Observable 로 등록한 function 이 동작을 시작한다. 그래서 그 안에서 subscriber.next() 를 호출하면 비로서 우리가 subscribe 을 호출하면서 등록한 function 이 실행된다.

그래서 위처럼 2번을 subscribe 해보면, 순차적으로 첫번째 subscribe 에 대한 호출이 끝나고 나서, x2에 대한 처리가 되는 것을 확인할 수 있다.

아래 글을 참고하면 좋을 듯 하다.

- ref. 1 에서 -

This is drastically different to event handler APIs like addEventListener / removeEventListener. With observable.subscribe, the given Observer is not registered as a listener in the Observable. The Observable does not even maintain a list of attached Observers.

A subscribe call is simply a way to start an “Observable execution” and deliver values or events to an Observer of that execution.

자세한 code

subscriber.next(10)
  Subscriber._next
    callback function of myObservable

subscriber.next(10)
Subscriber.prototype.next = function (value) {
    if (this.isStopped) {
        ...
    }
    else {
        this._next(value);
    }
};
Subscriber.prototype._next = function (value) {
    this.destination.next(value);
};
function wrapForErrorHandling(handler, instance) {
    return function () {
        ...
        try {
            handler.apply(void 0, __spreadArray([], __read(args)));
        }
        catch (err) {
            ...
        }
    };
}
// myObservable.subscribe(
x => {
    console.log(x);
}

myObservable.subscribe()
    Observable._trySubscribe(subscriber)
      Observable._subscribe(sink)
        Observable's callback function
       

myObservable.subscribe(x => console.log(x))
Observable.prototype.subscribe = function (observerOrNext, error, complete) {
    var subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
    if (config.useDeprecatedSynchronousErrorHandling) {
        ...
    }
    else {
        ...
        this._trySubscribe(subscriber));
    }
    return subscriber;
};
Observable.prototype._trySubscribe = function (sink) {
    try {
        return this._subscribe(sink);
    }
    ...
};
// const myObservable = new rxjs.Observable(
subscriber => {
  console.log('Observalbe');
  subscriber.next(42);
  ...

sample source

위에서 사용한 code이다.

See Also

  1. 쿠…sal: [컴] Rx.js 에서 새로운 스케쥴러를 사용하는 경우
  2. https://gist.github.com/staltz/868e7e9bc2a7b8c1f754#request-and-response : request, response 의 경우 어떻게 stream 으로 만드는 지에 대한 좋은 예시

References

  1. RxJS - Observable

댓글 없음:

댓글 쓰기