466 lines
15 KiB
Rust
466 lines
15 KiB
Rust
use std::cmp;
|
|
use std::collections::{BTreeMap, HashMap};
|
|
use std::fs::File;
|
|
use std::io::{Seek, SeekFrom, Write};
|
|
use std::os::unix::io::AsRawFd;
|
|
use std::sync::Arc;
|
|
use std::sync::mpsc::{self, Receiver, Sender};
|
|
use std::thread;
|
|
use std::time::{Duration, Instant};
|
|
|
|
use libc;
|
|
|
|
use metainfo::{Hash, Metainfo};
|
|
use net::bitfield::BitField;
|
|
use net::buffers::PieceBuffer;
|
|
use net::peer::{Packet, PeerConnection};
|
|
use tracker::http;
|
|
|
|
const FRAGMENT_SIZE: u32 = 16 * 1024;
|
|
|
|
#[derive(Debug)]
|
|
enum Signal {
|
|
// == Torrent ==
|
|
AddTorrent(Metainfo),
|
|
RemoveTorrent(Hash),
|
|
//ResumeTorrent(Hash),
|
|
//StopTorrent(Hash),
|
|
// == Peer ==
|
|
ConnectionOpened {
|
|
info_hash: Hash,
|
|
peer_id: Hash,
|
|
sock: PeerConnection,
|
|
},
|
|
ConnectionClosed {
|
|
info_hash: Hash,
|
|
peer_id: Hash,
|
|
},
|
|
Packet {
|
|
info_hash: Hash,
|
|
peer_id: Hash,
|
|
packet: Packet,
|
|
},
|
|
// == Other ==
|
|
Stop,
|
|
}
|
|
|
|
struct InnerSession {
|
|
peer_id: Hash,
|
|
port: u16,
|
|
}
|
|
|
|
// == Session ==
|
|
|
|
pub struct Session {
|
|
// inner: Arc<InnerSession>,
|
|
sender: Sender<Signal>,
|
|
}
|
|
|
|
impl Session {
|
|
pub fn new() -> Session {
|
|
let (sender, receiver) = mpsc::channel();
|
|
|
|
let inner = Arc::new(InnerSession {
|
|
peer_id: Hash::new([2u8; 20]),
|
|
port: 6981,
|
|
});
|
|
|
|
let thread = SessionNetworkThread {
|
|
session: inner.clone(),
|
|
sender: sender.clone(),
|
|
torrents: HashMap::new(),
|
|
seeds: 0,
|
|
bytecount: 0,
|
|
lastcount: Instant::now(),
|
|
};
|
|
|
|
thread::spawn(move || {
|
|
thread.run(receiver);
|
|
});
|
|
|
|
Session {
|
|
// inner: inner,
|
|
sender: sender,
|
|
}
|
|
}
|
|
|
|
pub fn add_torrent(&self, metainfo: Metainfo) {
|
|
let _ = self.sender.send(Signal::AddTorrent(metainfo));
|
|
}
|
|
|
|
pub fn remove_torrent(&self, id: Hash) {
|
|
let _ = self.sender.send(Signal::RemoveTorrent(id));
|
|
}
|
|
|
|
pub fn stop(&self) {
|
|
let _ = self.sender.send(Signal::Stop);
|
|
}
|
|
}
|
|
|
|
// == SessionPeer ==
|
|
|
|
struct SessionPeer {
|
|
sock: PeerConnection,
|
|
bitfield: BitField,
|
|
am_choking: bool,
|
|
am_interested: bool,
|
|
peer_choking: bool,
|
|
peer_interested: bool,
|
|
}
|
|
|
|
impl SessionPeer {
|
|
pub fn new(sock: PeerConnection, bitfield_len: u32) -> Self {
|
|
SessionPeer {
|
|
sock: sock,
|
|
bitfield: BitField::with_capacity(bitfield_len),
|
|
am_choking: true,
|
|
am_interested: false,
|
|
peer_choking: true,
|
|
peer_interested: false,
|
|
}
|
|
}
|
|
}
|
|
|
|
// == FragmentStatus ==
|
|
|
|
#[derive(Copy, Clone, Debug)]
|
|
enum FragmentStatus {
|
|
Available,
|
|
Complete,
|
|
Taken(Instant),
|
|
}
|
|
|
|
// == SessionFragment ==
|
|
|
|
pub struct SessionFragment {
|
|
begin: u32,
|
|
length: u32,
|
|
status: FragmentStatus,
|
|
}
|
|
|
|
// == SessionPiece ==
|
|
|
|
struct SessionPiece {
|
|
fragments: BTreeMap<u32, SessionFragment>,
|
|
buffer: PieceBuffer,
|
|
}
|
|
|
|
// == SessionTorrent ==
|
|
|
|
struct SessionTorrent {
|
|
metainfo: Metainfo,
|
|
own_bitfield: BitField,
|
|
peers: HashMap<Hash, SessionPeer>,
|
|
pieces: BTreeMap<u32, SessionPiece>,
|
|
files: Vec<File>,
|
|
}
|
|
|
|
impl SessionTorrent {
|
|
pub fn new(metainfo: Metainfo, files: Vec<File>) -> Self {
|
|
SessionTorrent {
|
|
own_bitfield: BitField::with_capacity(metainfo.pieces.len() as u32),
|
|
metainfo: metainfo,
|
|
peers: HashMap::new(),
|
|
pieces: BTreeMap::new(),
|
|
files: files,
|
|
}
|
|
}
|
|
|
|
pub fn get_peer(&mut self, peer_id: &Hash) -> &mut SessionPeer {
|
|
self.peers.get_mut(peer_id).unwrap()
|
|
}
|
|
|
|
// Get the requesting connection something to download. Check in the pieces currently
|
|
// in flight, match with the peer's bitfield, and if nothing matches, fly a new piece.
|
|
// (scheduler)
|
|
pub fn get_fragment(&mut self, peer_id: &Hash) -> Option<(u32, u32, u32)> {
|
|
let peer = &self.peers[peer_id];
|
|
|
|
// try to find a fragment from the pieces in flight
|
|
for (&index, piece) in self.pieces.iter_mut() {
|
|
if peer.bitfield.is_set(index) {
|
|
for fragment in piece.fragments.values_mut() {
|
|
match fragment.status {
|
|
FragmentStatus::Available => {
|
|
fragment.status = FragmentStatus::Taken(Instant::now());
|
|
return Some((index, fragment.begin, fragment.length))
|
|
}
|
|
FragmentStatus::Complete => continue,
|
|
FragmentStatus::Taken(timestamp) => {
|
|
if timestamp.elapsed() > Duration::from_secs(5) {
|
|
println!("retrying fragment");
|
|
fragment.status = FragmentStatus::Taken(Instant::now());
|
|
return Some((index, fragment.begin, fragment.length))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Collect the list of pieces currently in flight, to avoid re-fragmenting the same piece over and over.
|
|
let keys: Vec<_> = self.pieces.keys().cloned().collect();
|
|
|
|
println!("fragmenting a new piece, in flight = {:?}", keys);
|
|
|
|
// TODO: optimize currently O(n). It's also ugly as hell
|
|
for index in (0..self.own_bitfield.len()).filter(|index| !keys.contains(index)) {
|
|
if !self.own_bitfield.is_set(index) && peer.bitfield.is_set(index) {
|
|
|
|
let num_fragments = f64::ceil(self.metainfo.piece_length as f64 / FRAGMENT_SIZE as f64) as u32;
|
|
let mut fragments = BTreeMap::new();
|
|
for idx in 0..num_fragments {
|
|
let begin = idx * FRAGMENT_SIZE;
|
|
|
|
fragments.insert(begin ,SessionFragment {
|
|
begin: begin,
|
|
length: cmp::min(FRAGMENT_SIZE, self.metainfo.piece_length - (idx * FRAGMENT_SIZE)),
|
|
status: FragmentStatus::Available,
|
|
});
|
|
}
|
|
|
|
let begin;
|
|
let length;
|
|
{
|
|
let taken = fragments.get_mut(&0).unwrap();
|
|
taken.status = FragmentStatus::Taken(Instant::now());
|
|
begin = taken.begin;
|
|
length = taken.length;
|
|
}
|
|
|
|
self.pieces.insert(index, SessionPiece {
|
|
fragments: fragments,
|
|
buffer: PieceBuffer::new(num_fragments, self.metainfo.piece_length),
|
|
});
|
|
|
|
return Some((index, begin, length));
|
|
}
|
|
}
|
|
|
|
None
|
|
}
|
|
|
|
fn requeue(&mut self, peer_id: &Hash) {
|
|
if let Some((index, begin, length)) = self.get_fragment(peer_id) {
|
|
// println!("onto piece {} {}..{}", index, begin, begin + length);
|
|
self.get_peer(&peer_id).sock.send_request(index, begin,length);
|
|
} else {
|
|
self.get_peer(&peer_id).sock.send_not_interested();
|
|
println!("no fragment");
|
|
}
|
|
}
|
|
|
|
fn write_piece(&mut self, index: u32, buffer: Vec<u8>) {
|
|
let mut start = 0;
|
|
let mut end = 0;
|
|
let mut pos: u64 = 0;
|
|
let mut remaining = self.metainfo.piece_length as u64;
|
|
let mut write = index as u64 * self.metainfo.piece_length as u64;
|
|
|
|
for (id, fileinfo) in self.metainfo.files.iter().enumerate() {
|
|
end += fileinfo.length as u64;
|
|
println!("in file {}, start={}, end={}, pos={}, remaining={}, write={}", fileinfo.path.display(), start, end, pos, remaining, write);
|
|
if write < end {
|
|
let len = cmp::min(remaining, fileinfo.length as u64);
|
|
self.files[id].seek(SeekFrom::Start(write - start));
|
|
self.files[id].write_all(&buffer[pos as usize..(pos+len) as usize]);
|
|
write += len;
|
|
pos += len;
|
|
remaining -= len;
|
|
if remaining == 0 {
|
|
break;
|
|
}
|
|
}
|
|
start = end;
|
|
}
|
|
}
|
|
|
|
fn unchoke_reply(&mut self, peer_id: &Hash) {
|
|
self.get_peer(&peer_id).peer_choking = false;
|
|
self.requeue(peer_id);
|
|
self.requeue(peer_id);
|
|
self.requeue(peer_id);
|
|
}
|
|
|
|
fn piece_reply(&mut self, peer_id: &Hash, index: u32, begin: u32, block: Vec<u8>) {
|
|
let mut remove = false;
|
|
let mut reset = false;
|
|
{
|
|
if let Some(piece) = self.pieces.get_mut(&index) {
|
|
if let Some(fragment) = piece.fragments.get_mut(&begin) {
|
|
fragment.status = FragmentStatus::Complete;
|
|
|
|
piece.buffer.add_fragment(begin, &block);
|
|
if piece.buffer.complete() {
|
|
println!("piece is done {}", index);
|
|
if piece.buffer.matches_hash(&self.metainfo.pieces[index as usize]) {
|
|
self.own_bitfield.set(index);
|
|
println!("it's a match!");
|
|
remove = true;
|
|
} else {
|
|
reset = true;
|
|
println!("no match");
|
|
}
|
|
}
|
|
} else {
|
|
println!("could not find fragment {}", begin);
|
|
}
|
|
if reset {
|
|
for fragment in piece.fragments.values_mut() {
|
|
fragment.status = FragmentStatus::Available;
|
|
}
|
|
}
|
|
} else {
|
|
println!("could not find piece {}", index);
|
|
}
|
|
}
|
|
if remove {
|
|
let piece = self.pieces.remove(&index).expect("told to remove piece that doesn't exist");
|
|
self.write_piece(index, piece.buffer.get());
|
|
}
|
|
self.requeue(peer_id);
|
|
}
|
|
}
|
|
|
|
// == SessionNetworkThread ==
|
|
|
|
struct SessionNetworkThread {
|
|
session: Arc<InnerSession>,
|
|
sender: Sender<Signal>,
|
|
torrents: HashMap<Hash, SessionTorrent>,
|
|
seeds: u32,
|
|
bytecount: u64,
|
|
lastcount: Instant,
|
|
}
|
|
|
|
impl SessionNetworkThread {
|
|
pub fn run(mut self, signals: Receiver<Signal>) {
|
|
for signal in signals.iter() {
|
|
match signal {
|
|
Signal::AddTorrent(torrent) => self.signal_add_torrent(torrent),
|
|
Signal::RemoveTorrent(id) => self.signal_remove_torrent(&id),
|
|
Signal::ConnectionOpened { info_hash, peer_id, sock } => self.signal_connection_opened(&info_hash, &peer_id, sock),
|
|
Signal::ConnectionClosed { info_hash, peer_id } => self.signal_connection_closed(&info_hash, &peer_id),
|
|
Signal::Packet { info_hash, peer_id, packet } => self.signal_packet(info_hash, peer_id, packet),
|
|
Signal::Stop => break,
|
|
}
|
|
}
|
|
}
|
|
|
|
fn signal_add_torrent(&mut self, metainfo: Metainfo) {
|
|
let own_peer_id = self.session.peer_id;
|
|
let info_hash = metainfo.info_hash;
|
|
|
|
let mut files = Vec::new();
|
|
for fileinfo in metainfo.files.iter() {
|
|
let file = File::create(&fileinfo.path).unwrap();
|
|
unsafe {
|
|
libc::ftruncate64(file.as_raw_fd(), fileinfo.length as i64);
|
|
}
|
|
files.push(file);
|
|
}
|
|
|
|
if let Ok(resp) = http::get_peers(own_peer_id, self.session.port, &metainfo, 0, 0, 0) {
|
|
self.torrents.insert(info_hash, SessionTorrent::new(metainfo, files));
|
|
|
|
for &peer in resp.peers.iter() {
|
|
let sender = self.sender.clone();
|
|
|
|
thread::spawn(move || {
|
|
if let Ok((mut sock, sock_clone, peer_id)) = PeerConnection::open_connection(peer, info_hash, own_peer_id) {
|
|
let _ = sender.send(Signal::ConnectionOpened {
|
|
info_hash: info_hash,
|
|
peer_id: peer_id,
|
|
sock: sock_clone,
|
|
});
|
|
|
|
while let Ok(packet) = sock.read_packet() {
|
|
let _ = sender.send(Signal::Packet {
|
|
info_hash: info_hash,
|
|
peer_id: peer_id,
|
|
packet: packet,
|
|
});
|
|
}
|
|
|
|
let _ = sender.send(Signal::ConnectionClosed {
|
|
info_hash: info_hash,
|
|
peer_id: peer_id,
|
|
});
|
|
}
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
fn signal_remove_torrent(&mut self, id: &Hash) {
|
|
if let Some(mut sess_torrent) = self.torrents.remove(id) {
|
|
for peer in sess_torrent.peers.values_mut() {
|
|
peer.sock.shutdown();
|
|
}
|
|
}
|
|
}
|
|
|
|
fn signal_connection_opened(&mut self, info_hash: &Hash, peer_id: &Hash, conn: PeerConnection) {
|
|
if let Some(sess_torrent) = self.torrents.get_mut(info_hash) {
|
|
let mut peer = SessionPeer::new(conn, sess_torrent.metainfo.pieces.len() as u32);
|
|
peer.sock.send_interested();
|
|
peer.am_interested = true;
|
|
sess_torrent.peers.insert(*peer_id, peer);
|
|
}
|
|
}
|
|
|
|
fn signal_connection_closed(&mut self, info_hash: &Hash, peer_id: &Hash) {
|
|
if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) {
|
|
if let Some(mut peer) = sess_torrent.peers.remove(&peer_id) {
|
|
// if !peer.peer_choking && self.seeds > 1 {
|
|
// self.seeds -= 1;
|
|
// }
|
|
}
|
|
}
|
|
}
|
|
|
|
fn signal_packet(&mut self, info_hash: Hash, peer_id: Hash, packet: Packet) {
|
|
if let Some(torrent) = self.torrents.get_mut(&info_hash) {
|
|
match packet {
|
|
Packet::Choke => {
|
|
torrent.get_peer(&peer_id).peer_choking = true;
|
|
}
|
|
Packet::Unchoke => {
|
|
self.seeds += 1;
|
|
println!("seeds {}", self.seeds);
|
|
torrent.unchoke_reply(&peer_id);
|
|
}
|
|
Packet::Interested => {
|
|
torrent.get_peer(&peer_id).peer_interested = true;
|
|
}
|
|
Packet::NotInterested => {
|
|
torrent.get_peer(&peer_id).peer_interested = false;
|
|
}
|
|
Packet::Have { index } => {
|
|
if index < torrent.metainfo.num_pieces {
|
|
torrent.get_peer(&peer_id).bitfield.set(index);
|
|
}
|
|
}
|
|
Packet::Bitfield { bitfield } => {
|
|
torrent.get_peer(&peer_id).bitfield = BitField::new(bitfield, torrent.metainfo.pieces.len() as u32);
|
|
}
|
|
Packet::Request { index, begin, length } => {
|
|
// TODO
|
|
}
|
|
Packet::Piece { index, begin, block } => {
|
|
self.bytecount += block.len() as u64;
|
|
if self.lastcount.elapsed() >= Duration::from_secs(1) {
|
|
println!("{:.2} KiB/s", self.bytecount as f64 / 1024.);
|
|
self.bytecount = 0;
|
|
self.lastcount = Instant::now();
|
|
}
|
|
torrent.piece_reply(&peer_id, index, begin, block);
|
|
}
|
|
Packet::Cancel { index, begin, length } => {
|
|
// TODO
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|