rework peer connection

This commit is contained in:
2016-12-14 14:47:53 -05:00
parent 75065741dc
commit 9ff00f848f
2 changed files with 157 additions and 141 deletions

View File

@@ -1,10 +1,9 @@
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::net::TcpStream; use std::net::{Shutdown, TcpStream};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use metainfo::Hash; use metainfo::Hash;
use net::bitfield::BitField;
use tracker::Peer; use tracker::Peer;
#[derive(Debug)] #[derive(Debug)]
@@ -22,7 +21,13 @@ pub enum Packet {
Cancel { index: u32, begin: u32, length: u32, } Cancel { index: u32, begin: u32, length: u32, }
} }
pub fn open_connection(peer: Peer, own_info_hash: Hash, own_peer_id: Hash) -> io::Result<(TcpStream, TcpStream, Hash)> { #[derive(Debug)]
pub struct PeerConnection {
sock: TcpStream,
}
impl PeerConnection {
pub fn open_connection(peer: Peer, own_info_hash: Hash, own_peer_id: Hash) -> io::Result<(PeerConnection, PeerConnection, Hash)> {
let mut sock = TcpStream::connect((peer.addr, peer.port))?; let mut sock = TcpStream::connect((peer.addr, peer.port))?;
// send handshake // send handshake
@@ -44,12 +49,14 @@ pub fn open_connection(peer: Peer, own_info_hash: Hash, own_peer_id: Hash) -> io
return Err(io::Error::new(io::ErrorKind::Other, "invalid protocol")) return Err(io::Error::new(io::ErrorKind::Other, "invalid protocol"))
} }
Ok((sock.try_clone()?, sock, peer_id)) let mut conn = PeerConnection { sock: sock };
Ok((conn.try_clone()?, conn, peer_id))
} }
pub fn read_packet(sock: &mut TcpStream) -> io::Result<Packet> { pub fn read_packet(&mut self) -> io::Result<Packet> {
let len = sock.read_u32::<BigEndian>()?; let len = self.sock.read_u32::<BigEndian>()?;
let id = sock.read_u8()?; let id = self.sock.read_u8()?;
Ok(match id { Ok(match id {
0 => Packet::Choke, 0 => Packet::Choke,
@@ -57,7 +64,7 @@ pub fn read_packet(sock: &mut TcpStream) -> io::Result<Packet> {
2 => Packet::Interested, 2 => Packet::Interested,
3 => Packet::NotInterested, 3 => Packet::NotInterested,
4 => Packet::Have { 4 => Packet::Have {
index: sock.read_u32::<BigEndian>()?, index: self.sock.read_u32::<BigEndian>()?,
}, },
5 => { 5 => {
let size = len as usize - 1; let size = len as usize - 1;
@@ -65,25 +72,25 @@ pub fn read_packet(sock: &mut TcpStream) -> io::Result<Packet> {
unsafe { unsafe {
bitfield.set_len(size); bitfield.set_len(size);
} }
sock.read_exact(&mut bitfield)?; self.sock.read_exact(&mut bitfield)?;
Packet::Bitfield { Packet::Bitfield {
bitfield: bitfield, bitfield: bitfield,
} }
} }
6 => Packet::Request { 6 => Packet::Request {
index: sock.read_u32::<BigEndian>()?, index: self.sock.read_u32::<BigEndian>()?,
begin: sock.read_u32::<BigEndian>()?, begin: self.sock.read_u32::<BigEndian>()?,
length: sock.read_u32::<BigEndian>()?, length: self.sock.read_u32::<BigEndian>()?,
}, },
7 => { 7 => {
let size = len as usize - 9; let size = len as usize - 9;
let index = sock.read_u32::<BigEndian>()?; let index = self.sock.read_u32::<BigEndian>()?;
let begin = sock.read_u32::<BigEndian>()?; let begin = self.sock.read_u32::<BigEndian>()?;
let mut block = Vec::with_capacity(size); let mut block = Vec::with_capacity(size);
unsafe { unsafe {
block.set_len(size); block.set_len(size);
} }
sock.read_exact(&mut block)?; self.sock.read_exact(&mut block)?;
Packet::Piece { Packet::Piece {
index: index, index: index,
@@ -92,73 +99,84 @@ pub fn read_packet(sock: &mut TcpStream) -> io::Result<Packet> {
} }
} }
8 => Packet::Cancel { 8 => Packet::Cancel {
index: sock.read_u32::<BigEndian>()?, index: self.sock.read_u32::<BigEndian>()?,
begin: sock.read_u32::<BigEndian>()?, begin: self.sock.read_u32::<BigEndian>()?,
length: sock.read_u32::<BigEndian>()?, length: self.sock.read_u32::<BigEndian>()?,
}, },
_ => return Err(io::Error::new(io::ErrorKind::Other, "invalid packet")), _ => return Err(io::Error::new(io::ErrorKind::Other, "invalid packet")),
}) })
} }
pub fn send_keepalive(sock: &mut TcpStream) -> io::Result<()> { pub fn send_keepalive(&mut self) -> io::Result<()> {
sock.write(b"")?; self.sock.write(b"")?;
Ok(()) Ok(())
} }
pub fn send_choke(sock: &mut TcpStream) -> io::Result<()> { pub fn send_choke(&mut self) -> io::Result<()> {
sock.write_u32::<BigEndian>(1)?; self.sock.write_u32::<BigEndian>(1)?;
sock.write_u8(0)?; self.sock.write_u8(0)?;
Ok(()) Ok(())
} }
pub fn send_unchoke(sock: &mut TcpStream) -> io::Result<()> { pub fn send_unchoke(&mut self) -> io::Result<()> {
sock.write_u32::<BigEndian>(1)?; self.sock.write_u32::<BigEndian>(1)?;
sock.write_u8(1)?; self.sock.write_u8(1)?;
Ok(()) Ok(())
} }
pub fn send_interested(sock: &mut TcpStream) -> io::Result<()> { pub fn send_interested(&mut self) -> io::Result<()> {
sock.write_u32::<BigEndian>(1)?; self.sock.write_u32::<BigEndian>(1)?;
sock.write_u8(2)?; self.sock.write_u8(2)?;
Ok(()) Ok(())
} }
pub fn send_not_interested(sock: &mut TcpStream) -> io::Result<()> { pub fn send_not_interested(&mut self) -> io::Result<()> {
sock.write_u32::<BigEndian>(1)?; self.sock.write_u32::<BigEndian>(1)?;
sock.write_u8(3)?; self.sock.write_u8(3)?;
Ok(()) Ok(())
} }
pub fn send_have(sock: &mut TcpStream, piece: u32) -> io::Result<()> { pub fn send_have(&mut self, piece: u32) -> io::Result<()> {
sock.write_u32::<BigEndian>(5)?; self.sock.write_u32::<BigEndian>(5)?;
sock.write_u8(4)?; self.sock.write_u8(4)?;
sock.write_u32::<BigEndian>(piece)?; self.sock.write_u32::<BigEndian>(piece)?;
Ok(()) Ok(())
} }
pub fn send_request(sock: &mut TcpStream, piece: u32, begin: u32, length: u32) -> io::Result<()> { pub fn send_request(&mut self, piece: u32, begin: u32, length: u32) -> io::Result<()> {
sock.write_u32::<BigEndian>(13)?; self.sock.write_u32::<BigEndian>(13)?;
sock.write_u8(6)?; self.sock.write_u8(6)?;
sock.write_u32::<BigEndian>(piece)?; self.sock.write_u32::<BigEndian>(piece)?;
sock.write_u32::<BigEndian>(begin)?; self.sock.write_u32::<BigEndian>(begin)?;
sock.write_u32::<BigEndian>(length)?; self.sock.write_u32::<BigEndian>(length)?;
Ok(()) Ok(())
} }
pub fn send_piece(sock: &mut TcpStream, piece: u32, begin: u32, data: &[u8]) -> io::Result<()> { pub fn send_piece(&mut self, piece: u32, begin: u32, data: &[u8]) -> io::Result<()> {
sock.write_u32::<BigEndian>(9 + data.len() as u32)?; self.sock.write_u32::<BigEndian>(9 + data.len() as u32)?;
sock.write_u8(7)?; self.sock.write_u8(7)?;
sock.write_u32::<BigEndian>(piece)?; self.sock.write_u32::<BigEndian>(piece)?;
sock.write_u32::<BigEndian>(begin)?; self.sock.write_u32::<BigEndian>(begin)?;
sock.write(data)?; self.sock.write(data)?;
Ok(()) Ok(())
} }
pub fn send_cancel(sock: &mut TcpStream, piece: u32, begin: u32, length: u32) -> io::Result<()> { pub fn send_cancel(&mut self, piece: u32, begin: u32, length: u32) -> io::Result<()> {
sock.write_u32::<BigEndian>(13)?; self.sock.write_u32::<BigEndian>(13)?;
sock.write_u8(8)?; self.sock.write_u8(8)?;
sock.write_u32::<BigEndian>(piece)?; self.sock.write_u32::<BigEndian>(piece)?;
sock.write_u32::<BigEndian>(begin)?; self.sock.write_u32::<BigEndian>(begin)?;
sock.write_u32::<BigEndian>(length)?; self.sock.write_u32::<BigEndian>(length)?;
Ok(()) Ok(())
} }
pub fn try_clone(&mut self) -> io::Result<PeerConnection> {
Ok(PeerConnection {
sock: self.sock.try_clone()?,
})
}
pub fn shutdown(&mut self) {
let _ = self.sock.shutdown(Shutdown::Both);
}
}

View File

@@ -14,7 +14,7 @@ use sha1::Sha1;
use metainfo::{Hash, Metainfo}; use metainfo::{Hash, Metainfo};
use net::bitfield::BitField; use net::bitfield::BitField;
use net::peer::{self, Packet}; use net::peer::{Packet, PeerConnection};
use tracker::http; use tracker::http;
const FRAGMENT_SIZE: u32 = 16 * 1024; const FRAGMENT_SIZE: u32 = 16 * 1024;
@@ -30,7 +30,7 @@ enum Signal {
ConnectionOpened { ConnectionOpened {
info_hash: Hash, info_hash: Hash,
peer_id: Hash, peer_id: Hash,
sock: TcpStream, sock: PeerConnection,
}, },
ConnectionClosed { ConnectionClosed {
info_hash: Hash, info_hash: Hash,
@@ -96,8 +96,8 @@ impl Session {
} }
} }
struct PeerConnection { struct SessionPeer {
sock: TcpStream, sock: PeerConnection,
bitfield: BitField, bitfield: BitField,
am_choking: bool, am_choking: bool,
am_interested: bool, am_interested: bool,
@@ -105,9 +105,9 @@ struct PeerConnection {
peer_interested: bool, peer_interested: bool,
} }
impl PeerConnection { impl SessionPeer {
pub fn new(sock: TcpStream, bitfield_len: u32) -> Self { pub fn new(sock: PeerConnection, bitfield_len: u32) -> Self {
PeerConnection { SessionPeer {
sock: sock, sock: sock,
bitfield: BitField::with_capacity(bitfield_len), bitfield: BitField::with_capacity(bitfield_len),
am_choking: true, am_choking: true,
@@ -141,7 +141,7 @@ struct SessionPiece {
struct SessionTorrent { struct SessionTorrent {
metainfo: Metainfo, metainfo: Metainfo,
own_bitfield: BitField, own_bitfield: BitField,
peers: HashMap<Hash, PeerConnection>, peers: HashMap<Hash, SessionPeer>,
pieces: BTreeMap<u32, SessionPiece>, pieces: BTreeMap<u32, SessionPiece>,
files: Vec<File>, files: Vec<File>,
} }
@@ -157,7 +157,7 @@ impl SessionTorrent {
} }
} }
pub fn get_peer(&mut self, peer_id: &Hash) -> &mut PeerConnection { pub fn get_peer(&mut self, peer_id: &Hash) -> &mut SessionPeer {
self.peers.get_mut(peer_id).unwrap() self.peers.get_mut(peer_id).unwrap()
} }
@@ -240,9 +240,9 @@ impl SessionTorrent {
fn requeue(&mut self, peer_id: &Hash) { fn requeue(&mut self, peer_id: &Hash) {
if let Some((index, begin, length)) = self.get_fragment(peer_id) { if let Some((index, begin, length)) = self.get_fragment(peer_id) {
// println!("onto piece {} {}..{}", index, begin, begin + length); // println!("onto piece {} {}..{}", index, begin, begin + length);
peer::send_request(&mut self.get_peer(&peer_id).sock, index, begin,length); self.get_peer(&peer_id).sock.send_request(index, begin,length);
} else { } else {
peer::send_not_interested(&mut self.get_peer(&peer_id).sock); self.get_peer(&peer_id).sock.send_not_interested();
println!("no fragment"); println!("no fragment");
} }
} }
@@ -332,15 +332,15 @@ impl SessionNetworkThread {
Signal::AddTorrent(torrent) => self.signal_add_torrent(torrent), Signal::AddTorrent(torrent) => self.signal_add_torrent(torrent),
Signal::RemoveTorrent(id) => { Signal::RemoveTorrent(id) => {
if let Some(mut sess_torrent) = self.torrents.remove(&id) { if let Some(mut sess_torrent) = self.torrents.remove(&id) {
for conn in sess_torrent.peers.values_mut() { for peer in sess_torrent.peers.values_mut() {
let _ = conn.sock.shutdown(Shutdown::Both); peer.sock.shutdown();
} }
} }
} }
Signal::ConnectionOpened { info_hash, peer_id, sock } => { Signal::ConnectionOpened { info_hash, peer_id, sock } => {
if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) { if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) {
let mut peer = PeerConnection::new(sock, sess_torrent.metainfo.pieces.len() as u32); let mut peer = SessionPeer::new(sock, sess_torrent.metainfo.pieces.len() as u32);
peer::send_interested(&mut peer.sock); peer.sock.send_interested();
peer.am_interested = true; peer.am_interested = true;
sess_torrent.peers.insert(peer_id, peer); sess_torrent.peers.insert(peer_id, peer);
} }
@@ -376,20 +376,18 @@ impl SessionNetworkThread {
if let Ok(resp) = http::get_peers(own_peer_id, self.session.port, &metainfo, 0, 0, 0) { if let Ok(resp) = http::get_peers(own_peer_id, self.session.port, &metainfo, 0, 0, 0) {
self.torrents.insert(info_hash, SessionTorrent::new(metainfo, files)); self.torrents.insert(info_hash, SessionTorrent::new(metainfo, files));
println!("trying with {} peers", resp.peers.len());
for &peer in resp.peers.iter() { for &peer in resp.peers.iter() {
let sender = self.sender.clone(); let sender = self.sender.clone();
thread::spawn(move || { thread::spawn(move || {
if let Ok((mut sock, sock_clone, peer_id)) = peer::open_connection(peer, info_hash, own_peer_id) { if let Ok((mut sock, sock_clone, peer_id)) = PeerConnection::open_connection(peer, info_hash, own_peer_id) {
let _ = sender.send(Signal::ConnectionOpened { let _ = sender.send(Signal::ConnectionOpened {
info_hash: info_hash, info_hash: info_hash,
peer_id: peer_id, peer_id: peer_id,
sock: sock_clone, sock: sock_clone,
}); });
while let Ok(packet) = peer::read_packet(&mut sock) { while let Ok(packet) = sock.read_packet() {
let _ = sender.send(Signal::Packet { let _ = sender.send(Signal::Packet {
info_hash: info_hash, info_hash: info_hash,
peer_id: peer_id, peer_id: peer_id,