diff --git a/src/net/peer.rs b/src/net/peer.rs index be4e609..e964950 100644 --- a/src/net/peer.rs +++ b/src/net/peer.rs @@ -1,10 +1,9 @@ use std::io::{self, Read, Write}; -use std::net::TcpStream; +use std::net::{Shutdown, TcpStream}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use metainfo::Hash; -use net::bitfield::BitField; use tracker::Peer; #[derive(Debug)] @@ -22,143 +21,162 @@ pub enum Packet { Cancel { index: u32, begin: u32, length: u32, } } -pub fn open_connection(peer: Peer, own_info_hash: Hash, own_peer_id: Hash) -> io::Result<(TcpStream, TcpStream, Hash)> { - let mut sock = TcpStream::connect((peer.addr, peer.port))?; +#[derive(Debug)] +pub struct PeerConnection { + sock: TcpStream, +} - // send handshake - sock.write_u8(19)?; - sock.write(b"BitTorrent protocol")?; - sock.write_u64::(0)?; - sock.write(&own_info_hash)?; - sock.write(&own_peer_id)?; - sock.flush()?; +impl PeerConnection { + pub fn open_connection(peer: Peer, own_info_hash: Hash, own_peer_id: Hash) -> io::Result<(PeerConnection, PeerConnection, Hash)> { + let mut sock = TcpStream::connect((peer.addr, peer.port))?; - // receive handshake - let mut buf = [0u8; 68]; - sock.read_exact(&mut buf)?; + // send handshake + sock.write_u8(19)?; + sock.write(b"BitTorrent protocol")?; + sock.write_u64::(0)?; + sock.write(&own_info_hash)?; + sock.write(&own_peer_id)?; + sock.flush()?; - let peer_info_hash = Hash::from_slice(&buf[28..48]); - let peer_id = Hash::from_slice(&buf[48..68]); + // receive handshake + let mut buf = [0u8; 68]; + sock.read_exact(&mut buf)?; - if buf[0] != 19 || &buf[1..20] != b"BitTorrent protocol" || own_info_hash != peer_info_hash { - return Err(io::Error::new(io::ErrorKind::Other, "invalid protocol")) + let peer_info_hash = Hash::from_slice(&buf[28..48]); + let peer_id = Hash::from_slice(&buf[48..68]); + + if buf[0] != 19 || &buf[1..20] != b"BitTorrent protocol" || own_info_hash != peer_info_hash { + return Err(io::Error::new(io::ErrorKind::Other, "invalid protocol")) + } + + let mut conn = PeerConnection { sock: sock }; + + Ok((conn.try_clone()?, conn, peer_id)) } - Ok((sock.try_clone()?, sock, peer_id)) -} + pub fn read_packet(&mut self) -> io::Result { + let len = self.sock.read_u32::()?; + let id = self.sock.read_u8()?; -pub fn read_packet(sock: &mut TcpStream) -> io::Result { - let len = sock.read_u32::()?; - let id = sock.read_u8()?; - - Ok(match id { - 0 => Packet::Choke, - 1 => Packet::Unchoke, - 2 => Packet::Interested, - 3 => Packet::NotInterested, - 4 => Packet::Have { - index: sock.read_u32::()?, - }, - 5 => { - let size = len as usize - 1; - let mut bitfield = Vec::with_capacity(size); - unsafe { - bitfield.set_len(size); + Ok(match id { + 0 => Packet::Choke, + 1 => Packet::Unchoke, + 2 => Packet::Interested, + 3 => Packet::NotInterested, + 4 => Packet::Have { + index: self.sock.read_u32::()?, + }, + 5 => { + let size = len as usize - 1; + let mut bitfield = Vec::with_capacity(size); + unsafe { + bitfield.set_len(size); + } + self.sock.read_exact(&mut bitfield)?; + Packet::Bitfield { + bitfield: bitfield, + } } - sock.read_exact(&mut bitfield)?; - Packet::Bitfield { - bitfield: bitfield, + 6 => Packet::Request { + index: self.sock.read_u32::()?, + begin: self.sock.read_u32::()?, + length: self.sock.read_u32::()?, + }, + 7 => { + let size = len as usize - 9; + let index = self.sock.read_u32::()?; + let begin = self.sock.read_u32::()?; + let mut block = Vec::with_capacity(size); + unsafe { + block.set_len(size); + } + self.sock.read_exact(&mut block)?; + + Packet::Piece { + index: index, + begin: begin, + block: block, + } } - } - 6 => Packet::Request { - index: sock.read_u32::()?, - begin: sock.read_u32::()?, - length: sock.read_u32::()?, - }, - 7 => { - let size = len as usize - 9; - let index = sock.read_u32::()?; - let begin = sock.read_u32::()?; - let mut block = Vec::with_capacity(size); - unsafe { - block.set_len(size); - } - sock.read_exact(&mut block)?; + 8 => Packet::Cancel { + index: self.sock.read_u32::()?, + begin: self.sock.read_u32::()?, + length: self.sock.read_u32::()?, + }, + _ => return Err(io::Error::new(io::ErrorKind::Other, "invalid packet")), + }) + } - Packet::Piece { - index: index, - begin: begin, - block: block, - } - } - 8 => Packet::Cancel { - index: sock.read_u32::()?, - begin: sock.read_u32::()?, - length: sock.read_u32::()?, - }, - _ => return Err(io::Error::new(io::ErrorKind::Other, "invalid packet")), - }) -} + pub fn send_keepalive(&mut self) -> io::Result<()> { + self.sock.write(b"")?; + Ok(()) + } -pub fn send_keepalive(sock: &mut TcpStream) -> io::Result<()> { - sock.write(b"")?; - Ok(()) -} + pub fn send_choke(&mut self) -> io::Result<()> { + self.sock.write_u32::(1)?; + self.sock.write_u8(0)?; + Ok(()) + } -pub fn send_choke(sock: &mut TcpStream) -> io::Result<()> { - sock.write_u32::(1)?; - sock.write_u8(0)?; - Ok(()) -} + pub fn send_unchoke(&mut self) -> io::Result<()> { + self.sock.write_u32::(1)?; + self.sock.write_u8(1)?; + Ok(()) + } -pub fn send_unchoke(sock: &mut TcpStream) -> io::Result<()> { - sock.write_u32::(1)?; - sock.write_u8(1)?; - Ok(()) -} + pub fn send_interested(&mut self) -> io::Result<()> { + self.sock.write_u32::(1)?; + self.sock.write_u8(2)?; + Ok(()) + } -pub fn send_interested(sock: &mut TcpStream) -> io::Result<()> { - sock.write_u32::(1)?; - sock.write_u8(2)?; - Ok(()) -} + pub fn send_not_interested(&mut self) -> io::Result<()> { + self.sock.write_u32::(1)?; + self.sock.write_u8(3)?; + Ok(()) + } -pub fn send_not_interested(sock: &mut TcpStream) -> io::Result<()> { - sock.write_u32::(1)?; - sock.write_u8(3)?; - Ok(()) -} + pub fn send_have(&mut self, piece: u32) -> io::Result<()> { + self.sock.write_u32::(5)?; + self.sock.write_u8(4)?; + self.sock.write_u32::(piece)?; + Ok(()) + } -pub fn send_have(sock: &mut TcpStream, piece: u32) -> io::Result<()> { - sock.write_u32::(5)?; - sock.write_u8(4)?; - sock.write_u32::(piece)?; - Ok(()) -} + pub fn send_request(&mut self, piece: u32, begin: u32, length: u32) -> io::Result<()> { + self.sock.write_u32::(13)?; + self.sock.write_u8(6)?; + self.sock.write_u32::(piece)?; + self.sock.write_u32::(begin)?; + self.sock.write_u32::(length)?; + Ok(()) + } -pub fn send_request(sock: &mut TcpStream, piece: u32, begin: u32, length: u32) -> io::Result<()> { - sock.write_u32::(13)?; - sock.write_u8(6)?; - sock.write_u32::(piece)?; - sock.write_u32::(begin)?; - sock.write_u32::(length)?; - Ok(()) -} + pub fn send_piece(&mut self, piece: u32, begin: u32, data: &[u8]) -> io::Result<()> { + self.sock.write_u32::(9 + data.len() as u32)?; + self.sock.write_u8(7)?; + self.sock.write_u32::(piece)?; + self.sock.write_u32::(begin)?; + self.sock.write(data)?; + Ok(()) + } -pub fn send_piece(sock: &mut TcpStream, piece: u32, begin: u32, data: &[u8]) -> io::Result<()> { - sock.write_u32::(9 + data.len() as u32)?; - sock.write_u8(7)?; - sock.write_u32::(piece)?; - sock.write_u32::(begin)?; - sock.write(data)?; - Ok(()) -} + pub fn send_cancel(&mut self, piece: u32, begin: u32, length: u32) -> io::Result<()> { + self.sock.write_u32::(13)?; + self.sock.write_u8(8)?; + self.sock.write_u32::(piece)?; + self.sock.write_u32::(begin)?; + self.sock.write_u32::(length)?; + Ok(()) + } -pub fn send_cancel(sock: &mut TcpStream, piece: u32, begin: u32, length: u32) -> io::Result<()> { - sock.write_u32::(13)?; - sock.write_u8(8)?; - sock.write_u32::(piece)?; - sock.write_u32::(begin)?; - sock.write_u32::(length)?; - Ok(()) + pub fn try_clone(&mut self) -> io::Result { + Ok(PeerConnection { + sock: self.sock.try_clone()?, + }) + } + + pub fn shutdown(&mut self) { + let _ = self.sock.shutdown(Shutdown::Both); + } } diff --git a/src/net/session.rs b/src/net/session.rs index 0ff26e4..cf23bca 100644 --- a/src/net/session.rs +++ b/src/net/session.rs @@ -14,7 +14,7 @@ use sha1::Sha1; use metainfo::{Hash, Metainfo}; use net::bitfield::BitField; -use net::peer::{self, Packet}; +use net::peer::{Packet, PeerConnection}; use tracker::http; const FRAGMENT_SIZE: u32 = 16 * 1024; @@ -30,7 +30,7 @@ enum Signal { ConnectionOpened { info_hash: Hash, peer_id: Hash, - sock: TcpStream, + sock: PeerConnection, }, ConnectionClosed { info_hash: Hash, @@ -96,8 +96,8 @@ impl Session { } } -struct PeerConnection { - sock: TcpStream, +struct SessionPeer { + sock: PeerConnection, bitfield: BitField, am_choking: bool, am_interested: bool, @@ -105,9 +105,9 @@ struct PeerConnection { peer_interested: bool, } -impl PeerConnection { - pub fn new(sock: TcpStream, bitfield_len: u32) -> Self { - PeerConnection { +impl SessionPeer { + pub fn new(sock: PeerConnection, bitfield_len: u32) -> Self { + SessionPeer { sock: sock, bitfield: BitField::with_capacity(bitfield_len), am_choking: true, @@ -141,7 +141,7 @@ struct SessionPiece { struct SessionTorrent { metainfo: Metainfo, own_bitfield: BitField, - peers: HashMap, + peers: HashMap, pieces: BTreeMap, files: Vec, } @@ -157,7 +157,7 @@ impl SessionTorrent { } } - pub fn get_peer(&mut self, peer_id: &Hash) -> &mut PeerConnection { + pub fn get_peer(&mut self, peer_id: &Hash) -> &mut SessionPeer { self.peers.get_mut(peer_id).unwrap() } @@ -240,9 +240,9 @@ impl SessionTorrent { fn requeue(&mut self, peer_id: &Hash) { if let Some((index, begin, length)) = self.get_fragment(peer_id) { // println!("onto piece {} {}..{}", index, begin, begin + length); - peer::send_request(&mut self.get_peer(&peer_id).sock, index, begin,length); + self.get_peer(&peer_id).sock.send_request(index, begin,length); } else { - peer::send_not_interested(&mut self.get_peer(&peer_id).sock); + self.get_peer(&peer_id).sock.send_not_interested(); println!("no fragment"); } } @@ -332,15 +332,15 @@ impl SessionNetworkThread { Signal::AddTorrent(torrent) => self.signal_add_torrent(torrent), Signal::RemoveTorrent(id) => { if let Some(mut sess_torrent) = self.torrents.remove(&id) { - for conn in sess_torrent.peers.values_mut() { - let _ = conn.sock.shutdown(Shutdown::Both); + for peer in sess_torrent.peers.values_mut() { + peer.sock.shutdown(); } } } Signal::ConnectionOpened { info_hash, peer_id, sock } => { if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) { - let mut peer = PeerConnection::new(sock, sess_torrent.metainfo.pieces.len() as u32); - peer::send_interested(&mut peer.sock); + let mut peer = SessionPeer::new(sock, sess_torrent.metainfo.pieces.len() as u32); + peer.sock.send_interested(); peer.am_interested = true; sess_torrent.peers.insert(peer_id, peer); } @@ -376,20 +376,18 @@ impl SessionNetworkThread { if let Ok(resp) = http::get_peers(own_peer_id, self.session.port, &metainfo, 0, 0, 0) { self.torrents.insert(info_hash, SessionTorrent::new(metainfo, files)); - println!("trying with {} peers", resp.peers.len()); - for &peer in resp.peers.iter() { let sender = self.sender.clone(); thread::spawn(move || { - if let Ok((mut sock, sock_clone, peer_id)) = peer::open_connection(peer, info_hash, own_peer_id) { + if let Ok((mut sock, sock_clone, peer_id)) = PeerConnection::open_connection(peer, info_hash, own_peer_id) { let _ = sender.send(Signal::ConnectionOpened { info_hash: info_hash, peer_id: peer_id, sock: sock_clone, }); - while let Ok(packet) = peer::read_packet(&mut sock) { + while let Ok(packet) = sock.read_packet() { let _ = sender.send(Signal::Packet { info_hash: info_hash, peer_id: peer_id,