[컴] Reactor Pattern 에 대해 알아보자.

reactor pattern 이란 무엇인가.


Reactor pattern


reactor pattern 에 대해서 알아보자. Node.js 에서 Event-driven network programming 이 유명해 졌는데, 찾아보니 이미 여러개의 framework 들이 존재했다. 그런데 이런 pattern 을 Reactor pattern 이라고 해서, Reactor pattern 에 대해 찾아보고 설명을 몇자 적어놓는다.


정의

일단 정의를 보면,

서비스를 처리하는 쪽(서버라고 생각하면 될 듯 하다.)으로 동시에 요청이 들어온 service request 를 처리하기 위한 event handling pattern 이라고 한다.
이 때 서버에서는 들어온 요청들을 분류(demux)해서 이 녀석들을 적절한 request handler 에 dispatch 한다. 이 때 동시에 여러개의 요청을 처리하는 것이 아니라, 하나에 대한 처리를 끝내고 그 다음 것을 처리하는 방식으로 하나씩 처리한다.
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.[ref. 1]


이런 Reactor pattern 을 이용한 여러 web server 가 있는데, Node.js 도 그중에 하나이다. 이외에 여러 개들이 있는데, ref. 1에서 확인하자.



기존의 Design


from : ref. 3


기존의 ThreadPool 을 이용하는 경우를 고려 해 보자.


  1. ServerSocket 으로 request A 가 들어오면 Thread 를 할당 해 준다.
  2. 그럼 이 Thread 는 그 socket 을 가지고 read, write 작업(IO) 등을 할 것이다. 그런데 이 와중에 ServerSocket 에 request B가 들어오면, context switching 이 일어난다.
  3. 그럼 새로 들어온 request 에 대해 Thread 를 배분해주고, 또 이 socket 으로 read()/write() 등의 작업을 할 것이다.
  4. 그러면서 A 의 작업을 하기 위해 중간에 다시 context switching 하고, 그러다 다시 B 작업하려고 context switching 하면서 IO 작업을 모두 처리할 것이다.

그런데 reqest A 가 들어와서 Thread 가 만들어진 후에 또다른 reqeust 가 들어오지 않는 상황을 가정 해 보자.

그럼 request A 를 처리하다가 중간에 request 가 들어왔는지 확인하기 위해 context switching 을 해야 한다. 만약 들어왔으면 Thread 를 만들어주고, 그렇지 않으면 다시 request A 를 처리하러 돌아갈 것이다.

그런데 이렇게 확인하는 시간이 아깝다.

codes

class Server implements Runnable {
   public void run() {
       try {
           ServerSocket ss = new ServerSocket(PORT);
           while (!Thread.interrupted())
               new Thread(new Handler(ss.accept())).start();
        // or, single-threaded, or a thread pool
       } catch (IOException ex) { /* ... */ }
   }
   static class Handler implements Runnable {
       final Socket socket;
       Handler(Socket s) { socket = s; }
       public void run() {
           try {
               byte[] input = new byte[MAX_INPUT];
               socket.getInputStream().read(input);
               byte[] output = process(input);
               socket.getOutputStream().write(output);
           } catch (IOException ex) { /* ... */ }
       }
       private byte[] process(byte[] cmd) { /* ... */ }
    }
}




Reactor Pattern

그래서 나온 것이 Reactor Pattern 이라고 한다.[ref. 2]

Reactor 는 우리나라 말로 하면 "반응로" 이다. 그러니까 event 가 발생하면 반응하는 녀석이다.

Reactor 는 event 가 발생하기를 기다리고, event 가 발생하면 Reactor 는 이 event 를 적절한 event handler 에 넘겨주는 역할을 하게 된다.


********

여기서 부터는 ref. 2 의 내용을 번역하고 각색했다.

ref. 3 에서는 java code 로 Reactor pattern 을 설명해 주는 ppt 이다. ref. 3 의 코드를 바탕으로 설명할 것이다. 참고로 이 java code 는 non blocking io library 인 nio package 를 사용한다.

Reactor pattern 에 중요한 2가지 요소가 있다. 하나는 event 를 받고 전달해주는 Reactor 와 다른 하나는 Reactor 가 보낸 event 를 실제로 처리하는 Handler 가 있다. ref. 3 에서 UI programming 과 연관지어서 설명한 것을 인용하면,
  • Reactor 는 UI 를 처리하는 thread 이고,
  • Handlers 는 UI의 ActionListeners 라고 생각하면 된다.
다시 얘기하면,
  • Reactor 로 event 가 들어오면 알맞는 handler 로 dispatch 주는 역할을 담당하고, 
  • Handler 는 이 dispatch 된 event 를 받아서 처리하는 역할을 하게 된다.
이 추상적인 이야기를 ref. 3 에 있는 java 코드를 통해서 확인하자.


Sigle-threaded codes

먼저, 하나의 thread 를 사용해서 동작하는 것만 고려하자.

소스코드를 이해하기 위해서 몇가지 class 에 대한 이해를 해야 한다.
  1. Channels :non-blocking read 를 지원해 주는 녀석이다.
  2. Selector : Selector 는 어느 channel set 이 IO event 를 가지고 있는지를 알려준다. Selector.select() 는 I/O 가 발생한 channel 을 return 해주게 되는데, return 할 channel 이 없다면 계속 기다리게 된다.(block). 이 block 된 녀석을 바로 return 시켜주는 녀석이 Selector.wakeup() 이다.
    Selector.selectedKeys() 는 Selection Key 를 return 해 준다. Reactor 는 이 Selection Key 를 보고 어떤 handler 로 넘겨줄 지를 결정한다.
  3. SelectionKeys : SelectionKey 는 Channel 과 Handler 에 대한 reference 를 가지고 있다. ServerSocketChannel 에 selector 를 등록하면 key 를 준다. 이 key 가 SelectionKey 이다.
  4. Acceptor : Acceptor 는 handler 다. connection 을 만들기 위한 special handler 이다.


final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ...
   
    Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c;
        c.configureBlocking(false);
        // Optionally try first read now
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }
    ...
}



Reactor 동작 : Acceptor 등록

Reactor 는 event 가 들어오면, ServerSocketChannel 을 만들기 위해 accept 를 해야 한다.

class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(
                new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey sk =
                serverSocket.register(selector,
                        SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());
    }

그러므로 SelectionKey 0 는 Acceptor 에 묶이게 된다.(Bind 된다.)
Acceptor 는 connection 을 만들기 위한 special handler 이다.
Reactor 가 SelectionKey 0 를 보고 Acceptor 에게 넘겨야 겠다는 것을 알 수 있다.
  • SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
이 부분을 통해서 selector 와 channel 을 함께 등록을 하고, return 되는 SelectionKey 가 OP_ACCEPT 에 반응하도록 설정한다.


Reactor 동작 : dispatch

public void run() { // normally in a new Thread
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                    dispatch((SelectionKey) (it.next());
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

Selector 는 IO event 를 계속해서 찾는다.

Selector.select() 를 호출한 후 selectedKeys() 를 통해 pending 된 event 가 있는 Channel 들의 SelectionKey 들을 가져올 수 있다.

이 때 SelectionKey 0 가 있으면, ServerSocketChannel 에 event 가 발생했다는 의미이고, Reactor 는 이 event 에 대한 connection 을 만들기 위해 Acceptor 에 보낸다.


Acceptor 동작

class Acceptor implements Runnable { // inner

        public void run() {
            try {
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    new Handler(selector, c);
            } catch (IOException ex) { /* ... */ }
        }
    }



final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ...
   
    Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c;
        c.configureBlocking(false);
        // Optionally try first read now
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }
    ...
}


그렇게 Acceptor 가 client 1 의 connection 에 대한 channel 인 ServerSocketChannel 1 을 만들게 된다. 그리고 나서 Selector 를 ServerSocketChannel 1 에 등록하고 SelectionKey 를 얻고, 이 key 에 handler 를 attach 한다.

여기서
  • interestOps(SelectionKey.OP_READ)
를 통해서 selector 가 이 selectionKey 를 READ event 가 발생했을 때만 return 하도록 설정해 준다. 쉽게 얘기하면, 이 selectionKey 와 관련된 event type 을 정해주는 것이다.

이제 client 는 이 ServerSocketChannel 1을 이용해서 read from server / write to server 를 할 수 있다. connection 은 Acceptor 를 통해 이미 했기 때문에 이제는 필요없다.

그러고나서 sel.wakeup()[ref. 7] 을 호출하면, select()(code 에서는 selector.select() 가 될 것이다.) 가 바로 return 할 수있는 channel 이 없어도 바로 return 해 주고 그로 인해 blocked 상태에서 벗어나서 thead 가 다시 깨어나게 된다.



정리

정리해서 다시 이야기 하면, Server 는 Selector.select() 를 호출한다.
  • 이 때 SelectionKey 1 이 return 된다면, 이 것은 ServerSocketChannel 1 이 event 를 가지고 있다는 이야기가 되고, Reactor 는 이 SelectionKey1 이 bind 한 Handler 1 에 dispatch 해주게 된다.
  • 만약 SelectionKey 0 이 return 된다면, 이것은 ServerSocketChannel 0 에서 event 가 발생한 것이고, 새로운 client 가 접속한 것이다. 이 SelectionKey 0 는 Acceptor 에 bind 되어 있기 때문에 이 event 는 Acceptor 로 dispatch 된다.

"특정 ServerSocketCannel" 에서 "특정 event" 가 발생했을 때, 해당하는 SelectionKey 가 selector 에게 return 된다. 그러면 "selector 에서 가져온 SelectionKey" 가 가지고 있는 handler 를 실행한다.



여기까지 구현으로는 Reactor 는 어차피 Handler 의 작업이 끝나기를 기다려야 한다. selectionKey 하나를 다 처리하고 나야 다음 selectionKey 를 처리하게 된다. 우리는 이것을 극복하기 위해 multi-thread 를 이용할 수 있다.



codes

class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(
                new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey sk =
                serverSocket.register(selector,SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());
    }

    /*
    Alternatively, use explicit SPI provider:
    SelectorProvider p = SelectorProvider.provider();
    selector = p.openSelector();
    serverSocket = p.openServerSocketChannel();
    */
    public void run() { // normally in a new Thread
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                    dispatch((SelectionKey) (it.next());
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if (r != null)
            r.run();
    }

    class Acceptor implements Runnable { // inner

        public void run() {
            try {
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    new Handler(selector, c);
            } catch (IOException ex) { /* ... */ }
        }
    }
}



final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    Handler(Selector sel, SocketChannel c)
            throws IOException {
        socket = c;
        c.configureBlocking(false);
        // Optionally try first read now
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    boolean inputIsComplete() { /* ... */ }

    boolean outputIsComplete() { /* ... */ }

    void process() { /* ... */ }


    // class Handler continued
    public void run() {
        try {
            if (state == READING) read();
            else if (state == SENDING) send();
        } catch (IOException ex) { /* ... */ }
    }

    void read() throws IOException {
        int readCount = socket.read(input);
        if (readCount > 0) {
            process();
        }

        state = SENDING;
        // Normally also do first write now
        sk.interestOps(SelectionKey.OP_WRITE);
    }

    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) sk.cancel();
    }
}



Multi-threaded

Handler 부분을 thread pool 을 이용해서 동시에 여러개를 처리할 수 있도록 한다.

ref. 9 에서는 상속을 이용해서 구현하였지만, 여기서는 차이점을 잘 알아볼 수 있도록 변경사항을 기존의 Handler 에 추가했다.

아래를 보면 PROCESSING 이라는 state 를 더 추가했다.

final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    
    static ExecutorService pool = Executors.newFixedThreadPool(2);
    static final int PROCESSING = 2;
    

    Handler(Selector sel, SocketChannel c)
            throws IOException {
        socket = c;
        c.configureBlocking(false);
        // Optionally try first read now
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    boolean inputIsComplete() { /* ... */ }

    boolean outputIsComplete() { /* ... */ }

    
    synchronized void process() { /* ... */ }
    


    // class Handler continued
    public void run() {
        try {
            if (state == READING) read();
            else if (state == SENDING) send();
        } catch (IOException ex) { /* ... */ }
    }

    
    synchronized void read() throws IOException {
        int readCount = socketChannel.read(input);
        if (readCount > 0) {
            
            state = PROCESSING;
            pool.execute(new Processer(readCount));
            
        }
        //We are interested in writing back to the client soon after read processing is done.
        selectionKey.interestOps(SelectionKey.OP_WRITE);
    }
    

    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) sk.cancel();
    }

    
    synchronized processAndChageState(int readCount){
        process(readCount)
        state = SENDING;
    }
    
    class Processer implements Runnable {
        int readCount;
        Processer(int readCount) {
            this.readCount =  readCount;
        }
        public void run() {
            processAndChageState(readCount);
        }
    }
    
}


See Also

  1. Proactor Pattern 발표 자료
  2. proactor design pattern (번역),2012년 5월 18일 금요일
  3. http://www.tornadoweb.org/en/stable/


Reference

  1. http://en.wikipedia.org/wiki/Reactor_pattern
  2. http://jeewanthad.blogspot.kr/2013/02/reactor-pattern-explained-part-1.html
  3. Scalable IO in Java, Doug Lea, State University of New York at Oswego
  4. SelectionKey (Java Platform SE 7 )
  5. Selector (Java Platform SE 7 )
  6. SelectableChannel (Java Platform SE 7 )
  7. Java NIO Selector By Jakob Jenkov
  8. http://jeewanthad.blogspot.kr/2013/03/reacter-pattern-explained-part-2.html
  9. http://jeewanthad.blogspot.kr/2013/03/reacter-pattern-explained-part-3.html


댓글 없음:

댓글 쓰기