[컴] Rx.js 에서 새로운 스케쥴러를 사용하는 경우

rxjs / reactx

Rx.js 에서 새로운 스케쥴러를 사용하는 경우

Observable 에 새로운 scheduler 를 set 하는 경우를 보자. 코드는 다음과 같다.

const myObservable2 = new rxjs.Observable(proxyObserver => {
    console.log('Observalbe');
    proxyObserver.next(42);
    proxyObserver.next(100);
    proxyObserver.complete();
  }
).pipe(
  rxjs.operators.observeOn(rxjs.asyncScheduler)
);

var finalObserver = {
    next(x) {
      console.log('got value ' + x)
    },
    error(err) {
      console.error('something wrong occurred: ' + err);
    },
    complete() {
       console.log('done');
    }
  };
console.log('just before subscribe');
myObservable2.subscribe(finalObserver);
console.log('just after subscribe');

간략한 코드 흐름, code flow

Observable()
  operators.observeOn(rxjs.asyncScheduler)
    const operateRetFn = operate(callback1)
  Observable.pipe(operateRetFn)
    pipeFromArray(operations)(this)
      operateRetFn(this)
        const newObservable = Observable.lift(callback2)

자세한 code flow

function Observable(subscribe) {
    if (subscribe) {
        this._subscribe = subscribe;
    }
}

function observeOn(scheduler, delay) {
    if (delay === void 0) { delay = 0; }
    return operate(function (source, subscriber) {  // callback1
        source.subscribe(
            new OperatorSubscriber(subscriber, function (value) { 
                    return subscriber.add(
                        scheduler.schedule(function () { 
                            return subscriber.next(value); 
                        }, delay)
                    ); 
                }, function () { 
                    return subscriber.add(
                        scheduler.schedule(function () { 
                            return subscriber.complete(); 
                        }, delay)
                    ); 
                }, function (err) { 
                    return subscriber.add(
                        scheduler.schedule(function () { 
                            return subscriber.error(err); 
                        }, delay)
                    ); 
                }
            )
        );
    });
}


function operate(init) {
    return function (source) {  // `operateRetFn`
        if (hasLift(source)) {
            return source.lift(function (liftedSource) {    // `callback2`
                try {
                    return init(liftedSource, this);    // `callback1` is invoked
                }
                ...
            });
        }
        throw new TypeError('Unable to lift unknown Observable type');
    };
}

Observable.prototype.pipe = function () {
    var operations = [];
    for (var _i = 0; _i < arguments.length; _i++) {
        operations[_i] = arguments[_i];
    }
    return operations.length ? pipeFromArray(operations)(this) : this;
};

function pipeFromArray(fns) {
    ...
    if (fns.length === 1) {
        return fns[0];  // `operateRetFn` function
    }
    return function piped(input) {
        return fns.reduce(function (prev, fn) { return fn(prev); }, input);
    };
}


Observable.prototype.lift = function (operator) {
    var observable$$1 = new Observable();
    observable$$1.source = this;
    observable$$1.operator = operator;  // `callback2`
    return observable$$1;
};


Observable.prototype.subscribe = function (observerOrNext, error, complete) {
    var subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
    if (config.useDeprecatedSynchronousErrorHandling) {
        ...
    }
    else {
        var _a = this, operator = _a.operator, source = _a.source;
        subscriber.add(operator
            ? operator.call(subscriber, source)  // 최종적으로 callback1 이 호출된다.`
            : ...
    }
    return subscriber;
};

See Also

  1. 쿠…sal: [컴] Rx.js 에서 Observable.subscribe 의 동작

References

  1. RxJS - Scheduler

댓글 없음:

댓글 쓰기