rxjs
Rx.js 에서 Observable.subscribe 의 동작
전체 흐름
- Observable.subscribe 을 호출
- –> Observable 생성시 등록한 callback(subscriber) 호출
- –> callback 안에서 subscriber.next(val)
- –> subscribe 시 등록한 subscriber function 을 호출
console.clear();
const myObservable = new rxjs.Observable(subscriber => {
console.log('Observalbe');
subscriber.next(42);
subscriber.next(100);
setTimeout(() => {
subscriber.next(300); // happens asynchronously
}, 1000);
});
myObservable.subscribe(x => {
console.log(x);
});
myObservable.subscribe(x => {
console.log('x2 : ' + x);
});
event 마다 해당하는 handler 를 전부 호출 하는 것이 아니다.
대체로 event 를 dispatch 하는 경우에는 event 마다 handler 의 list 를 가지고 있고, 그 event 가 호출될 때마다 해당하는 handler 를 전부 호출하는 구조이다.
하지만 여기는 다르다. 직접 실행해 보면 알겠지만, myObservable.subscribe
를 호출하는 시점에 Observable 로 등록한 function 이 동작을 시작한다. 그래서 그 안에서 subscriber.next()
를 호출하면 비로서 우리가 subscribe 을 호출하면서 등록한 function 이 실행된다.
그래서 위처럼 2번을 subscribe 해보면, 순차적으로 첫번째 subscribe 에 대한 호출이 끝나고 나서, x2에 대한 처리가 되는 것을 확인할 수 있다.
아래 글을 참고하면 좋을 듯 하다.
- ref. 1 에서 -
This is drastically different to event handler APIs like addEventListener / removeEventListener. With observable.subscribe, the given Observer is not registered as a listener in the Observable. The Observable does not even maintain a list of attached Observers.
A subscribe call is simply a way to start an “Observable execution” and deliver values or events to an Observer of that execution.
자세한 code
subscriber.next(10)
Subscriber._next
callback function of myObservable
subscriber.next(10)
Subscriber.prototype.next = function (value) {
if (this.isStopped) {
...
}
else {
this._next(value);
}
};
Subscriber.prototype._next = function (value) {
this.destination.next(value);
};
function wrapForErrorHandling(handler, instance) {
return function () {
...
try {
handler.apply(void 0, __spreadArray([], __read(args)));
}
catch (err) {
...
}
};
}
// myObservable.subscribe(
x => {
console.log(x);
}
myObservable.subscribe()
Observable._trySubscribe(subscriber)
Observable._subscribe(sink)
Observable's callback function
myObservable.subscribe(x => console.log(x))
Observable.prototype.subscribe = function (observerOrNext, error, complete) {
var subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
if (config.useDeprecatedSynchronousErrorHandling) {
...
}
else {
...
this._trySubscribe(subscriber));
}
return subscriber;
};
Observable.prototype._trySubscribe = function (sink) {
try {
return this._subscribe(sink);
}
...
};
// const myObservable = new rxjs.Observable(
subscriber => {
console.log('Observalbe');
subscriber.next(42);
...
sample source
위에서 사용한 code이다.
See Also
- 쿠…sal: [컴] Rx.js 에서 새로운 스케쥴러를 사용하는 경우
- https://gist.github.com/staltz/868e7e9bc2a7b8c1f754#request-and-response : request, response 의 경우 어떻게 stream 으로 만드는 지에 대한 좋은 예시
댓글 없음:
댓글 쓰기