[컴] RxJS 는 어떤 이득이 있는가

react / reactorx / reactor extension / 원리 / RxJS 에 대한 이해 / frp 의 이득 / benefit / 무엇이 더 나은가 / 무엇때문에 왜 frp 를 쓰는가.

RxJS 는 어떤 이득이 있는가

개인적으로 이 programming model 이 기존의 방법론과 어떤 점이 다른지에 대한 개념을 잡기가 어려웠다. Observable sequence 로 굳이 만들지 않아도 충분히 코드를 짤 수 있다. 그런데 굳이 왜 이것을 사용하는가? 에 대한 의문을 해소하고 싶어서 글을 쓴다.

MS 가 만든 programming model

이 prgramming model 은 ms 에서 처음에 만들어서 (Erik Meijer and his team) open source로 공개했다고 한다. 아래는 관련글이다.

언제 사용할까?

현재까지 판단하기로 이것은 data 를 asynchronous 하게 공급하는 것들을 감싸서 event 를 날리는 어떤 객체로 만들어 쓸 수 있다.

즉 data stream (예를 들면, file read, network request 등)을 만들어 주는 library 라고 보면 될 것 같다. synchrnouse 하게 작업할 때 blocking 이 생길만한 요소가 있다면, 그것을 data stream 으로 만들면 되는 것이다.

data stream

이것을 data stream 이라고 표현하는 이유는, 만약 우리가 event emitter로 구성하면 그것은 callback function 하나를 등록하는 것으로 여길 수 있다. 그런데 stream 이라면 이것은 마치 list 에 대해 loop 을 돌면서 data 를 처리하는 개념으로 이것을 이해하고, 구성할 수 있다.

개념이 달라지기에 code 의 모습도 조금 달라진다. 하지만 결국 특정 event 가 오면 그에 대한 처리를 한다는 점에서는 event-subscribe pattern 과 맥이 닿아 있다. 하지만 모습이 바뀌고, 개념이 바뀌면서 code를 좀 다른 모양으로 할 수 있다. (event listner 를 등록하는 code와 list 를 iterate 하는 code는 사실 data 가 있을때 동작한다는 점은 같지만 다른 모습으로 구현되어 있다. 그리고 그 code를 읽는 우리의 이해도 다르다.)

간략한 코드로 설명

간략하게 code를 보면 아래처럼 myObservable을 만들게 된다. 이렇게 Obserable로 감싸면 그냥 단순하게 data-list 가 된다고 이해하자.(다만 그 data 를 주는 시점은 알수없는)

const myObservable = new rxjs.Observable(subscriber => {
  data = requestData()
  subscriber.next(data);
});

그리고 우리는 이 data-list 에서 data 가 있다고 event 를 줄때 해야 할 일을 subscribe으로 정해놓을 수 있다.

myObservable.subscribe( data => {
    // data 를 가지고 할 일
    console.log(data)
})

개념

  • Bridging to Callbacks · rxjs : callback 을 Observable sequence 형태로 변경하고, Observable sequence 를 callback 형태로 변경하는 예제를 확인할 수 있다.
  • Bridging to Promises · rxjs : Promise 을 Observable sequence 형태로 변경하고, Observable sequence 를 Promise 형태로 변경하는 예제를 확인할 수 있다.

이 Rxjs 를 이해하기에는 ref.1 의 글이 가장 좋았다.

이 rxjs 를 iterable pattern 을 구현하는 것에 대입해보자. iterable pattern 은 간단하게 생각하면 array 처럼 traversing 할 수 있도록 만들어주는 것이다. 실제로 data 가 있는데, 이것의 traverse 방법을 정의해주는 것으로 보면 될 것 같다.

이 때 우리는 next() 함수를 만들어서 그 다음의 element(value) 를 가져오게 된다. observer.next() 를 이 next() 와 같은 느낌으로 보면 이해하면 된다. 이 observer.next() 를 호출하는 시점에 특정한 value 를 “subscribe 시점에 등록해 놓은 callback 함수”에 그 value 를 전달해 주는 것이다.

const myarr = []
for(let i=0; i<myarr.length; i++){
  console.log(myarry[i])
}
class MyIterator{

  private data: {value: number, index: number}[]
  constructor(data){
    this.data = data
  }

  public next(): any{
    while(this.cursor < this.data.length){
      const curdata = this.data[this.cursor]
      this.cursor++
      if(curdata.index % 2 === 0){
        return curdata
      }
    }
  }

  public hasNext(): any{
    const cursor0 = this.cursor
    while(cursor0 < this.data.length){
      const curdata = this.data[cursor0]
      cursor0++
      if(curdata && curdata.index % 2 === 0){
        return true
      }
    }
    return false
  }

}

const miter = new MyIterator([{index:0, value: 1}, {index:1, value: 10}, {index:2, value: 100}, {index:3, value: 1000}]);

while(miter.hasNext()){
  console.log(miter.next())
}

이제 우리가 자주 쓰는 ajax 로 server에서 data 를 requst하는 부분을 봐보자. 대체로 아래와 같은 모습을 하고 있다.

var request = new XMLHttpRequest();
request.open('GET', url);
request.onload = () => {

    if (request.status === 200) {
        // do something
        console.log('success')
    } else {
        console.log(new Error(request.statusText))
    }
};

request.send();

아래처럼 observable 로 만들 수 있다. (ref. 1 참고)

const request = Observable.create((observer: Observer<any>) => {

    var request = new XMLHttpRequest();
    request.open('GET', url);
    request.onload = () => {

        if (request.status === 200) {
            observer.next(request.responseText);
            observer.complete();
        } else {

            observer.error(new Error(request.statusText));
            observer.complete();

        }

    };

    request.send();

});

request.subscribe(function onSuccess(text: string){
    console.log(text)
})

data 를 얻어오는 대리자(observerable)가 존재한다.

위의 코드에서 observer.next() 를 호출하면 우리가 subscribe 에 등록한 callback 함수를 호출하게 된다.(see also. 1 참고) 이 observer.next() 를 호출하는 주체는 Observable 이다.

Functional Reactive Programming(FRP) 에서는 observable.subscribe() 이 iterable 의 next() 의 역할을 한다고 보면 될 듯 하다. iterable 에서는 직접적으로 next()를 호출했다면, FRP 에서는 subscribe 를 한번 거쳐서 next() 를 호출하는 차이가 있다.

어쨌든 결과적으로 subscribe을 호출하면, iterable.next() 를 호출하는 것처럼, data 가 넘어오게 된다. 그러면 그것에 대한 처리를 해주면 된다.

조금 다른 점은 iterable 은 synchronous 하게 next() 로 값을 얻어온 후, 그 다음 우리가 필요한 작업(operation) 을 바로 수행하게 된다. 즉, next() 가 synchronous 하게 동작하는 것을 전제로 programming 을 작성한다.

그런데 subscribe 을 호출하면, 이것은 우리가 직접적으로 next() 를 호출하는 것이 아니기에, 실제로 값을 전달해주는 next() 가 언제 호출될 지 모른다. 바로 전달될 수도 있지만, 한참있다가 전달될 수도 있다. 그래서 우리는 asynchronous 하게 처리하기 위해 직접 subscribe 을 호출한 후 return 값을 받아서 값을 처리하지 않고, callback function 을 전달한다.

FRP 가 data traversing 의 변경이 조금 더 수월하다.

우리는 이 next() 가 호출되기 위해서는 observable.subscribe() 을 호출해야 한다. 즉 직접적으로 next() 를 호출하지 않는다. 그래서 우리는 iterable pattern 에서처럼 직접적으로 next() 를 호출하지 않고, next() 를 호출해 주기 위해 그저 subscribe할 뿐이다.

iterable pattern 에서는 우리가 직접 next() 가 어떻게 동작하는지에 대한 code를 작성했다. FRP 에서는 이 부분을 우리가 observable 에 작성한다고 보면 된다.

이 observable 에 이것을 작성해 놓으면, obserable 에서 제공하는 다양한 함수들(operation) 을 이용해서 observer.next() 를 호출하기 전에 조작이 가능하다. 예를 들면, 특정 범위에 대해서만 observer.next()를 호출하거나, 일정시간후(delay)에 observer.next()를 호출하는 등이 가능해 진다.

즉 next() 를 호출하기 전에, 우리가 만든 observable 이 전달하는 data 의 모습을 더 쉽게 변경할 수 있다. iterable 에서는 next() 를 직접호출하기 때문에, next() 를 직접 수정해서 어떤 data 를 넘겨줄 것인가를 정하게 되지만(또는 ’상속’등을 통해서 next()를 override하거나), FRP 에서는, Observable 이 제공하는 operation 등을 통해서, 어떤 iterable 을 하는 조건을(예를 들면, filtering 이나, delay 등) 쉽게 적용할 수 있도록 해주는 것이다.(see also 2 참고)

iterable pattern 이나 FRP 나 data traverse 를 어떻게 할 것인가 대해서는 똑같이 user 가 직접 작성한다. 하지만 FRP 는 이부분 중간에 layer를 하나둬서, user가 작성한 data traverse 를, 또는 data 자체의 모양을, 좀 더 쉽게 변경할 수 있도록 해놓은 것이라고 보면 될 것 같다.

code를 보면 이해하기 더 쉬울수 있다. observable 에 우리가 subscribe을 호출하는데, chaining을 통해서 이 observable 를 다른 모습으로 변경시키고, 그것을 우리가 subscribe 하기 때문이다. (see also 2 를 보면 이해가 더 쉬울 듯 하다.)

아래 코드를 보면, myObservable2 은 순수한 rxjs.Observable 이 아니다. rxjs.Observable 뒤에 .pipe() 에 의해 변형된 Observable 이다. 우리는 이 변형된 Obeservable 에 subscribe 을 하게 된다.

const myObservable2 = new rxjs.Observable(proxyObserver => {
    console.log('Observalbe');
    proxyObserver.next(42);
    proxyObserver.next(100);
    proxyObserver.complete();
  }
).pipe(
  rxjs.operators.observeOn(rxjs.asyncScheduler)
);


myObservable2.subscribe({
    next(x){
        console.log(x)
    },
    ...
});

See Also

  1. 쿠…sal: [컴] Rx.js 에서 Observable.subscribe 의 동작
  2. 쿠…sal: [컴] Rx.js 에서 새로운 스케쥴러를 사용하는 경우
  3. The introduction to Reactive Programming you've been missing · GitHub : 좋은 글, 이해가 쉽다.

Reference

  1. RxJs — An Introduction in TypeScript — Part 1 | by Yarlen Mailler | The Startup | Medium

P

댓글 없음:

댓글 쓰기