레이블이 deep인 게시물을 표시합니다. 모든 게시물 표시
레이블이 deep인 게시물을 표시합니다. 모든 게시물 표시

[컴][아두이노] 아두이노의 bootloader

부트로더 / 붓로더 / 아두이노 부트로더



Arduino Bootloader

bootloader 를 보면 일단 microcontroller 에 의해 나뉜다. (쉽게 cpu 라고 생각하자.) 그래서 크게 2가지 소스가 있다.


여기서 용도에 따라 보드 종류에 따라 여러가지로 소스를 나눠놨다. 확인은 안해봤지만 대략적인 동작은 아래 ATmega8/Atmega168 bootloader 와 비슷할 듯 하다.(추측)

ref. 1 에서는 AVR bootloader 중 3가지에 대해 간략하게 설명을 해준다. 정리하면 다음과 같다.

ATmega8 bootloader(source)

  • 1KB 의 flash memory 만 사용한다.
  • 이것도 invalid data 를 받을 때 timeout 시키지 않는다. 
  • bootloader 가 동작할 때 6-8초 동안 data 가 보드로 전송되지 않게 해야 한다.

ATmega168 bootloader(source)

  • 현재 Arduino 0009 에 들어가 있는 bootloader 는  "Diecimila" 과  "NG" 버전(ATmega168이 장착된)과 거의 같다.
  • ATmega168에서 19200 baud 속도로 동작하고, flash memory 의 2 KB 를 사용한다.
  • Diecimila 과 NG 버전의 차이점
    • 시작시 초반에 새로운 프로그램이 동작하도록 기다려 주는 시간이 있다.(Diecimila 는 시간을 절약하기 위해 1초 미만을 기다리고, NG 는 6~8초를 기다린다.)
    • pin 13 LED 를 번쩍이는 횟수(Diecimila 은 한번 깜빡임, NG 는 3번)
  • NG 버전의 특징
    • Arduino NG 에 들어가있는 bootloader 는 약간 다르다.
    • pin 6 에 있는 internal pullup resistor 를 enable 한다.
    • RX pin 에 있는 internal pullup 는 enable 하지 않는다.
    • invalid data 를 받고 있는 동안에도 timeout 을 하지 않는다. 그래서 만약 reset 하자마자 data 를 upload 하면, 그 sketch 는 절대 동작하지 않을 것이다.

Arduino BT 의 bootloader(source)

  • Arduino BT 의 bootloader 는 블루투스 모듈에 대한 초기설정을 한다.



References

  1. Arduino - Bootloader > Versions of the bootloader


[컴][LLVM] ASTMatcher





Clang 의 AST 를 사용해서 code 등을 analyze 할 때 AST 의 특정부분을 찾는 것은 ASTMatcher 를 이용한다. "AST 에서 이러이러한 특징을 갖는 녀석을 찾고, 그것에 대해 무슨 일을 하고 싶다." 라면 이 ASTMatcher 를 사용하면 된다.

실제 Clang tool 을 작성하는 것에 대해서는 "Visual Studio 에서 Clang tool 빌드 하기" 를 참고하자.


AST Matcher

Clang 은 ASTMatcher library 를 제공하는데, 이녀석으로 Clang AST 를 좀 더 쉽게 이용할 수 있다.

아래문장에 ASTMatcher library 를 이용한 경우인데,

forStmt(hasLoopInit(declStmt(hasSingleDecl(varDecl(  )))))
  • for 문인데 : forStmt
  • loop 에 대한 init 을 가졌고 : hasLoopInit
  • 선언하는 문장이 있고(declare) : declStmt
  • 그 선언은 한개이고, : hasSingleDecl
  • 그 선언은 변수 : varDecl
위의 ASTMatcher 는 아래 for 문에 match 된다. (노란색)
for(int i = 0 ; i<10; i++){}

만약 loop 가 아래처럼 2개를 initialize 한다면 위의 조건에 맞지 않아서 match 되지 않는다.
for(int i = 0, j=0 ; i<10; i++){}



여기에 추가로 " '0'으로 초기화되는 녀석들만 가져오게 하고 싶다 "면,
hasInitializer(integerLiteral(equals(0)))
만 추가하면 된다.
forStmt(hasLoopInit(declStmt(hasSingleDecl(varDecl( hasInitializer(integerLiteral(equals(0))) )))))

실제로 선언을 할 때는 아래처럼 하게 된다. 아래에서 .bind 는 저 Matcher 에 ID 를 주는 것이다. 이 ID 를 이용해서 나중에 저걸 가지고 무엇인가를 작업할 때에 필요하다.
StatementMatcher LoopMatcher =
 forStmt(hasLoopInit(declStmt(hasSingleDecl(varDecl(
  hasInitializer(integerLiteral(equals(0)))))))).bind("forLoop");

이러면 아래같은 for 문이 있으면 match 된다.
for(int i = 0 ; i<10; i++){}


Callback 함수

이렇게 특정 녀석이 matching 됐을 때 callback 함수를 부를 수 있게 설정할 수 있다. 소스는 ref. 1 에서 가져왔다.

StatementMatcher LoopMatcher =
 forStmt(
  hasLoopInit(declStmt(
   hasSingleDecl(varDecl(hasInitializer(integerLiteral(equals(0))))
   .bind("initVarName")))),
  hasIncrement(unaryOperator(
   hasOperatorName("++"),
   hasUnaryOperand(declRefExpr(
    to(varDecl(hasType(isInteger())).bind("incVarName")))))),
  hasCondition(binaryOperator(
   hasOperatorName("<"),
   hasLHS(ignoringParenImpCasts(declRefExpr(
    to(varDecl(hasType(isInteger())).bind("condVarName"))))),
   hasRHS(expr(hasType(isInteger())))))).bind("forLoop");

 
class LoopPrinter : public MatchFinder::MatchCallback {
public:
 virtual void run(const MatchFinder::MatchResult &Result) {
  if (const ForStmt *FS = Result.Nodes.getNodeAs<clang::ForStmt>("forLoop")){
   FS->dump();
   llvm::outs() << "Potential array-based loop discovered.\n";
  }

  
 }
};


int main(int argc, const char **argv) {
  CommonOptionsParser OptionsParser(argc, argv, MyToolCategory);
  ClangTool Tool(OptionsParser.getCompilations(),
                 OptionsParser.getSourcePathList());

  LoopPrinter Printer; // callback
  MatchFinder Finder;
  Finder.addMatcher(LoopMatcher, &Printer);

  return Tool.run(newFrontendActionFactory(&Finder).get());
  // old code
  // return Tool.run(newFrontendActionFactory<clang::SyntaxOnlyAction>().get());
}




See Also

  1. Matching the Clang AST — Clang 5 documentation : Matcher 를 어떻게 사용하는지 알려준다.
  2. AST Matcher Reference : ASTMatcher library references


References

  1. Tutorial for building tools using LibTooling and LibASTMatchers — Clang 5 documentation



[컴][보안] 서버의 key 관리 어느것이 나을까?







DB 의 field 값 암호화

갑자기 서버가 사용하는 DB 에 user 의 어떤 key 를 저장할 때(예를 들면 특정사이트의 password 같은) 어떤 식으로 저장하는 것이 해킹에 안전할까에 대한 궁금증이 들었다.

그래서 조금 찾아보니 아래에 어느정도 잘 답변해준 자료가 있었다.


여기서 이야기 하려고 하는 것은 당연히 crypt/decrypt 가 되는 암호화를 이야기 하려 한다. hash 같은 방식은 여기서 사용하지 않는다. 왜냐하면 암호화 한 내용을 다시 복호화해서 사용해야 하는 상황이기 때문이다.

여하튼 짧은 보안에 대한 지식을 가지고, 어떤 식으로 key 를 관리하는 것이 좋은 지에 대해 한 번 생각해 보자.

키 관리 key management

symmetric / asymmetirc

개인적으로 symmetric 을 쓰려고 한다. asymmetric 은 key 를 주고받아야 하는 경우가 아니라면 굳이 사용할 이유는 없다고 생각된다. 암호화 수준도 대체로 symmetric 이 우수하다고 하며, key 를 따로 관리해야 하는 것도 부담이다.(혹시 이밖의 의견이 있다면 댓글을 부탁합니다.)

key 관리 key management

결국 이 암호화의 핵심은 key 를 hacker 에게 어떻게 노출시키지 않을 수 있는가 인 듯 하다. 그러기 위해서는 일단 key 를 어디에 숨겨야 한다. 그럼 key 를 숨기기위해 또 암호화를 할까? 그것은 그냥 절차(process)만 길게 만드는 것일 뿐, 보안에 더 도움이 되지 않는다.

보통은 이 key 를 hacker 가 알아내기 힘든 어떤 값으로 정하는 쪽을 택하는 듯 하다.

hacker 가 가져가기 힘든 것

hacker 가 알아내기 힘든 곳이란 것은 자신의 컴퓨터가 hacking 되었을 때를 가정하면 파악할 수 있다.

보통 자신의 server 의 admin 계정이 hacking 당하는 등의 일이 발생하면, hacker 는 계속 그 컴퓨터를 사용하기 보다는 서버의 정보를 자신의 local 로 옮기는 행위를 할 가능성이 크다.

왜냐하면 hacking 이 발각되면, admin 비밀번호를 당연히 바꿀 것이고, 다시 hacking 을 해서 정보를 가져오려면 또 오랜 수고가 필요할 테니 말이다.

그래서 결국 여기서 "hacker 가 알아내기 힘든 곳"이란 결국 hacker 가 가져갈 수 없는 어떤 것이 된다. 대부분은 network를 통해 가져갈 수 있으니, 그 이외의 것을 고려하면 된다.

Hardware

그래서 가장 좋은 것은 encryption key 로 h/w 정보를 이용하는 것이다. 물론 쉽게 알아낼 수 있는 것은 좋은 생각이 아니다.(예를 들면 mac address)

그래서 ref. 1 에서 이야기하는 것처럼 외부에 장착된 security module 같은 것은 더없이 좋다. 아니면 TPM chip 같은 것을 이용하는 것도 좋다.

물론 여기에도 한계는 있을 수 있다. hacker 들이 이것을 이미 알고 들어왔다면, security module 을 hacking 하는 시도를 하던지 TPM chip 의 정보등은 해킹을 시도할 수 있다.

login 정보

os 의 login 정보를 key 로 이용할 수 있다. 하지만 이것은 linux 에서 어떤 식으로 가능한지는 잘 모르겠다. windows 는 이런 API 를 제공한다. 이것을 활용한 예는 chrome 에서 볼 수 있다. 아래 글을 참고하자.

server 시작시 key 입력

개인적으로는 이녀석이 쉽게 구현해서 사용할 수 있고 꽤 괜찮은 안전 수준을 제공해 줄 듯 하다. ref. 1 에서 memory dump 로 가져갈 수 있다고 하지만, 입력한 key 의 hash 값을 사용한다면 더욱 안전한 상태가 가능할 듯 싶다.

하지만 관리자가 계속 머리에 key를 가지고 있어야 하는 것은 부담이다.

다른 서버에 저장

개인적으로 이것도 나쁜 생각은 아니라고 생각된다. 하지만, 그 key 를 저장하는 서버가 충분히 secure 할 필요는 있다. 쉽게 뚤리는 곳이라면 hacking 도 쉽게 이뤄질 수 있으니, 순간적으로 key 가 탈취당하지 않았다고 생각할 수 있지만, 바로 hacking 해서 가져갈 수도 있다.

하지만 일단 또다른 hacking 을 해야 한다는 부담감이 있어서 단념하게 될 가능성도 크다.

같은 서버에 다른 곳에 저장

가장 쉽게 생각할 수 있는 부분인 듯 하다. 그저 network 로 정보만 가져가는 경우라면 다행이겠지만, admin 계정등을 해킹해서 들어온 경우라면, 큰 의미없는 보관이 될 듯 하다.

개인적은 결론

일단 개인적으로 자금이나 machine 의 상황으로 h/w 를 이용하는 것은 쉬운일이 아니라고 생각된다. 그런면에서는 조금 번거롭지만, 암호를 잘 기억해서 server 를 띄울때 typing 하는 것이 가장 합리적인 방법이 아닐까 생각된다.

References

  1. key management - Where to store a key for encryption? - Information Security Stack Exchange

[컴][인터넷] warning.or.kr 이 뜨는 원리



Warning page 가 보여지는 원리

처음에 이것은 DNS server 에서 mapping table 을 조작해서 warning 페이지를 보여주는 줄 알았다. 그런데 요새 좀 바뀐 듯 하다. 관련해서 찾아본 내용을 대충 정리해 본다.


누가 차단하나?



누가 차단하는지를 확인하기 위해 "차단절차"를 한번 살펴보자. ref. 1 에 자세한 사항이 나와 있다. 여기서 정리하면 다음과 같다.

차단 절차
  1. 차단결정 : 방심위(방통심의 위원회) 심의를 통해 차단 결정이 나면 
  2. ISP 에 목록 전달 : 결정난 목록을 각 ISP(인터넷사업자)에게 전달
  3. ISP 에서 설정 : ISP들이 가지고 있는 ‘필터링 사이트 목록’에 새로 결정된 사이트 목록을 추가
  4. 사용자 중 누군가 이 목록에 들어가 있는 사이트에 접속 시도를 하면, 
  5. 필터링 장치가 동작해 접속을 끊고 대신 방심위의 warning.or.kr 페이지로 포워딩

결국 차단할 목록을 정하는 것은 "방통심의 위원회"이지만, 실제로 차단을 하는 곳은 "ISP 의 장비"인 것이다.


차단원리

위에서 보았듯이 결국 실질적인 행동은 ISP 에서 하게 된다.

DNS lookup table 수정

ISP 에서 예전에는 DNS server 를 조작해서 warning page 를 보여줬다.

좀 더 자세하게 이야기 하면, 원래 우리가 알고 있는 domain name(http://daum.net 같은 주소라고 보면 된다.) 는 그 domain name 에 대한 "ip 주소" 를 찾아서 그 ip 주소로 packet 을 보낸다. 이 때 domain name 에 대한 'ip 주소' 를 찾아주는 일을 DNS server 가 해준다.

  • Domain name ---> IP address

이 DNS server 이 mapping 해주는 작업은 간단하게 table 을 가지고 한다. 하나의 table 에 어떤 주소는 어떤 ip 주소로 가라고 표시되어 있다.

  • daum.net --> 129.0.2.1 

이 table 을 수정하는 것이다.

그래서 그냥 DNS server 주소를 다른 것으로 바꿔서 사용하는 것으로 warning 을 피할 수 있었다. (참고)


Http header 에서 Host 부분

이것이 이제는 바뀌어서 사용자가 보내는 packet 의 "Http header 에서 Host 부분을 확인"하는 듯 하다. 이것은 ref. 2 / ref. 3 / ref. 4 를 통해 어느정도 확인할 수 있다.

특히 ref. 4에서는 warning page 를 우회시켜주는 프로그램에서 나가는 packet 을 확인해서 좀 더 정확한 이야기를 해준다.

  • Dodge Chrome'(닷지 크롬) 
  • 'Dodge Web & Dodge Browser'(닷지 웹 & 브라우저)

좀 더 자세하게 이야기하면,

packet 의 http header 에는 아래 같이 요청하는 domain 주소가 들어가게 된다. 여기에 적힌 domain 주소를 확인해서 그 주소가 "방심위" 에서 보낸 list 에 있는 주소라면 warning page 로 돌리는 것이다.
Host: daum.net





References


  1. 웹의 자유 옥죄는 ‘방통심의위원회’…Warning.or.kr의 불편한 진실 - 경향신문, 2014-01-18
  2. SSL 페이지는 검열 불가능? - 일간워스트, 2014-05-21
  3. [warning.or.kr] 사이트 차단 및 우회 기법 분석 - 1
  4. [warning.or.kr] 사이트 차단 및 우회 기법 분석 - 2

[컴] Bit torrent - 정리안된 자료



정리안된 자료다. 일단 버리는 것보다 살려두면 혹시나 필요할 일이 있을 듯 하여일단 적어놓는다. 정리는 추후에...^^;;;

Building a BitTorrent client from the ground up in Go | Jesse Li : 이 자료를 보면 좋을 듯 하다.

Bit torrent


bencoding : bencoding - BitTorrentSpecification - Theory.org Wiki
go bencoding library : marksamman/bencode: Bencode implementation in Go


pure python code : Blender3D/torrent: Pure-Python torrent client.



pip install tornado
torrent/file

.torrent 파일을 parsing 해서 meta info 를 얻는다.
meta info 의 announce-list 가 최초 trackers 가 된다.
이 announce-list 의 url 을 보고 http/udp 인지를 판단한다.
아래의 정보를 tacker 에게 보낸다.
{'compact': 1, 'downloaded': 1000000, 'info_hash': '\xd5\xde\x04}\xcc\x...`\x17\xc6', 'left': 10000000, 'peer_id': "-PT0000-Q\x04\x11\x...2\xf9\xf8", 'port': 6881, 'uploaded': 1000000}


test torrent file



Bit torrent specification

비트토렌트에 관련한 스펙은 ref. 2, ref. 5 에 나와 있다. 한번 읽어보자.




main()

torrent = Torrent(options.torrent)

server = Server(
    torrent=torrent,
    download_path=options.path,
    storage_class=DiskStorage,
    peer_id=peer_id(),
    max_peers=options.max_peers
)
server.listen(options.port)
server.start()

...

IOLoop.instance().start()



Server 의 _init__ 에서 .torrent 의 info 정보에 'length' 가 있는데 이 정보를 보고 필요한 disk 용량을 잡아놓는다.(storage_class.from_torrent() )
info['length'] 또는 info['files']['length'] 에 size 정보가 있다.
그리고 info['piece length'] 가 block size 로 사용된다.
이 piece 에 대한 hash 도 .torrent 에 있는데 이녀석도 가지고 있는다.

 이 hash 는 나중에 bitfield 를 보내줄때 내가 어떤 block 을 가지고 있는지 확인한 후에 알려줘야 하는데, 이것을 위해서 요청한 index 의 blcok hash 값과 자신이 가지고 있는 block 의 hash 값을 만들어서 비교한다.

bitfield 도 내가 가진 block 들을 전부보면서 block hash 값을 가지고 있는지를 보고 가지고 있다면 그 block 이 존재하는 것으로 보고 1 을 set 한다.

이 block 하나가 index 하나가 된다. 즉 index 하나는 한개의 block 을 가리킨다.


일단 file 을 만든적이 없으면 file 을 만들어서 사이즈만큼 00 을 채운다. (utils.fill) 그리고 이 녀석을 다시 열어서 이 handle 을 가지고 있는다.


해당 peer 에서 have message 를 받으면, peer 마다 가지고 있는 array 에 have 에서 알려준 index 에 대한 표시를 해둔다. (Client.got_have)


bitfield message

처음 peer 와 접속하게 되면,
server.storage.to_bitfield 를 통해 현재가지고 있는 piece 들을 bitfield 로 만들어서 던져준다.

piece message

받는 경우
내가 piece message 를 받아서 그 piece message 에 있는 index, begin을 보고 file 의 offset 을 찾은후에

  • offset = self.block_size * index + begin

거기다가 piece message 에 있는 block 을 write 한다.
write 가 끝나고 piece 가 전부 다 write 가 됐는지 확인하는 방법으로 hash 를 비교한다.

보내는 경우
상대가 request message 를 보냈고, 해당 piece 가 있다면, 그 piece 를 실어서 이쪽에서는 piece message 를 보내주게 된다.



have message

piece message 를 받고 block 을 전부 write 했다면, 해당 index 에 대한 have message 를 server module 을 통해 다른 peer 에게 날리게 된다.(Client.got_piece)



reqeust message

이렇게 message 들을 주고받다가,
missing_piece / total_piece < 0.05 (endgame mode) 라면
모든 piece를 받았는지 확인해보고, 다 받았으면, IOLoop 을 stop 한다.(client.message_loop)
다 받지 않았다면, missing_piece 부분을 받기위해 Request message 를 만들어 보낸다.

이밖에도 piece message 를 받고나서,
또는 unchoke message 를 받고나서,
request message 를 보낸다.
request message 는 원하는 piece에 대한 index 를 보낸다. 물론 그 block 에 대한 offset, size 도 함께 보낸다.

받은 경우
request message 를 받으면, 해당하는 piece 가 있는지 확인하고 있으면 piece message 를 만들어서 보내게 된다.



Torrent class 를 new 하면서 tracker 들을 new 한다.
tracker 는 HttpTracker 를 new 하고, HttpTracker 는 AsyncHTTPClient 를 new 한다.
AsyncHTTPClient  는 내부에서 TCPClient 를 new 한다.


torrent = Torrent(options.torrent)
class Torrent(object):
    def __init__(self, handle=None):
        self.trackers = self._trackers()

    ...
    def _trackers(self):
        trackers = self.meta.get('announce-list', [[self.meta['announce']]])
        result = []

        for tier, urls in enumerate(trackers):
            for url in urls:
                tracker = Tracker(url, torrent=self, tier=tier)
                result.append(tracker)

        return result

def Tracker(url, torrent, tier=0):
    o = urlsplit(url)

    if o.scheme == 'http':
        return HTTPTracker(url, torrent, tier)
    ...

class HTTPTracker(object):
    def __init__(self, url, torrent, tier=0):
        ...
        self.client = AsyncHTTPClient()

class SimpleAsyncHTTPClient(AsyncHTTPClient):
    ...
    def initialize(self, io_loop, max_clients=10,
                    hostname_mapping=None, max_buffer_size=104857600,
                    resolver=None, defaults=None, max_header_size=None,
                    max_body_size=None):
        ...
        self.tcp_client = TCPClient(resolver=self.resolver, io_loop=io_loop)





Server.start

TCPServer.start 를 한다.
그리고 connect_to_peers() 한다.

connect_to_peers()

  • 여기서 scripe_trackers() 를 하는데
  • scripe_trackers
    • tracker 에 대한 정보를 얻는다.
      •  future = tracker.announce(self.peer_id, self.port, event='started', num_wanted=50)
      • 에서 tracker url 을 fetch 한다. 그러고 나서 response 를 얻는다.
      • 여기서 tornado.AsychHTTPClient를 사용하게 된다.
    • response 가 온 후에 할일을 정해준다. 
      • future.add_done_callback(lambda future: self.tracker_done(future, result))
    • tracker_done
      • response 가 제대로 오지 않으면 log 를 남기고, 제대로 왔으면, 결과를 가지고 peer 정보를 update 한다.
  • Server.connect(self.unconnected_peers.pop())
    • 접속되지 않은 peers 에게 접속한다.


Server.connect

@coroutine
@gen_debuggable
def connect(self, peer):
    logging.info('Connecting to %s', peer)

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    stream = IOStream(sock)
    yield Task(stream.connect, (peer.address, peer.port))

    self.handle_stream(stream, (peer.address, peer.port), peer)



여기서 이제 peer 에 접속해서 data 를 가져오게 된다.
socket 을 열고 peer.address 에 connect 한다.
tornado 의 IOStream 을 이용한다.
iostream 을 peer.address 에 대해 connect 한다.
그리고 연결이 되면, Server.handle_stream 을 호출한다.

Server.handle_stream

여기서 Client 를 new 한다. Client 는 new 하면, 주기적으로 keepalive message 를 IOStream 을 통해 보내게 된다.
client 를 new 하고 나서 stream(IOStream) 이 close 될 때 쓰일 callback 함수도 등록한다.
그리고 이 Client instance를 Server.connecting_peers 에 add 한다.
그리고  client.handshake 를 한다. 이 handshake 가 잘 되면, Server.connected_peers 에 client 를 등록해 놓는다.


Client.handshake

protocol + info_hash + peer_id 를 IOStream 을 이용해서 보낸다. 그러면 peer 에서도 같은 형식의 message 를 보낸다. 이것이 완료되면, client 는 message_loop 을 시작한다.

message_loop 에서는 peer message 들을 처리한다.(참고 : ref. 2 > peer message)

\x 는 16진수를 나타낸다.
message = chr(len(self.protocol))
message += self.protocol
message += '\0\0\0\0\0\0\0\0'
message += self.server.torrent.info_hash()
message += self.server.peer_id

"\x13BitTorrent protocol\x00\x00\x00\x00\x00\x00\x00\x00?\x19\xb1I\xf5:P\xe1O\xc0\xb7\x99&\xa3\x91\x89n\xab\xabo-PT0000-\x7f\xfc1\xf3\xe9\xdc\x802\x1bE\xbd\xa3"


응답 메시지







server start 를 하면 announce 를 하고, 이 때 future 를 넘긴다. 이 future 는 다른 작업을 하는 녀석에게도 전달되고, 그 다른 작업을 하는 녀석이 이 future 에 결과를 반영할 것이다.
future = tracker.announce(self.peer_id, self.port, event='started', num_wanted=50)
announce 에서 yield self.client.fetch(tracker_url) 한다. 이 fetch 안에서 future 를 만드는데, 자기도 reference 가지고 handle_response 라는 callback 에서 사용한다.(request 에 대한 결과를 여기서 set_result() 한다.)
즉, client.fetch() 가 끝나고 나서 control 을 넘긴다.

그러면, announce 는 @coroutine 이여서,
yield 를 해서 control 을 넘기면, Runner() 를 new하게 된다.(_make_coroutine_wrapper.wrapper)
이 때 Runner 를 run() 할지 안할지를 __init__ 에서 판단한다.(handle_yield)
현재 넘어온 yielded (self.future)가 아직 done() 이 아니면, 나중에 run() 을 하고.
self.io_loop.add_future(self.future, lambda f: self.run())
그렇지 않으면 바로 run() 을 하게 된다.


future = tracker.announce(self.peer_id, self.port, event='started', num_wanted=50)
의 다음 line 으로 넘어간다
future.add_done_callback(lambda future: self.tracker_done(future, result))






Stack trace

server.start()
Server.connect_to_peers() # server.py
Server.connect_to_peers(self) # server.py
Server.scrape_trackers # server.py
Server.tracker.announce # server.py
AsyncHTTPClient.fetch # httpclient.py
SimpleAsyncHTTPClient.fetch_impl #simple_httpclient.py
SimpleAsyncHTTPClient._process_queue #simple_httpclient.py
SimpleAsyncHTTPClient._handle_request #simple_httpclient.py
_HTTPConnection.__init__ #simple_httpclient.py
wrapper #gen.py  
TCPClient.connect # tcpclient.py
Runner(result, future, yielded) # gen.py



server = Server(
    torrent=torrent,
    download_path=options.path,
    storage_class=DiskStorage,
    peer_id=peer_id(),
    max_peers=options.max_peers
)
server.listen(options.port)
server.start()

def start(self, num_processes=1):
    TCPServer.start(self, num_processes)

    self.connect_to_peers()


def connect_to_peers(self):
    self.peer_stats()

    num_active = len(self.connected_peers) + len(self.connecting_peers)

    if num_active != self.max_peers and not self.unconnected_peers:
        logging.info('No peers to choose from. Scraping trackers..')
        yield self.scrape_trackers()

def scrape_trackers(self):
    result = Future()

    for tracker in self.torrent.trackers:
        logging.info('Announcing to tracker %s', tracker.url)

        future = tracker.announce(self.peer_id, self.port, event='started', num_wanted=50)
        future.add_done_callback(lambda future: self.tracker_done(future, result))

    return result

@coroutine
def announce(self, peer_id, port, event=None, num_wanted=None, compact=True, no_peer_id=None):
    params = {
        'info_hash': self.torrent.info_hash(),
        'peer_id': peer_id,
        'port': port,
        'uploaded': self.torrent.uploaded,
        'downloaded': self.torrent.downloaded,
        'left': self.torrent.remaining,
        'compact': int(compact)
    }

    ...
    tracker_url = url_concat(self.url, params)

    response = yield self.client.fetch(tracker_url)

class AsyncHTTPClient(Configurable):
    def fetch(self, request, callback=None, raise_error=True, **kwargs):
        ...
        def handle_response(response):
            if raise_error and response.error:
                future.set_exception(response.error)
            else:
                future.set_result(response)
        self.fetch_impl(request, handle_response)
        return future

class SimpleAsyncHTTPClient(AsyncHTTPClient):
    ...
    def fetch_impl(self, request, callback):
        ...
        self._process_queue()
    ...
    def _process_queue(self):
        with stack_context.NullContext():
            while self.queue and len(self.active) < self.max_clients:
                ...
                self._handle_request(request, release_callback, callback)

    def _connection_class(self):
        return _HTTPConnection

    def _handle_request(self, request, release_callback, final_callback):
        self._connection_class()(
            self.io_loop, self, request, release_callback,
            final_callback, self.max_buffer_size, self.tcp_client,
            self.max_header_size, self.max_body_size)

class _HTTPConnection(httputil.HTTPMessageDelegate):

    def __init__(self, io_loop, client, request, release_callback,
                 final_callback, max_buffer_size, tcp_client,
                 max_header_size, max_body_size):
        ...
        self.tcp_client = tcp_client
        ...
        with stack_context.ExceptionStackContext(self._handle_exception):
            ...
            self.tcp_client.connect(host, port, af=af,
                                    ssl_options=ssl_options,
                                    max_buffer_size=self.max_buffer_size,
                                    callback=self._on_connect)
                                    
class TCPClient(object):
    ...
    @gen.coroutine
    def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None,
                max_buffer_size=None):
        """Connect to the given host and port.

        Asynchronously returns an `.IOStream` (or `.SSLIOStream` if
        ``ssl_options`` is not None).
        """
        addrinfo = yield self.resolver.resolve(host, port, af)
        connector = _Connector(
            addrinfo, self.io_loop,
            functools.partial(self._create_stream, max_buffer_size))
        af, addr, stream = yield connector.start()
        # TODO: For better performance we could cache the (af, addr)
        # information here and re-use it on subsequent connections to
        # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2)
        if ssl_options is not None:
            stream = yield stream.start_tls(False, ssl_options=ssl_options,
                                            server_hostname=host)
        raise gen.Return(stream)








torrent = Torrent(options.torrent)
class Torrent(object):
    def __init__(self, handle=None):
        self.trackers = self._trackers()

    ...
    def _trackers(self):
        trackers = self.meta.get('announce-list', [[self.meta['announce']]])
        result = []

        for tier, urls in enumerate(trackers):
            for url in urls:
                tracker = Tracker(url, torrent=self, tier=tier)
                result.append(tracker)

        return result


def Tracker(url, torrent, tier=0):
    o = urlsplit(url)

    if o.scheme == 'http':
        return HTTPTracker(url, torrent, tier)
    ...

class HTTPTracker(object):
    def __init__(self, url, torrent, tier=0):
        ...
        self.client = AsyncHTTPClient()

class SimpleAsyncHTTPClient(AsyncHTTPClient):
    ...
    def initialize(self, io_loop, max_clients=10,
                    hostname_mapping=None, max_buffer_size=104857600,
                    resolver=None, defaults=None, max_header_size=None,
                    max_body_size=None):
        ...
        self.tcp_client = TCPClient(resolver=self.resolver, io_loop=io_loop)


server = Server(
    torrent=torrent,
    download_path=options.path,
    storage_class=DiskStorage,
    peer_id=peer_id(),
    max_peers=options.max_peers
)
server.listen(options.port)
server.start()

def start(self, num_processes=1):
    TCPServer.start(self, num_processes)

    self.connect_to_peers()


def connect_to_peers(self):
    self.peer_stats()

    num_active = len(self.connected_peers) + len(self.connecting_peers)

    if num_active != self.max_peers and not self.unconnected_peers:
        logging.info('No peers to choose from. Scraping trackers..')
        yield self.scrape_trackers()

def scrape_trackers(self):
    result = Future()

    for tracker in self.torrent.trackers:
        logging.info('Announcing to tracker %s', tracker.url)

        future = tracker.announce(self.peer_id, self.port, event='started', num_wanted=50)
        future.add_done_callback(lambda future: self.tracker_done(future, result))

    return result

@coroutine
def announce(self, peer_id, port, event=None, num_wanted=None, compact=True, no_peer_id=None):
    params = {
        'info_hash': self.torrent.info_hash(),
        'peer_id': peer_id,
        'port': port,
        'uploaded': self.torrent.uploaded,
        'downloaded': self.torrent.downloaded,
        'left': self.torrent.remaining,
        'compact': int(compact)
    }

    ...
    tracker_url = url_concat(self.url, params)

    response = yield self.client.fetch(tracker_url)

class AsyncHTTPClient(Configurable):
    def fetch(self, request, callback=None, raise_error=True, **kwargs):
        ...
        def handle_response(response):
            if raise_error and response.error:
                future.set_exception(response.error)
            else:
                future.set_result(response)
        self.fetch_impl(request, handle_response)
        return future

class SimpleAsyncHTTPClient(AsyncHTTPClient):
    ...
    def initialize(self, io_loop, max_clients=10,
                    hostname_mapping=None, max_buffer_size=104857600,
                    resolver=None, defaults=None, max_header_size=None,
                    max_body_size=None):
        ...
        self.tcp_client = TCPClient(resolver=self.resolver, io_loop=io_loop)
 
    ...
    def fetch_impl(self, request, callback):
        ...
        self._process_queue()
    ...
    def _process_queue(self):
        with stack_context.NullContext():
            while self.queue and len(self.active) < self.max_clients:
                ...
                self._handle_request(request, release_callback, callback)

    def _connection_class(self):
        return _HTTPConnection

    def _handle_request(self, request, release_callback, final_callback):
        self._connection_class()(
            self.io_loop, self, request, release_callback,
            final_callback, self.max_buffer_size, self.tcp_client,
            self.max_header_size, self.max_body_size)

class _HTTPConnection(httputil.HTTPMessageDelegate):

    def __init__(self, io_loop, client, request, release_callback,
                 final_callback, max_buffer_size, tcp_client,
                 max_header_size, max_body_size):
        ...
        self.tcp_client = tcp_client
        ...
        with stack_context.ExceptionStackContext(self._handle_exception):
            ...
            self.tcp_client.connect(host, port, af=af,
                                    ssl_options=ssl_options,
                                    max_buffer_size=self.max_buffer_size,
                                    callback=self._on_connect)
                                 
class TCPClient(object):
    ...
    @gen.coroutine
    def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None,
                max_buffer_size=None):
        """Connect to the given host and port.

        Asynchronously returns an `.IOStream` (or `.SSLIOStream` if
        ``ssl_options`` is not None).
        """
        addrinfo = yield self.resolver.resolve(host, port, af)
        connector = _Connector(
            addrinfo, self.io_loop,
            functools.partial(self._create_stream, max_buffer_size))
        af, addr, stream = yield connector.start()
        # TODO: For better performance we could cache the (af, addr)
        # information here and re-use it on subsequent connections to
        # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2)
        if ssl_options is not None:
            stream = yield stream.start_tls(False, ssl_options=ssl_options,
                                            server_hostname=host)
        raise gen.Return(stream)







Reference

  1. 쿠...sal: [컴][파이썬] python 의 Future 가 동작하는 방법
  2. The BitTorrent Protocol Specification, www.bittorrent.org/beps/bep_0003.html
  3. BitTorrent 프로토콜의 동작원리 | NETMANIAS
  4. BitTorrent 공유 과정 실전 분석 | AhnLab
  5. BitTorrentSpecification - Theory.org Wiki
  6. JosephSalisbury/python-bittorrent: A simple, clean, and efficient BitTorrent library, written entirely in Python.
  7. jefflovejapan/drench: A simple BitTorrent client in Python
  8. Building a BitTorrent client from the ground up in Go | Jesse Li, 2020-01-04




[컴] Bittorent 의 peer message



아래 글은 ref. 1 > peer messages 에 대한 개인적인 이해에 바탕을 둔 번역입니다. 정확한 내용은 원문을 확인해 주세요.

peer messages


keepalive 메세지 이외(non-keepalive messages)에는 전부 첫 byte 에 type 정보를 가지고 있다.

type 의 종류는 아래와 같다.

  • 0 - choke
  • 1 - unchoke
  • 2 - interested
  • 3 - not interested
  • 4 - have
  • 5 - bitfield
  • 6 - request
  • 7 - piece
  • 8 - cancel

위의 type 중에 0~4 번(   색)은 payload 가 없다.


have

have message 의 payload 는 한개의 index 이다. downloader 가 막 완료하고 hash 를 check 한 index 이다.


bitfield

bitfield(5) 는 항상 처음 메시지(first message) 로 보내진다.
bitfield 의 payload 는 bitfield 인데 downloader 가 보낸 index 는 1이 되고, 나머지들은 0 으로 set 된다.

현재 아무것도 갖고 있지 않은 downloader 들은 bitfield message 를 skip 할 수 있다.

bitfield 의 첫 byte 의 highest bit 이 index 0 이 되고, lowest bit 이 index 7 이 된다. 다음 byte 는 index 8-15 가 된다. 끝에 남는 bit 는 0 으로 set 된다.


실제 packet



request

request message 는 index, begin, 길이(length) 를 갖는다.
begin 과 length 는 byte offset 으로 표현된다.
파일의 끝부분에 걸려서 truncated 되지 않는다면 length 는 일반적으로 "2의 제곱" 이다.
모든 현재의 구현들은 2^14(16 KB) 를 사용한다.
그리고 이 이상의 크기를 요청하는 connection 들은 close 한다.


cancel

cancel 메시지들은 request message 와 같은 payload 를 갖는다. cancel 메시지들은 endgame mode(missing_piece/total_piece < 0.05) 동안에는 일반적으로 download 의 끝에 보내진다.

download 가 거의 끝났을 때, 마지막 몇개의 piece 들이 단일 modem line 에서 떨어진 채로 시간을 많이 소비하면서 다운로드 되어지는 경향이 있다.

downloader 가 자신이 가지고 있지 않은 piece 에 대해 request 를 보내는데, 이 때 이 request 가 pending 되고 있으면, downloader 는 자기가 연결해 download 하고 있는 peer 들에게 piece 에 대한 request 를 보낸다. piece 가 여러개라면, 한 peer 에 여러개의 request 를 보낸다.

이것이 비효율적이 되는 것을 막기위해, downloader 는 piece 가 도착할 때마다 다른 사람들에게 cancel message 를 보낸다.


piece

piece message 는 index, begin, piece 를 포함한다.
암묵적으로 이 piece 는 request message 와 관련이 있다는 것을 명심하자.
만약 빠른 승계(quick succession) 상태에서 choke 와 unchoke message 들을 받았을때나 전송이 너무 느릴때는 예상치못한 piece 가 download 되는 것이 가능하다.

Downloader 들은 일반적으로 piece 들을 random 한 순서로 다운로드 한다.
random 한 순서는 downloader 가 자신이 연결한 어느 peer 의 piece 에 대한 superset 이나 subset 만을 갖게 되는 것을 상당히 막아준다.


interested

interested 는 peer 가 내가 원하는 piece 를 가지고 있어서, 내가 관심있다의 표현이다. downloader 는 peer 에 대한 각 연결마다 am_interested / peer_interested 에 대한 변수를 갖고 있다. 그래서 downloader 가 peer 에게 관심이 있으면 am_interested 를 peer 가 downloader 에게 관심이 있으면 peer_interested 를 set한다.


choking

만약 downloader 가 peer 에게 choke 를 받으면 peer 가 downloader 를 choke 했기때문에 아무런 message 를 주고 받지 않는다. 마찬가지로 downloader 가 peer 에게 choke 를 날리면 그 peer 에게서는 아무것도 요청하지 않고, 받지 않는것이다. 그래서 보통 downloader 가 peer 마다 연결을 가지고 있는데 그 연결마다, downloader 가 peer 를 choke 했는지(am_choking) , 아니면, peer 가 downloader 를 choke 했는지(peer_choking) 에 대한 변수를 가지고 있다.


Choking 은 여러가지 이유로 수행된다. TCP congestion control 은 여러 connection 들을 통해 한번에 보낼 때는 매우 좋지 않게 동작한다. 또한 choking 은 각각의 peer 가 일관된 download rate 을 가지고 있다는 것을 확신할 수 있도록 tit-for-tat-ish algorithm 을 사용하게 해준다.

아래 설명된 choking 알고리즘은 현재 배포된 녀석이다.
모든 새로운 알고리즘들은 "완전하게 그 알고리즘으로 이뤄진 network" 와 "대부분 그 알고리즘으로 이뤄진 network 양쪽 모두에서 잘 동작하는 것"은 매우 중요하다.

좋은 choking 알고리즘이 준수해야 하는 여러 기준이 있다.
  1. 그것은 좋은 TCP performance 를 위해 동시 upload 수의 상한을 둬야만 한다.
  2. fibrillation : choking 과 unchoking 을 빠르게 피해야만 한다.
  3. download 를 하게 해준 peer 들에게 화답해야한다.(reciprocate)
  4. optimistic unchoking : 때때로 현재 사용하고 있는 연결보다 더 나은 녀석들을 찾아내기 위해 사용되지 않는 연결들의 성능을 테스트 해 봐야 한다.


현재 배포된 choking 알고리즘


현재 배포된 choking 알고리즘은
단순히 매 10초마다 "한번이라도 choked 된 사람"을 변경하는 것으로 fibrillation 을 피한다.
가장 빠른 download rate 을 가지고 있고, interested 4 명의 peer 를 unchoking 하는 것으로 이 알고리즘은 화답과 upload 의 개수의 상한을 두는것을 한다.

참고로, 여기서 interested 는 peer 가 interested message 를 받은 상태이다. downloader 는 peer 에 자신에게 없는 새로운 piece 가 없다면 peer 에게 not interested message 를 보내게 된다.

4 개의 peer 들을 unchoking 하는 방법으로 화답(reciprocation) 과 upload 개수의 상한을 두는 것을 하는데, 이 4명의 peer 들은 가장 빠른 download rate 을 가지고 있고, interested 이다(are interested)
It does reciprocation and number of uploads capping by unchoking the four peers which it has the best download rates from and are interested.

좀더 나은 upload rate 을 가지고 있지만 not interested 한 peer 들은 unchoked 된다. 그리고 만약 그들이 interested 되면, 최악의 uploader 가 choked 된다.
만약에 downloader 가 complete file 을 가지고 있다면,  unchoke 될 대상을 결정할 때 download rate 보다는 upload rate 을 이용한다.

낙관적인 unchoking 을 위해, 그것의 upload rate 와는 상관없이 한 순간에 하나의 unchoked 된 peer 는 존재한다.(만약 interested 라면, 그것은 4명의 허락된 downloader 중 하나로 count 된다.)

peer 가 낙관적으로 unchoked 되는 녀석은 매 30초마다 rotate 한다.
upload 하기위한 complete piece를 얻을 적절한 기회를 주기위해서, rotation 의 어느곳에서나 현재의 낙관적인 unchoke 보다 새로운 연결들을 3배 더 많이 시작하는 경향이 있다.
For optimistic unchoking, at any one time there is a single peer which is unchoked regardless of its upload rate (if interested, it counts as one of the four allowed downloaders.) Which peer is optimistically unchoked rotates every 30 seconds. To give them a decent chance of getting a complete piece to upload, new connections are three times as likely to start as the current optimistic unchoke as anywhere else in the rotation.



Reference

  1. The BitTorrent Protocol Specification > peer messages





[컴][파이썬] yield 와 callback





yield 와 callback

yield 와 callback 은 많이 비슷하다. 실제로 작성하는 code 의 flow 는 많이 닮았다.

하지만, yield 는 직접 control(code 가 수행되는 순서로 보자) 을 제어할 수 있는 keyword 이기 때문에 이녀석이 좀 더 큰 범위에 있다고 보면 되겠다. 쉽게 이야기 하면, yield 로 callback 처럼 event 가 끝난 후에 다시 호출이 되는 구조의 code 를 구성할 수 있다.



callback 이 여러개인 경우

만약에 많은 callback 으로 표현해야 하는 경우, 예를 들면, network 에 대한 작업이 연속적으로 5번 일어나야 한다고 하면, callback 을 5번 이나 작성하게 되는데, 이러면 code 의 가독성이 떨어질 가능성이 많아진다.
하지만, yield 를 사용하면, 연속적으로 하나의 code 를 작성할 수 있어서 가독성도 좋고, 명확하게 하는 일을 보여줄 수 있다.


try/catch 를 한곳에

callback 에 비해 yield 를 이용할 때 또 하나의 장점은 같은 일을 하는 녀석을 하나의 code 처럼 한 function 안에 담을 수 있다. 이로 인해서 try/catch 를 이용할 때도 같은 종류의 exception 을 한 곳에서 try/catch 로 묶어놓을 수 있다.[ref. 1]



Reading a code

request  가 끝나면 doSomething() 을 해라
requests.get(callback=function(){
   doSomething()
})

http_request() 가 끝난 후 doSomething() 을 하겠다. http_request() 가 끝날동안 control 을 일단 양보(yield) 하마, 일단 다른일을 해라, 그 일이 끝나면 내가 doSomething() 을 하겠다.
yield http_request()
doSomething()


callback 을 yield 로 변경

ref. 3에 보면 callback 의 구조를 yield 를 사용하는 구조로 변경한 경우가 있다.(callback 을 yield 로 변경하는 법을 이야기하는 것은 아니다. 이 부분은 tornamdo.gen.coroutine 내부를 살펴보자.)

이 code 로 callback 과 yield 의 flow 의 차이를 느껴볼 수 있다.

class AsyncHandler(RequestHandler):
    @asynchronous
    def get(self):
        http_client = AsyncHTTPClient()
        http_client.fetch("http://example.com",
                          callback=self.on_fetch)

    def on_fetch(self, response):
        do_something_with_response(response)
        self.render("template.html")


class GenAsyncHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch("http://example.com")
        do_something_with_response(response)
        self.render("template.html")



Reference

  1. A Study on Solving Callbacks with JavaScript Generators
  2. 쿠...sal: [컴][파이썬] python 의 Future 가 동작하는 방법
  3. tornado.gen — Simplify asynchronous code — Tornado 4.4.2 documentation

[컴][파이썬] python 3 의 string 정리




python 3 의 string

python 3 의 string 을 정리 해 보자. python 2 에 대한 정리는 복잡하고, 요즘은 딱히 할 필요가 없다고 생각하니 ref. 1 을 보고 알아서 판단하자.

python2 에서 python3 string 사용하기

여기서 python 3 의 string 만을 이야기하려는 것은 python 2 에서도 python 3 처럼 string 을 사용할 수 있기 때문이다. module 의 가장 상단에 아래처럼 적어주면 된다. 이것을 사용하는 것이 나중에 python 3 로 convert 하기에도 편하다.
from __future__ import unicode_literals    # at top of module


기본 string 의 표현, unicode

python 3 에서 기본적으로 모든 string 은 unicode 이다. 그리고 이 unicode 의 default encoding 은 utf-8 이다. 하지만 그렇다고 해서 python 3 의 string 은 utf-8 이다라고 생각하면 안된다. 그냥 unicode 이다 라고 인식해야 한다. 그래야 나중에 string.encode 등을 할 때 혼란이 없다.


byte object, b'가'

python 3에서 제공하는 다른 형태의 string 은 b'가' 형태의 string 이 있다. 앞에 b 를 붙여서 byte object 인 것을 보여준다. byte object 는 그냥 c/c++ 에서 char 의 array 이다.
>>>'가'.encode()
b'\xea\xb0\x80'
>>>inint = struct.unpack('<bbb', '가'.encode())
None

 
그래서 아래 같은 표현은 성립이 안된다.
>>> b'가'
SyntaxError: bytes can only contain ASCII literal characters.

b'가' 는 '가' 를 char 변수에 넣으려는 시도인데, utf-8 에서 '가' 는 3byte 를 사용하기 때문이다.
참고로 실제로 '가' 를 byte object 에 넣으려면 아래처럼 해야 한다.
>>> b'\xea\xb0\x80'


string.encode --> byte object

python 3 의 string 에서 encode 를 호출하면, 그 string 에 대한 byte object 를 return 해준다. 이 때 parameter 로 encoding 을 넘겨주는데, 그 encoding 에 따른 값을 전달해 준다.(이래서 string 은 unicode 라고 기억해야 한다.)
>>> '가'.encode() # default 값이 utf8 이다.
b'\xea\xb0\x80'
>>> '가'.encode('utf-16')


b'\xff\xfe\x00\xac'
>>> '\x80abc'.encode("mbcs", "strict")  
Traceback (most recent call last):
    ...
UnicodeDecodeError: 'cp949' codec can't decode byte 0x80 in position 0:
  invalid start byte
>>> '\x80abc'.encode("mbcs", "replace")
'\ufffdabc'
>>> '\x80abc'.encode("mbcs", "backslashreplace")
'\\x80abc'
>>> '\x80abc'.encode("mbcs", "ignore")
'abc'


replace() / backslashreplace() / ignore() 는 ref.3을 참고하자.

byte object.decode --> string

반대로 byte object 에서 string 으로 변경할 때는 decode 를 사용하면 된다.(참고로 string 은 decode 라는 함수가 없다.)

>>> b'\xea\xb0\x80'.decode()
'가'
>>> b'\xff\xfe\x00\xac'.decode('utf-16')
'가'

>>> b'\x80abc'.decode("utf-8", "strict")  
Traceback (most recent call last):
    ...
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0:
  invalid start byte
>>> b'\x80abc'.decode("utf-8", "replace")
'\ufffdabc'
>>> b'\x80abc'.decode("utf-8", "backslashreplace")
'\\x80abc'
>>> b'\x80abc'.decode("utf-8", "ignore")
'abc'


file write

file 에 write 할 때도 python 3 에서는 기본적으로 unicode 이기 때문에 byte array 로 바꿔줘야 한다.
>> with open('nonlat.txt', 'wb') as f:
    f.write(nonlat.encode())


json.dump

ensure_ascii 를 false 로 줘야 file 에서 제대로 된 '한글'을 확인할 수 있다.
with codecs.open(configPath, 'w', encoding='utf-8') as fp:
    json.dump(lastConfig, fp, ensure_ascii=False)

datetime


(sdate.strftime('%m월 %d일(%a) %I:%M %p'.encode('unicode-escape').decode())).encode().decode('unicode-escape')  # 11월 1일(수) 오후 3시





Reference

  1. Python Unicode: Encode and Decode Strings (in Python 2.x) | Python Central
  2. Encoding and Decoding Strings (in Python 3.x) | Python Central
  3. Unicode HOWTO — Python 3.6.1rc1 documentation



[컴][go] c/c++ 의 pointer 와 go 의 pointer 의 차이

golang / pointer variable of golang


c/c++ 의 pointer 와 go 의 pointer


go 에도 pointer 가 존재한다. 개인적으로 c/c++ 언어 이외에 처음본다 ^^;;


처음에 이것은 heap 에서 allocate 하는 메모리를 줄이려고 한줄 알았는데, 그것은 아닌듯 하다.

ref.2 의 글의 번역을 읽어보면 아니라는 것을 알수 있다.
pointer type으로 구현하나 value type 으로 구현하나 그것이 heap 이나 stack 에 가는 것을 결정하지는 않는다. 이 부분은 Go compiler 가 알아서 변수가 함수내에서만 쓰이면 함수의 stack frame 에 넣어준다고 한다. 그러나 compiler 가 함수가 return 한 이후에도 변수가 계속 referenced 되는지 판단하지 못하는 경우라면, compiler 는 이것을 heap 에다 넣는다.
또한 local 변수가 너무 크면, stack 보다는 heap 에 넣는다고 한다.
current compiler 에서는 변수가 자신의 주소를 가지고 있으면, 이녀석은 heap 에 넣을 후보(candidate) 가 되지만, escape analysis 가 이 변수가 function 이 return 한 이후에 살아 있지 않고, stack 에 머무를 수 있다는 것을 파악한다.(이런 녀석은 heap 에 넣지 않는다. 란 뜻인듯)[ref. 2]
다른 가능성 있는 이유들은 아래 글에서 찾아보자.

여하튼 이녀석은 c/c++ 의 pointer 와 거의 유사하다. 그런데 한가지 다른 점은 사용할 때가 좀 다르다.


초기화

초기화는 c/c++ 과 크게 다를 것 없다.  아래처럼 pointer type에서의 intialization 과 value type에서의 intialization 은 다르다.


value 변수의 초기화


// Worker is job thread
type Worker struct {
 name      string
 readyChan chan int
 jobChan   chan interface{}
}


// Pool ...
type Pool struct {
 workers []Worker
}

func NewPool() *Pool {

    numWorkers := 3
    self := &Pool{workers: make([]Worker, numWorkers)}

    for i := 0; i < numWorkers; i++ {
        rc := make(chan int)
        jc := make(chan interface{})
        self.workers[i].readyChan = rc
        self.workers[i].jobChan = jc
    }

    return self
}


pointer 변수의 초기화


// Worker is job thread
type Worker struct {
 name      string
 readyChan chan int
 jobChan   chan interface{}
}


// Pool ...
type Pool struct {
 workers []*Worker
}

func NewPool() *Pool {

    numWorkers := 3
    self := &Pool{workers: make([]*Worker, numWorkers)}

    for i := 0; i < numWorkers; i++ {
        rc := make(chan int)
        jc := make(chan interface{})
        self.workers[i] = Worker{name: "n" + strconv.Itoa(i), readyChan: rc, jobChan: jc}
    }

    return self
}

사용

그런데 사용함에 있어서는 Go 는 좀 더 유연하다. 위의 두가지 initialization 모두 아래처럼 사용할 수 있다.
func main(){
  pool := NewPool()
  go func(){
    pool.workers[0].readyChan <- 1
  }()
  <-pool.workers[0].readyChan
}

그리고 pointer 로 initialization 을 한 code 에서는 아래처럼 code 를 작성해도 된다.
(*pool.workers[0]).readyChan <- 1

이것은 method 를 사용할 때도 마찬가지이다. 만약 아래 처럼 pointer type의 method 를 하나 만들었다고 해도 value type 변수를 사용해서도 사용할 수 있다.
func (v *Worker) do() {}
func main(){
  var w Worker
  w.do()
}


이유

이런 것이 가능한 이유는 ref. 1 에 잘 설명되어 있다. 대략적으로 이야기 하면, go compiler 가 알아서 변경을 해주거나 wrapper function 을 만들어 주기 때문이다.



Reference

  1. go - Golang pointers - Stack Overflow
  2. Frequently Asked Questions (FAQ) - The Go Programming Language





[컴][go] tunny goroutine 동작

thread pool / goroutine pool /routine pool 분석 / thread pool 분석 / 구현



Tunny goroutine pool




Channels of Channels




Source flow

  • unbuffered channel은 sender 나 receiver 가 준비가 돼서 communicate 가 될 때까지 block 된다. (wrapper.readyChan)[ref. 2]

pool, _ := tunny.CreatePool(numCPUs, func(object interface{}) interface{} {
    // this is job function
})

--> 
// tunny.CreatePool
for i := range pool.workers {
    newWorker := workerWrapper{
        worker: &(tunnyDefaultWorker{&job}),
    }
    pool.workers[i] = &newWorker
}

-->
pool.Open()
-->
// pool.Open()
for i, workerWrapper := range pool.workers {
    workerWrapper.Open()

    // create cases for select
    // case <- workerWrapper.readyChan : 
    pool.selects[i] = reflect.SelectCase{
        Dir:  reflect.SelectRecv,
        Chan: reflect.ValueOf(workerWrapper.readyChan),
    }
-->
// workerWrapper.Open()
go wrapper.Loop()
-->
// workerWrapper.Loop()
wrapper.readyChan <- 1

-->
// main()
result, _ := pool.SendWork(input)

-->
// WorkPool.SendWork()
/**
  select{ 
    case <--worker.readyChan : {
*/
if chosen, _, ok := reflect.Select(pool.selects); ok && chosen >= 0 {
    pool.workers[chosen].jobChan <- jobData
    result, open := <-pool.workers[chosen].outputChan

    if !open {
        return nil, ErrWorkerClosed
    }
    return result, nil
}

-->
// workerWrapper.Loop()
wrapper.readyChan <- 1 // block until the select finishes processing the readyChan

for data := range wrapper.jobChan {
    wrapper.outputChan <- wrapper.worker.TunnyJob(data)
    for !wrapper.worker.TunnyReady() {
        if atomic.LoadUint32(&wrapper.poolOpen) == 0 {
            break
        }
        time.Sleep(tout * time.Millisecond)
    }
    wrapper.readyChan <- 1
}


Full source

func main() {
    numCPUs := runtime.NumCPU()
    runtime.GOMAXPROCS(numCPUs + 1) // numCPUs hot threads + one for async tasks.

    pool, _ := tunny.CreatePool(numCPUs, func(object interface{}) interface{} {
        input, _ := object.([]byte)

        // Do something that takes a lot of work
        output := input

        return output
    }).Open()

    defer pool.Close()

    http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
        input, err := ioutil.ReadAll(r.Body)
        if err != nil {
            http.Error(w, "Internal error", http.StatusInternalServerError)
        }

        // Send work to our pool
        result, _ := pool.SendWork(input)

        w.Write(result.([]byte))
    })

    http.ListenAndServe(":8080", nil)
}



func CreatePool(numWorkers int, job func(interface{}) interface{}) *WorkPool {
    pool := WorkPool{running: 0}

    pool.workers = make([]*workerWrapper, numWorkers)
    for i := range pool.workers {
        newWorker := workerWrapper{
            worker: &(tunnyDefaultWorker{&job}),
        }
        pool.workers[i] = &newWorker
    }

    return &pool
}

//--------------------------------------------- WorkPool

func (pool *WorkPool) Open() (*WorkPool, error) {
    pool.statusMutex.Lock()
    defer pool.statusMutex.Unlock()

    if !pool.isRunning() {

        pool.selects = make([]reflect.SelectCase, len(pool.workers))

        for i, workerWrapper := range pool.workers {
            workerWrapper.Open()

            // create cases for select
            // case <- workerWrapper.readyChan : 
            pool.selects[i] = reflect.SelectCase{
                Dir:  reflect.SelectRecv,
                Chan: reflect.ValueOf(workerWrapper.readyChan),
            }
        }

        pool.setRunning(true)
        return pool, nil

    }
    return nil, ErrPoolAlreadyRunning
}

func (pool *WorkPool) SendWork(jobData interface{}) (interface{}, error) {
    pool.statusMutex.RLock()
    defer pool.statusMutex.RUnlock()

    if pool.isRunning() {
        /**
          select{ 
            case <--worker.readyChan : {
              worker.jobChan <- jobData
              result, open := <- worker.outputChan
              if !open { return nil, ErrWorkerClosed }
              return result, nil
            } 
          }
          return nil, ErrWorkerClosed
        */
        if chosen, _, ok := reflect.Select(pool.selects); ok && chosen >= 0 {
            pool.workers[chosen].jobChan <- jobData
            result, open := <-pool.workers[chosen].outputChan

            if !open {
                return nil, ErrWorkerClosed
            }
            return result, nil
        }
        return nil, ErrWorkerClosed
    }
    return nil, ErrPoolNotRunning
}

//--------------------------------------------- workerWrapper

func (wrapper *workerWrapper) Open() {
    if extWorker, ok := wrapper.worker.(TunnyExtendedWorker); ok {
        extWorker.TunnyInitialize()
    }

    wrapper.readyChan = make(chan int)
    wrapper.jobChan = make(chan interface{})
    wrapper.outputChan = make(chan interface{})

    atomic.SwapUint32(&wrapper.poolOpen, uint32(1))

    go wrapper.Loop()
}

func (wrapper *workerWrapper) Loop() {

    // TODO: Configure?
    tout := time.Duration(5)

    for !wrapper.worker.TunnyReady() {
        // It's sad that we can't simply check if jobChan is closed here.
        if atomic.LoadUint32(&wrapper.poolOpen) == 0 {
            break
        }
        time.Sleep(tout * time.Millisecond)
    }

    wrapper.readyChan <- 1 // block until the select process the readyChan

    for data := range wrapper.jobChan {
        wrapper.outputChan <- wrapper.worker.TunnyJob(data)
        for !wrapper.worker.TunnyReady() {
            if atomic.LoadUint32(&wrapper.poolOpen) == 0 {
                break
            }
            time.Sleep(tout * time.Millisecond)
        }
        wrapper.readyChan <- 1
    }

    close(wrapper.readyChan)
    close(wrapper.outputChan)

}






References

  1. Jeffail/tunny: A goroutine pool for golang
  2. Golang channels tutorial | Alexander Guz's blog






[컴][go] 간단한 goroutine 을 사용한 pool - grpool

thread pool in golang / golang thread pool / simple thread pool of go /



grpool

Projects · golang/go Wiki 에 소개된 간단한 goroutine pool 을 한 번 봐보자.


Installation

c:\> go get github.com/ivpusic/grpool










Source flow 



/**

*/

// source from : https://github.com/ivpusic/grpool
func main() {
    // number of workers, and size of job queue
    pool := grpool.NewPool(100, 50)
    defer pool.Release()
    
    for i := 0; i < 10; i++ {
        count := i
        // enqueue
        pool.JobQueue <- func() {
            fmt.Printf("I am worker! Number %d\n", count)
        }
    }
}

// Will release resources used by pool
func (p *Pool) Release() {
    p.dispatcher.stop <- true
    <-p.dispatcher.stop
}

func NewPool(numWorkers int, jobQueueLen int) *Pool {
    // create 'queue' and 'worker pool'
    jobQueue := make(chan Job, jobQueueLen) // channel with buffer
    workerPool := make(chan *worker, numWorkers)

    // create a 'Pool' with new dispatcher
    pool := &Pool{
        JobQueue:   jobQueue,
        dispatcher: newDispatcher(workerPool, jobQueue),
    }

    return pool
}
    

// Accepts jobs from clients, and waits for first free worker to deliver job
type dispatcher struct {
    workerPool chan *worker
    jobQueue   chan Job  // channel with Job objects
    stop       chan bool // channel with bool value
}

func newDispatcher(workerPool chan *worker, jobQueue chan Job) *dispatcher {
    d := &dispatcher{
        workerPool: workerPool,
        jobQueue:   jobQueue,
        stop:       make(chan bool),
    }

    // create workers
    for i := 0; i < cap(d.workerPool); i++ {
        worker := newWorker(d.workerPool)
        worker.start()  // at this point, worker is inserted into the workerPool
    }

    go d.dispatch()
    return d
}

func (d *dispatcher) dispatch() {
    for {
        select {
        case job := <-d.jobQueue:
            worker := <-d.workerPool
            worker.jobChannel <- job
        case stop := <-d.stop:
            if stop {
                for i := 0; i < cap(d.workerPool); i++ {
                    worker := <-d.workerPool

                    worker.stop <- true
                    <-worker.stop
                }

                d.stop <- true
                return
            }
        }
    }
}

// Gorouting instance which can accept client jobs
type worker struct {
    workerPool chan *worker
    jobChannel chan Job
    stop       chan bool
}

func newWorker(pool chan *worker) *worker {
    // create a Job channel and stop channel
    return &worker{
        workerPool: pool,
        jobChannel: make(chan Job),
        stop:       make(chan bool),
    }
}

func (w *worker) start() {
    // new thread start
    go func() {
        var job Job
        for {
            // worker free, add it to pool
            w.workerPool <- w

            select {
            case job = <-w.jobChannel:
                job()
            case stop := <-w.stop:
                if stop {
                    w.stop <- true
                    return
                }
            }
        }
    }()
}




References

  1. ivpusic/grpool: Lightweight Goroutine pool

[컴][알고리즘] KNN

machine-learning / 머신러닝 / 알고리즘 / machine learning / ai /



k nearest neighbors classification(KNN)


어떻게 분류하는가?

이 KNN 을 우리말로 번역하면, "K개의 가장 가까운 이웃들" 정도가 되겠다.  일단 claaification 이라는 것은 "분류"를 이야기한다. 그러니까 KNN 은 분류를 하는 방법중 하나이다.

어떤 input(입력) 이 있을 때 KNN 이 어떻게 분류를 하냐면, "input 에 대해서 거리가 가장 가까운 이웃 k개의 모습을 확인하고, 그중에 가장많은 모습으로 input 의 모습을 결정(classify)"하는 것이다. "끼리끼리 노는 느낌"이라고 보면된다.

아래 그림에서 input (녹색) 을 보자. k=3 이면, input 은 "빨강" 이 되는 것이고, k=5 이면 input 은 "파랑" 이 되는 것이다.

  • k=3 일 때:  "빨강 2개" , "파랑 1개" --> input 은 빨강
  • k=5 일 때:  "빨강 2개" , "파랑 3개" --> input 은 파랑

from : ref. 2



거리, distance

이제 대략적으로 KNN 이 무엇인지는 알았다. 그럼 이제 좀 더 자세하게 보자.

위에서
"input 이 들어왔을 때 input 에서 거리가 가장 가까운 녀석 k개를 보고 분류(claasify) 를 한다"
고 했다. 그럼 여기서 문제는 "거리가 가장 가까운 녀석" 에서 "거리" 이다.

이 "거리" 를 측정하는 방법에 따라서 input 에 대해 3번째로 가까울 수도 있고, 1번째로 가까운 녀석이 될 수도 있다.

ref. 1 에 보면 이런 거리를 측정하는 방법 몇가지를 소개해 주고 있다.
from : ref. 1

위의 거리를 측정하는 방법 3개는 "연속적인 변수들"(continuous variables) 에서만 유효하다고 한다. categorical variables 에서는 Hamming distance 를 사용해야 한다고 한다.

이것은 또한 data set 에 숫자형 변수(numerical variables)들과 범주형 변수(categorical variables)들이 섞여있을 때 0과 1사이의 숫자형 변수들에 대한 표준화문제를 가져온다. (얘기가 어려운데 이 부분은 아래 "distance 의 표준화" 부분을 보자.)

from : ref. 1



적절한 K 값

이제 거리에 대한 정의도 되었다. 그러면 이제 k 값을 정해서 그냥 사용하면 된다. 여기서 누군가는 그럼 k 값은 얼마로 정해야 하지? 라고 물을지 모르겠다.

사실 이것은 상황에 따라 다르다. 그렇지 않으면 왜 굳이 K개를 선택하게 했겠는가? 그냥 값을 정해서 알고리즘을 만들었을 것이다.

하지만 어느정도 사람들이 사용을 해보니, 어느정도의 결론이 나왔다. 그 이야기가 ref. 1에 있다. 대략적으로 정리를 하면,

  • K값이 클수록 평균적인 잡음(noise) 를 제거해 준다면, 일반적으로 큰 K 값이 좀 더 정확한 결과를 가져다 준다.
  • cross-validation 이 좋은 K 값을 결정하는 다른 방법이다. (?)
  • 역사적으로 대부분의 dataset 에서 가장 적절한 K(optimal K) 는 3~10 사이의 값이다. 이 것이 k=1 보다는 훨씬 좋은 결과를 보여준다.



distance 의 표준화, standardized distance

거리를 계산할 때를 한 번 봐보자. 아래처럼 input(녹색) 에 대해 거리를 계산하게 될 텐데, 이 때 위에서 나온 함수중에 Euclidean(유클리디안) 을 이용한다고 생각해보자.

from : ref. 1
그럼 아래처럼 계산할 수 있다.
그런데 이때 문제는 '나이' 와 '대출' 의 범위가 다르다는 것이다. 그래서 숫자의 크기가 다르다. 위의 예제에서는 대출은 2십만이 넘어가는 숫자이지만, 나이는 70살이하이다. 숫자의 크기가 다르니, 거리값이 숫자가 큰 '대출' 의 값에 의해 결정될 여지가 많아지는 것이다.

그래서 이 문제를 해결하기 위해 값을 표준화(standardization, normalization) 을 해야 한다. 표준화는 그리 어렵지 않다. 아래처럼 해주면 된다.
무슨 뜻이냐 하면, 값 x 를 전체 값 대비해서 어느 위치에 있는 값인지로 변경하는 것이다.
이러면, 값의 범위(scale) 이 0 ~ 1 로 같아지게 되고, 어느 한 변수가 더 큰 영향을 미치는 경우를 막을 수 있다.


  • scaling : x / (max-min) <--- 이것을 scaling 이라 한다.
  • mean normalization : x - min  <---- 이것을 mean normalization 이라 한다.






References

  1. KNN Classification
  2. k-nearest neighbors algorithm - Wikipedia

[컴] flow graph 란?

example of flow graph / flow graph example


flow graph

플로우 그래프(flow graph) 가 뭔지 간략하게 알아보자.

ref. 1 에 나온 정의를 보면 아래와 같다.

계산(computation) 을 표현하는 그래프이다. 이 그래프에서 각 노드는 기본적인 계산과 값을 표현한다.

a flow graph is a graph representing a computation, in which each node represents an elementary computation and a value (the result of the computation, applied to the values at the children of that node). Consider the set of computations allowed in each node and possible graph structures and this defines a family of functions. Input nodes have no children. Output nodes have no parents.

ref. 1 에 나온

  • sin(a^2 + a/b)

를 아래처럼 그릴 수 있다.

보면 어느정도 파악이 될 것이다. child node 가 input 이 되고, parent 가 output 이 되는 것이다.


References


  1. Introduction to Deep Learning Algorithms — Notes de cours IFT6266 Hiver 2010
  2. Flow graph (mathematics) - Wikipedia

[컴][수학] 선형독립, 선형종속

선형 종속, 선형 독립

선형종속, 선형독립

선형종속, 선형독립에 대해 알아보자. 둘의 관계를 이야기 하자면 반대되는 관계이다.

  • 선형 종속(linear dependent) <---> 선형 독립(linear independence)

일단 말뜻을 좀 풀어보자면, 선형독립은 linear 한 값들이 독립적이다. 라는 뜻으로 해석해 볼 수 있을 듯 하다.(수학적 정의를 이야기하는 것이 아니니 오해하지 말자.)

그러니까 여기서 linear 한 값이 vector 들인데, 이 vector 들을 합해서 0을 만들 수 없다면, 이 vector 들은 독립적이라고 본 것이다.

반대로 선형종속은 vector 들을 합해서 0을 만들 수 있으면 서로 의존성이 있다고 본 것이다.

그리고 밑의 예제에서도 보면 알겠지만, 이것은 즉 1차 방정식이 성립하는 가를 보여준다. 즉, 2x + 1 = 0 같은 1차 방정식이 성립한다면, 그것은 곧 linear graph 를 그릴 수 있다는 뜻이 된다. 그러므로 이것을 linear dependent 라고 이야기하는 듯 하다.

예제

선형종속이 어떤 모습인지 좀 더 구체적으로 알아보기 위해 아래 동영상에 있는 예제를 하나 가져왔다.

아래처럼 vector 가 3개 있다. 이 vector 의 linear combination 이 0 이 되게 해주는 a1, a2, ... 가 0 이외에 있다면, 이녀석은 linear dependence (선형종속) 라고 이야기 한다.

여기서 0 을 만들어주는 a1, a2, a3 가 존재하는 지 알기 위해 아래처럼 방정식을 세운다.

그래서 이것을 "가우시안-조단 소거법" 으로 풀면 아래와 같이 풀 수 있다.

위의 수식을 만족시키는 (a1, a2, a3) 는 0 vector 를 만들어줄 것이다. 그러므로 위의 vector 3개는 linear dependent (선형종속) 하다.

그래프, 기하학 관점에서 생각해보기

이 '선형종속'을 그래프에서 생각해 보자. 3개의 vector 의 합으로 0 을 만들 수 있다는 이야기는 '2개 vector의 합' 을 이용해서 나머지 '1개 vector ' 를 만들 수 있다는 이야기가 된다.( 0 이 되기위해서는 방향이 정 반대인 같은 크기의 값을 가져야 하니까)

[컴][링크] Deep learning 자료

딥러닝 읽을 자료 / 딥러닝 파악하기



README   SanghyukChun's Blog




Deep-Learning-Papers-Reading-Roadmap

deep learning 을 위한 자료들이다. 처음에 어떤 자료를 읽어야 할지 막막한 사람들을 위해 songrotek 이라는 분이 정리해놨다. 참고로 영문자료다.



Deeplearning.net Reading list

Etc

[컴][알고리즘] Raft 알고리즘



Raft 알고리즘에 대한 설명은 ref. 2 를 먼저보자. 간결하게 그림을 이용해서 설명해 주기 때문에 이해하기가 더 쉽다. 아래 자료를 이해하는데에도 ref. 2 를 한 번 보고 보면 이해가 잘 될 것이다.

여기의 내용은 필자가 이해하려고 정리한 내용이라 내용이 부실할 수 있다. 그러니 되도록 Reference 에 적어놓은 원본을 확인하자.


State machine replication

분산에서 사용되는 Replication models(복제모델) 이 몇개 있는데(참고) , 이중에 State machine replication 이 있다. 이 모델이 distributed consensus 를 이용해서 만들어 졌다. 이 모델도 transactional replication model 처럼 많이 쓰인다.

이 모델은 replicated process 가 deterministic finite automation 이고 모든 event 에 대해 atomic broadcast 가 가능하다 는 것을 가정한다.

  • 가정
    1. replicated process 가 deterministic finite automation
    2. 모든 event 에 대해 atomic broadcast 가 가능하다


이 state machine replication 은 어떤 node(state machine) 가 동일한 순서로 input 을 처리한다면 동일한 순서의 output 이 나오게 된다. 그렇기 때문에 동일한 순서의 input 을 결정해서 그것을 모든 node 에 뿌려주는 작업을 해야 한다.



Distributed Consensus 는 뭔가?[ref. 1, 2]

이 동의(consensus)에 대한 이야기는 ref. 2 의 slide 를 보는 것이 효과적인 듯 하다.

consensus 동작을 설명하기 위해 "분산으로 구성된 database "를 보자. 이 db 는 각 node 가 같은 data 를 가지고 있어야 한다고 가정하자.

이 때 client 가 한 node 랑 통신을 할 것인데, 이 때 db 에 "x 라는 값을 5로 변경하라" 고 query 를 날렸다고 하자. 그러면 x 가 5으로 바뀌면 명령은 완료된다. 그런데 문제는 distributed system 이다. 여러개의 node 가 가지고 있는 x 라는 값이 다 똑같이 5로 바뀌어야 한다.

이렇게 여러개의 node 가 같은 명령(command) 을 수행해서 같은 동작을 하도록 하기 위해 필요한 것이 consensus(동의) 이다.

이 부분에 대해서는 여기서도 설명을 하겠지만 ref. 2 를 보는 것이 이해하기 좋다.

여하튼 모든 node 들이 같은 행동을 하기 위해서
  1. 일단 한 node (node A 라고 하자.)가 command 를 받으면 
  2. 그것을 바로 수행하지 않고, 그 command 를 log 에 적는다.
  3. 그리고 다른 node 들에게 그 command 를 날려준다.
  4. 그럼 다른 node 들도 그 command 를 받고 log 에 적은 후에
  5. log 를 적었다고 node A 에게 응답을 준다.
  6. 그럼 node A 가 응답들을 보고 과반수 이상이 command 를 log 에 적었다는 것을 확인하면,
  7. node A 는 log 에 적어놓은 그 command 를 실제로 수행 한다.(commit)
  8. 그리고 client 에게 응답을 준다.

    이 경우에 client 가 응답을 못받으면, command 는 실행되고, client 는 command 가 수행된지를 몰라서 같은 작업을 다시 반복하게 된다. 이렇게 계속 같은 작업을 하게 될 수 있기 때문에 command 에 unique serial number 를 포함시킨다.[ref. 4 > Client interaction]
  9. 그 다음 다른 node 들에게 자신(node A) 가 commit 했다고 알려준다.
  10. 그럼 다른 node 들도 log 에 적어놓은 comand 를 commit 하게 된다.



아래는 ref. 1 에서 이야기하는 Consensus 에 대한 설명이다.

----------------------------------------------------------------

동의(Consensus) 는 fault-tolerant distributed system(문제가 발생해도 계속 잘 동작하는 분산시스템이라고 보면된다.) 에 기본적인 문제이다.

consensus 는 여러개의 서버들이 값들(values)에 대해 동의하는 것을 포함한다. 그들이 어떤 값에 대한 결정에 도달하자마자, 그것은 최종결정이다. 일반적인 consensus algorithm 들은 다수의 서버들이 동작할 때 일을 진행한다. 예를 들면, 5개 서버로 된 cluster 는 심지어 2개의 서버에 문제가 생겨도 일을 계속해 나갈 수 있다.  만약 더 많은 서버에 문제가 생기면, 그들은 일을 진행하는 것을 멈춘다.(그러나 절대 틀린 결과를 return 하지는 않을 것이다.)

동의는 일반적으로 fault-tolerant system 들을 만드는 일반적인 방법인 replicated state machine들이 있는 상황안에서 발생한다.
각각의 서버들은
  1. state machine과
  2. log 

를 갖는다. state machine 은 우리가 fault-tolerant 하고 싶은 구성요소(component)다. 예를 들면 hash table 같은 것이 있다.

client 는 하나의 믿을 수(reliable) 있는 state machine 과 상호작용하게된다.

각 state machine 은 input command 들을 그들의 log 로 부터 받는다.
우리의 hash table 예제에서, log 는 'x 를 3으로 set 하라' 같은 command 들을 가지고 있을 것이다. 서버들의 log 가 갖고 있는 command 에 대해 동의하기 위해서 consensus algorithm 을 사용한다.
consensus algorithm 은 다음과 같은 것을 보장해야만 한다.
  • 만약 n번째 command 로 어떤 state machine 이  'x 를 3으로 set 하라' 는 명령을 적용한다면, 
  • 다른 state machine 은 항상 다른 n번째 command 를 적용해야 한다. 

결과적으로 각각의 state machine 은 같은 series of commands 를 처리한다. 그리고 그래서 같은 series of result 들을 생산하고 같은 series of states 에 도달하게 된다.

----------------------------------------------------------------



Raft algorithm[ref. 3]

제일 일반적인 동의알고리즘(consensus algorithm) 은 Paxos 인데 이녀석은 이해하기 어렵고, 정확히 구현하기도 어렵다고 알려져 있다.

Raft 는 새로운 동의알고리즘(consensus algorithm)
이해가능함을 위해 디자인됐다.

Raft 는
  1. 먼저 server 를 leader 로 선출한다.
  2. 그리고 모든 결정들을 leader 에게 모아준다.
이 2가지 과정이 상대적으로 독립적이며 Paxos 보다 나은 구조를 만들어준다.

Raft 는 아래 방법을 이용해서 리더를 선출한다.
  1. 투표(voting) 와
  2. 랜덤화한 타임아웃(randomized timeouts) 
선거(election)는 리더가 이미 필요한 모든 정보를 저장했다는 것을 보장 해 준다. 그래서 데이터는 오직 리더에서 다른 서버들로만 흘러간다.

다른 leader-based algorithm 들과 비교하면, 이것은 방법(mechanism) 을 줄이고 행동을 단순화한다. 리더가 선출되며, 리더는 복사된 log 를 관리한다.

Raft 는 로그들이 어떻게 커지는 지에 관한 간단한 불변(simple invariant)을 레버리지(leverage)해서 알고리즘의 state space 를 줄이고 적은 mechanism 으로 이 작업을 달성한다.

Raft 는 이전의 알고리즘들에 비해 실제 세계 구현에도 적합하다. 이것은 실질적인 배포(deployments) 에도 충분히 잘 동작한다.

이것은 "고객 상호작용을 어떻게 관리하는지", "cluster 의 membership 을 어떻게 바꾸는지", 그리고 "로그가 너무 커졌을 때 그것을 어떻게 컴팩트하게 하는지"를 포함한 모든 완전한 시스템을 만드는 모든 측면을 언급한다.

cluster membership 을 바꾸기 위해, Raft 는 한번에 하나의 서버를 더하거나 뺀다.
(복잡한 변경은 이 기본 과정들로 작성할 수 있다.)

그리고 cluster 는 변경을 하는 중에도 계속해서 request 들을 계속 처리할 수 있다.



리더 선출(leader election)

node 의 state

node 는 다음 3가지 state 를 갖는다.

  1. follower
  2. candidate
  3. leader
처음 시작할 때 node 는 모두 follower 이다. 만약 follower 가 leader 를 가지고 있지 않으면 그들은 candidate 가 될 수 있다.

또는, node 는 leader 로 부터 heartbeat 을 주기적으로 받는데, 이 heart beat 이 오지 않아서 heartbeat timeout 이 끝나면 leader 를 가지고 있던 node 이지만, 이 node 는 candidate 가 되는 것이다.


2개의 timout 세팅

node 는 다음 두종류의 timeout 을 갖게 된다. 
  1. election timeout:
    follower 가 candidate 가 되기까지 걸리는 시간이다. 150ms ~ 300ms 사이의 random 한 값으로 각 node 가 각자 setting 한다.

    그리고 만약 2개의 node 가 candidate 가 되는 경우에는 같은 election term 을 가진 2개의 election 이 발생한다. 이 경우 각 candidate 가 vote 를 해달라고 node 에게 message 를 보낸다.

    그러면 한 node 는 같은 election term 을 가진 vote request 를 2개 받을 것이다.
    이 경우에 node 는 먼저 온 vote request 에 대해서만 응답을 해주고, 나중에 온 vote request 에는 응답을 하지 않는다.(node 는 한 election term 에 대해서 한개의 vote 만 한다.)

    이 때 누군가 과반수를 얻는다면 그 candidate 가 leader 가 되지만, 과반수를 얻지 못하면 candidate 가 timeout 되고 새로운 election term 을 시작하게 된다.

    아래 "leader 선출 과정" 참고
  2. heartbeat timeout
    주기적으로 leader 가 message 를 보내는데 이 heartbeat timeout 시간안에 보내야 leader 가 살아있다고 판단한다. 만약 이 시간안에 message 가 오지 않으면 node 는 바로 candidate 가 된다.


초기상태

이제 막 node 들의 설치가 끝나고 node 들을 실행(run) 했다고 가정해보자.

각 node 는 각자 150ms ~ 300ms 사이의 random 한 값으로 election timeout 을 setting 하고, 그 시간이 끝나면 follower 에서 candidate 로 상태가 바뀐다.

각 node 가 random 하게 timeout 을 설정했기 때문에 각자 candidate 로 상태가 바뀌는 시간이 다를 것이다.


leader 선출 과정

일단 candidate 가 된 node 는,

  1. 새로운 선거기간(election term) 을 시작한다. (termCount: 1)
  2. 그리고 자기한테 vote 하고, (voteCount: 1)
  3. 다른 node 에게 vote 를 해달라고 request 를 보낸다.(Request Vote message)

    이 때 timeout 시작한다. 이 timeout 까지 과반의 vote 를 획득해야 한다.

    참고로 다른 node 가 leader 라면서 message 를 보낼 수 있는데 이때는 termCount 가  자신보다 높은지를 확인하고, 높다면 자신은 follower 가 된다.

    자세한 사항은  In Search of an Understandable Consensus Algorithm > 5.2 Leader election 을 참고하자.
  4. 그러면 request 를 받은 node 들 중에 이 선거기간(election term) 에 "vote 를 하지 않은 node" 들은 candidate 에게 vote 를 해준다. 
  5. 그리고 이 node 들은 자신들의 election timeout 을 reset한다. 
  6. 이렇게 해서 한 candidate 가 과반수의 vote(majority of votes) 를 얻으면 그 candidate 가 leader 가 된다. 이 과반수가 leader 가 단 하나만이 존재하도록 보장해 준다.

    여러개의 candidate 가 vote 를 해달라고 node 에게 message 를 보내게 되면, node 는 같은 termCount(election term) 을 가진 request 을 여러개 받게 된다. 이 경우에 node 는 먼저 온 request 에 vote 를 하고, 뒤이어 오는 다른 request 에는 vote 를 하지 않는다.(한 election term 에는 한개의 vote 만을 하는 것이다.)

    이래서 만약 어떤 candidate 도 '과반수' 를 얻지 못한다면, candidate 들은 timeout 되고, 새로운 election term 을 시작하게 된다.


leader 와 heart beat(Append Entries message)

node 가 leader 가 되면

  1. leader 는 Append Entries message 를 follower 들에게 보내기 시작한다.
  2. 이 message 들은 각 node 들이  가지고 있는 heart beat timeout 에 의해 정해진 시간(interval)안에 node 들에게 도착하게 된다.
  3. 이 message 를 받으면 node 들은 heart beat timeout 을 reset 하고 다시 시작한다.
  4. 그리고 leader 에게 응답을 보낸다.
  5. 이 상태가 유지되면 election term 은 계속 같은 id 를 유지하게 된다.
follower 가 heart beat 을 받는 것을 멈추고 candidate 가 될 때까지, 선거기간(election term) 은 계속 된다. 즉 새로운 candidate 가 생겨서 새로운 election term 이 시작되기 까지는 기존의 election term 이 유지되는 것이다.

만약 현재 leader node 가 어떤 이유로 멈추면, hear beat timeout  시간안에 Append Entries message 가 오지 않아서 heartbeat timeout 이 끝나게 된다. 그럼 node 는 바로 candidate 가 되고, "leader 선출 과정" 을 시작하게 된다.



Log Replication, 로그 복제

로그는 client 가 request 한 command 를 적어놓은 것이기 때문에 로그 복제는 command 의 복제라고 보면 된다. 즉 로그 복제를 한다는 것은 실제 동작을 복사해서 follower node 에게 전달하는 것이다. (이것은 state machine 이기 때문에, 같은 순서의 command 를 제공하기만 한다면, 같은 결과를 가져오기 때문에 가능한 것이다.)

이 log replication 은 리더에 의해서 시작된다. 이는 Raft 에서 client 가 leader 에만 접근가능하기 때문이다.

여하튼 leader 는 위에서 heartbeat 로 사용했던 Append Entries message 를 이용해서 변경사항을 node 들에 전달한다.

이것을 대략 정리하면 아래와 같다.
  1. 일단 leader node (node A 라고 하자.)가 command 를 받으면 
  2. 그것을 바로 수행하지 않고, 그 command 를 log 에 적는다.
  3. 그리고 다른 node 들에게 그 command 를 날려준다.
  4. 그럼 다른 node 들도 그 command 를 받고 log 에 적은 후에
  5. log 를 적었다고 node A 에게 응답을 준다.
  6. 그럼 node A 가 응답들을 보고 과반수 이상이 command 를 log 에 적었다는 것을 확인하면,
  7. node A 는 log 에 적어놓은 그 command 를 실제로 수행 한다.(commit)
  8. 그리고 client 에게 응답을 준다.

    이 경우에 client 가 응답을 못받으면, command 는 실행되고, client 는 command 가 수행된지를 몰라서 같은 작업을 다시 반복하게 된다. 이렇게 계속 같은 작업을 하게 될 수 있기 때문에 command 에 unique serial number 를 포함시킨다.[ref. 4 > Client interaction]
  9. 그 다음 다른 node 들에게 자신(node A) 가 commit 했다고 알려준다.
  10. 그럼 다른 node 들도 log 에 적어놓은 comand 를 commit 하게 된다.

관련해서 자세한 사항은 ref. 2 를 참고하자. ref.2 에는 추가적으로 partition 이 나눠져서 log 가 2개가 생기는 경우에 대한 예제도 있다.



Client

client 가 처음에 cluster 에 접근하게 되거나 leader 가 crash 되었거나 하면, 여러 node 중에 아무 node 에나 접근하게 된다. 이 때 node 는 이 client 를 무조건 reject 하고 leader 정보를 준다. 그럼 client 가 이 정보를 가지고 다시 leader 로 접근하게 된다.

원래 node 가 leader node 로 부터 AppendEntries message 를 주기적으로 받는데, 이 녀석에 leader 의 network 주소도 같이 있어서 redirect 시켜주는 것은 어렵지 않다.


Read-only operation

Read-only 명령은 log 에 굳이 명령을 적지 않고 수행해도 문제가 없다. 다만 이 경우에 return 해주는 data 가 오래된 data(stale data) 가 아니어야만 한다. 

그런데 client 가 통신하고 있는 도중에 leader 가 교체된 상태라면, data 는 최신 data 를 보장할 수 없다. client 에게 return 하는 data 가 아직 leader 의 최신 data 로 update되지 않은 data 일 수 있다.


stale data 의 보장

log 를 이용하지 않고 stale data 가 아닌것을 보장하기 위해 Raft 에서 2가지 예방책이 필요하다.
  1. 리더가 최근 정보를 가져야만 한다. 이정보의 내용들은 당연히 commit 이 이미 된 내용(committed entries)이어야 한다.(Leader Completeness Property )

    그러나 term 의 시작시점에서는 log 에 적혀있는 entry 중에 어떤 녀석이 commit 된 내용인지 알 수 없다.(ref. 4 > figure 8 참고)

    이것을 알기 위해서 Raft 에서는 각 리더가 term 의 시작시점에 blank no-op entry 를 log 에 commit 한다. (그러니까 no-op entry 에 대한 commit 까지 완료하면, 이전의 entry 들은 commit 이 제대로 됐다고 볼 수 있다?)
  2. 두번째로 리더 노드는 read-only request 를 처리하기 전에 자신이 리더에서 물러난 상태인지 여부를 확인해야만 한다.

    Raft 는 read-only reqeust 에 응답하기 전에 리더가 heartbeat message 를 cluster 의 과반수의 node 와 교환하게해서 이것을 처리한다.



User-group

궁금한 점은 아래 user-group 을 이용하면 될 듯 하다.

See Also


  1. Introducing Runway, a distributed systems design tool : Runway system 을 소개해 준다. 분산시스템 디자인(distributed system design) 을 위한 tool 이다.
  2. GitHub - salesforce/runway-browser: Interactive visualization framework for Runway models of distributed systems : Github of Runway system 


References

  1. Raft Consensus Algorithm: raft 알고리즘과 관련된 자료들이 전부 모아져 있다. 동영상, library 들의 link 도 있다.
  2. The Secret Lives of Data : Raft 설명, 자세하게 예제를 slide 를 이용해서 설명해준다.
  3. Diego Ongaro's PhD dissertation, Consensus: Bridging Theory and Practice, published by Stanford University in 2014 > Abstract
  4. In Search of an Understandable Consensus Algorithm









[컴][알고리즘] 얼굴 인식, 안면인식




Face recognition

face recognition (안면인식)에 대한 글이 있어서 정리를 좀 해본다. 대부분의 내용은 번역이 될 듯 하고, 개인적으로 이해한 대로 좀 각색하게 될 듯 하다.

동작

컴퓨터가 얼굴을 인식할 때 아래 4가지 과정을 거치게 된다.
  1. 먼저, 사진을 보고, 그곳에 있는 모든 얼굴을 찾는다.
  2. 2번째로 각각의 얼굴에 집중하고, 얼굴이 이상한 방향으로 되어 있고, 조명이 좋지 않아도 그것은 여전히 같은 사람이라는 것을 이해할 수 있어야 한다.
  3. 3번째로 다른 사람과 다른 고유한 특징(unique features)들을 뽑아낼 수 있어야 한다. 예를 들면 눈이 얼마나 큰지, 얼굴이 얼마나 긴지 등의 요소들
  4. 마지막으로 이 고유한 특징을 당신이 이름을 알고 있는 모든 사람들과 비교한다.

Step 1. 얼굴 찾기

얼굴찾기 알고리즘

Paul Viola 와 Michael Jones 가 값싼 카메라에서도 충분히 빠르게 얼굴을 찾는 방법을 발명했다. 그 덕분에 2000년대초에 얼굴인식 이 주류가 될 수 있었다.

요새는 좀 더 신뢰성있는 알고리즘을 사용한다. 그 알고리즘이 Histogram of Oriented Gradients(HOG) 이다. 2005년 만들어졌다.

HOG

HOG 에서는 SIFT 방법을 사용해서 이미지의 윤곽을 인식한다.

SIFT

정확한 SIFT 방법은 여기 를 참고하자. 여기서는 대략적으로 어떤 식으로 동작하는지 알아보는 정도 수준이라는 것만 명심하자.


color 정보가 필요없으니, 이미지를 흑백으로 만든다.

이 이미지를 한 픽셀(pixel) 씩 보기 시작한다. 그러면서 이 pixel 의 위아래옆에 있는 8개의 pixel 도 같이 본다.

목표는 현재 보는 pixel 이 주위 8개의 pixel 보다 얼마나 어두운지를 알아내는 것이다.
그래서 어두워지는 방향으로 화살표를 그리는 것이다. 예를 들면, 현재 pixel 보다 위가 더 어두우면 up-arrow 를 사용하고, 위와 오른쪽이 더 어둡다면 up-right arrow 를 사용하는 것이다. 이렇게 되면 모든 pixel 은 화살표로 채워진다.

그리고 여기에 더해서 그 정도의 차이를 구한다. 그러니까 현재 pixel 에 비해 얼마나 진한가등의 정보를 구한다. 그래서 이 "방향 + 진한정도(magnitude)" 가 한개의 vector 가 되는 것이다. 이 화살표 vector 를 gradient 라고 부른다.
(실제로 구현은, 각 방향에 대한 gradient 값(histogram) 을 저장하는 형식으로 구현되는 듯 하다.)

이렇게 하면 어떤 방향으로 갈수록 더 어두워지는지를 알 수 있게 된다. 이 방법의 장점은 빛의 방향(?)을 알 수 있게 돼서 너무 밝은 사진에서도, 너무 어둡게 찍힌 사진에서도 사람의 형체를 알아보기가 수월해진다. 다시 말하면, 윤곽을 어두운 정도로 찾아내기 때문에 명확한 구분이 되는 선이 보이지 않아도 가능하다.

그런데 이 gradient 는 너무 세세한 level (그러니까 pixel level) 로 되어 있다. 그래서 오히려 형체를 알아보기가 어렵다. 장님이 코끼리 만지는 느낌이랄까. 그래서 좀 더 higher level 로 이 gradient 를 만들어야 한다. 그래서 이것을 16x16 pixcel 단위로 나눈다. 이 16x16 단위를 cell 이라고 부르자.( Histogram of Oriented Gradients(HOG)  에서는 16x16, 8x8 로 쪼개는 것이 효율적이라고 이야기한다. 자세한 내용은 원문을 참고하자.)

그래서 그 안에서 각 direction 에 해당하는 magnitude 의 합을 구하면, 어느 방향이 가장 큰 magnitude 를 가지는지 알 수 있다. 그 방향이 그  단위의 major direction (dominant direction)이 되는 것이다.

각 방향별로 vector 가 있다. 긴 화살표가 더 큰 magitude 를 나타낸다.

그 cell 을 major direction 을 표현하는 image 로 바꿔 놓는다. 그러면 이제 우리는 대략적인 윤곽을 보여주는 image 를 갖게 된다. 이것을 HOG image 라고 하자.

컴퓨터에게 A에 대한 여러 사진을 주면, 컴퓨터가 HOG image 들을 생성하고, 이 image 를 이용해서 A 에 대한 HOG pattern 을 만든다.

이것을 새롭게 올라온 사진의 HOG image 가 어떤 HOG image 와 비슷한지를 봐서 누구인지를 판단하는 것이다.(machine learning)

python 과 dlib 를 이용해서 구현할 수 있다.



작성중....


References

  1. Machine Learning is Fun! Part 4: Modern Face Recognition with Deep Learning — Medium
  2. http://www.scholarpedia.org/article/Scale_Invariant_Feature_Transform
  3. http://www.inf.fu-berlin.de/lehre/SS09/CV/uebungen/uebung09/SIFT.pdf