정리안된 자료다. 일단 버리는 것보다 살려두면 혹시나 필요할 일이 있을 듯 하여일단 적어놓는다. 정리는 추후에...^^;;;
Building a BitTorrent client from the ground up in Go | Jesse Li : 이 자료를 보면 좋을 듯 하다.
Bit torrent
bencoding : bencoding - BitTorrentSpecification - Theory.org Wiki
go bencoding library : marksamman/bencode: Bencode implementation in Go
pure python code : Blender3D/torrent: Pure-Python torrent client.
pip install tornado
torrent/file
.torrent 파일을 parsing 해서 meta info 를 얻는다.
meta info 의 announce-list 가 최초 trackers 가 된다.
이 announce-list 의 url 을 보고 http/udp 인지를 판단한다.
아래의 정보를 tacker 에게 보낸다.
{'compact': 1, 'downloaded': 1000000, 'info_hash': '\xd5\xde\x04}\xcc\x...`\x17\xc6', 'left': 10000000, 'peer_id': "-PT0000-Q\x04\x11\x...2\xf9\xf8", 'port': 6881, 'uploaded': 1000000}
test torrent file
- https://btguard.com/test.torrent
- http://torrenteditor.com/edit.php?url=http%3A%2F%2Fbtguard.com%2Ftest.torrent
Bit torrent specification
비트토렌트에 관련한 스펙은 ref. 2, ref. 5 에 나와 있다. 한번 읽어보자.main()
torrent = Torrent(options.torrent) server = Server( torrent=torrent, download_path=options.path, storage_class=DiskStorage, peer_id=peer_id(), max_peers=options.max_peers ) server.listen(options.port) server.start() ... IOLoop.instance().start()
Server 의 _init__ 에서 .torrent 의 info 정보에 'length' 가 있는데 이 정보를 보고 필요한 disk 용량을 잡아놓는다.(storage_class.from_torrent() )
info['length'] 또는 info['files']['length'] 에 size 정보가 있다.
그리고 info['piece length'] 가 block size 로 사용된다.
이 piece 에 대한 hash 도 .torrent 에 있는데 이녀석도 가지고 있는다.
이 hash 는 나중에 bitfield 를 보내줄때 내가 어떤 block 을 가지고 있는지 확인한 후에 알려줘야 하는데, 이것을 위해서 요청한 index 의 blcok hash 값과 자신이 가지고 있는 block 의 hash 값을 만들어서 비교한다.
bitfield 도 내가 가진 block 들을 전부보면서 block hash 값을 가지고 있는지를 보고 가지고 있다면 그 block 이 존재하는 것으로 보고 1 을 set 한다.
이 block 하나가 index 하나가 된다. 즉 index 하나는 한개의 block 을 가리킨다.
일단 file 을 만든적이 없으면 file 을 만들어서 사이즈만큼 00 을 채운다. (utils.fill) 그리고 이 녀석을 다시 열어서 이 handle 을 가지고 있는다.
해당 peer 에서 have message 를 받으면, peer 마다 가지고 있는 array 에 have 에서 알려준 index 에 대한 표시를 해둔다. (Client.got_have)
bitfield message
처음 peer 와 접속하게 되면,server.storage.to_bitfield 를 통해 현재가지고 있는 piece 들을 bitfield 로 만들어서 던져준다.
piece message
받는 경우
내가 piece message 를 받아서 그 piece message 에 있는 index, begin을 보고 file 의 offset 을 찾은후에- offset = self.block_size * index + begin
거기다가 piece message 에 있는 block 을 write 한다.
write 가 끝나고 piece 가 전부 다 write 가 됐는지 확인하는 방법으로 hash 를 비교한다.
보내는 경우
상대가 request message 를 보냈고, 해당 piece 가 있다면, 그 piece 를 실어서 이쪽에서는 piece message 를 보내주게 된다.have message
piece message 를 받고 block 을 전부 write 했다면, 해당 index 에 대한 have message 를 server module 을 통해 다른 peer 에게 날리게 된다.(Client.got_piece)reqeust message
이렇게 message 들을 주고받다가,missing_piece / total_piece < 0.05 (endgame mode) 라면
모든 piece를 받았는지 확인해보고, 다 받았으면, IOLoop 을 stop 한다.(client.message_loop)
다 받지 않았다면, missing_piece 부분을 받기위해 Request message 를 만들어 보낸다.
이밖에도 piece message 를 받고나서,
또는 unchoke message 를 받고나서,
request message 를 보낸다.
request message 는 원하는 piece에 대한 index 를 보낸다. 물론 그 block 에 대한 offset, size 도 함께 보낸다.
받은 경우
request message 를 받으면, 해당하는 piece 가 있는지 확인하고 있으면 piece message 를 만들어서 보내게 된다.Torrent class 를 new 하면서 tracker 들을 new 한다.
tracker 는 HttpTracker 를 new 하고, HttpTracker 는 AsyncHTTPClient 를 new 한다.
AsyncHTTPClient 는 내부에서 TCPClient 를 new 한다.
torrent = Torrent(options.torrent) class Torrent(object): def __init__(self, handle=None): self.trackers = self._trackers() ... def _trackers(self): trackers = self.meta.get('announce-list', [[self.meta['announce']]]) result = [] for tier, urls in enumerate(trackers): for url in urls: tracker = Tracker(url, torrent=self, tier=tier) result.append(tracker) return result def Tracker(url, torrent, tier=0): o = urlsplit(url) if o.scheme == 'http': return HTTPTracker(url, torrent, tier) ... class HTTPTracker(object): def __init__(self, url, torrent, tier=0): ... self.client = AsyncHTTPClient() class SimpleAsyncHTTPClient(AsyncHTTPClient): ... def initialize(self, io_loop, max_clients=10, hostname_mapping=None, max_buffer_size=104857600, resolver=None, defaults=None, max_header_size=None, max_body_size=None): ... self.tcp_client = TCPClient(resolver=self.resolver, io_loop=io_loop)
Server.start
TCPServer.start 를 한다.그리고 connect_to_peers() 한다.
connect_to_peers()
- 여기서 scripe_trackers() 를 하는데
- scripe_trackers
- tracker 에 대한 정보를 얻는다.
- future = tracker.announce(self.peer_id, self.port, event='started', num_wanted=50)
- 에서 tracker url 을 fetch 한다. 그러고 나서 response 를 얻는다.
- 여기서 tornado.AsychHTTPClient를 사용하게 된다.
- response 가 온 후에 할일을 정해준다.
- future.add_done_callback(lambda future: self.tracker_done(future, result))
- tracker_done
- response 가 제대로 오지 않으면 log 를 남기고, 제대로 왔으면, 결과를 가지고 peer 정보를 update 한다.
- Server.connect(self.unconnected_peers.pop())
- 접속되지 않은 peers 에게 접속한다.
Server.connect
@coroutine @gen_debuggable def connect(self, peer): logging.info('Connecting to %s', peer) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) stream = IOStream(sock) yield Task(stream.connect, (peer.address, peer.port)) self.handle_stream(stream, (peer.address, peer.port), peer)
여기서 이제 peer 에 접속해서 data 를 가져오게 된다.
socket 을 열고 peer.address 에 connect 한다.
tornado 의 IOStream 을 이용한다.
iostream 을 peer.address 에 대해 connect 한다.
그리고 연결이 되면, Server.handle_stream 을 호출한다.
Server.handle_stream
여기서 Client 를 new 한다. Client 는 new 하면, 주기적으로 keepalive message 를 IOStream 을 통해 보내게 된다.client 를 new 하고 나서 stream(IOStream) 이 close 될 때 쓰일 callback 함수도 등록한다.
그리고 이 Client instance를 Server.connecting_peers 에 add 한다.
그리고 client.handshake 를 한다. 이 handshake 가 잘 되면, Server.connected_peers 에 client 를 등록해 놓는다.
Client.handshake
protocol + info_hash + peer_id 를 IOStream 을 이용해서 보낸다. 그러면 peer 에서도 같은 형식의 message 를 보낸다. 이것이 완료되면, client 는 message_loop 을 시작한다.message_loop 에서는 peer message 들을 처리한다.(참고 : ref. 2 > peer message)
\x 는 16진수를 나타낸다.
message = chr(len(self.protocol)) message += self.protocol message += '\0\0\0\0\0\0\0\0' message += self.server.torrent.info_hash() message += self.server.peer_id "\x13BitTorrent protocol\x00\x00\x00\x00\x00\x00\x00\x00?\x19\xb1I\xf5:P\xe1O\xc0\xb7\x99&\xa3\x91\x89n\xab\xabo-PT0000-\x7f\xfc1\xf3\xe9\xdc\x802\x1bE\xbd\xa3"
응답 메시지
server start 를 하면 announce 를 하고, 이 때 future 를 넘긴다. 이 future 는 다른 작업을 하는 녀석에게도 전달되고, 그 다른 작업을 하는 녀석이 이 future 에 결과를 반영할 것이다.
future = tracker.announce(self.peer_id, self.port, event='started', num_wanted=50)
announce 에서 yield self.client.fetch(tracker_url) 한다. 이 fetch 안에서 future 를 만드는데, 자기도 reference 가지고 handle_response 라는 callback 에서 사용한다.(request 에 대한 결과를 여기서 set_result() 한다.)
즉, client.fetch() 가 끝나고 나서 control 을 넘긴다.
그러면, announce 는 @coroutine 이여서,
yield 를 해서 control 을 넘기면, Runner() 를 new하게 된다.(_make_coroutine_wrapper.wrapper)
이 때 Runner 를 run() 할지 안할지를 __init__ 에서 판단한다.(handle_yield)
현재 넘어온 yielded (self.future)가 아직 done() 이 아니면, 나중에 run() 을 하고.
self.io_loop.add_future(self.future, lambda f: self.run())
그렇지 않으면 바로 run() 을 하게 된다.
future = tracker.announce(self.peer_id, self.port, event='started', num_wanted=50)
의 다음 line 으로 넘어간다
future.add_done_callback(lambda future: self.tracker_done(future, result))
Stack trace
server.start() Server.connect_to_peers() # server.py Server.connect_to_peers(self) # server.py Server.scrape_trackers # server.py Server.tracker.announce # server.py AsyncHTTPClient.fetch # httpclient.py SimpleAsyncHTTPClient.fetch_impl #simple_httpclient.py SimpleAsyncHTTPClient._process_queue #simple_httpclient.py SimpleAsyncHTTPClient._handle_request #simple_httpclient.py _HTTPConnection.__init__ #simple_httpclient.py wrapper #gen.py TCPClient.connect # tcpclient.py Runner(result, future, yielded) # gen.py server = Server( torrent=torrent, download_path=options.path, storage_class=DiskStorage, peer_id=peer_id(), max_peers=options.max_peers ) server.listen(options.port) server.start() def start(self, num_processes=1): TCPServer.start(self, num_processes) self.connect_to_peers() def connect_to_peers(self): self.peer_stats() num_active = len(self.connected_peers) + len(self.connecting_peers) if num_active != self.max_peers and not self.unconnected_peers: logging.info('No peers to choose from. Scraping trackers..') yield self.scrape_trackers() def scrape_trackers(self): result = Future() for tracker in self.torrent.trackers: logging.info('Announcing to tracker %s', tracker.url) future = tracker.announce(self.peer_id, self.port, event='started', num_wanted=50) future.add_done_callback(lambda future: self.tracker_done(future, result)) return result @coroutine def announce(self, peer_id, port, event=None, num_wanted=None, compact=True, no_peer_id=None): params = { 'info_hash': self.torrent.info_hash(), 'peer_id': peer_id, 'port': port, 'uploaded': self.torrent.uploaded, 'downloaded': self.torrent.downloaded, 'left': self.torrent.remaining, 'compact': int(compact) } ... tracker_url = url_concat(self.url, params) response = yield self.client.fetch(tracker_url) class AsyncHTTPClient(Configurable): def fetch(self, request, callback=None, raise_error=True, **kwargs): ... def handle_response(response): if raise_error and response.error: future.set_exception(response.error) else: future.set_result(response) self.fetch_impl(request, handle_response) return future class SimpleAsyncHTTPClient(AsyncHTTPClient): ... def fetch_impl(self, request, callback): ... self._process_queue() ... def _process_queue(self): with stack_context.NullContext(): while self.queue and len(self.active) < self.max_clients: ... self._handle_request(request, release_callback, callback) def _connection_class(self): return _HTTPConnection def _handle_request(self, request, release_callback, final_callback): self._connection_class()( self.io_loop, self, request, release_callback, final_callback, self.max_buffer_size, self.tcp_client, self.max_header_size, self.max_body_size) class _HTTPConnection(httputil.HTTPMessageDelegate): def __init__(self, io_loop, client, request, release_callback, final_callback, max_buffer_size, tcp_client, max_header_size, max_body_size): ... self.tcp_client = tcp_client ... with stack_context.ExceptionStackContext(self._handle_exception): ... self.tcp_client.connect(host, port, af=af, ssl_options=ssl_options, max_buffer_size=self.max_buffer_size, callback=self._on_connect) class TCPClient(object): ... @gen.coroutine def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None, max_buffer_size=None): """Connect to the given host and port. Asynchronously returns an `.IOStream` (or `.SSLIOStream` if ``ssl_options`` is not None). """ addrinfo = yield self.resolver.resolve(host, port, af) connector = _Connector( addrinfo, self.io_loop, functools.partial(self._create_stream, max_buffer_size)) af, addr, stream = yield connector.start() # TODO: For better performance we could cache the (af, addr) # information here and re-use it on subsequent connections to # the same host. (http://tools.ietf.org/html/rfc6555#section-4.2) if ssl_options is not None: stream = yield stream.start_tls(False, ssl_options=ssl_options, server_hostname=host) raise gen.Return(stream)
torrent = Torrent(options.torrent)
class Torrent(object):
def __init__(self, handle=None):
self.trackers = self._trackers()
...
def _trackers(self):
trackers = self.meta.get('announce-list', [[self.meta['announce']]])
result = []
for tier, urls in enumerate(trackers):
for url in urls:
tracker = Tracker(url, torrent=self, tier=tier)
result.append(tracker)
return result
def Tracker(url, torrent, tier=0):
o = urlsplit(url)
if o.scheme == 'http':
return HTTPTracker(url, torrent, tier)
...
class HTTPTracker(object):
def __init__(self, url, torrent, tier=0):
...
self.client = AsyncHTTPClient()
class SimpleAsyncHTTPClient(AsyncHTTPClient):
...
def initialize(self, io_loop, max_clients=10,
hostname_mapping=None, max_buffer_size=104857600,
resolver=None, defaults=None, max_header_size=None,
max_body_size=None):
...
self.tcp_client = TCPClient(resolver=self.resolver, io_loop=io_loop)
server = Server(
torrent=torrent,
download_path=options.path,
storage_class=DiskStorage,
peer_id=peer_id(),
max_peers=options.max_peers
)
server.listen(options.port)
server.start()
def start(self, num_processes=1):
TCPServer.start(self, num_processes)
self.connect_to_peers()
def connect_to_peers(self):
self.peer_stats()
num_active = len(self.connected_peers) + len(self.connecting_peers)
if num_active != self.max_peers and not self.unconnected_peers:
logging.info('No peers to choose from. Scraping trackers..')
yield self.scrape_trackers()
def scrape_trackers(self):
result = Future()
for tracker in self.torrent.trackers:
logging.info('Announcing to tracker %s', tracker.url)
future = tracker.announce(self.peer_id, self.port, event='started', num_wanted=50)
future.add_done_callback(lambda future: self.tracker_done(future, result))
return result
@coroutine
def announce(self, peer_id, port, event=None, num_wanted=None, compact=True, no_peer_id=None):
params = {
'info_hash': self.torrent.info_hash(),
'peer_id': peer_id,
'port': port,
'uploaded': self.torrent.uploaded,
'downloaded': self.torrent.downloaded,
'left': self.torrent.remaining,
'compact': int(compact)
}
...
tracker_url = url_concat(self.url, params)
response = yield self.client.fetch(tracker_url)
class AsyncHTTPClient(Configurable):
def fetch(self, request, callback=None, raise_error=True, **kwargs):
...
def handle_response(response):
if raise_error and response.error:
future.set_exception(response.error)
else:
future.set_result(response)
self.fetch_impl(request, handle_response)
return future
class SimpleAsyncHTTPClient(AsyncHTTPClient):
...
def initialize(self, io_loop, max_clients=10,
hostname_mapping=None, max_buffer_size=104857600,
resolver=None, defaults=None, max_header_size=None,
max_body_size=None):
...
self.tcp_client = TCPClient(resolver=self.resolver, io_loop=io_loop)
...
def fetch_impl(self, request, callback):
...
self._process_queue()
...
def _process_queue(self):
with stack_context.NullContext():
while self.queue and len(self.active) < self.max_clients:
...
self._handle_request(request, release_callback, callback)
def _connection_class(self):
return _HTTPConnection
def _handle_request(self, request, release_callback, final_callback):
self._connection_class()(
self.io_loop, self, request, release_callback,
final_callback, self.max_buffer_size, self.tcp_client,
self.max_header_size, self.max_body_size)
class _HTTPConnection(httputil.HTTPMessageDelegate):
def __init__(self, io_loop, client, request, release_callback,
final_callback, max_buffer_size, tcp_client,
max_header_size, max_body_size):
...
self.tcp_client = tcp_client
...
with stack_context.ExceptionStackContext(self._handle_exception):
...
self.tcp_client.connect(host, port, af=af,
ssl_options=ssl_options,
max_buffer_size=self.max_buffer_size,
callback=self._on_connect)
class TCPClient(object):
...
@gen.coroutine
def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None,
max_buffer_size=None):
"""Connect to the given host and port.
Asynchronously returns an `.IOStream` (or `.SSLIOStream` if
``ssl_options`` is not None).
"""
addrinfo = yield self.resolver.resolve(host, port, af)
connector = _Connector(
addrinfo, self.io_loop,
functools.partial(self._create_stream, max_buffer_size))
af, addr, stream = yield connector.start()
# TODO: For better performance we could cache the (af, addr)
# information here and re-use it on subsequent connections to
# the same host. (http://tools.ietf.org/html/rfc6555#section-4.2)
if ssl_options is not None:
stream = yield stream.start_tls(False, ssl_options=ssl_options,
server_hostname=host)
raise gen.Return(stream)
Reference
- 쿠...sal: [컴][파이썬] python 의 Future 가 동작하는 방법
- The BitTorrent Protocol Specification, www.bittorrent.org/beps/bep_0003.html
- BitTorrent 프로토콜의 동작원리 | NETMANIAS
- BitTorrent 공유 과정 실전 분석 | AhnLab
- BitTorrentSpecification - Theory.org Wiki
- JosephSalisbury/python-bittorrent: A simple, clean, and efficient BitTorrent library, written entirely in Python.
- jefflovejapan/drench: A simple BitTorrent client in Python
- Building a BitTorrent client from the ground up in Go | Jesse Li, 2020-01-04
댓글 없음:
댓글 쓰기