rxjs / reactx
Rx.js 에서 새로운 스케쥴러를 사용하는 경우
Observable 에 새로운 scheduler 를 set 하는 경우를 보자. 코드는 다음과 같다.
const myObservable2 = new rxjs.Observable(proxyObserver => {
console.log('Observalbe');
proxyObserver.next(42);
proxyObserver.next(100);
proxyObserver.complete();
}
).pipe(
rxjs.operators.observeOn(rxjs.asyncScheduler)
);
var finalObserver = {
next(x) {
console.log('got value ' + x)
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
}
};
console.log('just before subscribe');
myObservable2.subscribe(finalObserver);
console.log('just after subscribe');
간략한 코드 흐름, code flow
Observable()
operators.observeOn(rxjs.asyncScheduler)
const operateRetFn = operate(callback1)
Observable.pipe(operateRetFn)
pipeFromArray(operations)(this)
operateRetFn(this)
const newObservable = Observable.lift(callback2)
자세한 code flow
function Observable(subscribe) {
if (subscribe) {
this._subscribe = subscribe;
}
}
function observeOn(scheduler, delay) {
if (delay === void 0) { delay = 0; }
return operate(function (source, subscriber) { // callback1
source.subscribe(
new OperatorSubscriber(subscriber, function (value) {
return subscriber.add(
scheduler.schedule(function () {
return subscriber.next(value);
}, delay)
);
}, function () {
return subscriber.add(
scheduler.schedule(function () {
return subscriber.complete();
}, delay)
);
}, function (err) {
return subscriber.add(
scheduler.schedule(function () {
return subscriber.error(err);
}, delay)
);
}
)
);
});
}
function operate(init) {
return function (source) { // `operateRetFn`
if (hasLift(source)) {
return source.lift(function (liftedSource) { // `callback2`
try {
return init(liftedSource, this); // `callback1` is invoked
}
...
});
}
throw new TypeError('Unable to lift unknown Observable type');
};
}
Observable.prototype.pipe = function () {
var operations = [];
for (var _i = 0; _i < arguments.length; _i++) {
operations[_i] = arguments[_i];
}
return operations.length ? pipeFromArray(operations)(this) : this;
};
function pipeFromArray(fns) {
...
if (fns.length === 1) {
return fns[0]; // `operateRetFn` function
}
return function piped(input) {
return fns.reduce(function (prev, fn) { return fn(prev); }, input);
};
}
Observable.prototype.lift = function (operator) {
var observable$$1 = new Observable();
observable$$1.source = this;
observable$$1.operator = operator; // `callback2`
return observable$$1;
};
Observable.prototype.subscribe = function (observerOrNext, error, complete) {
var subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
if (config.useDeprecatedSynchronousErrorHandling) {
...
}
else {
var _a = this, operator = _a.operator, source = _a.source;
subscriber.add(operator
? operator.call(subscriber, source) // 최종적으로 callback1 이 호출된다.`
: ...
}
return subscriber;
};
댓글 없음:
댓글 쓰기