proto: basic peer wire protocol that can download
This commit is contained in:
450
src/net/session.rs
Normal file
450
src/net/session.rs
Normal file
@@ -0,0 +1,450 @@
|
||||
use std::cmp;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fs::File;
|
||||
use std::io::{Seek, SeekFrom, Write};
|
||||
use std::net::{Shutdown, TcpStream};
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::sync::Arc;
|
||||
use std::sync::mpsc::{self, Receiver, Sender};
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use libc;
|
||||
use sha1::Sha1;
|
||||
|
||||
use metainfo::{Hash, Metainfo};
|
||||
use net::bitfield::BitField;
|
||||
use net::peer::{self, Packet};
|
||||
use tracker::http;
|
||||
|
||||
const FRAGMENT_SIZE: u32 = 16 * 1024;
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Signal {
|
||||
// == Torrent ==
|
||||
AddTorrent(Metainfo),
|
||||
RemoveTorrent(Hash),
|
||||
//ResumeTorrent(Hash),
|
||||
//StopTorrent(Hash),
|
||||
// == Peer ==
|
||||
ConnectionOpened {
|
||||
info_hash: Hash,
|
||||
peer_id: Hash,
|
||||
sock: TcpStream,
|
||||
},
|
||||
ConnectionClosed {
|
||||
info_hash: Hash,
|
||||
peer_id: Hash,
|
||||
},
|
||||
Packet {
|
||||
info_hash: Hash,
|
||||
peer_id: Hash,
|
||||
packet: Packet,
|
||||
},
|
||||
// == Other ==
|
||||
Stop,
|
||||
}
|
||||
|
||||
struct InnerSession {
|
||||
peer_id: Hash,
|
||||
port: u16,
|
||||
}
|
||||
|
||||
pub struct Session {
|
||||
// inner: Arc<InnerSession>,
|
||||
sender: Sender<Signal>,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
pub fn new() -> Session {
|
||||
let (sender, receiver) = mpsc::channel();
|
||||
|
||||
let inner = Arc::new(InnerSession {
|
||||
peer_id: Hash::new([2u8; 20]),
|
||||
port: 6981,
|
||||
});
|
||||
|
||||
let thread = SessionNetworkThread {
|
||||
session: inner.clone(),
|
||||
sender: sender.clone(),
|
||||
torrents: HashMap::new(),
|
||||
seeds: 0,
|
||||
bytecount: 0,
|
||||
lastcount: Instant::now(),
|
||||
};
|
||||
|
||||
thread::spawn(move || {
|
||||
thread.run(receiver);
|
||||
});
|
||||
|
||||
Session {
|
||||
// inner: inner,
|
||||
sender: sender,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_torrent(&self, metainfo: Metainfo) {
|
||||
let _ = self.sender.send(Signal::AddTorrent(metainfo));
|
||||
}
|
||||
|
||||
pub fn remove_torrent(&self, id: Hash) {
|
||||
let _ = self.sender.send(Signal::RemoveTorrent(id));
|
||||
}
|
||||
|
||||
pub fn stop(&self) {
|
||||
let _ = self.sender.send(Signal::Stop);
|
||||
}
|
||||
}
|
||||
|
||||
struct PeerConnection {
|
||||
sock: TcpStream,
|
||||
bitfield: BitField,
|
||||
am_choking: bool,
|
||||
am_interested: bool,
|
||||
peer_choking: bool,
|
||||
peer_interested: bool,
|
||||
}
|
||||
|
||||
impl PeerConnection {
|
||||
pub fn new(sock: TcpStream, bitfield_len: u32) -> Self {
|
||||
PeerConnection {
|
||||
sock: sock,
|
||||
bitfield: BitField::with_capacity(bitfield_len),
|
||||
am_choking: true,
|
||||
am_interested: false,
|
||||
peer_choking: true,
|
||||
peer_interested: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
enum FragmentStatus {
|
||||
Available,
|
||||
Complete,
|
||||
Taken(Instant),
|
||||
}
|
||||
|
||||
pub struct SessionFragment {
|
||||
begin: u32,
|
||||
length: u32,
|
||||
status: FragmentStatus,
|
||||
}
|
||||
|
||||
struct SessionPiece {
|
||||
fragments: BTreeMap<u32, SessionFragment>,
|
||||
buffer: Vec<u8>,
|
||||
num_fragments: u32,
|
||||
total_fragments: u32,
|
||||
}
|
||||
|
||||
struct SessionTorrent {
|
||||
metainfo: Metainfo,
|
||||
own_bitfield: BitField,
|
||||
peers: HashMap<Hash, PeerConnection>,
|
||||
pieces: BTreeMap<u32, SessionPiece>,
|
||||
files: Vec<File>,
|
||||
}
|
||||
|
||||
impl SessionTorrent {
|
||||
pub fn new(metainfo: Metainfo, files: Vec<File>) -> Self {
|
||||
SessionTorrent {
|
||||
own_bitfield: BitField::with_capacity(metainfo.pieces.len() as u32),
|
||||
metainfo: metainfo,
|
||||
peers: HashMap::new(),
|
||||
pieces: BTreeMap::new(),
|
||||
files: files,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_peer(&mut self, peer_id: &Hash) -> &mut PeerConnection {
|
||||
self.peers.get_mut(peer_id).unwrap()
|
||||
}
|
||||
|
||||
// Get the requesting connection something to download. Check in the pieces currently
|
||||
// in flight, match with the peer's bitfield, and if nothing matches, fly a new piece.
|
||||
// (scheduler)
|
||||
pub fn get_fragment(&mut self, peer_id: &Hash) -> Option<(u32, u32, u32)> {
|
||||
let peer = &self.peers[peer_id];
|
||||
|
||||
// try to find a fragment from the pieces in flight
|
||||
for (&index, piece) in self.pieces.iter_mut() {
|
||||
if peer.bitfield.is_set(index) {
|
||||
for fragment in piece.fragments.values_mut() {
|
||||
match fragment.status {
|
||||
FragmentStatus::Available => {
|
||||
fragment.status = FragmentStatus::Taken(Instant::now());
|
||||
return Some((index, fragment.begin, fragment.length))
|
||||
}
|
||||
FragmentStatus::Complete => continue,
|
||||
FragmentStatus::Taken(timestamp) => {
|
||||
if timestamp.elapsed() > Duration::from_secs(10) {
|
||||
println!("retrying fragment");
|
||||
fragment.status = FragmentStatus::Taken(Instant::now());
|
||||
return Some((index, fragment.begin, fragment.length))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Collect the list of pieces currently in flight, to avoid re-fragmenting the same piece over and over.
|
||||
let keys: Vec<_> = self.pieces.keys().cloned().collect();
|
||||
|
||||
println!("fragmenting a new piece, in flight = {:?}", keys);
|
||||
|
||||
// TODO: optimize currently O(n). It's also ugly as hell
|
||||
for index in (0..self.own_bitfield.len()).filter(|index| !keys.contains(index)) {
|
||||
if !self.own_bitfield.is_set(index) && peer.bitfield.is_set(index) {
|
||||
|
||||
let total_fragments = f64::ceil(self.metainfo.piece_length as f64 / FRAGMENT_SIZE as f64) as u32;
|
||||
let mut fragments = BTreeMap::new();
|
||||
for idx in 0..total_fragments {
|
||||
let begin = idx * FRAGMENT_SIZE;
|
||||
|
||||
fragments.insert(begin ,SessionFragment {
|
||||
begin: begin,
|
||||
length: cmp::min(FRAGMENT_SIZE, self.metainfo.piece_length - (idx * FRAGMENT_SIZE)),
|
||||
status: FragmentStatus::Available,
|
||||
});
|
||||
}
|
||||
|
||||
let begin;
|
||||
let length;
|
||||
{
|
||||
let taken = fragments.get_mut(&0).unwrap();
|
||||
taken.status = FragmentStatus::Taken(Instant::now());
|
||||
begin = taken.begin;
|
||||
length = taken.length;
|
||||
}
|
||||
|
||||
let mut buffer = Vec::with_capacity(self.metainfo.piece_length as usize);
|
||||
unsafe {
|
||||
buffer.set_len(self.metainfo.piece_length as usize);
|
||||
}
|
||||
self.pieces.insert(index, SessionPiece {
|
||||
fragments: fragments,
|
||||
buffer: buffer,
|
||||
num_fragments: 0,
|
||||
total_fragments: total_fragments,
|
||||
});
|
||||
|
||||
return Some((index, begin, length));
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn requeue(&mut self, peer_id: &Hash) {
|
||||
if let Some((index, begin, length)) = self.get_fragment(peer_id) {
|
||||
// println!("onto piece {} {}..{}", index, begin, begin + length);
|
||||
peer::send_request(&mut self.get_peer(&peer_id).sock, index, begin,length);
|
||||
} else {
|
||||
println!("no fragment");
|
||||
}
|
||||
}
|
||||
|
||||
fn write_piece(&mut self, index: u32) {
|
||||
let mut start = 0;
|
||||
let mut end = 0;
|
||||
let mut pos: u64 = 0;
|
||||
let mut remaining = self.metainfo.piece_length as u64;
|
||||
let mut write = index as u64 * self.metainfo.piece_length as u64;
|
||||
|
||||
for (id, fileinfo) in self.metainfo.files.iter().enumerate() {
|
||||
end += fileinfo.length as u64;
|
||||
println!("in file {}, start={}, end={}, pos={}, remaining={}, write={}", fileinfo.path.display(), start, end, pos, remaining, write);
|
||||
if write < end {
|
||||
let len = cmp::min(remaining, fileinfo.length as u64);
|
||||
self.files[id].seek(SeekFrom::Start(write - start));
|
||||
self.files[id].write_all(&self.pieces[&index].buffer[pos as usize..(pos+len) as usize]);
|
||||
write += len;
|
||||
pos += len;
|
||||
remaining -= len;
|
||||
if remaining == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
start = end;
|
||||
}
|
||||
}
|
||||
|
||||
fn unchoke_reply(&mut self, peer_id: &Hash) {
|
||||
self.get_peer(&peer_id).peer_choking = false;
|
||||
self.requeue(peer_id);
|
||||
}
|
||||
|
||||
fn piece_reply(&mut self, peer_id: &Hash, index: u32, begin: u32, block: Vec<u8>) {
|
||||
let mut remove = false;
|
||||
{
|
||||
if let Some(piece) = self.pieces.get_mut(&index) {
|
||||
if let Some(fragment) = piece.fragments.get_mut(&begin) {
|
||||
fragment.status = FragmentStatus::Complete;
|
||||
|
||||
piece.buffer[begin as usize..(begin as usize)+block.len()].copy_from_slice(&block);
|
||||
piece.num_fragments += 1;
|
||||
if piece.num_fragments == piece.total_fragments {
|
||||
// TODO check hash
|
||||
println!("piece is done {}", index);
|
||||
let mut m = Sha1::new();
|
||||
m.update(&piece.buffer);
|
||||
if m.digest().bytes() == &self.metainfo.pieces[index as usize][..] {
|
||||
self.own_bitfield.set(index);
|
||||
println!("it's a match!");
|
||||
remove = true;
|
||||
} else {
|
||||
println!("no match");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!("could not find fragment {}", begin);
|
||||
}
|
||||
} else {
|
||||
println!("could not find piece {}", index);
|
||||
}
|
||||
}
|
||||
if remove {
|
||||
self.write_piece(index);
|
||||
self.pieces.remove(&index);
|
||||
}
|
||||
self.requeue(peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
struct SessionNetworkThread {
|
||||
session: Arc<InnerSession>,
|
||||
sender: Sender<Signal>,
|
||||
torrents: HashMap<Hash, SessionTorrent>,
|
||||
seeds: u32,
|
||||
bytecount: u64,
|
||||
lastcount: Instant,
|
||||
}
|
||||
|
||||
impl SessionNetworkThread {
|
||||
pub fn run(mut self, input: Receiver<Signal>) {
|
||||
for signal in input.iter() {
|
||||
match signal {
|
||||
Signal::AddTorrent(torrent) => self.signal_add_torrent(torrent),
|
||||
Signal::RemoveTorrent(id) => {
|
||||
if let Some(mut sess_torrent) = self.torrents.remove(&id) {
|
||||
for conn in sess_torrent.peers.values_mut() {
|
||||
let _ = conn.sock.shutdown(Shutdown::Both);
|
||||
}
|
||||
}
|
||||
}
|
||||
Signal::ConnectionOpened { info_hash, peer_id, sock } => {
|
||||
if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) {
|
||||
let mut peer = PeerConnection::new(sock, sess_torrent.metainfo.pieces.len() as u32);
|
||||
peer::send_interested(&mut peer.sock);
|
||||
peer.am_interested = true;
|
||||
sess_torrent.peers.insert(peer_id, peer);
|
||||
}
|
||||
}
|
||||
Signal::ConnectionClosed { info_hash, peer_id } => {
|
||||
if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) {
|
||||
if let Some(mut peer) = sess_torrent.peers.remove(&peer_id) {
|
||||
// if !peer.peer_choking && self.seeds > 1 {
|
||||
// self.seeds -= 1;
|
||||
// }
|
||||
}
|
||||
}
|
||||
}
|
||||
Signal::Packet { info_hash, peer_id, packet } => self.signal_packet(info_hash, peer_id, packet),
|
||||
Signal::Stop => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn signal_add_torrent(&mut self, metainfo: Metainfo) {
|
||||
let own_peer_id = self.session.peer_id;
|
||||
let info_hash = metainfo.info_hash;
|
||||
|
||||
let mut files = Vec::new();
|
||||
for fileinfo in metainfo.files.iter() {
|
||||
let file = File::create(&fileinfo.path).unwrap();
|
||||
unsafe {
|
||||
libc::ftruncate64(file.as_raw_fd(), fileinfo.length as i64);
|
||||
}
|
||||
files.push(file);
|
||||
}
|
||||
|
||||
if let Ok(resp) = http::get_peers(own_peer_id, self.session.port, &metainfo, 0, 0, 0) {
|
||||
self.torrents.insert(info_hash, SessionTorrent::new(metainfo, files));
|
||||
|
||||
println!("trying with {} peers", resp.peers.len());
|
||||
|
||||
for &peer in resp.peers.iter() {
|
||||
let sender = self.sender.clone();
|
||||
|
||||
thread::spawn(move || {
|
||||
if let Ok((mut sock, sock_clone, peer_id)) = peer::open_connection(peer, info_hash, own_peer_id) {
|
||||
let _ = sender.send(Signal::ConnectionOpened {
|
||||
info_hash: info_hash,
|
||||
peer_id: peer_id,
|
||||
sock: sock_clone,
|
||||
});
|
||||
|
||||
while let Ok(packet) = peer::read_packet(&mut sock) {
|
||||
let _ = sender.send(Signal::Packet {
|
||||
info_hash: info_hash,
|
||||
peer_id: peer_id,
|
||||
packet: packet,
|
||||
});
|
||||
}
|
||||
|
||||
let _ = sender.send(Signal::ConnectionClosed {
|
||||
info_hash: info_hash,
|
||||
peer_id: peer_id,
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn signal_packet(&mut self, info_hash: Hash, peer_id: Hash, packet: Packet) {
|
||||
if let Some(torrent) = self.torrents.get_mut(&info_hash) {
|
||||
match packet {
|
||||
Packet::Choke => {
|
||||
torrent.get_peer(&peer_id).peer_choking = true;
|
||||
}
|
||||
Packet::Unchoke => {
|
||||
self.seeds += 1;
|
||||
println!("seeds {}", self.seeds);
|
||||
torrent.unchoke_reply(&peer_id);
|
||||
}
|
||||
Packet::Interested => {
|
||||
torrent.get_peer(&peer_id).peer_interested = true;
|
||||
}
|
||||
Packet::NotInterested => {
|
||||
torrent.get_peer(&peer_id).peer_interested = false;
|
||||
}
|
||||
Packet::Have { index } => {
|
||||
if index < torrent.metainfo.num_pieces {
|
||||
torrent.get_peer(&peer_id).bitfield.set(index);
|
||||
}
|
||||
}
|
||||
Packet::Bitfield { bitfield } => {
|
||||
torrent.get_peer(&peer_id).bitfield = BitField::new(bitfield, torrent.metainfo.pieces.len() as u32);
|
||||
}
|
||||
Packet::Request { index, begin, length } => {
|
||||
// TODO
|
||||
}
|
||||
Packet::Piece { index, begin, block } => {
|
||||
self.bytecount += block.len() as u64;
|
||||
if self.lastcount.elapsed() >= Duration::from_secs(1) {
|
||||
println!("{:.2} KiB/s", self.bytecount as f64 / 1024.);
|
||||
self.bytecount = 0;
|
||||
self.lastcount = Instant::now();
|
||||
}
|
||||
torrent.piece_reply(&peer_id, index, begin, block);
|
||||
}
|
||||
Packet::Cancel { index, begin, length } => {
|
||||
// TODO
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user