Compare commits
9 Commits
tmp
...
new_sessio
| Author | SHA1 | Date | |
|---|---|---|---|
| 448f7cc69f | |||
| b9280a28dc | |||
| c01b732162 | |||
| 29599cfb02 | |||
| 2a1c5ec40e | |||
| d336c00079 | |||
| 18d64ecebd | |||
| 3860112f21 | |||
| 9b7f5c9874 |
35
design.md
35
design.md
@@ -1,8 +1,27 @@
|
||||
* A torrent should have multiple workers (seed vs download workers?).
|
||||
* These workers work with connections, if one break they find another one.
|
||||
* The connection broker manages connections, choking/unchoking.
|
||||
* Connection are linked to a peer & their bitfield.
|
||||
* Connections need to be listened to at all times for choke/unchoke/have/timeout/interested
|
||||
* A worker takes over a connection to send/receive.
|
||||
* The connection broker opens news connections if none are available for
|
||||
the workers
|
||||
The `Session` is the main object to download torrents. It manages multiple
|
||||
threads. When a torrent is added to it, the tracker is contacted to get a
|
||||
list of peers. Then, a thread is spawned for each peer, and a connection to
|
||||
the peer is opened.
|
||||
|
||||
The connections read packets and send them to the network thread via a channel.
|
||||
The network thread manages the choking and unchoking of connections and the
|
||||
scheduling of fragments. Fragments are 16KiB blocks that form a
|
||||
piece. For instance if a piece is of 512KiB, there are 32 fragments for that
|
||||
piece. When a connection is unchoked, the network thread will schedule
|
||||
fragments on this connection. When the fragment is received back, the
|
||||
network thread assembles the piece from the various fragments. When a piece
|
||||
is complete, it writes it to disk.
|
||||
|
||||
The scheduling algorithm is very simple at the moment. It tries to minimize
|
||||
the number of pieces that are being downloaded. Every fragment is kept
|
||||
track of. It has a status, Available (no peer connection is downloading
|
||||
this fragment), Taken(a peer is downloading this fragment) or Complete (this
|
||||
fragment has been received). The Taken status also has a timestamp. When
|
||||
looking for a fragment to schedule, the algorithm looks for fragments that
|
||||
are available or that have been taken, but ran out of time (5s). It also
|
||||
takes into account the peer's bitfield to find a piece the peer has. If no
|
||||
fragment matches these criterias, a new piece is fragmented.
|
||||
|
||||
_Note that there could be an explosion in the number of pieces that are being
|
||||
downloaded if the peers have very incomplete bitfields. In the future the
|
||||
algorithm will limit the number of pieces that are being downloaded._
|
||||
|
||||
4
docs.md
4
docs.md
@@ -5,5 +5,7 @@
|
||||
* [ ] udp [spec](http://www.bittorrent.org/beps/bep_0015.html)
|
||||
* [x] http [spec](http://www.bittorrent.org/beps/bep_0003.html#trackers)
|
||||
* Metainfo
|
||||
* [ ] torrent files [spec](https://wiki.theory.org/BitTorrentSpecification#Metainfo_File_Structure)
|
||||
* [x] torrent files [spec](https://wiki.theory.org/BitTorrentSpecification#Metainfo_File_Structure)
|
||||
* [ ] magnet links
|
||||
* [x] Peer wire protocol
|
||||
* [ ] uTP
|
||||
|
||||
@@ -107,6 +107,15 @@ pub struct MetainfoFile {
|
||||
pub path: PathBuf,
|
||||
}
|
||||
|
||||
impl MetainfoFile {
|
||||
pub fn new(length: u64, path: PathBuf) -> Self {
|
||||
MetainfoFile {
|
||||
length: length,
|
||||
path: path,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn sha1(bytes: &[u8]) -> Hash {
|
||||
let mut hasher = Sha1::new();
|
||||
hasher.update(bytes);
|
||||
|
||||
248
src/net/_session/disk.rs
Normal file
248
src/net/_session/disk.rs
Normal file
@@ -0,0 +1,248 @@
|
||||
use std::cmp;
|
||||
use std::fs::File;
|
||||
use std::io::{self, Read, Seek, SeekFrom, Write};
|
||||
use std::sync::Arc;
|
||||
|
||||
use net::_session::map::Map;
|
||||
use metainfo::{Hash, Metainfo};
|
||||
|
||||
pub struct DiskManager<F> where F: Read + Seek + Write {
|
||||
torrents: Map<Hash, Torrent<F>>,
|
||||
}
|
||||
|
||||
impl<F> DiskManager<F> where F: Read + Seek + Write {
|
||||
|
||||
pub fn write_piece(&mut self, info_hash: &Hash, index: u32, mut piece: Vec<u8>) -> io::Result<()> {
|
||||
let torrent = &mut self.torrents[info_hash];
|
||||
|
||||
torrent.iter_files_for_piece(index, &mut piece, |file, seek, buf| {
|
||||
file.seek(SeekFrom::Start(seek))?;
|
||||
file.write_all(&buf)?;
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn read_piece(&mut self, info_hash: &Hash, index: u32) -> io::Result<Vec<u8>> {
|
||||
let torrent = &mut self.torrents[info_hash];
|
||||
|
||||
let len = torrent.metainfo.piece_length as usize;
|
||||
let mut piece = Vec::with_capacity(len);
|
||||
unsafe {
|
||||
piece.set_len(len);
|
||||
}
|
||||
|
||||
torrent.iter_files_for_piece(index, &mut piece, |file, seek, buf| {
|
||||
file.seek(SeekFrom::Start(seek))?;
|
||||
file.read_exact(buf)?;
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
Ok(piece)
|
||||
}
|
||||
}
|
||||
|
||||
impl DiskManager<File> {
|
||||
pub fn add_torrent(&mut self, metainfo: Arc<Metainfo>) -> io::Result<()> {
|
||||
let mut files = Vec::new();
|
||||
for fileinfo in metainfo.files.iter() {
|
||||
let file = File::create(&fileinfo.path)?;
|
||||
file.set_len(fileinfo.length)?;
|
||||
files.push(file);
|
||||
}
|
||||
self.torrents.insert(metainfo.info_hash, Torrent {
|
||||
files: files,
|
||||
metainfo: metainfo,
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Torrent<F> where F: Read + Seek + Write {
|
||||
metainfo: Arc<Metainfo>,
|
||||
files: Vec<F>,
|
||||
}
|
||||
|
||||
impl<F> Torrent<F> where F: Read + Seek + Write {
|
||||
// This function iterates over the files that are spanned by a piece.
|
||||
fn iter_files_for_piece<C>(&mut self, index: u32, piece: &mut [u8], mut callback: C) -> io::Result<()>
|
||||
where C: FnMut(&mut F, u64, &mut [u8]) -> io::Result<()>
|
||||
{
|
||||
let buffer_len = piece.len();
|
||||
let mut buffer_pos: usize = 0;
|
||||
// All the files in a torrent can be seen as a single contiguous file. These
|
||||
// values are relative to this contiguous file.
|
||||
let mut abs_pos = index as u64 * self.metainfo.piece_length as u64;
|
||||
let mut abs_start = 0;
|
||||
let mut abs_end = 0;
|
||||
|
||||
for (idx, file) in self.metainfo.files.iter().enumerate() {
|
||||
abs_end += file.length;
|
||||
|
||||
if abs_start <= abs_pos && abs_pos < abs_end {
|
||||
// The amount of bytes to write is the remaining number of bytes in the buffer,
|
||||
// or the size of the file, whichever is smaller.
|
||||
let remaining = (buffer_len - buffer_pos) as u64;
|
||||
let write_len = cmp::min(remaining, abs_end - abs_pos) as usize;
|
||||
|
||||
// Seek in the file. The position given is relative to the start of the file.
|
||||
callback(&mut self.files[idx], abs_pos - abs_start, &mut piece[buffer_pos..buffer_pos + write_len])?;
|
||||
|
||||
abs_pos += write_len as u64;
|
||||
buffer_pos += write_len;
|
||||
|
||||
// If the buffer position if equal to the length, we're done writing.
|
||||
if buffer_pos == buffer_len {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// The start of the next file is the end of this one.
|
||||
abs_start = abs_end;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use std::io::{self, Read, Seek, SeekFrom, Write};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use net::_session::map::Map;
|
||||
use metainfo::{Hash, Metainfo, MetainfoFile};
|
||||
|
||||
struct MockFile {
|
||||
seek: SeekFrom,
|
||||
read_len: usize,
|
||||
write_len: usize,
|
||||
}
|
||||
|
||||
impl MockFile {
|
||||
pub fn new() -> Self {
|
||||
MockFile {
|
||||
seek: SeekFrom::Start(0),
|
||||
read_len: 0,
|
||||
write_len: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for MockFile {
|
||||
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
||||
self.read_len = buf.len();
|
||||
Ok(buf.len())
|
||||
}
|
||||
}
|
||||
|
||||
impl Seek for MockFile {
|
||||
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
|
||||
self.seek = pos;
|
||||
Ok(0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Write for MockFile {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.write_len = buf.len();
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn build_dummy() -> (DiskManager<MockFile>, Hash) {
|
||||
let info_hash = Hash::new([1u8; 20]);
|
||||
|
||||
let metainfo = Metainfo {
|
||||
announce: "".into(),
|
||||
announce_list: vec![],
|
||||
files: vec![MetainfoFile::new(12, PathBuf::new()),
|
||||
MetainfoFile::new(6, PathBuf::new()),
|
||||
MetainfoFile::new(2, PathBuf::new())],
|
||||
info_hash: info_hash,
|
||||
piece_length: 10,
|
||||
num_pieces: 2,
|
||||
pieces: vec![],
|
||||
length: 20,
|
||||
};
|
||||
|
||||
let mut torrents = Map::new();
|
||||
torrents.insert(info_hash, Torrent {
|
||||
metainfo: Arc::new(metainfo),
|
||||
files: vec![MockFile::new(), MockFile::new(), MockFile::new()],
|
||||
});
|
||||
|
||||
let dm = DiskManager {
|
||||
torrents: torrents,
|
||||
};
|
||||
|
||||
(dm, info_hash)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_read_piece() {
|
||||
let (mut dm, info_hash) = build_dummy();
|
||||
|
||||
{
|
||||
// Read a piece in a single file that is larger than the piece.
|
||||
dm.read_piece(&info_hash, 0).unwrap();
|
||||
let torrent = &dm.torrents[&info_hash];
|
||||
|
||||
assert_eq!(torrent.files[0].seek, SeekFrom::Start(0));
|
||||
assert_eq!(torrent.files[0].read_len, 10);
|
||||
}
|
||||
|
||||
{
|
||||
// Read a piece over multiple files that are smaller that a piece.
|
||||
dm.read_piece(&info_hash, 1).unwrap();
|
||||
let torrent = &dm.torrents[&info_hash];
|
||||
|
||||
assert_eq!(torrent.files[0].seek, SeekFrom::Start(10));
|
||||
assert_eq!(torrent.files[0].read_len, 2);
|
||||
|
||||
assert_eq!(torrent.files[1].seek, SeekFrom::Start(0));
|
||||
assert_eq!(torrent.files[1].read_len, 6);
|
||||
|
||||
assert_eq!(torrent.files[2].seek, SeekFrom::Start(0));
|
||||
assert_eq!(torrent.files[2].read_len, 2);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_write_piece() {
|
||||
let (mut dm, info_hash) = build_dummy();
|
||||
|
||||
{
|
||||
// Write a piece in a single file that is larger than the piece.
|
||||
dm.write_piece(&info_hash, 0, vec![2u8; 10]).unwrap();
|
||||
let torrent = &dm.torrents[&info_hash];
|
||||
|
||||
assert_eq!(torrent.files[0].seek, SeekFrom::Start(0));
|
||||
assert_eq!(torrent.files[0].write_len, 10);
|
||||
}
|
||||
|
||||
|
||||
{
|
||||
// Write a piece over multiple files that are smaller that a piece.
|
||||
dm.write_piece(&info_hash, 1, vec![2u8; 10]).unwrap();
|
||||
let torrent = &dm.torrents[&info_hash];
|
||||
|
||||
assert_eq!(torrent.files[0].seek, SeekFrom::Start(10));
|
||||
assert_eq!(torrent.files[0].write_len, 2);
|
||||
|
||||
assert_eq!(torrent.files[1].seek, SeekFrom::Start(0));
|
||||
assert_eq!(torrent.files[1].write_len, 6);
|
||||
|
||||
assert_eq!(torrent.files[2].seek, SeekFrom::Start(0));
|
||||
assert_eq!(torrent.files[2].write_len, 2);
|
||||
}
|
||||
}
|
||||
}
|
||||
34
src/net/_session/map.rs
Normal file
34
src/net/_session/map.rs
Normal file
@@ -0,0 +1,34 @@
|
||||
use std::borrow::Borrow;
|
||||
use std::collections::HashMap;
|
||||
use std::hash::Hash;
|
||||
use std::ops::{Deref, DerefMut, Index, IndexMut};
|
||||
|
||||
pub struct Map<K, V>(HashMap<K, V>);
|
||||
|
||||
impl<K, V> Deref for Map<K, V> {
|
||||
type Target = HashMap<K, V>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, V> DerefMut for Map<K, V> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, K, V> Index<I> for Map<K, V> where I: Borrow<K>, K: Eq + Hash {
|
||||
type Output = V;
|
||||
|
||||
fn index(&self, index: I) -> &V {
|
||||
self.0.get(index.borrow()).expect("index not found")
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, K, V> IndexMut<I> for Map<K, V> where I: Borrow<K>, K: Eq + Hash {
|
||||
fn index_mut(&mut self, index: I) -> &mut V {
|
||||
self.0.get_mut(index.borrow()).expect("index not found")
|
||||
}
|
||||
}
|
||||
2
src/net/_session/mod.rs
Normal file
2
src/net/_session/mod.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
pub mod disk;
|
||||
pub mod map;
|
||||
@@ -3,13 +3,16 @@ use std::fmt;
|
||||
pub struct BitField {
|
||||
bits: Vec<u8>,
|
||||
len: u32,
|
||||
num_set: u32,
|
||||
}
|
||||
|
||||
impl BitField {
|
||||
pub fn new(bits: Vec<u8>, len: u32) -> BitField {
|
||||
let num_set = bits.iter().map(|b| b.count_ones()).sum::<u32>();
|
||||
BitField {
|
||||
bits: bits,
|
||||
len: len,
|
||||
num_set: num_set,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,38 +20,58 @@ impl BitField {
|
||||
BitField {
|
||||
bits: vec![0u8; f64::ceil(len as f64 / 8.) as usize],
|
||||
len: len,
|
||||
num_set: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn calc_positions(&self, index: u32) -> (u32, u32) {
|
||||
(index / 8, index % 8)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn len(&self) -> u32 {
|
||||
self.len
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn num_set(&self) -> u32 {
|
||||
self.num_set
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn is_set(&self, index: u32) -> bool {
|
||||
let idx = index / 8;
|
||||
let offset = index % 8;
|
||||
let (idx, offset) = self.calc_positions(index);
|
||||
(self.bits[idx as usize] & (1 << 7 - offset)) != 0
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn set(&mut self, index: u32) {
|
||||
let idx = index / 8;
|
||||
let offset = index % 8;
|
||||
if !self.is_set(index) {
|
||||
let (idx, offset) = self.calc_positions(index);
|
||||
self.bits[idx as usize] |= 1 << 7 - offset;
|
||||
self.num_set += 1;
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn unset(&mut self, index: u32) {
|
||||
let idx = index / 8;
|
||||
let offset = index % 8;
|
||||
if self.is_set(index) {
|
||||
let (idx, offset) = self.calc_positions(index);
|
||||
self.bits[idx as usize] ^= 1 << 7 - offset;
|
||||
self.num_set -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn as_bytes(&self) -> &[u8] {
|
||||
&self.bits
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn set_ratio(&self) -> f64 {
|
||||
self.num_set as f64 / self.len as f64
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for BitField {
|
||||
@@ -80,6 +103,8 @@ fn test_bitfield_is_set() {
|
||||
assert!(bf.is_set(0xd));
|
||||
assert!(bf.is_set(0xe));
|
||||
assert!(bf.is_set(0xf));
|
||||
|
||||
assert_eq!(bf.num_set(), 12);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -93,6 +118,8 @@ fn test_bitfield_set() {
|
||||
assert_eq!(bf.bits[1], 128);
|
||||
bf.set(15);
|
||||
assert_eq!(bf.bits[1], 129);
|
||||
|
||||
assert_eq!(bf.num_set(), 4);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -106,4 +133,13 @@ fn test_bitfield_unset() {
|
||||
assert_eq!(bf.bits[1], 255-128);
|
||||
bf.unset(15);
|
||||
assert_eq!(bf.bits[1], 255-128-1);
|
||||
|
||||
assert_eq!(bf.num_set(), 12);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bitfield_set_ratio() {
|
||||
use std::f64;
|
||||
let mut bf = BitField::new(vec![255, 255], 16);
|
||||
assert!(f64::abs(bf.set_ratio() - 1.0) < f64::EPSILON);
|
||||
}
|
||||
|
||||
68
src/net/buffers.rs
Normal file
68
src/net/buffers.rs
Normal file
@@ -0,0 +1,68 @@
|
||||
use std::io::{self, Read};
|
||||
use std::ptr;
|
||||
|
||||
use sha1::Sha1;
|
||||
|
||||
use metainfo::Hash;
|
||||
|
||||
/// Allocate a new buffer of the given size and read_exact into it.
|
||||
pub fn read_exact<R>(mut reader: R, size: usize) -> io::Result<Vec<u8>> where R: Read {
|
||||
let mut buf = Vec::with_capacity(size);
|
||||
unsafe {
|
||||
buf.set_len(size);
|
||||
}
|
||||
reader.read_exact(&mut buf)?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
pub struct PieceBuffer {
|
||||
fragment_count: u32,
|
||||
num_fragments: u32,
|
||||
buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl PieceBuffer {
|
||||
pub fn new(num_fragments: u32, piece_length: u32) -> Self {
|
||||
let mut buffer = Vec::with_capacity(piece_length as usize);
|
||||
unsafe {
|
||||
buffer.set_len(piece_length as usize);
|
||||
}
|
||||
PieceBuffer {
|
||||
fragment_count: 0,
|
||||
num_fragments: num_fragments,
|
||||
buffer: buffer,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_fragment(&mut self, begin: u32, fragment: &[u8]) {
|
||||
let begin = begin as usize;
|
||||
assert!(begin + fragment.len() <= self.buffer.len());
|
||||
unsafe {
|
||||
ptr::copy_nonoverlapping(fragment.as_ptr(), self.buffer[begin..].as_mut_ptr(), fragment.len());
|
||||
}
|
||||
self.fragment_count += 1;
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn complete(&self) -> bool {
|
||||
self.fragment_count == self.num_fragments
|
||||
}
|
||||
|
||||
pub fn matches_hash(&self, info_hash: &Hash) -> bool {
|
||||
if !self.complete() {
|
||||
panic!("trying to hash piece buffer before it's complete");
|
||||
} else {
|
||||
let mut m = Sha1::new();
|
||||
m.update(&self.buffer);
|
||||
m.digest().bytes() == &info_hash[..]
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(self) -> Vec<u8> {
|
||||
if !self.complete() {
|
||||
panic!("trying to get piece buffer before it's complete");
|
||||
} else {
|
||||
self.buffer
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,5 @@
|
||||
pub mod bitfield;
|
||||
mod buffers;
|
||||
pub mod peer;
|
||||
pub mod session;
|
||||
pub mod _session;
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::net::{Shutdown, TcpStream};
|
||||
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
||||
|
||||
use metainfo::Hash;
|
||||
use net::buffers;
|
||||
use tracker::Peer;
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -68,11 +69,8 @@ impl PeerConnection {
|
||||
},
|
||||
5 => {
|
||||
let size = len as usize - 1;
|
||||
let mut bitfield = Vec::with_capacity(size);
|
||||
unsafe {
|
||||
bitfield.set_len(size);
|
||||
}
|
||||
self.sock.read_exact(&mut bitfield)?;
|
||||
let bitfield = buffers::read_exact(&mut self.sock, size)?;
|
||||
|
||||
Packet::Bitfield {
|
||||
bitfield: bitfield,
|
||||
}
|
||||
@@ -86,11 +84,7 @@ impl PeerConnection {
|
||||
let size = len as usize - 9;
|
||||
let index = self.sock.read_u32::<BigEndian>()?;
|
||||
let begin = self.sock.read_u32::<BigEndian>()?;
|
||||
let mut block = Vec::with_capacity(size);
|
||||
unsafe {
|
||||
block.set_len(size);
|
||||
}
|
||||
self.sock.read_exact(&mut block)?;
|
||||
let block = buffers::read_exact(&mut self.sock, size)?;
|
||||
|
||||
Packet::Piece {
|
||||
index: index,
|
||||
|
||||
@@ -2,7 +2,6 @@ use std::cmp;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fs::File;
|
||||
use std::io::{Seek, SeekFrom, Write};
|
||||
use std::net::{Shutdown, TcpStream};
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::sync::Arc;
|
||||
use std::sync::mpsc::{self, Receiver, Sender};
|
||||
@@ -10,10 +9,10 @@ use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use libc;
|
||||
use sha1::Sha1;
|
||||
|
||||
use metainfo::{Hash, Metainfo};
|
||||
use net::bitfield::BitField;
|
||||
use net::buffers::PieceBuffer;
|
||||
use net::peer::{Packet, PeerConnection};
|
||||
use tracker::http;
|
||||
|
||||
@@ -50,6 +49,8 @@ struct InnerSession {
|
||||
port: u16,
|
||||
}
|
||||
|
||||
// == Session ==
|
||||
|
||||
pub struct Session {
|
||||
// inner: Arc<InnerSession>,
|
||||
sender: Sender<Signal>,
|
||||
@@ -96,6 +97,8 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
// == SessionPeer ==
|
||||
|
||||
struct SessionPeer {
|
||||
sock: PeerConnection,
|
||||
bitfield: BitField,
|
||||
@@ -118,6 +121,8 @@ impl SessionPeer {
|
||||
}
|
||||
}
|
||||
|
||||
// == FragmentStatus ==
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
enum FragmentStatus {
|
||||
Available,
|
||||
@@ -125,19 +130,23 @@ enum FragmentStatus {
|
||||
Taken(Instant),
|
||||
}
|
||||
|
||||
// == SessionFragment ==
|
||||
|
||||
pub struct SessionFragment {
|
||||
begin: u32,
|
||||
length: u32,
|
||||
status: FragmentStatus,
|
||||
}
|
||||
|
||||
// == SessionPiece ==
|
||||
|
||||
struct SessionPiece {
|
||||
fragments: BTreeMap<u32, SessionFragment>,
|
||||
buffer: Vec<u8>,
|
||||
num_fragments: u32,
|
||||
total_fragments: u32,
|
||||
buffer: PieceBuffer,
|
||||
}
|
||||
|
||||
// == SessionTorrent ==
|
||||
|
||||
struct SessionTorrent {
|
||||
metainfo: Metainfo,
|
||||
own_bitfield: BitField,
|
||||
@@ -198,9 +207,9 @@ impl SessionTorrent {
|
||||
for index in (0..self.own_bitfield.len()).filter(|index| !keys.contains(index)) {
|
||||
if !self.own_bitfield.is_set(index) && peer.bitfield.is_set(index) {
|
||||
|
||||
let total_fragments = f64::ceil(self.metainfo.piece_length as f64 / FRAGMENT_SIZE as f64) as u32;
|
||||
let num_fragments = f64::ceil(self.metainfo.piece_length as f64 / FRAGMENT_SIZE as f64) as u32;
|
||||
let mut fragments = BTreeMap::new();
|
||||
for idx in 0..total_fragments {
|
||||
for idx in 0..num_fragments {
|
||||
let begin = idx * FRAGMENT_SIZE;
|
||||
|
||||
fragments.insert(begin ,SessionFragment {
|
||||
@@ -219,15 +228,9 @@ impl SessionTorrent {
|
||||
length = taken.length;
|
||||
}
|
||||
|
||||
let mut buffer = Vec::with_capacity(self.metainfo.piece_length as usize);
|
||||
unsafe {
|
||||
buffer.set_len(self.metainfo.piece_length as usize);
|
||||
}
|
||||
self.pieces.insert(index, SessionPiece {
|
||||
fragments: fragments,
|
||||
buffer: buffer,
|
||||
num_fragments: 0,
|
||||
total_fragments: total_fragments,
|
||||
buffer: PieceBuffer::new(num_fragments, self.metainfo.piece_length),
|
||||
});
|
||||
|
||||
return Some((index, begin, length));
|
||||
@@ -247,7 +250,7 @@ impl SessionTorrent {
|
||||
}
|
||||
}
|
||||
|
||||
fn write_piece(&mut self, index: u32) {
|
||||
fn write_piece(&mut self, index: u32, buffer: Vec<u8>) {
|
||||
let mut start = 0;
|
||||
let mut end = 0;
|
||||
let mut pos: u64 = 0;
|
||||
@@ -260,7 +263,7 @@ impl SessionTorrent {
|
||||
if write < end {
|
||||
let len = cmp::min(remaining, fileinfo.length as u64);
|
||||
self.files[id].seek(SeekFrom::Start(write - start));
|
||||
self.files[id].write_all(&self.pieces[&index].buffer[pos as usize..(pos+len) as usize]);
|
||||
self.files[id].write_all(&buffer[pos as usize..(pos+len) as usize]);
|
||||
write += len;
|
||||
pos += len;
|
||||
remaining -= len;
|
||||
@@ -281,41 +284,46 @@ impl SessionTorrent {
|
||||
|
||||
fn piece_reply(&mut self, peer_id: &Hash, index: u32, begin: u32, block: Vec<u8>) {
|
||||
let mut remove = false;
|
||||
let mut reset = false;
|
||||
{
|
||||
if let Some(piece) = self.pieces.get_mut(&index) {
|
||||
if let Some(fragment) = piece.fragments.get_mut(&begin) {
|
||||
fragment.status = FragmentStatus::Complete;
|
||||
|
||||
piece.buffer[begin as usize..(begin as usize)+block.len()].copy_from_slice(&block);
|
||||
piece.num_fragments += 1;
|
||||
if piece.num_fragments == piece.total_fragments {
|
||||
// TODO check hash
|
||||
piece.buffer.add_fragment(begin, &block);
|
||||
if piece.buffer.complete() {
|
||||
println!("piece is done {}", index);
|
||||
let mut m = Sha1::new();
|
||||
m.update(&piece.buffer);
|
||||
if m.digest().bytes() == &self.metainfo.pieces[index as usize][..] {
|
||||
if piece.buffer.matches_hash(&self.metainfo.pieces[index as usize]) {
|
||||
self.own_bitfield.set(index);
|
||||
println!("it's a match!");
|
||||
remove = true;
|
||||
} else {
|
||||
reset = true;
|
||||
println!("no match");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!("could not find fragment {}", begin);
|
||||
}
|
||||
if reset {
|
||||
for fragment in piece.fragments.values_mut() {
|
||||
fragment.status = FragmentStatus::Available;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!("could not find piece {}", index);
|
||||
}
|
||||
}
|
||||
if remove {
|
||||
self.write_piece(index);
|
||||
self.pieces.remove(&index);
|
||||
let piece = self.pieces.remove(&index).expect("told to remove piece that doesn't exist");
|
||||
self.write_piece(index, piece.buffer.get());
|
||||
}
|
||||
self.requeue(peer_id);
|
||||
}
|
||||
}
|
||||
|
||||
// == SessionNetworkThread ==
|
||||
|
||||
struct SessionNetworkThread {
|
||||
session: Arc<InnerSession>,
|
||||
sender: Sender<Signal>,
|
||||
@@ -326,34 +334,13 @@ struct SessionNetworkThread {
|
||||
}
|
||||
|
||||
impl SessionNetworkThread {
|
||||
pub fn run(mut self, input: Receiver<Signal>) {
|
||||
for signal in input.iter() {
|
||||
pub fn run(mut self, signals: Receiver<Signal>) {
|
||||
for signal in signals.iter() {
|
||||
match signal {
|
||||
Signal::AddTorrent(torrent) => self.signal_add_torrent(torrent),
|
||||
Signal::RemoveTorrent(id) => {
|
||||
if let Some(mut sess_torrent) = self.torrents.remove(&id) {
|
||||
for peer in sess_torrent.peers.values_mut() {
|
||||
peer.sock.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
Signal::ConnectionOpened { info_hash, peer_id, sock } => {
|
||||
if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) {
|
||||
let mut peer = SessionPeer::new(sock, sess_torrent.metainfo.pieces.len() as u32);
|
||||
peer.sock.send_interested();
|
||||
peer.am_interested = true;
|
||||
sess_torrent.peers.insert(peer_id, peer);
|
||||
}
|
||||
}
|
||||
Signal::ConnectionClosed { info_hash, peer_id } => {
|
||||
if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) {
|
||||
if let Some(mut peer) = sess_torrent.peers.remove(&peer_id) {
|
||||
// if !peer.peer_choking && self.seeds > 1 {
|
||||
// self.seeds -= 1;
|
||||
// }
|
||||
}
|
||||
}
|
||||
}
|
||||
Signal::RemoveTorrent(id) => self.signal_remove_torrent(&id),
|
||||
Signal::ConnectionOpened { info_hash, peer_id, sock } => self.signal_connection_opened(&info_hash, &peer_id, sock),
|
||||
Signal::ConnectionClosed { info_hash, peer_id } => self.signal_connection_closed(&info_hash, &peer_id),
|
||||
Signal::Packet { info_hash, peer_id, packet } => self.signal_packet(info_hash, peer_id, packet),
|
||||
Signal::Stop => break,
|
||||
}
|
||||
@@ -405,6 +392,33 @@ impl SessionNetworkThread {
|
||||
}
|
||||
}
|
||||
|
||||
fn signal_remove_torrent(&mut self, id: &Hash) {
|
||||
if let Some(mut sess_torrent) = self.torrents.remove(id) {
|
||||
for peer in sess_torrent.peers.values_mut() {
|
||||
peer.sock.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn signal_connection_opened(&mut self, info_hash: &Hash, peer_id: &Hash, conn: PeerConnection) {
|
||||
if let Some(sess_torrent) = self.torrents.get_mut(info_hash) {
|
||||
let mut peer = SessionPeer::new(conn, sess_torrent.metainfo.pieces.len() as u32);
|
||||
peer.sock.send_interested();
|
||||
peer.am_interested = true;
|
||||
sess_torrent.peers.insert(*peer_id, peer);
|
||||
}
|
||||
}
|
||||
|
||||
fn signal_connection_closed(&mut self, info_hash: &Hash, peer_id: &Hash) {
|
||||
if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) {
|
||||
if let Some(mut peer) = sess_torrent.peers.remove(&peer_id) {
|
||||
// if !peer.peer_choking && self.seeds > 1 {
|
||||
// self.seeds -= 1;
|
||||
// }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn signal_packet(&mut self, info_hash: Hash, peer_id: Hash, packet: Packet) {
|
||||
if let Some(torrent) = self.torrents.get_mut(&info_hash) {
|
||||
match packet {
|
||||
|
||||
Reference in New Issue
Block a user