diff --git a/.gitignore b/.gitignore index eb5a316..ecd64e4 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ target +Cargo.lock +*.torrent diff --git a/Cargo.toml b/Cargo.toml index db3612b..e31d883 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" authors = ["Simon Bernier St-Pierre "] [dependencies] +byteorder = "0.5" +libc = "0.2" sha1 = "0.2" url = "1.2" diff --git a/design.md b/design.md new file mode 100644 index 0000000..af00e71 --- /dev/null +++ b/design.md @@ -0,0 +1,8 @@ +* A torrent should have multiple workers (seed vs download workers?). +* These workers work with connections, if one break they find another one. + * The connection broker manages connections, choking/unchoking. + * Connection are linked to a peer & their bitfield. + * Connections need to be listened to at all times for choke/unchoke/have/timeout/interested + * A worker takes over a connection to send/receive. + * The connection broker opens news connections if none are available for + the workers diff --git a/examples/decode-torrent.rs b/examples/decode-torrent.rs index 8969b8f..01ca503 100644 --- a/examples/decode-torrent.rs +++ b/examples/decode-torrent.rs @@ -3,12 +3,14 @@ extern crate magnolia; use std::env; use std::fs::File; use std::io::{self, Read}; +use std::thread; +use std::time::Duration; use std::str; use magnolia::bencode::*; use magnolia::metainfo::Metainfo; -use magnolia::torrent::Torrent; use magnolia::tracker::http; +use magnolia::net::session::Session; fn load_file(path: &str) -> io::Result<()> { let mut buf = Vec::new(); @@ -18,16 +20,14 @@ fn load_file(path: &str) -> io::Result<()> { let obj = decode(&buf).unwrap(); let meta = Metainfo::from_bencode(obj).unwrap(); - let t = Torrent { - metainfo: meta, - uploaded: 0, - downloaded: 0, - left: 0, - }; + println!("{}", meta.pieces.len()); - let resp = http::get_peers([1u8; 20], 55555, &t).unwrap(); + let s = Session::new(); + s.add_torrent(meta); - print!("{:?}", resp); + loop { + thread::sleep(Duration::from_secs(1)); + } Ok(()) } diff --git a/src/buffer.rs b/src/bencode/buffer.rs similarity index 100% rename from src/buffer.rs rename to src/bencode/buffer.rs diff --git a/src/bencode/decode.rs b/src/bencode/decode.rs index 928bc1f..61a7ab9 100644 --- a/src/bencode/decode.rs +++ b/src/bencode/decode.rs @@ -3,7 +3,7 @@ use std::num::ParseIntError; use std::str::{self, Utf8Error}; use bencode::{Bytes, Object}; -use buffer::Buffer; +use bencode::buffer::Buffer; #[derive(Debug)] pub struct DecodeError; diff --git a/src/bencode/mod.rs b/src/bencode/mod.rs index 4b91daf..38d527e 100644 --- a/src/bencode/mod.rs +++ b/src/bencode/mod.rs @@ -1,3 +1,4 @@ +mod buffer; mod decode; mod encode; diff --git a/src/lib.rs b/src/lib.rs index 8421c35..7c9c340 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,10 @@ +extern crate byteorder; extern crate hyper; +extern crate libc; extern crate sha1; extern crate url; pub mod bencode; -pub mod buffer; pub mod metainfo; -pub mod torrent; +pub mod net; pub mod tracker; diff --git a/src/metainfo.rs b/src/metainfo.rs index 69879ba..f48f9e9 100644 --- a/src/metainfo.rs +++ b/src/metainfo.rs @@ -1,3 +1,5 @@ +use std::fmt; +use std::ops::{Deref, DerefMut}; use std::path::PathBuf; use sha1::Sha1; @@ -18,9 +20,11 @@ pub struct Metainfo { pub announce: String, pub announce_list: Vec>, pub files: Vec, - pub info_hash: [u8; 20], - pub piece_length: u64, - pub pieces: Vec<[u8; 20]>, + pub info_hash: Hash, + pub piece_length: u32, + pub num_pieces: u32, + pub pieces: Vec, + pub length: u64, } impl Metainfo { @@ -34,6 +38,11 @@ impl Metainfo { let name = ts!(info.get_str("name")); let piece_length = ts!(info.get_int("piece length")); let pieces = ts!(info.get_bytes("pieces")); + let mut total_length = 0; + + if pieces.len() % 20 != 0 { + return None; + } let mut announce_list = vec![]; if let Some(list) = metainfo.get_list("announce-list") { @@ -57,8 +66,11 @@ impl Metainfo { path.push(ts!(component.as_str())); } + let length = ts!(file.get_int("length")) as u64; + total_length += length; + files.push(MetainfoFile { - length: ts!(file.get_int("length")) as u64, + length: length, path: path, }); } @@ -66,19 +78,25 @@ impl Metainfo { let mut path = PathBuf::new(); path.push(name); + total_length = ts!(info.get_int("length")) as u64; + files.push(MetainfoFile{ - length: ts!(info.get_int("length")) as u64, + length: total_length, path: path, }) } + let pieces = pieces.chunks(20).map(Hash::from_slice).collect::>(); + Some(Metainfo { announce: announce, announce_list: announce_list, files: files, info_hash: info_hash, - piece_length: piece_length as u64, - pieces: pieces.chunks(20).map(|c| piece_from_slice(c)).collect::>(), + piece_length: piece_length as u32, + num_pieces: pieces.len() as u32, + pieces: pieces, + length: total_length, }) } } @@ -89,18 +107,54 @@ pub struct MetainfoFile { pub path: PathBuf, } -fn sha1(bytes: &[u8]) -> [u8; 20] { +fn sha1(bytes: &[u8]) -> Hash { let mut hasher = Sha1::new(); hasher.update(bytes); - hasher.digest().bytes() + Hash::new(hasher.digest().bytes()) } -fn piece_from_slice(src: &[u8]) -> [u8; 20] { - assert_eq!(src.len(), 20); - let mut dst = [0u8; 20]; - dst.copy_from_slice(src); - dst +#[derive(Clone, Copy, Eq, Hash, PartialEq)] +pub struct Hash([u8; 20]); + +impl Hash { + pub fn alloc() -> Hash { + Hash([0u8; 20]) + } + + pub fn new(inner: [u8; 20]) -> Hash { + Hash(inner) + } + + pub fn from_slice(bytes: &[u8]) -> Hash { + assert_eq!(bytes.len(), 20); + let mut hash = Hash::alloc(); + hash.copy_from_slice(bytes); + hash + } +} + +impl fmt::Debug for Hash { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + for &b in self.0.iter() { + write!(f, "{:x}", b)? + } + Ok(()) + } +} + +impl Deref for Hash { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + &self.0 + } +} + +impl DerefMut for Hash { + fn deref_mut(&mut self) -> &mut [u8] { + &mut self.0 + } } #[cfg(test)] diff --git a/src/net/bitfield.rs b/src/net/bitfield.rs new file mode 100644 index 0000000..24a6b6b --- /dev/null +++ b/src/net/bitfield.rs @@ -0,0 +1,109 @@ +use std::fmt; + +pub struct BitField { + bits: Vec, + len: u32, +} + +impl BitField { + pub fn new(bits: Vec, len: u32) -> BitField { + BitField { + bits: bits, + len: len, + } + } + + pub fn with_capacity(len: u32) -> BitField { + BitField { + bits: vec![0u8; f64::ceil(len as f64 / 8.) as usize], + len: len, + } + } + + #[inline] + pub fn len(&self) -> u32 { + self.len + } + + #[inline] + pub fn is_set(&self, index: u32) -> bool { + let idx = index / 8; + let offset = index % 8; + (self.bits[idx as usize] & (1 << 7 - offset)) != 0 + } + + #[inline] + pub fn set(&mut self, index: u32) { + let idx = index / 8; + let offset = index % 8; + self.bits[idx as usize] |= 1 << 7 - offset; + } + + #[inline] + pub fn unset(&mut self, index: u32) { + let idx = index / 8; + let offset = index % 8; + self.bits[idx as usize] ^= 1 << 7 - offset; + } + + pub fn as_bytes(&self) -> &[u8] { + &self.bits + } +} + +impl fmt::Debug for BitField { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "BitField {{ size: {} }}", self.bits.len() * 8) + } +} + +#[test] +fn test_bitfield_is_set() { + // 01234567 89abcdef + // 00111110 01111111 + let bf = BitField::new(vec![62, 127]); + + assert!(!bf.is_set(0)); + assert!(!bf.is_set(1)); + assert!(bf.is_set(2)); + assert!(bf.is_set(3)); + assert!(bf.is_set(4)); + assert!(bf.is_set(5)); + assert!(bf.is_set(6)); + assert!(!bf.is_set(7)); + + assert!(!bf.is_set(8)); + assert!(bf.is_set(9)); + assert!(bf.is_set(0xa)); + assert!(bf.is_set(0xb)); + assert!(bf.is_set(0xc)); + assert!(bf.is_set(0xd)); + assert!(bf.is_set(0xe)); + assert!(bf.is_set(0xf)); +} + +#[test] +fn test_bitfield_set() { + let mut bf = BitField::new(vec![0, 0]); + bf.set(0); + assert_eq!(bf.bits[0], 128); + bf.set(7); + assert_eq!(bf.bits[0], 129); + bf.set(8); + assert_eq!(bf.bits[1], 128); + bf.set(15); + assert_eq!(bf.bits[1], 129); +} + +#[test] +fn test_bitfield_unset() { + let mut bf = BitField::new(vec![255, 255]); + bf.unset(0); + assert_eq!(bf.bits[0], 255-128); + bf.unset(7); + assert_eq!(bf.bits[0], 255-128-1); + bf.unset(8); + assert_eq!(bf.bits[1], 255-128); + bf.unset(15); + assert_eq!(bf.bits[1], 255-128-1); +} diff --git a/src/net/mod.rs b/src/net/mod.rs new file mode 100644 index 0000000..0eadd19 --- /dev/null +++ b/src/net/mod.rs @@ -0,0 +1,3 @@ +pub mod bitfield; +pub mod peer; +pub mod session; diff --git a/src/net/peer.rs b/src/net/peer.rs new file mode 100644 index 0000000..be4e609 --- /dev/null +++ b/src/net/peer.rs @@ -0,0 +1,164 @@ +use std::io::{self, Read, Write}; +use std::net::TcpStream; + +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; + +use metainfo::Hash; +use net::bitfield::BitField; +use tracker::Peer; + +#[derive(Debug)] +pub enum Packet { + Choke, + Unchoke, + Interested, + NotInterested, + Have { + index: u32, + }, + Bitfield { bitfield: Vec, }, + Request { index: u32, begin: u32, length: u32, }, + Piece { index: u32, begin: u32, block: Vec, }, + Cancel { index: u32, begin: u32, length: u32, } +} + +pub fn open_connection(peer: Peer, own_info_hash: Hash, own_peer_id: Hash) -> io::Result<(TcpStream, TcpStream, Hash)> { + let mut sock = TcpStream::connect((peer.addr, peer.port))?; + + // send handshake + sock.write_u8(19)?; + sock.write(b"BitTorrent protocol")?; + sock.write_u64::(0)?; + sock.write(&own_info_hash)?; + sock.write(&own_peer_id)?; + sock.flush()?; + + // receive handshake + let mut buf = [0u8; 68]; + sock.read_exact(&mut buf)?; + + let peer_info_hash = Hash::from_slice(&buf[28..48]); + let peer_id = Hash::from_slice(&buf[48..68]); + + if buf[0] != 19 || &buf[1..20] != b"BitTorrent protocol" || own_info_hash != peer_info_hash { + return Err(io::Error::new(io::ErrorKind::Other, "invalid protocol")) + } + + Ok((sock.try_clone()?, sock, peer_id)) +} + +pub fn read_packet(sock: &mut TcpStream) -> io::Result { + let len = sock.read_u32::()?; + let id = sock.read_u8()?; + + Ok(match id { + 0 => Packet::Choke, + 1 => Packet::Unchoke, + 2 => Packet::Interested, + 3 => Packet::NotInterested, + 4 => Packet::Have { + index: sock.read_u32::()?, + }, + 5 => { + let size = len as usize - 1; + let mut bitfield = Vec::with_capacity(size); + unsafe { + bitfield.set_len(size); + } + sock.read_exact(&mut bitfield)?; + Packet::Bitfield { + bitfield: bitfield, + } + } + 6 => Packet::Request { + index: sock.read_u32::()?, + begin: sock.read_u32::()?, + length: sock.read_u32::()?, + }, + 7 => { + let size = len as usize - 9; + let index = sock.read_u32::()?; + let begin = sock.read_u32::()?; + let mut block = Vec::with_capacity(size); + unsafe { + block.set_len(size); + } + sock.read_exact(&mut block)?; + + Packet::Piece { + index: index, + begin: begin, + block: block, + } + } + 8 => Packet::Cancel { + index: sock.read_u32::()?, + begin: sock.read_u32::()?, + length: sock.read_u32::()?, + }, + _ => return Err(io::Error::new(io::ErrorKind::Other, "invalid packet")), + }) +} + +pub fn send_keepalive(sock: &mut TcpStream) -> io::Result<()> { + sock.write(b"")?; + Ok(()) +} + +pub fn send_choke(sock: &mut TcpStream) -> io::Result<()> { + sock.write_u32::(1)?; + sock.write_u8(0)?; + Ok(()) +} + +pub fn send_unchoke(sock: &mut TcpStream) -> io::Result<()> { + sock.write_u32::(1)?; + sock.write_u8(1)?; + Ok(()) +} + +pub fn send_interested(sock: &mut TcpStream) -> io::Result<()> { + sock.write_u32::(1)?; + sock.write_u8(2)?; + Ok(()) +} + +pub fn send_not_interested(sock: &mut TcpStream) -> io::Result<()> { + sock.write_u32::(1)?; + sock.write_u8(3)?; + Ok(()) +} + +pub fn send_have(sock: &mut TcpStream, piece: u32) -> io::Result<()> { + sock.write_u32::(5)?; + sock.write_u8(4)?; + sock.write_u32::(piece)?; + Ok(()) +} + +pub fn send_request(sock: &mut TcpStream, piece: u32, begin: u32, length: u32) -> io::Result<()> { + sock.write_u32::(13)?; + sock.write_u8(6)?; + sock.write_u32::(piece)?; + sock.write_u32::(begin)?; + sock.write_u32::(length)?; + Ok(()) +} + +pub fn send_piece(sock: &mut TcpStream, piece: u32, begin: u32, data: &[u8]) -> io::Result<()> { + sock.write_u32::(9 + data.len() as u32)?; + sock.write_u8(7)?; + sock.write_u32::(piece)?; + sock.write_u32::(begin)?; + sock.write(data)?; + Ok(()) +} + +pub fn send_cancel(sock: &mut TcpStream, piece: u32, begin: u32, length: u32) -> io::Result<()> { + sock.write_u32::(13)?; + sock.write_u8(8)?; + sock.write_u32::(piece)?; + sock.write_u32::(begin)?; + sock.write_u32::(length)?; + Ok(()) +} diff --git a/src/net/session.rs b/src/net/session.rs new file mode 100644 index 0000000..3f7e80f --- /dev/null +++ b/src/net/session.rs @@ -0,0 +1,450 @@ +use std::cmp; +use std::collections::{BTreeMap, HashMap}; +use std::fs::File; +use std::io::{Seek, SeekFrom, Write}; +use std::net::{Shutdown, TcpStream}; +use std::os::unix::io::AsRawFd; +use std::sync::Arc; +use std::sync::mpsc::{self, Receiver, Sender}; +use std::thread; +use std::time::{Duration, Instant}; + +use libc; +use sha1::Sha1; + +use metainfo::{Hash, Metainfo}; +use net::bitfield::BitField; +use net::peer::{self, Packet}; +use tracker::http; + +const FRAGMENT_SIZE: u32 = 16 * 1024; + +#[derive(Debug)] +enum Signal { + // == Torrent == + AddTorrent(Metainfo), + RemoveTorrent(Hash), + //ResumeTorrent(Hash), + //StopTorrent(Hash), + // == Peer == + ConnectionOpened { + info_hash: Hash, + peer_id: Hash, + sock: TcpStream, + }, + ConnectionClosed { + info_hash: Hash, + peer_id: Hash, + }, + Packet { + info_hash: Hash, + peer_id: Hash, + packet: Packet, + }, + // == Other == + Stop, +} + +struct InnerSession { + peer_id: Hash, + port: u16, +} + +pub struct Session { + // inner: Arc, + sender: Sender, +} + +impl Session { + pub fn new() -> Session { + let (sender, receiver) = mpsc::channel(); + + let inner = Arc::new(InnerSession { + peer_id: Hash::new([2u8; 20]), + port: 6981, + }); + + let thread = SessionNetworkThread { + session: inner.clone(), + sender: sender.clone(), + torrents: HashMap::new(), + seeds: 0, + bytecount: 0, + lastcount: Instant::now(), + }; + + thread::spawn(move || { + thread.run(receiver); + }); + + Session { + // inner: inner, + sender: sender, + } + } + + pub fn add_torrent(&self, metainfo: Metainfo) { + let _ = self.sender.send(Signal::AddTorrent(metainfo)); + } + + pub fn remove_torrent(&self, id: Hash) { + let _ = self.sender.send(Signal::RemoveTorrent(id)); + } + + pub fn stop(&self) { + let _ = self.sender.send(Signal::Stop); + } +} + +struct PeerConnection { + sock: TcpStream, + bitfield: BitField, + am_choking: bool, + am_interested: bool, + peer_choking: bool, + peer_interested: bool, +} + +impl PeerConnection { + pub fn new(sock: TcpStream, bitfield_len: u32) -> Self { + PeerConnection { + sock: sock, + bitfield: BitField::with_capacity(bitfield_len), + am_choking: true, + am_interested: false, + peer_choking: true, + peer_interested: false, + } + } +} + +#[derive(Copy, Clone, Debug)] +enum FragmentStatus { + Available, + Complete, + Taken(Instant), +} + +pub struct SessionFragment { + begin: u32, + length: u32, + status: FragmentStatus, +} + +struct SessionPiece { + fragments: BTreeMap, + buffer: Vec, + num_fragments: u32, + total_fragments: u32, +} + +struct SessionTorrent { + metainfo: Metainfo, + own_bitfield: BitField, + peers: HashMap, + pieces: BTreeMap, + files: Vec, +} + +impl SessionTorrent { + pub fn new(metainfo: Metainfo, files: Vec) -> Self { + SessionTorrent { + own_bitfield: BitField::with_capacity(metainfo.pieces.len() as u32), + metainfo: metainfo, + peers: HashMap::new(), + pieces: BTreeMap::new(), + files: files, + } + } + + pub fn get_peer(&mut self, peer_id: &Hash) -> &mut PeerConnection { + self.peers.get_mut(peer_id).unwrap() + } + + // Get the requesting connection something to download. Check in the pieces currently + // in flight, match with the peer's bitfield, and if nothing matches, fly a new piece. + // (scheduler) + pub fn get_fragment(&mut self, peer_id: &Hash) -> Option<(u32, u32, u32)> { + let peer = &self.peers[peer_id]; + + // try to find a fragment from the pieces in flight + for (&index, piece) in self.pieces.iter_mut() { + if peer.bitfield.is_set(index) { + for fragment in piece.fragments.values_mut() { + match fragment.status { + FragmentStatus::Available => { + fragment.status = FragmentStatus::Taken(Instant::now()); + return Some((index, fragment.begin, fragment.length)) + } + FragmentStatus::Complete => continue, + FragmentStatus::Taken(timestamp) => { + if timestamp.elapsed() > Duration::from_secs(10) { + println!("retrying fragment"); + fragment.status = FragmentStatus::Taken(Instant::now()); + return Some((index, fragment.begin, fragment.length)) + } + } + } + } + } + } + + // Collect the list of pieces currently in flight, to avoid re-fragmenting the same piece over and over. + let keys: Vec<_> = self.pieces.keys().cloned().collect(); + + println!("fragmenting a new piece, in flight = {:?}", keys); + + // TODO: optimize currently O(n). It's also ugly as hell + for index in (0..self.own_bitfield.len()).filter(|index| !keys.contains(index)) { + if !self.own_bitfield.is_set(index) && peer.bitfield.is_set(index) { + + let total_fragments = f64::ceil(self.metainfo.piece_length as f64 / FRAGMENT_SIZE as f64) as u32; + let mut fragments = BTreeMap::new(); + for idx in 0..total_fragments { + let begin = idx * FRAGMENT_SIZE; + + fragments.insert(begin ,SessionFragment { + begin: begin, + length: cmp::min(FRAGMENT_SIZE, self.metainfo.piece_length - (idx * FRAGMENT_SIZE)), + status: FragmentStatus::Available, + }); + } + + let begin; + let length; + { + let taken = fragments.get_mut(&0).unwrap(); + taken.status = FragmentStatus::Taken(Instant::now()); + begin = taken.begin; + length = taken.length; + } + + let mut buffer = Vec::with_capacity(self.metainfo.piece_length as usize); + unsafe { + buffer.set_len(self.metainfo.piece_length as usize); + } + self.pieces.insert(index, SessionPiece { + fragments: fragments, + buffer: buffer, + num_fragments: 0, + total_fragments: total_fragments, + }); + + return Some((index, begin, length)); + } + } + + None + } + + fn requeue(&mut self, peer_id: &Hash) { + if let Some((index, begin, length)) = self.get_fragment(peer_id) { + // println!("onto piece {} {}..{}", index, begin, begin + length); + peer::send_request(&mut self.get_peer(&peer_id).sock, index, begin,length); + } else { + println!("no fragment"); + } + } + + fn write_piece(&mut self, index: u32) { + let mut start = 0; + let mut end = 0; + let mut pos: u64 = 0; + let mut remaining = self.metainfo.piece_length as u64; + let mut write = index as u64 * self.metainfo.piece_length as u64; + + for (id, fileinfo) in self.metainfo.files.iter().enumerate() { + end += fileinfo.length as u64; + println!("in file {}, start={}, end={}, pos={}, remaining={}, write={}", fileinfo.path.display(), start, end, pos, remaining, write); + if write < end { + let len = cmp::min(remaining, fileinfo.length as u64); + self.files[id].seek(SeekFrom::Start(write - start)); + self.files[id].write_all(&self.pieces[&index].buffer[pos as usize..(pos+len) as usize]); + write += len; + pos += len; + remaining -= len; + if remaining == 0 { + break; + } + } + start = end; + } + } + + fn unchoke_reply(&mut self, peer_id: &Hash) { + self.get_peer(&peer_id).peer_choking = false; + self.requeue(peer_id); + } + + fn piece_reply(&mut self, peer_id: &Hash, index: u32, begin: u32, block: Vec) { + let mut remove = false; + { + if let Some(piece) = self.pieces.get_mut(&index) { + if let Some(fragment) = piece.fragments.get_mut(&begin) { + fragment.status = FragmentStatus::Complete; + + piece.buffer[begin as usize..(begin as usize)+block.len()].copy_from_slice(&block); + piece.num_fragments += 1; + if piece.num_fragments == piece.total_fragments { + // TODO check hash + println!("piece is done {}", index); + let mut m = Sha1::new(); + m.update(&piece.buffer); + if m.digest().bytes() == &self.metainfo.pieces[index as usize][..] { + self.own_bitfield.set(index); + println!("it's a match!"); + remove = true; + } else { + println!("no match"); + } + } + } else { + println!("could not find fragment {}", begin); + } + } else { + println!("could not find piece {}", index); + } + } + if remove { + self.write_piece(index); + self.pieces.remove(&index); + } + self.requeue(peer_id); + } +} + +struct SessionNetworkThread { + session: Arc, + sender: Sender, + torrents: HashMap, + seeds: u32, + bytecount: u64, + lastcount: Instant, +} + +impl SessionNetworkThread { + pub fn run(mut self, input: Receiver) { + for signal in input.iter() { + match signal { + Signal::AddTorrent(torrent) => self.signal_add_torrent(torrent), + Signal::RemoveTorrent(id) => { + if let Some(mut sess_torrent) = self.torrents.remove(&id) { + for conn in sess_torrent.peers.values_mut() { + let _ = conn.sock.shutdown(Shutdown::Both); + } + } + } + Signal::ConnectionOpened { info_hash, peer_id, sock } => { + if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) { + let mut peer = PeerConnection::new(sock, sess_torrent.metainfo.pieces.len() as u32); + peer::send_interested(&mut peer.sock); + peer.am_interested = true; + sess_torrent.peers.insert(peer_id, peer); + } + } + Signal::ConnectionClosed { info_hash, peer_id } => { + if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) { + if let Some(mut peer) = sess_torrent.peers.remove(&peer_id) { + // if !peer.peer_choking && self.seeds > 1 { + // self.seeds -= 1; + // } + } + } + } + Signal::Packet { info_hash, peer_id, packet } => self.signal_packet(info_hash, peer_id, packet), + Signal::Stop => break, + } + } + } + + fn signal_add_torrent(&mut self, metainfo: Metainfo) { + let own_peer_id = self.session.peer_id; + let info_hash = metainfo.info_hash; + + let mut files = Vec::new(); + for fileinfo in metainfo.files.iter() { + let file = File::create(&fileinfo.path).unwrap(); + unsafe { + libc::ftruncate64(file.as_raw_fd(), fileinfo.length as i64); + } + files.push(file); + } + + if let Ok(resp) = http::get_peers(own_peer_id, self.session.port, &metainfo, 0, 0, 0) { + self.torrents.insert(info_hash, SessionTorrent::new(metainfo, files)); + + println!("trying with {} peers", resp.peers.len()); + + for &peer in resp.peers.iter() { + let sender = self.sender.clone(); + + thread::spawn(move || { + if let Ok((mut sock, sock_clone, peer_id)) = peer::open_connection(peer, info_hash, own_peer_id) { + let _ = sender.send(Signal::ConnectionOpened { + info_hash: info_hash, + peer_id: peer_id, + sock: sock_clone, + }); + + while let Ok(packet) = peer::read_packet(&mut sock) { + let _ = sender.send(Signal::Packet { + info_hash: info_hash, + peer_id: peer_id, + packet: packet, + }); + } + + let _ = sender.send(Signal::ConnectionClosed { + info_hash: info_hash, + peer_id: peer_id, + }); + } + }); + } + } + } + + fn signal_packet(&mut self, info_hash: Hash, peer_id: Hash, packet: Packet) { + if let Some(torrent) = self.torrents.get_mut(&info_hash) { + match packet { + Packet::Choke => { + torrent.get_peer(&peer_id).peer_choking = true; + } + Packet::Unchoke => { + self.seeds += 1; + println!("seeds {}", self.seeds); + torrent.unchoke_reply(&peer_id); + } + Packet::Interested => { + torrent.get_peer(&peer_id).peer_interested = true; + } + Packet::NotInterested => { + torrent.get_peer(&peer_id).peer_interested = false; + } + Packet::Have { index } => { + if index < torrent.metainfo.num_pieces { + torrent.get_peer(&peer_id).bitfield.set(index); + } + } + Packet::Bitfield { bitfield } => { + torrent.get_peer(&peer_id).bitfield = BitField::new(bitfield, torrent.metainfo.pieces.len() as u32); + } + Packet::Request { index, begin, length } => { + // TODO + } + Packet::Piece { index, begin, block } => { + self.bytecount += block.len() as u64; + if self.lastcount.elapsed() >= Duration::from_secs(1) { + println!("{:.2} KiB/s", self.bytecount as f64 / 1024.); + self.bytecount = 0; + self.lastcount = Instant::now(); + } + torrent.piece_reply(&peer_id, index, begin, block); + } + Packet::Cancel { index, begin, length } => { + // TODO + } + } + } + } +} diff --git a/src/torrent.rs b/src/torrent.rs deleted file mode 100644 index 87171e9..0000000 --- a/src/torrent.rs +++ /dev/null @@ -1,8 +0,0 @@ -use metainfo::Metainfo; - -pub struct Torrent { - pub metainfo: Metainfo, - pub downloaded: u64, - pub uploaded: u64, - pub left: u64, -} diff --git a/src/tracker/http.rs b/src/tracker/http.rs index e5de564..cc39f4f 100644 --- a/src/tracker/http.rs +++ b/src/tracker/http.rs @@ -5,7 +5,7 @@ use hyper::Client; use url::form_urlencoded::byte_serialize; use bencode::{decode, Dict}; -use torrent::Torrent; +use metainfo::{Hash, Metainfo}; use tracker::{Peer, TrackerError, TrackerResponse, TrackerResult}; macro_rules! ts { @@ -17,16 +17,16 @@ macro_rules! ts { } } -pub fn get_peers(peer_id: [u8; 20], port: u16, torrent: &Torrent) -> TrackerResult { +pub fn get_peers(peer_id: Hash, port: u16, metainfo: &Metainfo, uploaded: u64, downloaded: u64, left: u64) -> TrackerResult { let url = format!("{}?info_hash={}&peer_id={}&port={}&uploaded={}\ &downloaded={}&left={}&compact=1", - torrent.metainfo.announce, - urlencode(&torrent.metainfo.info_hash), + metainfo.announce, + urlencode(&metainfo.info_hash), urlencode(&peer_id), port, - torrent.uploaded, - torrent.downloaded, - torrent.left); + uploaded, + downloaded, + left); let client = Client::new(); let mut resp = client.get(&url).send()?; diff --git a/src/tracker/mod.rs b/src/tracker/mod.rs index 3cd433b..17d9a0e 100644 --- a/src/tracker/mod.rs +++ b/src/tracker/mod.rs @@ -41,7 +41,7 @@ impl From for TrackerError { } } -#[derive(Debug)] +#[derive(Copy, Clone, Debug)] pub struct Peer { pub addr: Ipv4Addr, pub port: u16,