Reactor pattern
reactor pattern 에 대해서 알아보자. Node.js 에서 Event-driven network programming 이 유명해 졌는데, 찾아보니 이미 여러개의 framework 들이 존재했다. 그런데 이런 pattern 을 Reactor pattern 이라고 해서, Reactor pattern 에 대해 찾아보고 설명을 몇자 적어놓는다.
정의
일단 정의를 보면,서비스를 처리하는 쪽(서버라고 생각하면 될 듯 하다.)으로 동시에 요청이 들어온 service request 를 처리하기 위한 event handling pattern 이라고 한다.
이 때 서버에서는 들어온 요청들을 분류(demux)해서 이 녀석들을 적절한 request handler 에 dispatch 한다. 이 때 동시에 여러개의 요청을 처리하는 것이 아니라, 하나에 대한 처리를 끝내고 그 다음 것을 처리하는 방식으로 하나씩 처리한다.
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.[ref. 1]
이런 Reactor pattern 을 이용한 여러 web server 가 있는데, Node.js 도 그중에 하나이다. 이외에 여러 개들이 있는데, ref. 1에서 확인하자.
기존의 Design
from : ref. 3 |
기존의 ThreadPool 을 이용하는 경우를 고려 해 보자.
- ServerSocket 으로 request A 가 들어오면 Thread 를 할당 해 준다.
- 그럼 이 Thread 는 그 socket 을 가지고 read, write 작업(IO) 등을 할 것이다. 그런데 이 와중에 ServerSocket 에 request B가 들어오면, context switching 이 일어난다.
- 그럼 새로 들어온 request 에 대해 Thread 를 배분해주고, 또 이 socket 으로 read()/write() 등의 작업을 할 것이다.
- 그러면서 A 의 작업을 하기 위해 중간에 다시 context switching 하고, 그러다 다시 B 작업하려고 context switching 하면서 IO 작업을 모두 처리할 것이다.
그런데 reqest A 가 들어와서 Thread 가 만들어진 후에 또다른 reqeust 가 들어오지 않는 상황을 가정 해 보자.
그럼 request A 를 처리하다가 중간에 request 가 들어왔는지 확인하기 위해 context switching 을 해야 한다. 만약 들어왔으면 Thread 를 만들어주고, 그렇지 않으면 다시 request A 를 처리하러 돌아갈 것이다.
그런데 이렇게 확인하는 시간이 아깝다.
codes
class Server implements Runnable { public void run() { try { ServerSocket ss = new ServerSocket(PORT); while (!Thread.interrupted()) new Thread(new Handler(ss.accept())).start(); // or, single-threaded, or a thread pool } catch (IOException ex) { /* ... */ } } static class Handler implements Runnable { final Socket socket; Handler(Socket s) { socket = s; } public void run() { try { byte[] input = new byte[MAX_INPUT]; socket.getInputStream().read(input); byte[] output = process(input); socket.getOutputStream().write(output); } catch (IOException ex) { /* ... */ } } private byte[] process(byte[] cmd) { /* ... */ } } }
Reactor Pattern
그래서 나온 것이 Reactor Pattern 이라고 한다.[ref. 2]Reactor 는 우리나라 말로 하면 "반응로" 이다. 그러니까 event 가 발생하면 반응하는 녀석이다.
Reactor 는 event 가 발생하기를 기다리고, event 가 발생하면 Reactor 는 이 event 를 적절한 event handler 에 넘겨주는 역할을 하게 된다.
********
여기서 부터는 ref. 2 의 내용을 번역하고 각색했다.
ref. 3 에서는 java code 로 Reactor pattern 을 설명해 주는 ppt 이다. ref. 3 의 코드를 바탕으로 설명할 것이다. 참고로 이 java code 는 non blocking io library 인 nio package 를 사용한다.
Reactor pattern 에 중요한 2가지 요소가 있다. 하나는 event 를 받고 전달해주는 Reactor 와 다른 하나는 Reactor 가 보낸 event 를 실제로 처리하는 Handler 가 있다. ref. 3 에서 UI programming 과 연관지어서 설명한 것을 인용하면,
- Reactor 는 UI 를 처리하는 thread 이고,
- Handlers 는 UI의 ActionListeners 라고 생각하면 된다.
- Reactor 로 event 가 들어오면 알맞는 handler 로 dispatch 주는 역할을 담당하고,
- Handler 는 이 dispatch 된 event 를 받아서 처리하는 역할을 하게 된다.
Sigle-threaded codes
먼저, 하나의 thread 를 사용해서 동작하는 것만 고려하자.- Channels :non-blocking read 를 지원해 주는 녀석이다.
- Selector : Selector 는 어느 channel set 이 IO event 를 가지고 있는지를 알려준다. Selector.select() 는 I/O 가 발생한 channel 을 return 해주게 되는데, return 할 channel 이 없다면 계속 기다리게 된다.(block). 이 block 된 녀석을 바로 return 시켜주는 녀석이 Selector.wakeup() 이다.
Selector.selectedKeys() 는 Selection Key 를 return 해 준다. Reactor 는 이 Selection Key 를 보고 어떤 handler 로 넘겨줄 지를 결정한다. - SelectionKeys : SelectionKey 는 Channel 과 Handler 에 대한 reference 를 가지고 있다. ServerSocketChannel 에 selector 를 등록하면 key 를 준다. 이 key 가 SelectionKey 이다.
- Acceptor : Acceptor 는 handler 다. connection 을 만들기 위한 special handler 이다.
final class Handler implements Runnable { final SocketChannel socket; final SelectionKey sk; ... Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); // Optionally try first read now sk = socket.register(sel, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); sel.wakeup(); } ... }
Reactor 동작 : Acceptor 등록
Reactor 는 event 가 들어오면, ServerSocketChannel 을 만들기 위해 accept 를 해야 한다.class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind( new InetSocketAddress(port)); serverSocket.configureBlocking(false); SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); }
그러므로 SelectionKey 0 는 Acceptor 에 묶이게 된다.(Bind 된다.)
Acceptor 는 connection 을 만들기 위한 special handler 이다.
Reactor 가 SelectionKey 0 를 보고 Acceptor 에게 넘겨야 겠다는 것을 알 수 있다.
- SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
Reactor 동작 : dispatch
public void run() { // normally in a new Thread try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) dispatch((SelectionKey) (it.next()); selected.clear(); } } catch (IOException ex) { /* ... */ } }
Selector.select() 를 호출한 후 selectedKeys() 를 통해 pending 된 event 가 있는 Channel 들의 SelectionKey 들을 가져올 수 있다.
이 때 SelectionKey 0 가 있으면, ServerSocketChannel 에 event 가 발생했다는 의미이고, Reactor 는 이 event 에 대한 connection 을 만들기 위해 Acceptor 에 보낸다.
Acceptor 동작
class Acceptor implements Runnable { // inner public void run() { try { SocketChannel c = serverSocket.accept(); if (c != null) new Handler(selector, c); } catch (IOException ex) { /* ... */ } } }
final class Handler implements Runnable { final SocketChannel socket; final SelectionKey sk; ... Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); // Optionally try first read now sk = socket.register(sel, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); sel.wakeup(); } ... }
그렇게 Acceptor 가 client 1 의 connection 에 대한 channel 인 ServerSocketChannel 1 을 만들게 된다. 그리고 나서 Selector 를 ServerSocketChannel 1 에 등록하고 SelectionKey 를 얻고, 이 key 에 handler 를 attach 한다.
여기서
- interestOps(SelectionKey.OP_READ)
이제 client 는 이 ServerSocketChannel 1을 이용해서 read from server / write to server 를 할 수 있다. connection 은 Acceptor 를 통해 이미 했기 때문에 이제는 필요없다.
그러고나서 sel.wakeup()[ref. 7] 을 호출하면, select()(code 에서는 selector.select() 가 될 것이다.) 가 바로 return 할 수있는 channel 이 없어도 바로 return 해 주고 그로 인해 blocked 상태에서 벗어나서 thead 가 다시 깨어나게 된다.
정리
정리해서 다시 이야기 하면, Server 는 Selector.select() 를 호출한다.- 이 때 SelectionKey 1 이 return 된다면, 이 것은 ServerSocketChannel 1 이 event 를 가지고 있다는 이야기가 되고, Reactor 는 이 SelectionKey1 이 bind 한 Handler 1 에 dispatch 해주게 된다.
- 만약 SelectionKey 0 이 return 된다면, 이것은 ServerSocketChannel 0 에서 event 가 발생한 것이고, 새로운 client 가 접속한 것이다. 이 SelectionKey 0 는 Acceptor 에 bind 되어 있기 때문에 이 event 는 Acceptor 로 dispatch 된다.
"특정 ServerSocketCannel" 에서 "특정 event" 가 발생했을 때, 해당하는 SelectionKey 가 selector 에게 return 된다. 그러면 "selector 에서 가져온 SelectionKey" 가 가지고 있는 handler 를 실행한다.
여기까지 구현으로는 Reactor 는 어차피 Handler 의 작업이 끝나기를 기다려야 한다. selectionKey 하나를 다 처리하고 나야 다음 selectionKey 를 처리하게 된다. 우리는 이것을 극복하기 위해 multi-thread 를 이용할 수 있다.
codes
class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind( new InetSocketAddress(port)); serverSocket.configureBlocking(false); SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); } /* Alternatively, use explicit SPI provider: SelectorProvider p = SelectorProvider.provider(); selector = p.openSelector(); serverSocket = p.openServerSocketChannel(); */ public void run() { // normally in a new Thread try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) dispatch((SelectionKey) (it.next()); selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); if (r != null) r.run(); } class Acceptor implements Runnable { // inner public void run() { try { SocketChannel c = serverSocket.accept(); if (c != null) new Handler(selector, c); } catch (IOException ex) { /* ... */ } } } }
final class Handler implements Runnable { final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(MAXIN); ByteBuffer output = ByteBuffer.allocate(MAXOUT); static final int READING = 0, SENDING = 1; int state = READING; Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); // Optionally try first read now sk = socket.register(sel, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); sel.wakeup(); } boolean inputIsComplete() { /* ... */ } boolean outputIsComplete() { /* ... */ } void process() { /* ... */ } // class Handler continued public void run() { try { if (state == READING) read(); else if (state == SENDING) send(); } catch (IOException ex) { /* ... */ } } void read() throws IOException { int readCount = socket.read(input); if (readCount > 0) { process(); } state = SENDING; // Normally also do first write now sk.interestOps(SelectionKey.OP_WRITE); } void send() throws IOException { socket.write(output); if (outputIsComplete()) sk.cancel(); } }
Multi-threaded
Handler 부분을 thread pool 을 이용해서 동시에 여러개를 처리할 수 있도록 한다.ref. 9 에서는 상속을 이용해서 구현하였지만, 여기서는 차이점을 잘 알아볼 수 있도록 변경사항을 기존의 Handler 에 추가했다.
아래를 보면 PROCESSING 이라는 state 를 더 추가했다.
final class Handler implements Runnable { final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(MAXIN); ByteBuffer output = ByteBuffer.allocate(MAXOUT); static final int READING = 0, SENDING = 1; int state = READING; static ExecutorService pool = Executors.newFixedThreadPool(2); static final int PROCESSING = 2; Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); // Optionally try first read now sk = socket.register(sel, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); sel.wakeup(); } boolean inputIsComplete() { /* ... */ } boolean outputIsComplete() { /* ... */ } synchronized void process() { /* ... */ } // class Handler continued public void run() { try { if (state == READING) read(); else if (state == SENDING) send(); } catch (IOException ex) { /* ... */ } } synchronized void read() throws IOException { int readCount = socketChannel.read(input); if (readCount > 0) { state = PROCESSING; pool.execute(new Processer(readCount)); } //We are interested in writing back to the client soon after read processing is done. selectionKey.interestOps(SelectionKey.OP_WRITE); } void send() throws IOException { socket.write(output); if (outputIsComplete()) sk.cancel(); } synchronized processAndChageState(int readCount){ process(readCount) state = SENDING; } class Processer implements Runnable { int readCount; Processer(int readCount) { this.readCount = readCount; } public void run() { processAndChageState(readCount); } } }
See Also
- Proactor Pattern 발표 자료
- proactor design pattern (번역),2012년 5월 18일 금요일
- http://www.tornadoweb.org/en/stable/
Reference
- http://en.wikipedia.org/wiki/Reactor_pattern
- http://jeewanthad.blogspot.kr/2013/02/reactor-pattern-explained-part-1.html
- Scalable IO in Java, Doug Lea, State University of New York at Oswego
- SelectionKey (Java Platform SE 7 )
- Selector (Java Platform SE 7 )
- SelectableChannel (Java Platform SE 7 )
- Java NIO Selector By Jakob Jenkov
- http://jeewanthad.blogspot.kr/2013/03/reacter-pattern-explained-part-2.html
- http://jeewanthad.blogspot.kr/2013/03/reacter-pattern-explained-part-3.html
댓글 없음:
댓글 쓰기