[컴] Bit torrent - 정리안된 자료



정리안된 자료다. 일단 버리는 것보다 살려두면 혹시나 필요할 일이 있을 듯 하여일단 적어놓는다. 정리는 추후에...^^;;;

Bit torrent


bencoding : bencoding - BitTorrentSpecification - Theory.org Wiki
go bencoding library : marksamman/bencode: Bencode implementation in Go


pure python code : Blender3D/torrent: Pure-Python torrent client.



pip install tornado
torrent/file

.torrent 파일을 parsing 해서 meta info 를 얻는다.
meta info 의 announce-list 가 최초 trackers 가 된다.
이 announce-list 의 url 을 보고 http/udp 인지를 판단한다.
아래의 정보를 tacker 에게 보낸다.
{'compact': 1, 'downloaded': 1000000, 'info_hash': '\xd5\xde\x04}\xcc\x...`\x17\xc6', 'left': 10000000, 'peer_id': "-PT0000-Q\x04\x11\x...2\xf9\xf8", 'port': 6881, 'uploaded': 1000000}


test torrent file



Bit torrent specification

비트토렌트에 관련한 스펙은 ref. 2, ref. 5 에 나와 있다. 한번 읽어보자.




main()

torrent = Torrent(options.torrent)

server = Server(
    torrent=torrent,
    download_path=options.path,
    storage_class=DiskStorage,
    peer_id=peer_id(),
    max_peers=options.max_peers
)
server.listen(options.port)
server.start()

...

IOLoop.instance().start()



Server 의 _init__ 에서 .torrent 의 info 정보에 'length' 가 있는데 이 정보를 보고 필요한 disk 용량을 잡아놓는다.(storage_class.from_torrent() )
info['length'] 또는 info['files']['length'] 에 size 정보가 있다.
그리고 info['piece length'] 가 block size 로 사용된다.
이 piece 에 대한 hash 도 .torrent 에 있는데 이녀석도 가지고 있는다.

 이 hash 는 나중에 bitfield 를 보내줄때 내가 어떤 block 을 가지고 있는지 확인한 후에 알려줘야 하는데, 이것을 위해서 요청한 index 의 blcok hash 값과 자신이 가지고 있는 block 의 hash 값을 만들어서 비교한다.

bitfield 도 내가 가진 block 들을 전부보면서 block hash 값을 가지고 있는지를 보고 가지고 있다면 그 block 이 존재하는 것으로 보고 1 을 set 한다.

이 block 하나가 index 하나가 된다. 즉 index 하나는 한개의 block 을 가리킨다.


일단 file 을 만든적이 없으면 file 을 만들어서 사이즈만큼 00 을 채운다. (utils.fill) 그리고 이 녀석을 다시 열어서 이 handle 을 가지고 있는다.


해당 peer 에서 have message 를 받으면, peer 마다 가지고 있는 array 에 have 에서 알려준 index 에 대한 표시를 해둔다. (Client.got_have)


bitfield message

처음 peer 와 접속하게 되면,
server.storage.to_bitfield 를 통해 현재가지고 있는 piece 들을 bitfield 로 만들어서 던져준다.

piece message

받는 경우
내가 piece message 를 받아서 그 piece message 에 있는 index, begin을 보고 file 의 offset 을 찾은후에

  • offset = self.block_size * index + begin

거기다가 piece message 에 있는 block 을 write 한다.
write 가 끝나고 piece 가 전부 다 write 가 됐는지 확인하는 방법으로 hash 를 비교한다.

보내는 경우
상대가 request message 를 보냈고, 해당 piece 가 있다면, 그 piece 를 실어서 이쪽에서는 piece message 를 보내주게 된다.



have message

piece message 를 받고 block 을 전부 write 했다면, 해당 index 에 대한 have message 를 server module 을 통해 다른 peer 에게 날리게 된다.(Client.got_piece)



reqeust message

이렇게 message 들을 주고받다가,
missing_piece / total_piece < 0.05 (endgame mode) 라면
모든 piece를 받았는지 확인해보고, 다 받았으면, IOLoop 을 stop 한다.(client.message_loop)
다 받지 않았다면, missing_piece 부분을 받기위해 Request message 를 만들어 보낸다.

이밖에도 piece message 를 받고나서,
또는 unchoke message 를 받고나서,
request message 를 보낸다.
request message 는 원하는 piece에 대한 index 를 보낸다. 물론 그 block 에 대한 offset, size 도 함께 보낸다.

받은 경우
request message 를 받으면, 해당하는 piece 가 있는지 확인하고 있으면 piece message 를 만들어서 보내게 된다.



Torrent class 를 new 하면서 tracker 들을 new 한다.
tracker 는 HttpTracker 를 new 하고, HttpTracker 는 AsyncHTTPClient 를 new 한다.
AsyncHTTPClient  는 내부에서 TCPClient 를 new 한다.


torrent = Torrent(options.torrent)
class Torrent(object):
    def __init__(self, handle=None):
        self.trackers = self._trackers()

    ...
    def _trackers(self):
        trackers = self.meta.get('announce-list', [[self.meta['announce']]])
        result = []

        for tier, urls in enumerate(trackers):
            for url in urls:
                tracker = Tracker(url, torrent=self, tier=tier)
                result.append(tracker)

        return result

def Tracker(url, torrent, tier=0):
    o = urlsplit(url)

    if o.scheme == 'http':
        return HTTPTracker(url, torrent, tier)
    ...

class HTTPTracker(object):
    def __init__(self, url, torrent, tier=0):
        ...
        self.client = AsyncHTTPClient()

class SimpleAsyncHTTPClient(AsyncHTTPClient):
    ...
    def initialize(self, io_loop, max_clients=10,
                    hostname_mapping=None, max_buffer_size=104857600,
                    resolver=None, defaults=None, max_header_size=None,
                    max_body_size=None):
        ...
        self.tcp_client = TCPClient(resolver=self.resolver, io_loop=io_loop)





Server.start

TCPServer.start 를 한다.
그리고 connect_to_peers() 한다.

connect_to_peers()

  • 여기서 scripe_trackers() 를 하는데
  • scripe_trackers
    • tracker 에 대한 정보를 얻는다.
      •  future = tracker.announce(self.peer_id, self.port, event='started', num_wanted=50)
      • 에서 tracker url 을 fetch 한다. 그러고 나서 response 를 얻는다.
      • 여기서 tornado.AsychHTTPClient를 사용하게 된다.
    • response 가 온 후에 할일을 정해준다. 
      • future.add_done_callback(lambda future: self.tracker_done(future, result))
    • tracker_done
      • response 가 제대로 오지 않으면 log 를 남기고, 제대로 왔으면, 결과를 가지고 peer 정보를 update 한다.
  • Server.connect(self.unconnected_peers.pop())
    • 접속되지 않은 peers 에게 접속한다.


Server.connect

@coroutine
@gen_debuggable
def connect(self, peer):
    logging.info('Connecting to %s', peer)

    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    stream = IOStream(sock)
    yield Task(stream.connect, (peer.address, peer.port))

    self.handle_stream(stream, (peer.address, peer.port), peer)



여기서 이제 peer 에 접속해서 data 를 가져오게 된다.
socket 을 열고 peer.address 에 connect 한다.
tornado 의 IOStream 을 이용한다.
iostream 을 peer.address 에 대해 connect 한다.
그리고 연결이 되면, Server.handle_stream 을 호출한다.

Server.handle_stream

여기서 Client 를 new 한다. Client 는 new 하면, 주기적으로 keepalive message 를 IOStream 을 통해 보내게 된다.
client 를 new 하고 나서 stream(IOStream) 이 close 될 때 쓰일 callback 함수도 등록한다.
그리고 이 Client instance를 Server.connecting_peers 에 add 한다.
그리고  client.handshake 를 한다. 이 handshake 가 잘 되면, Server.connected_peers 에 client 를 등록해 놓는다.


Client.handshake

protocol + info_hash + peer_id 를 IOStream 을 이용해서 보낸다. 그러면 peer 에서도 같은 형식의 message 를 보낸다. 이것이 완료되면, client 는 message_loop 을 시작한다.

message_loop 에서는 peer message 들을 처리한다.(참고 : ref. 2 > peer message)

\x 는 16진수를 나타낸다.
message = chr(len(self.protocol))
message += self.protocol
message += '\0\0\0\0\0\0\0\0'
message += self.server.torrent.info_hash()
message += self.server.peer_id

"\x13BitTorrent protocol\x00\x00\x00\x00\x00\x00\x00\x00?\x19\xb1I\xf5:P\xe1O\xc0\xb7\x99&\xa3\x91\x89n\xab\xabo-PT0000-\x7f\xfc1\xf3\xe9\xdc\x802\x1bE\xbd\xa3"


응답 메시지







server start 를 하면 announce 를 하고, 이 때 future 를 넘긴다. 이 future 는 다른 작업을 하는 녀석에게도 전달되고, 그 다른 작업을 하는 녀석이 이 future 에 결과를 반영할 것이다.
future = tracker.announce(self.peer_id, self.port, event='started', num_wanted=50)
announce 에서 yield self.client.fetch(tracker_url) 한다. 이 fetch 안에서 future 를 만드는데, 자기도 reference 가지고 handle_response 라는 callback 에서 사용한다.(request 에 대한 결과를 여기서 set_result() 한다.)
즉, client.fetch() 가 끝나고 나서 control 을 넘긴다.

그러면, announce 는 @coroutine 이여서,
yield 를 해서 control 을 넘기면, Runner() 를 new하게 된다.(_make_coroutine_wrapper.wrapper)
이 때 Runner 를 run() 할지 안할지를 __init__ 에서 판단한다.(handle_yield)
현재 넘어온 yielded (self.future)가 아직 done() 이 아니면, 나중에 run() 을 하고.
self.io_loop.add_future(self.future, lambda f: self.run())
그렇지 않으면 바로 run() 을 하게 된다.


future = tracker.announce(self.peer_id, self.port, event='started', num_wanted=50)
의 다음 line 으로 넘어간다
future.add_done_callback(lambda future: self.tracker_done(future, result))






Stack trace

server.start()
Server.connect_to_peers() # server.py
Server.connect_to_peers(self) # server.py
Server.scrape_trackers # server.py
Server.tracker.announce # server.py
AsyncHTTPClient.fetch # httpclient.py
SimpleAsyncHTTPClient.fetch_impl #simple_httpclient.py
SimpleAsyncHTTPClient._process_queue #simple_httpclient.py
SimpleAsyncHTTPClient._handle_request #simple_httpclient.py
_HTTPConnection.__init__ #simple_httpclient.py
wrapper #gen.py  
TCPClient.connect # tcpclient.py
Runner(result, future, yielded) # gen.py



server = Server(
    torrent=torrent,
    download_path=options.path,
    storage_class=DiskStorage,
    peer_id=peer_id(),
    max_peers=options.max_peers
)
server.listen(options.port)
server.start()

def start(self, num_processes=1):
    TCPServer.start(self, num_processes)

    self.connect_to_peers()


def connect_to_peers(self):
    self.peer_stats()

    num_active = len(self.connected_peers) + len(self.connecting_peers)

    if num_active != self.max_peers and not self.unconnected_peers:
        logging.info('No peers to choose from. Scraping trackers..')
        yield self.scrape_trackers()

def scrape_trackers(self):
    result = Future()

    for tracker in self.torrent.trackers:
        logging.info('Announcing to tracker %s', tracker.url)

        future = tracker.announce(self.peer_id, self.port, event='started', num_wanted=50)
        future.add_done_callback(lambda future: self.tracker_done(future, result))

    return result

@coroutine
def announce(self, peer_id, port, event=None, num_wanted=None, compact=True, no_peer_id=None):
    params = {
        'info_hash': self.torrent.info_hash(),
        'peer_id': peer_id,
        'port': port,
        'uploaded': self.torrent.uploaded,
        'downloaded': self.torrent.downloaded,
        'left': self.torrent.remaining,
        'compact': int(compact)
    }

    ...
    tracker_url = url_concat(self.url, params)

    response = yield self.client.fetch(tracker_url)

class AsyncHTTPClient(Configurable):
    def fetch(self, request, callback=None, raise_error=True, **kwargs):
        ...
        def handle_response(response):
            if raise_error and response.error:
                future.set_exception(response.error)
            else:
                future.set_result(response)
        self.fetch_impl(request, handle_response)
        return future

class SimpleAsyncHTTPClient(AsyncHTTPClient):
    ...
    def fetch_impl(self, request, callback):
        ...
        self._process_queue()
    ...
    def _process_queue(self):
        with stack_context.NullContext():
            while self.queue and len(self.active) < self.max_clients:
                ...
                self._handle_request(request, release_callback, callback)

    def _connection_class(self):
        return _HTTPConnection

    def _handle_request(self, request, release_callback, final_callback):
        self._connection_class()(
            self.io_loop, self, request, release_callback,
            final_callback, self.max_buffer_size, self.tcp_client,
            self.max_header_size, self.max_body_size)

class _HTTPConnection(httputil.HTTPMessageDelegate):

    def __init__(self, io_loop, client, request, release_callback,
                 final_callback, max_buffer_size, tcp_client,
                 max_header_size, max_body_size):
        ...
        self.tcp_client = tcp_client
        ...
        with stack_context.ExceptionStackContext(self._handle_exception):
            ...
            self.tcp_client.connect(host, port, af=af,
                                    ssl_options=ssl_options,
                                    max_buffer_size=self.max_buffer_size,
                                    callback=self._on_connect)
                                    
class TCPClient(object):
    ...
    @gen.coroutine
    def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None,
                max_buffer_size=None):
        """Connect to the given host and port.

        Asynchronously returns an `.IOStream` (or `.SSLIOStream` if
        ``ssl_options`` is not None).
        """
        addrinfo = yield self.resolver.resolve(host, port, af)
        connector = _Connector(
            addrinfo, self.io_loop,
            functools.partial(self._create_stream, max_buffer_size))
        af, addr, stream = yield connector.start()
        # TODO: For better performance we could cache the (af, addr)
        # information here and re-use it on subsequent connections to
        # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2)
        if ssl_options is not None:
            stream = yield stream.start_tls(False, ssl_options=ssl_options,
                                            server_hostname=host)
        raise gen.Return(stream)








torrent = Torrent(options.torrent)
class Torrent(object):
    def __init__(self, handle=None):
        self.trackers = self._trackers()

    ...
    def _trackers(self):
        trackers = self.meta.get('announce-list', [[self.meta['announce']]])
        result = []

        for tier, urls in enumerate(trackers):
            for url in urls:
                tracker = Tracker(url, torrent=self, tier=tier)
                result.append(tracker)

        return result


def Tracker(url, torrent, tier=0):
    o = urlsplit(url)

    if o.scheme == 'http':
        return HTTPTracker(url, torrent, tier)
    ...

class HTTPTracker(object):
    def __init__(self, url, torrent, tier=0):
        ...
        self.client = AsyncHTTPClient()

class SimpleAsyncHTTPClient(AsyncHTTPClient):
    ...
    def initialize(self, io_loop, max_clients=10,
                    hostname_mapping=None, max_buffer_size=104857600,
                    resolver=None, defaults=None, max_header_size=None,
                    max_body_size=None):
        ...
        self.tcp_client = TCPClient(resolver=self.resolver, io_loop=io_loop)


server = Server(
    torrent=torrent,
    download_path=options.path,
    storage_class=DiskStorage,
    peer_id=peer_id(),
    max_peers=options.max_peers
)
server.listen(options.port)
server.start()

def start(self, num_processes=1):
    TCPServer.start(self, num_processes)

    self.connect_to_peers()


def connect_to_peers(self):
    self.peer_stats()

    num_active = len(self.connected_peers) + len(self.connecting_peers)

    if num_active != self.max_peers and not self.unconnected_peers:
        logging.info('No peers to choose from. Scraping trackers..')
        yield self.scrape_trackers()

def scrape_trackers(self):
    result = Future()

    for tracker in self.torrent.trackers:
        logging.info('Announcing to tracker %s', tracker.url)

        future = tracker.announce(self.peer_id, self.port, event='started', num_wanted=50)
        future.add_done_callback(lambda future: self.tracker_done(future, result))

    return result

@coroutine
def announce(self, peer_id, port, event=None, num_wanted=None, compact=True, no_peer_id=None):
    params = {
        'info_hash': self.torrent.info_hash(),
        'peer_id': peer_id,
        'port': port,
        'uploaded': self.torrent.uploaded,
        'downloaded': self.torrent.downloaded,
        'left': self.torrent.remaining,
        'compact': int(compact)
    }

    ...
    tracker_url = url_concat(self.url, params)

    response = yield self.client.fetch(tracker_url)

class AsyncHTTPClient(Configurable):
    def fetch(self, request, callback=None, raise_error=True, **kwargs):
        ...
        def handle_response(response):
            if raise_error and response.error:
                future.set_exception(response.error)
            else:
                future.set_result(response)
        self.fetch_impl(request, handle_response)
        return future

class SimpleAsyncHTTPClient(AsyncHTTPClient):
    ...
    def initialize(self, io_loop, max_clients=10,
                    hostname_mapping=None, max_buffer_size=104857600,
                    resolver=None, defaults=None, max_header_size=None,
                    max_body_size=None):
        ...
        self.tcp_client = TCPClient(resolver=self.resolver, io_loop=io_loop)
 
    ...
    def fetch_impl(self, request, callback):
        ...
        self._process_queue()
    ...
    def _process_queue(self):
        with stack_context.NullContext():
            while self.queue and len(self.active) < self.max_clients:
                ...
                self._handle_request(request, release_callback, callback)

    def _connection_class(self):
        return _HTTPConnection

    def _handle_request(self, request, release_callback, final_callback):
        self._connection_class()(
            self.io_loop, self, request, release_callback,
            final_callback, self.max_buffer_size, self.tcp_client,
            self.max_header_size, self.max_body_size)

class _HTTPConnection(httputil.HTTPMessageDelegate):

    def __init__(self, io_loop, client, request, release_callback,
                 final_callback, max_buffer_size, tcp_client,
                 max_header_size, max_body_size):
        ...
        self.tcp_client = tcp_client
        ...
        with stack_context.ExceptionStackContext(self._handle_exception):
            ...
            self.tcp_client.connect(host, port, af=af,
                                    ssl_options=ssl_options,
                                    max_buffer_size=self.max_buffer_size,
                                    callback=self._on_connect)
                                 
class TCPClient(object):
    ...
    @gen.coroutine
    def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None,
                max_buffer_size=None):
        """Connect to the given host and port.

        Asynchronously returns an `.IOStream` (or `.SSLIOStream` if
        ``ssl_options`` is not None).
        """
        addrinfo = yield self.resolver.resolve(host, port, af)
        connector = _Connector(
            addrinfo, self.io_loop,
            functools.partial(self._create_stream, max_buffer_size))
        af, addr, stream = yield connector.start()
        # TODO: For better performance we could cache the (af, addr)
        # information here and re-use it on subsequent connections to
        # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2)
        if ssl_options is not None:
            stream = yield stream.start_tls(False, ssl_options=ssl_options,
                                            server_hostname=host)
        raise gen.Return(stream)







Reference

  1. 쿠...sal: [컴][파이썬] python 의 Future 가 동작하는 방법
  2. The BitTorrent Protocol Specification, www.bittorrent.org/beps/bep_0003.html
  3. BitTorrent 프로토콜의 동작원리 | NETMANIAS
  4. BitTorrent 공유 과정 실전 분석 | AhnLab
  5. BitTorrentSpecification - Theory.org Wiki
  6. JosephSalisbury/python-bittorrent: A simple, clean, and efficient BitTorrent library, written entirely in Python.
  7. jefflovejapan/drench: A simple BitTorrent client in Python




댓글 없음:

댓글 쓰기