use std::cmp; use std::collections::{BTreeMap, HashMap}; use std::fs::File; use std::io::{Seek, SeekFrom, Write}; use std::os::unix::io::AsRawFd; use std::sync::Arc; use std::sync::mpsc::{self, Receiver, Sender}; use std::thread; use std::time::{Duration, Instant}; use libc; use metainfo::{Hash, Metainfo}; use net::bitfield::BitField; use net::buffers::PieceBuffer; use net::peer::{Packet, PeerConnection}; use tracker::http; const FRAGMENT_SIZE: u32 = 16 * 1024; #[derive(Debug)] enum Signal { // == Torrent == AddTorrent(Metainfo), RemoveTorrent(Hash), //ResumeTorrent(Hash), //StopTorrent(Hash), // == Peer == ConnectionOpened { info_hash: Hash, peer_id: Hash, sock: PeerConnection, }, ConnectionClosed { info_hash: Hash, peer_id: Hash, }, Packet { info_hash: Hash, peer_id: Hash, packet: Packet, }, // == Other == Stop, } struct InnerSession { peer_id: Hash, port: u16, } // == Session == pub struct Session { // inner: Arc, sender: Sender, } impl Session { pub fn new() -> Session { let (sender, receiver) = mpsc::channel(); let inner = Arc::new(InnerSession { peer_id: Hash::new([2u8; 20]), port: 6981, }); let thread = SessionNetworkThread { session: inner.clone(), sender: sender.clone(), torrents: HashMap::new(), seeds: 0, bytecount: 0, lastcount: Instant::now(), }; thread::spawn(move || { thread.run(receiver); }); Session { // inner: inner, sender: sender, } } pub fn add_torrent(&self, metainfo: Metainfo) { let _ = self.sender.send(Signal::AddTorrent(metainfo)); } pub fn remove_torrent(&self, id: Hash) { let _ = self.sender.send(Signal::RemoveTorrent(id)); } pub fn stop(&self) { let _ = self.sender.send(Signal::Stop); } } // == SessionPeer == struct SessionPeer { sock: PeerConnection, bitfield: BitField, am_choking: bool, am_interested: bool, peer_choking: bool, peer_interested: bool, } impl SessionPeer { pub fn new(sock: PeerConnection, bitfield_len: u32) -> Self { SessionPeer { sock: sock, bitfield: BitField::with_capacity(bitfield_len), am_choking: true, am_interested: false, peer_choking: true, peer_interested: false, } } } // == FragmentStatus == #[derive(Copy, Clone, Debug)] enum FragmentStatus { Available, Complete, Taken(Instant), } // == SessionFragment == pub struct SessionFragment { begin: u32, length: u32, status: FragmentStatus, } // == SessionPiece == struct SessionPiece { fragments: BTreeMap, buffer: PieceBuffer, } // == SessionTorrent == struct SessionTorrent { metainfo: Metainfo, own_bitfield: BitField, peers: HashMap, pieces: BTreeMap, files: Vec, } impl SessionTorrent { pub fn new(metainfo: Metainfo, files: Vec) -> Self { SessionTorrent { own_bitfield: BitField::with_capacity(metainfo.pieces.len() as u32), metainfo: metainfo, peers: HashMap::new(), pieces: BTreeMap::new(), files: files, } } pub fn get_peer(&mut self, peer_id: &Hash) -> &mut SessionPeer { self.peers.get_mut(peer_id).unwrap() } // Get the requesting connection something to download. Check in the pieces currently // in flight, match with the peer's bitfield, and if nothing matches, fly a new piece. // (scheduler) pub fn get_fragment(&mut self, peer_id: &Hash) -> Option<(u32, u32, u32)> { let peer = &self.peers[peer_id]; // try to find a fragment from the pieces in flight for (&index, piece) in self.pieces.iter_mut() { if peer.bitfield.is_set(index) { for fragment in piece.fragments.values_mut() { match fragment.status { FragmentStatus::Available => { fragment.status = FragmentStatus::Taken(Instant::now()); return Some((index, fragment.begin, fragment.length)) } FragmentStatus::Complete => continue, FragmentStatus::Taken(timestamp) => { if timestamp.elapsed() > Duration::from_secs(5) { println!("retrying fragment"); fragment.status = FragmentStatus::Taken(Instant::now()); return Some((index, fragment.begin, fragment.length)) } } } } } } // Collect the list of pieces currently in flight, to avoid re-fragmenting the same piece over and over. let keys: Vec<_> = self.pieces.keys().cloned().collect(); println!("fragmenting a new piece, in flight = {:?}", keys); // TODO: optimize currently O(n). It's also ugly as hell for index in (0..self.own_bitfield.len()).filter(|index| !keys.contains(index)) { if !self.own_bitfield.is_set(index) && peer.bitfield.is_set(index) { let num_fragments = f64::ceil(self.metainfo.piece_length as f64 / FRAGMENT_SIZE as f64) as u32; let mut fragments = BTreeMap::new(); for idx in 0..num_fragments { let begin = idx * FRAGMENT_SIZE; fragments.insert(begin ,SessionFragment { begin: begin, length: cmp::min(FRAGMENT_SIZE, self.metainfo.piece_length - (idx * FRAGMENT_SIZE)), status: FragmentStatus::Available, }); } let begin; let length; { let taken = fragments.get_mut(&0).unwrap(); taken.status = FragmentStatus::Taken(Instant::now()); begin = taken.begin; length = taken.length; } self.pieces.insert(index, SessionPiece { fragments: fragments, buffer: PieceBuffer::new(num_fragments, self.metainfo.piece_length), }); return Some((index, begin, length)); } } None } fn requeue(&mut self, peer_id: &Hash) { if let Some((index, begin, length)) = self.get_fragment(peer_id) { // println!("onto piece {} {}..{}", index, begin, begin + length); self.get_peer(&peer_id).sock.send_request(index, begin,length); } else { self.get_peer(&peer_id).sock.send_not_interested(); println!("no fragment"); } } fn write_piece(&mut self, index: u32, buffer: Vec) { let mut start = 0; let mut end = 0; let mut pos: u64 = 0; let mut remaining = self.metainfo.piece_length as u64; let mut write = index as u64 * self.metainfo.piece_length as u64; for (id, fileinfo) in self.metainfo.files.iter().enumerate() { end += fileinfo.length as u64; println!("in file {}, start={}, end={}, pos={}, remaining={}, write={}", fileinfo.path.display(), start, end, pos, remaining, write); if write < end { let len = cmp::min(remaining, fileinfo.length as u64); self.files[id].seek(SeekFrom::Start(write - start)); self.files[id].write_all(&buffer[pos as usize..(pos+len) as usize]); write += len; pos += len; remaining -= len; if remaining == 0 { break; } } start = end; } } fn unchoke_reply(&mut self, peer_id: &Hash) { self.get_peer(&peer_id).peer_choking = false; self.requeue(peer_id); self.requeue(peer_id); self.requeue(peer_id); } fn piece_reply(&mut self, peer_id: &Hash, index: u32, begin: u32, block: Vec) { let mut remove = false; let mut reset = false; { if let Some(piece) = self.pieces.get_mut(&index) { if let Some(fragment) = piece.fragments.get_mut(&begin) { fragment.status = FragmentStatus::Complete; piece.buffer.add_fragment(begin, &block); if piece.buffer.complete() { println!("piece is done {}", index); if piece.buffer.matches_hash(&self.metainfo.pieces[index as usize]) { self.own_bitfield.set(index); println!("it's a match!"); remove = true; } else { reset = true; println!("no match"); } } } else { println!("could not find fragment {}", begin); } if reset { for fragment in piece.fragments.values_mut() { fragment.status = FragmentStatus::Available; } } } else { println!("could not find piece {}", index); } } if remove { let piece = self.pieces.remove(&index).expect("told to remove piece that doesn't exist"); self.write_piece(index, piece.buffer.get()); } self.requeue(peer_id); } } // == SessionNetworkThread == struct SessionNetworkThread { session: Arc, sender: Sender, torrents: HashMap, seeds: u32, bytecount: u64, lastcount: Instant, } impl SessionNetworkThread { pub fn run(mut self, signals: Receiver) { for signal in signals.iter() { match signal { Signal::AddTorrent(torrent) => self.signal_add_torrent(torrent), Signal::RemoveTorrent(id) => self.signal_remove_torrent(&id), Signal::ConnectionOpened { info_hash, peer_id, sock } => self.signal_connection_opened(&info_hash, &peer_id, sock), Signal::ConnectionClosed { info_hash, peer_id } => self.signal_connection_closed(&info_hash, &peer_id), Signal::Packet { info_hash, peer_id, packet } => self.signal_packet(info_hash, peer_id, packet), Signal::Stop => break, } } } fn signal_add_torrent(&mut self, metainfo: Metainfo) { let own_peer_id = self.session.peer_id; let info_hash = metainfo.info_hash; let mut files = Vec::new(); for fileinfo in metainfo.files.iter() { let file = File::create(&fileinfo.path).unwrap(); unsafe { libc::ftruncate64(file.as_raw_fd(), fileinfo.length as i64); } files.push(file); } if let Ok(resp) = http::get_peers(own_peer_id, self.session.port, &metainfo, 0, 0, 0) { self.torrents.insert(info_hash, SessionTorrent::new(metainfo, files)); for &peer in resp.peers.iter() { let sender = self.sender.clone(); thread::spawn(move || { if let Ok((mut sock, sock_clone, peer_id)) = PeerConnection::open_connection(peer, info_hash, own_peer_id) { let _ = sender.send(Signal::ConnectionOpened { info_hash: info_hash, peer_id: peer_id, sock: sock_clone, }); while let Ok(packet) = sock.read_packet() { let _ = sender.send(Signal::Packet { info_hash: info_hash, peer_id: peer_id, packet: packet, }); } let _ = sender.send(Signal::ConnectionClosed { info_hash: info_hash, peer_id: peer_id, }); } }); } } } fn signal_remove_torrent(&mut self, id: &Hash) { if let Some(mut sess_torrent) = self.torrents.remove(id) { for peer in sess_torrent.peers.values_mut() { peer.sock.shutdown(); } } } fn signal_connection_opened(&mut self, info_hash: &Hash, peer_id: &Hash, conn: PeerConnection) { if let Some(sess_torrent) = self.torrents.get_mut(info_hash) { let mut peer = SessionPeer::new(conn, sess_torrent.metainfo.pieces.len() as u32); peer.sock.send_interested(); peer.am_interested = true; sess_torrent.peers.insert(*peer_id, peer); } } fn signal_connection_closed(&mut self, info_hash: &Hash, peer_id: &Hash) { if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) { if let Some(mut peer) = sess_torrent.peers.remove(&peer_id) { // if !peer.peer_choking && self.seeds > 1 { // self.seeds -= 1; // } } } } fn signal_packet(&mut self, info_hash: Hash, peer_id: Hash, packet: Packet) { if let Some(torrent) = self.torrents.get_mut(&info_hash) { match packet { Packet::Choke => { torrent.get_peer(&peer_id).peer_choking = true; } Packet::Unchoke => { self.seeds += 1; println!("seeds {}", self.seeds); torrent.unchoke_reply(&peer_id); } Packet::Interested => { torrent.get_peer(&peer_id).peer_interested = true; } Packet::NotInterested => { torrent.get_peer(&peer_id).peer_interested = false; } Packet::Have { index } => { if index < torrent.metainfo.num_pieces { torrent.get_peer(&peer_id).bitfield.set(index); } } Packet::Bitfield { bitfield } => { torrent.get_peer(&peer_id).bitfield = BitField::new(bitfield, torrent.metainfo.pieces.len() as u32); } Packet::Request { index, begin, length } => { // TODO } Packet::Piece { index, begin, block } => { self.bytecount += block.len() as u64; if self.lastcount.elapsed() >= Duration::from_secs(1) { println!("{:.2} KiB/s", self.bytecount as f64 / 1024.); self.bytecount = 0; self.lastcount = Instant::now(); } torrent.piece_reply(&peer_id, index, begin, block); } Packet::Cancel { index, begin, length } => { // TODO } } } } }