9 Commits

11 changed files with 506 additions and 78 deletions

View File

@@ -1,8 +1,27 @@
* A torrent should have multiple workers (seed vs download workers?).
* These workers work with connections, if one break they find another one.
* The connection broker manages connections, choking/unchoking.
* Connection are linked to a peer & their bitfield.
* Connections need to be listened to at all times for choke/unchoke/have/timeout/interested
* A worker takes over a connection to send/receive.
* The connection broker opens news connections if none are available for
the workers
The `Session` is the main object to download torrents. It manages multiple
threads. When a torrent is added to it, the tracker is contacted to get a
list of peers. Then, a thread is spawned for each peer, and a connection to
the peer is opened.
The connections read packets and send them to the network thread via a channel.
The network thread manages the choking and unchoking of connections and the
scheduling of fragments. Fragments are 16KiB blocks that form a
piece. For instance if a piece is of 512KiB, there are 32 fragments for that
piece. When a connection is unchoked, the network thread will schedule
fragments on this connection. When the fragment is received back, the
network thread assembles the piece from the various fragments. When a piece
is complete, it writes it to disk.
The scheduling algorithm is very simple at the moment. It tries to minimize
the number of pieces that are being downloaded. Every fragment is kept
track of. It has a status, Available (no peer connection is downloading
this fragment), Taken(a peer is downloading this fragment) or Complete (this
fragment has been received). The Taken status also has a timestamp. When
looking for a fragment to schedule, the algorithm looks for fragments that
are available or that have been taken, but ran out of time (5s). It also
takes into account the peer's bitfield to find a piece the peer has. If no
fragment matches these criterias, a new piece is fragmented.
_Note that there could be an explosion in the number of pieces that are being
downloaded if the peers have very incomplete bitfields. In the future the
algorithm will limit the number of pieces that are being downloaded._

View File

@@ -5,5 +5,7 @@
* [ ] udp [spec](http://www.bittorrent.org/beps/bep_0015.html)
* [x] http [spec](http://www.bittorrent.org/beps/bep_0003.html#trackers)
* Metainfo
* [ ] torrent files [spec](https://wiki.theory.org/BitTorrentSpecification#Metainfo_File_Structure)
* [x] torrent files [spec](https://wiki.theory.org/BitTorrentSpecification#Metainfo_File_Structure)
* [ ] magnet links
* [x] Peer wire protocol
* [ ] uTP

View File

@@ -107,6 +107,15 @@ pub struct MetainfoFile {
pub path: PathBuf,
}
impl MetainfoFile {
pub fn new(length: u64, path: PathBuf) -> Self {
MetainfoFile {
length: length,
path: path,
}
}
}
fn sha1(bytes: &[u8]) -> Hash {
let mut hasher = Sha1::new();
hasher.update(bytes);

248
src/net/_session/disk.rs Normal file
View File

@@ -0,0 +1,248 @@
use std::cmp;
use std::fs::File;
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::sync::Arc;
use net::_session::map::Map;
use metainfo::{Hash, Metainfo};
pub struct DiskManager<F> where F: Read + Seek + Write {
torrents: Map<Hash, Torrent<F>>,
}
impl<F> DiskManager<F> where F: Read + Seek + Write {
pub fn write_piece(&mut self, info_hash: &Hash, index: u32, mut piece: Vec<u8>) -> io::Result<()> {
let torrent = &mut self.torrents[info_hash];
torrent.iter_files_for_piece(index, &mut piece, |file, seek, buf| {
file.seek(SeekFrom::Start(seek))?;
file.write_all(&buf)?;
Ok(())
})?;
Ok(())
}
pub fn read_piece(&mut self, info_hash: &Hash, index: u32) -> io::Result<Vec<u8>> {
let torrent = &mut self.torrents[info_hash];
let len = torrent.metainfo.piece_length as usize;
let mut piece = Vec::with_capacity(len);
unsafe {
piece.set_len(len);
}
torrent.iter_files_for_piece(index, &mut piece, |file, seek, buf| {
file.seek(SeekFrom::Start(seek))?;
file.read_exact(buf)?;
Ok(())
})?;
Ok(piece)
}
}
impl DiskManager<File> {
pub fn add_torrent(&mut self, metainfo: Arc<Metainfo>) -> io::Result<()> {
let mut files = Vec::new();
for fileinfo in metainfo.files.iter() {
let file = File::create(&fileinfo.path)?;
file.set_len(fileinfo.length)?;
files.push(file);
}
self.torrents.insert(metainfo.info_hash, Torrent {
files: files,
metainfo: metainfo,
});
Ok(())
}
}
pub struct Torrent<F> where F: Read + Seek + Write {
metainfo: Arc<Metainfo>,
files: Vec<F>,
}
impl<F> Torrent<F> where F: Read + Seek + Write {
// This function iterates over the files that are spanned by a piece.
fn iter_files_for_piece<C>(&mut self, index: u32, piece: &mut [u8], mut callback: C) -> io::Result<()>
where C: FnMut(&mut F, u64, &mut [u8]) -> io::Result<()>
{
let buffer_len = piece.len();
let mut buffer_pos: usize = 0;
// All the files in a torrent can be seen as a single contiguous file. These
// values are relative to this contiguous file.
let mut abs_pos = index as u64 * self.metainfo.piece_length as u64;
let mut abs_start = 0;
let mut abs_end = 0;
for (idx, file) in self.metainfo.files.iter().enumerate() {
abs_end += file.length;
if abs_start <= abs_pos && abs_pos < abs_end {
// The amount of bytes to write is the remaining number of bytes in the buffer,
// or the size of the file, whichever is smaller.
let remaining = (buffer_len - buffer_pos) as u64;
let write_len = cmp::min(remaining, abs_end - abs_pos) as usize;
// Seek in the file. The position given is relative to the start of the file.
callback(&mut self.files[idx], abs_pos - abs_start, &mut piece[buffer_pos..buffer_pos + write_len])?;
abs_pos += write_len as u64;
buffer_pos += write_len;
// If the buffer position if equal to the length, we're done writing.
if buffer_pos == buffer_len {
break
}
}
// The start of the next file is the end of this one.
abs_start = abs_end;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::sync::Arc;
use net::_session::map::Map;
use metainfo::{Hash, Metainfo, MetainfoFile};
struct MockFile {
seek: SeekFrom,
read_len: usize,
write_len: usize,
}
impl MockFile {
pub fn new() -> Self {
MockFile {
seek: SeekFrom::Start(0),
read_len: 0,
write_len: 0,
}
}
}
impl Read for MockFile {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.read_len = buf.len();
Ok(buf.len())
}
}
impl Seek for MockFile {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.seek = pos;
Ok(0)
}
}
impl Write for MockFile {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.write_len = buf.len();
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
fn build_dummy() -> (DiskManager<MockFile>, Hash) {
let info_hash = Hash::new([1u8; 20]);
let metainfo = Metainfo {
announce: "".into(),
announce_list: vec![],
files: vec![MetainfoFile::new(12, PathBuf::new()),
MetainfoFile::new(6, PathBuf::new()),
MetainfoFile::new(2, PathBuf::new())],
info_hash: info_hash,
piece_length: 10,
num_pieces: 2,
pieces: vec![],
length: 20,
};
let mut torrents = Map::new();
torrents.insert(info_hash, Torrent {
metainfo: Arc::new(metainfo),
files: vec![MockFile::new(), MockFile::new(), MockFile::new()],
});
let dm = DiskManager {
torrents: torrents,
};
(dm, info_hash)
}
#[test]
fn test_read_piece() {
let (mut dm, info_hash) = build_dummy();
{
// Read a piece in a single file that is larger than the piece.
dm.read_piece(&info_hash, 0).unwrap();
let torrent = &dm.torrents[&info_hash];
assert_eq!(torrent.files[0].seek, SeekFrom::Start(0));
assert_eq!(torrent.files[0].read_len, 10);
}
{
// Read a piece over multiple files that are smaller that a piece.
dm.read_piece(&info_hash, 1).unwrap();
let torrent = &dm.torrents[&info_hash];
assert_eq!(torrent.files[0].seek, SeekFrom::Start(10));
assert_eq!(torrent.files[0].read_len, 2);
assert_eq!(torrent.files[1].seek, SeekFrom::Start(0));
assert_eq!(torrent.files[1].read_len, 6);
assert_eq!(torrent.files[2].seek, SeekFrom::Start(0));
assert_eq!(torrent.files[2].read_len, 2);
}
}
#[test]
fn test_write_piece() {
let (mut dm, info_hash) = build_dummy();
{
// Write a piece in a single file that is larger than the piece.
dm.write_piece(&info_hash, 0, vec![2u8; 10]).unwrap();
let torrent = &dm.torrents[&info_hash];
assert_eq!(torrent.files[0].seek, SeekFrom::Start(0));
assert_eq!(torrent.files[0].write_len, 10);
}
{
// Write a piece over multiple files that are smaller that a piece.
dm.write_piece(&info_hash, 1, vec![2u8; 10]).unwrap();
let torrent = &dm.torrents[&info_hash];
assert_eq!(torrent.files[0].seek, SeekFrom::Start(10));
assert_eq!(torrent.files[0].write_len, 2);
assert_eq!(torrent.files[1].seek, SeekFrom::Start(0));
assert_eq!(torrent.files[1].write_len, 6);
assert_eq!(torrent.files[2].seek, SeekFrom::Start(0));
assert_eq!(torrent.files[2].write_len, 2);
}
}
}

34
src/net/_session/map.rs Normal file
View File

@@ -0,0 +1,34 @@
use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::Hash;
use std::ops::{Deref, DerefMut, Index, IndexMut};
pub struct Map<K, V>(HashMap<K, V>);
impl<K, V> Deref for Map<K, V> {
type Target = HashMap<K, V>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<K, V> DerefMut for Map<K, V> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<I, K, V> Index<I> for Map<K, V> where I: Borrow<K>, K: Eq + Hash {
type Output = V;
fn index(&self, index: I) -> &V {
self.0.get(index.borrow()).expect("index not found")
}
}
impl<I, K, V> IndexMut<I> for Map<K, V> where I: Borrow<K>, K: Eq + Hash {
fn index_mut(&mut self, index: I) -> &mut V {
self.0.get_mut(index.borrow()).expect("index not found")
}
}

2
src/net/_session/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod disk;
pub mod map;

View File

@@ -3,13 +3,16 @@ use std::fmt;
pub struct BitField {
bits: Vec<u8>,
len: u32,
num_set: u32,
}
impl BitField {
pub fn new(bits: Vec<u8>, len: u32) -> BitField {
let num_set = bits.iter().map(|b| b.count_ones()).sum::<u32>();
BitField {
bits: bits,
len: len,
num_set: num_set,
}
}
@@ -17,38 +20,58 @@ impl BitField {
BitField {
bits: vec![0u8; f64::ceil(len as f64 / 8.) as usize],
len: len,
num_set: 0,
}
}
#[inline]
fn calc_positions(&self, index: u32) -> (u32, u32) {
(index / 8, index % 8)
}
#[inline]
pub fn len(&self) -> u32 {
self.len
}
#[inline]
pub fn num_set(&self) -> u32 {
self.num_set
}
#[inline]
pub fn is_set(&self, index: u32) -> bool {
let idx = index / 8;
let offset = index % 8;
let (idx, offset) = self.calc_positions(index);
(self.bits[idx as usize] & (1 << 7 - offset)) != 0
}
#[inline]
pub fn set(&mut self, index: u32) {
let idx = index / 8;
let offset = index % 8;
self.bits[idx as usize] |= 1 << 7 - offset;
if !self.is_set(index) {
let (idx, offset) = self.calc_positions(index);
self.bits[idx as usize] |= 1 << 7 - offset;
self.num_set += 1;
}
}
#[inline]
pub fn unset(&mut self, index: u32) {
let idx = index / 8;
let offset = index % 8;
self.bits[idx as usize] ^= 1 << 7 - offset;
if self.is_set(index) {
let (idx, offset) = self.calc_positions(index);
self.bits[idx as usize] ^= 1 << 7 - offset;
self.num_set -= 1;
}
}
#[inline]
pub fn as_bytes(&self) -> &[u8] {
&self.bits
}
#[inline]
pub fn set_ratio(&self) -> f64 {
self.num_set as f64 / self.len as f64
}
}
impl fmt::Debug for BitField {
@@ -80,6 +103,8 @@ fn test_bitfield_is_set() {
assert!(bf.is_set(0xd));
assert!(bf.is_set(0xe));
assert!(bf.is_set(0xf));
assert_eq!(bf.num_set(), 12);
}
#[test]
@@ -93,6 +118,8 @@ fn test_bitfield_set() {
assert_eq!(bf.bits[1], 128);
bf.set(15);
assert_eq!(bf.bits[1], 129);
assert_eq!(bf.num_set(), 4);
}
#[test]
@@ -106,4 +133,13 @@ fn test_bitfield_unset() {
assert_eq!(bf.bits[1], 255-128);
bf.unset(15);
assert_eq!(bf.bits[1], 255-128-1);
assert_eq!(bf.num_set(), 12);
}
#[test]
fn test_bitfield_set_ratio() {
use std::f64;
let mut bf = BitField::new(vec![255, 255], 16);
assert!(f64::abs(bf.set_ratio() - 1.0) < f64::EPSILON);
}

68
src/net/buffers.rs Normal file
View File

@@ -0,0 +1,68 @@
use std::io::{self, Read};
use std::ptr;
use sha1::Sha1;
use metainfo::Hash;
/// Allocate a new buffer of the given size and read_exact into it.
pub fn read_exact<R>(mut reader: R, size: usize) -> io::Result<Vec<u8>> where R: Read {
let mut buf = Vec::with_capacity(size);
unsafe {
buf.set_len(size);
}
reader.read_exact(&mut buf)?;
Ok(buf)
}
pub struct PieceBuffer {
fragment_count: u32,
num_fragments: u32,
buffer: Vec<u8>,
}
impl PieceBuffer {
pub fn new(num_fragments: u32, piece_length: u32) -> Self {
let mut buffer = Vec::with_capacity(piece_length as usize);
unsafe {
buffer.set_len(piece_length as usize);
}
PieceBuffer {
fragment_count: 0,
num_fragments: num_fragments,
buffer: buffer,
}
}
pub fn add_fragment(&mut self, begin: u32, fragment: &[u8]) {
let begin = begin as usize;
assert!(begin + fragment.len() <= self.buffer.len());
unsafe {
ptr::copy_nonoverlapping(fragment.as_ptr(), self.buffer[begin..].as_mut_ptr(), fragment.len());
}
self.fragment_count += 1;
}
#[inline]
pub fn complete(&self) -> bool {
self.fragment_count == self.num_fragments
}
pub fn matches_hash(&self, info_hash: &Hash) -> bool {
if !self.complete() {
panic!("trying to hash piece buffer before it's complete");
} else {
let mut m = Sha1::new();
m.update(&self.buffer);
m.digest().bytes() == &info_hash[..]
}
}
pub fn get(self) -> Vec<u8> {
if !self.complete() {
panic!("trying to get piece buffer before it's complete");
} else {
self.buffer
}
}
}

View File

@@ -1,3 +1,5 @@
pub mod bitfield;
mod buffers;
pub mod peer;
pub mod session;
pub mod _session;

View File

@@ -4,6 +4,7 @@ use std::net::{Shutdown, TcpStream};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use metainfo::Hash;
use net::buffers;
use tracker::Peer;
#[derive(Debug)]
@@ -68,11 +69,8 @@ impl PeerConnection {
},
5 => {
let size = len as usize - 1;
let mut bitfield = Vec::with_capacity(size);
unsafe {
bitfield.set_len(size);
}
self.sock.read_exact(&mut bitfield)?;
let bitfield = buffers::read_exact(&mut self.sock, size)?;
Packet::Bitfield {
bitfield: bitfield,
}
@@ -86,11 +84,7 @@ impl PeerConnection {
let size = len as usize - 9;
let index = self.sock.read_u32::<BigEndian>()?;
let begin = self.sock.read_u32::<BigEndian>()?;
let mut block = Vec::with_capacity(size);
unsafe {
block.set_len(size);
}
self.sock.read_exact(&mut block)?;
let block = buffers::read_exact(&mut self.sock, size)?;
Packet::Piece {
index: index,

View File

@@ -2,7 +2,6 @@ use std::cmp;
use std::collections::{BTreeMap, HashMap};
use std::fs::File;
use std::io::{Seek, SeekFrom, Write};
use std::net::{Shutdown, TcpStream};
use std::os::unix::io::AsRawFd;
use std::sync::Arc;
use std::sync::mpsc::{self, Receiver, Sender};
@@ -10,10 +9,10 @@ use std::thread;
use std::time::{Duration, Instant};
use libc;
use sha1::Sha1;
use metainfo::{Hash, Metainfo};
use net::bitfield::BitField;
use net::buffers::PieceBuffer;
use net::peer::{Packet, PeerConnection};
use tracker::http;
@@ -50,6 +49,8 @@ struct InnerSession {
port: u16,
}
// == Session ==
pub struct Session {
// inner: Arc<InnerSession>,
sender: Sender<Signal>,
@@ -96,6 +97,8 @@ impl Session {
}
}
// == SessionPeer ==
struct SessionPeer {
sock: PeerConnection,
bitfield: BitField,
@@ -118,6 +121,8 @@ impl SessionPeer {
}
}
// == FragmentStatus ==
#[derive(Copy, Clone, Debug)]
enum FragmentStatus {
Available,
@@ -125,19 +130,23 @@ enum FragmentStatus {
Taken(Instant),
}
// == SessionFragment ==
pub struct SessionFragment {
begin: u32,
length: u32,
status: FragmentStatus,
}
// == SessionPiece ==
struct SessionPiece {
fragments: BTreeMap<u32, SessionFragment>,
buffer: Vec<u8>,
num_fragments: u32,
total_fragments: u32,
buffer: PieceBuffer,
}
// == SessionTorrent ==
struct SessionTorrent {
metainfo: Metainfo,
own_bitfield: BitField,
@@ -198,9 +207,9 @@ impl SessionTorrent {
for index in (0..self.own_bitfield.len()).filter(|index| !keys.contains(index)) {
if !self.own_bitfield.is_set(index) && peer.bitfield.is_set(index) {
let total_fragments = f64::ceil(self.metainfo.piece_length as f64 / FRAGMENT_SIZE as f64) as u32;
let num_fragments = f64::ceil(self.metainfo.piece_length as f64 / FRAGMENT_SIZE as f64) as u32;
let mut fragments = BTreeMap::new();
for idx in 0..total_fragments {
for idx in 0..num_fragments {
let begin = idx * FRAGMENT_SIZE;
fragments.insert(begin ,SessionFragment {
@@ -219,15 +228,9 @@ impl SessionTorrent {
length = taken.length;
}
let mut buffer = Vec::with_capacity(self.metainfo.piece_length as usize);
unsafe {
buffer.set_len(self.metainfo.piece_length as usize);
}
self.pieces.insert(index, SessionPiece {
fragments: fragments,
buffer: buffer,
num_fragments: 0,
total_fragments: total_fragments,
buffer: PieceBuffer::new(num_fragments, self.metainfo.piece_length),
});
return Some((index, begin, length));
@@ -247,7 +250,7 @@ impl SessionTorrent {
}
}
fn write_piece(&mut self, index: u32) {
fn write_piece(&mut self, index: u32, buffer: Vec<u8>) {
let mut start = 0;
let mut end = 0;
let mut pos: u64 = 0;
@@ -260,7 +263,7 @@ impl SessionTorrent {
if write < end {
let len = cmp::min(remaining, fileinfo.length as u64);
self.files[id].seek(SeekFrom::Start(write - start));
self.files[id].write_all(&self.pieces[&index].buffer[pos as usize..(pos+len) as usize]);
self.files[id].write_all(&buffer[pos as usize..(pos+len) as usize]);
write += len;
pos += len;
remaining -= len;
@@ -281,41 +284,46 @@ impl SessionTorrent {
fn piece_reply(&mut self, peer_id: &Hash, index: u32, begin: u32, block: Vec<u8>) {
let mut remove = false;
let mut reset = false;
{
if let Some(piece) = self.pieces.get_mut(&index) {
if let Some(fragment) = piece.fragments.get_mut(&begin) {
fragment.status = FragmentStatus::Complete;
piece.buffer[begin as usize..(begin as usize)+block.len()].copy_from_slice(&block);
piece.num_fragments += 1;
if piece.num_fragments == piece.total_fragments {
// TODO check hash
piece.buffer.add_fragment(begin, &block);
if piece.buffer.complete() {
println!("piece is done {}", index);
let mut m = Sha1::new();
m.update(&piece.buffer);
if m.digest().bytes() == &self.metainfo.pieces[index as usize][..] {
if piece.buffer.matches_hash(&self.metainfo.pieces[index as usize]) {
self.own_bitfield.set(index);
println!("it's a match!");
remove = true;
} else {
reset = true;
println!("no match");
}
}
} else {
println!("could not find fragment {}", begin);
}
if reset {
for fragment in piece.fragments.values_mut() {
fragment.status = FragmentStatus::Available;
}
}
} else {
println!("could not find piece {}", index);
}
}
if remove {
self.write_piece(index);
self.pieces.remove(&index);
let piece = self.pieces.remove(&index).expect("told to remove piece that doesn't exist");
self.write_piece(index, piece.buffer.get());
}
self.requeue(peer_id);
}
}
// == SessionNetworkThread ==
struct SessionNetworkThread {
session: Arc<InnerSession>,
sender: Sender<Signal>,
@@ -326,34 +334,13 @@ struct SessionNetworkThread {
}
impl SessionNetworkThread {
pub fn run(mut self, input: Receiver<Signal>) {
for signal in input.iter() {
pub fn run(mut self, signals: Receiver<Signal>) {
for signal in signals.iter() {
match signal {
Signal::AddTorrent(torrent) => self.signal_add_torrent(torrent),
Signal::RemoveTorrent(id) => {
if let Some(mut sess_torrent) = self.torrents.remove(&id) {
for peer in sess_torrent.peers.values_mut() {
peer.sock.shutdown();
}
}
}
Signal::ConnectionOpened { info_hash, peer_id, sock } => {
if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) {
let mut peer = SessionPeer::new(sock, sess_torrent.metainfo.pieces.len() as u32);
peer.sock.send_interested();
peer.am_interested = true;
sess_torrent.peers.insert(peer_id, peer);
}
}
Signal::ConnectionClosed { info_hash, peer_id } => {
if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) {
if let Some(mut peer) = sess_torrent.peers.remove(&peer_id) {
// if !peer.peer_choking && self.seeds > 1 {
// self.seeds -= 1;
// }
}
}
}
Signal::RemoveTorrent(id) => self.signal_remove_torrent(&id),
Signal::ConnectionOpened { info_hash, peer_id, sock } => self.signal_connection_opened(&info_hash, &peer_id, sock),
Signal::ConnectionClosed { info_hash, peer_id } => self.signal_connection_closed(&info_hash, &peer_id),
Signal::Packet { info_hash, peer_id, packet } => self.signal_packet(info_hash, peer_id, packet),
Signal::Stop => break,
}
@@ -405,6 +392,33 @@ impl SessionNetworkThread {
}
}
fn signal_remove_torrent(&mut self, id: &Hash) {
if let Some(mut sess_torrent) = self.torrents.remove(id) {
for peer in sess_torrent.peers.values_mut() {
peer.sock.shutdown();
}
}
}
fn signal_connection_opened(&mut self, info_hash: &Hash, peer_id: &Hash, conn: PeerConnection) {
if let Some(sess_torrent) = self.torrents.get_mut(info_hash) {
let mut peer = SessionPeer::new(conn, sess_torrent.metainfo.pieces.len() as u32);
peer.sock.send_interested();
peer.am_interested = true;
sess_torrent.peers.insert(*peer_id, peer);
}
}
fn signal_connection_closed(&mut self, info_hash: &Hash, peer_id: &Hash) {
if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) {
if let Some(mut peer) = sess_torrent.peers.remove(&peer_id) {
// if !peer.peer_choking && self.seeds > 1 {
// self.seeds -= 1;
// }
}
}
}
fn signal_packet(&mut self, info_hash: Hash, peer_id: Hash, packet: Packet) {
if let Some(torrent) = self.torrents.get_mut(&info_hash) {
match packet {