10 Commits
tmp ... master

28 changed files with 1337 additions and 1045 deletions

View File

@@ -5,9 +5,15 @@ authors = ["Simon Bernier St-Pierre <sbernierstpierre@gmail.com>"]
[dependencies] [dependencies]
byteorder = "0.5" byteorder = "0.5"
libc = "0.2" failure = "0.1"
rand = "0.3"
reqwest = "0.8"
serde = "1.0"
serde_bencode = "0.2.0"
serde_derive = "1.0"
serde_bytes = "0.10"
sha1 = "0.2" sha1 = "0.2"
url = "1.2" url = "1.6"
[dependencies.hyper] [dependencies.hyper]
version = "0.9" version = "0.9"

View File

@@ -1,8 +1,27 @@
* A torrent should have multiple workers (seed vs download workers?). The `Session` is the main object to download torrents. It manages multiple
* These workers work with connections, if one break they find another one. threads. When a torrent is added to it, the tracker is contacted to get a
* The connection broker manages connections, choking/unchoking. list of peers. Then, a thread is spawned for each peer, and a connection to
* Connection are linked to a peer & their bitfield. the peer is opened.
* Connections need to be listened to at all times for choke/unchoke/have/timeout/interested
* A worker takes over a connection to send/receive. The connections read packets and send them to the network thread via a channel.
* The connection broker opens news connections if none are available for The network thread manages the choking and unchoking of connections and the
the workers scheduling of fragments. Fragments are 16KiB blocks that form a
piece. For instance if a piece is of 512KiB, there are 32 fragments for that
piece. When a connection is unchoked, the network thread will schedule
fragments on this connection. When the fragment is received back, the
network thread assembles the piece from the various fragments. When a piece
is complete, it writes it to disk.
The scheduling algorithm is very simple at the moment. It tries to minimize
the number of pieces that are being downloaded. Every fragment is kept
track of. It has a status, Available (no peer connection is downloading
this fragment), Taken(a peer is downloading this fragment) or Complete (this
fragment has been received). The Taken status also has a timestamp. When
looking for a fragment to schedule, the algorithm looks for fragments that
are available or that have been taken, but ran out of time (5s). It also
takes into account the peer's bitfield to find a piece the peer has. If no
fragment matches these criterias, a new piece is fragmented.
_Note that there could be an explosion in the number of pieces that are being
downloaded if the peers have very incomplete bitfields. In the future the
algorithm will limit the number of pieces that are being downloaded._

View File

@@ -5,5 +5,7 @@
* [ ] udp [spec](http://www.bittorrent.org/beps/bep_0015.html) * [ ] udp [spec](http://www.bittorrent.org/beps/bep_0015.html)
* [x] http [spec](http://www.bittorrent.org/beps/bep_0003.html#trackers) * [x] http [spec](http://www.bittorrent.org/beps/bep_0003.html#trackers)
* Metainfo * Metainfo
* [ ] torrent files [spec](https://wiki.theory.org/BitTorrentSpecification#Metainfo_File_Structure) * [x] torrent files [spec](https://wiki.theory.org/BitTorrentSpecification#Metainfo_File_Structure)
* [ ] magnet links * [ ] magnet links
* [x] Peer wire protocol
* [ ] uTP

View File

@@ -1,38 +0,0 @@
extern crate magnolia;
use std::env;
use std::fs::File;
use std::io::{self, Read};
use std::thread;
use std::time::Duration;
use std::str;
use magnolia::bencode::*;
use magnolia::metainfo::Metainfo;
use magnolia::tracker::http;
use magnolia::net::session::Session;
fn load_file(path: &str) -> io::Result<()> {
let mut buf = Vec::new();
let mut f = File::open(path)?;
f.read_to_end(&mut buf)?;
let obj = decode(&buf).unwrap();
let meta = Metainfo::from_bencode(obj).unwrap();
println!("{}", meta.pieces.len());
let s = Session::new();
s.add_torrent(meta);
loop {
thread::sleep(Duration::from_secs(1));
}
Ok(())
}
fn main() {
let path = env::args().nth(1).expect("need path to .torrent file");
load_file(&path).unwrap();
}

View File

@@ -0,0 +1,19 @@
extern crate failure;
extern crate magnolia;
use std::env;
use failure::Fail;
use magnolia::metainfo::Metainfo;
fn main() {
let args: Vec<String> = env::args().collect();
match Metainfo::open(&args[1]) {
Ok(meta) => println!("{:?}", meta),
Err(e) => {
for cause in e.causes() {
println!("{}", cause);
}
}
}
}

6
rustfmt.toml Normal file
View File

@@ -0,0 +1,6 @@
same_line_attributes = false
reorder_imports = true
reorder_imports_in_group = true
reorder_extern_crates = true
reorder_extern_crates_in_group = true
indent_style = "Visual"

View File

@@ -1,96 +0,0 @@
use std::ops::{Index, Range, RangeFrom, RangeFull, RangeTo};
pub struct Buffer<'b> {
inner: &'b [u8],
pos: usize,
}
impl<'b> Buffer<'b> {
pub fn new(inner: &[u8]) -> Buffer {
Buffer {
inner: inner,
pos: 0,
}
}
pub fn advance(&mut self, amt: usize) {
self.pos += amt
}
pub fn find(&self, byte: u8) -> Option<usize> {
self[..].iter().position(|&b| b == byte)
}
pub fn get(&self, idx: usize) -> Option<u8> {
self[..].get(idx).map(|&b| b)
}
pub fn pos(&self) -> usize {
self.pos
}
}
impl<'b> Index<Range<usize>> for Buffer<'b> {
type Output = [u8];
fn index(&self, r: Range<usize>) -> &[u8] {
&self.inner[self.pos + r.start..self.pos + r.end]
}
}
impl<'b> Index<RangeFrom<usize>> for Buffer<'b> {
type Output = [u8];
fn index(&self, r: RangeFrom<usize>) -> &[u8] {
&self.inner[self.pos + r.start..]
}
}
impl<'b> Index<RangeTo<usize>> for Buffer<'b> {
type Output = [u8];
fn index(&self, r: RangeTo<usize>) -> &[u8] {
&self.inner[self.pos..self.pos + r.end]
}
}
impl<'b> Index<RangeFull> for Buffer<'b> {
type Output = [u8];
fn index(&self, _: RangeFull) -> &[u8] {
&self.inner[self.pos..]
}
}
#[test]
fn test_advance() {
let mut b = Buffer::new(b"hello");
b.advance(2);
assert_eq!(b.pos(), 2);
}
#[test]
fn test_find() {
let b = Buffer::new(b"hello");
assert_eq!(b.find(b'l'), Some(2));
assert_eq!(b.find(b'a'), None);
}
#[test]
fn test_get() {
let mut b = Buffer::new(b"hello");
b.advance(2);
assert_eq!(b.get(0), Some(b'l'));
assert_eq!(b.get(1), Some(b'l'));
assert_eq!(b.get(2), Some(b'o'));
}
#[test]
fn test_range() {
let mut b = Buffer::new(b"hello");
b.advance(2);
assert_eq!(b[1..2], b"l"[..]);
assert_eq!(b[1..], b"lo"[..]);
assert_eq!(b[..2], b"ll"[..]);
assert_eq!(b[..], b"llo"[..]);
}

View File

@@ -1,145 +0,0 @@
use std::collections::BTreeMap;
use std::num::ParseIntError;
use std::str::{self, Utf8Error};
use bencode::{Bytes, Object};
use bencode::buffer::Buffer;
#[derive(Debug)]
pub struct DecodeError;
impl From<Utf8Error> for DecodeError {
fn from(_: Utf8Error) -> Self {
DecodeError
}
}
impl From<ParseIntError> for DecodeError {
fn from(_: ParseIntError) -> Self {
DecodeError
}
}
pub type DecodeResult<T> = Result<T, DecodeError>;
pub fn decode(data: &[u8]) -> DecodeResult<Object> {
let mut buf = Buffer::new(data);
decode_object(&mut buf)
}
fn decode_object(buf: &mut Buffer) -> DecodeResult<Object> {
match buf.get(0) {
Some(b'i') => {
buf.advance(1);
decode_int(buf, b'e')
},
Some(b'0' ... b'9') => {
decode_bytes(buf)
}
Some(b'l') => {
buf.advance(1);
let mut list = Vec::new();
while let Some(term) = buf.get(0) {
if term == b'e' {
buf.advance(1);
break;
}
list.push(decode_object(buf)?);
}
Ok(list.into())
}
Some(b'd') => {
buf.advance(1);
let mut dict = BTreeMap::new();
while let Some(term) = buf.get(0) {
if term == b'e' {
buf.advance(1);
break;
}
let key = _decode_bytes(buf)?;
let val = decode_object(buf)?;
dict.insert(Bytes(key), val);
}
Ok(dict.into())
}
_ => Err(DecodeError),
}
}
fn decode_int(buf: &mut Buffer, term: u8) -> DecodeResult<Object> {
_decode_int(buf, term).map(|num| Object::Int(num))
}
fn _decode_int(buf: &mut Buffer, term: u8) -> DecodeResult<i64> {
if let Some(end) = buf.find(term) {
let obj = {
let num = str::from_utf8(&buf[..end])?;
num.parse()?
};
buf.advance(end + 1);
Ok(obj)
} else {
Err(DecodeError)
}
}
fn decode_bytes(buf: &mut Buffer) -> DecodeResult<Object> {
_decode_bytes(buf).map(Into::into)
}
fn _decode_bytes(buf: &mut Buffer) -> Result<Vec<u8>, DecodeError> {
let size = _decode_int(buf, b':')? as usize;
let bytes = buf[..size].to_vec();
buf.advance(size);
Ok(bytes)
}
#[test]
fn test_int_pos() {
let mut buf = Buffer::new(b"i1337e");
buf.advance(1);
assert_eq!(decode_int(&mut buf, b'e').unwrap(), Object::Int(1337));
assert_eq!(buf.pos(), 6);
}
#[test]
fn test_int_neg() {
let mut buf = Buffer::new(b"i-1337e");
buf.advance(1);
assert_eq!(decode_int(&mut buf, b'e').unwrap(), Object::Int(-1337));
assert_eq!(buf.pos(), 7);
}
#[test]
fn test_bytes() {
let mut buf = Buffer::new(b"5:hello");
assert_eq!(decode_bytes(&mut buf).unwrap(), "hello".into());
assert_eq!(buf.pos(), 7);
}
#[test]
fn test_list() {
use bencode::List;
let mut buf = Buffer::new(b"li1ei2ei3ee");
let obj = decode_object(&mut buf).unwrap();
let list = obj.as_list().unwrap();
assert_eq!(list, &List(vec![Object::Int(1), Object::Int(2), Object::Int(3)]));
assert_eq!(buf.pos(), 11);
}
#[test]
fn test_dict() {
let mut buf = Buffer::new(b"d5:helloi1337ee");
let obj = decode_object(&mut buf).unwrap();
let dict = obj.as_dict().unwrap();
assert_eq!(dict.get_int("hello").unwrap(), 1337);
assert_eq!(buf.pos(), 15);
}

View File

@@ -1,92 +0,0 @@
use std::borrow::Cow;
use bencode::{Dict, Object};
/// Allows the encode function to take something that can be converted into an object,
/// or a reference to an object.
pub trait Encodable<'a> {
fn get_object(self) -> Cow<'a, Object>;
}
impl<'a, T> Encodable<'a> for T where T: Into<Object> {
fn get_object(self) -> Cow<'a, Object> {
Cow::Owned(self.into())
}
}
impl<'a> Encodable<'a> for &'a Object {
fn get_object(self) -> Cow<'a, Object> {
Cow::Borrowed(self)
}
}
pub fn encode<'a, O>(obj: O) -> Vec<u8> where O: Encodable<'a> {
let mut buff = Vec::new();
encode_object(&mut buff, &obj.get_object());
buff
}
fn encode_object(buff: &mut Vec<u8>, obj: &Object) {
match *obj {
Object::Int(num) => encode_int(buff, num),
Object::Bytes(ref bytes) => encode_bytes(buff, bytes),
Object::List(ref list) => encode_list(buff, list),
Object::Dict(ref dict) => encode_dict(buff, dict),
}
}
fn encode_int(buff: &mut Vec<u8>, num: i64) {
buff.extend_from_slice(format!("i{}e", num).as_bytes());
}
fn encode_bytes(buff: &mut Vec<u8>, bytes: &[u8]) {
buff.extend_from_slice(format!("{}:", bytes.len()).as_bytes());
buff.extend_from_slice(bytes);
}
fn encode_list(buff: &mut Vec<u8>, list: &[Object]) {
buff.push(b'l');
for obj in list {
encode_object(buff, obj);
}
buff.push(b'e');
}
fn encode_dict(buff: &mut Vec<u8>, dict: &Dict) {
buff.push(b'd');
for (key, val) in dict.iter() {
encode_bytes(buff, key);
encode_object(buff, val);
}
buff.push(b'e');
}
#[test]
fn test_int_pos() {
assert_eq!(encode(1337), b"i1337e");
}
#[test]
fn test_int_neg() {
assert_eq!(encode(-1337), b"i-1337e");
}
#[test]
fn test_bytes() {
assert_eq!(encode("hello"), b"5:hello");
}
#[test]
fn test_list() {
let list = vec![Object::Int(1), Object::Int(2), Object::Int(3)];
assert_eq!(encode(list), b"li1ei2ei3ee");
}
#[test]
fn test_dict() {
use bencode::Dict;
let mut dict = Dict::new();
dict.insert("hello", 1337);
assert_eq!(encode(dict), b"d5:helloi1337ee")
}

View File

@@ -1,305 +0,0 @@
mod buffer;
mod decode;
mod encode;
use std::collections::BTreeMap;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::str;
pub use self::decode::{decode, DecodeError, DecodeResult};
pub use self::encode::{encode, Encodable};
#[derive(Clone, Eq, PartialEq)]
pub enum Object {
Int(i64),
Bytes(Bytes),
List(List),
Dict(Dict),
}
impl Object {
pub fn as_int(&self) -> Option<i64> {
match *self {
Object::Int(num) => Some(num),
_ => None,
}
}
pub fn as_bytes(&self) -> Option<&Bytes> {
match *self {
Object::Bytes(ref bytes) => Some(bytes),
_ => None,
}
}
pub fn as_str(&self) -> Option<&str> {
self.as_bytes().and_then(|b| str::from_utf8(b).ok())
}
pub fn as_string(&self) -> Option<String> {
self.as_str().map(|s| s.to_string())
}
pub fn as_list(&self) -> Option<&List> {
match *self {
Object::List(ref list) => Some(list),
_ => None,
}
}
pub fn as_dict(&self) -> Option<&Dict> {
match *self {
Object::Dict(ref dict) => Some(dict),
_ => None,
}
}
}
impl fmt::Debug for Object {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Object::Int(num) => write!(f, "{}", num),
Object::Bytes(ref bytes) => bytes.fmt(f),
Object::List(ref list) => list.fmt(f),
Object::Dict(ref dict) => dict.fmt(f),
}
}
}
#[derive(Clone, Eq, Ord, PartialEq, PartialOrd)]
pub struct Bytes(Vec<u8>);
impl Bytes {
pub fn new<A>(bytes: A) -> Bytes where A: Into<Vec<u8>> {
Bytes(bytes.into())
}
pub fn str(&self) -> Option<&str> {
str::from_utf8(&self.0).ok()
}
pub fn string(&self) -> Option<String> {
self.str().map(|s| s.to_string())
}
}
impl Deref for Bytes {
type Target = Vec<u8>;
fn deref(&self) -> &Vec<u8> {
&self.0
}
}
impl DerefMut for Bytes {
fn deref_mut(&mut self) -> &mut Vec<u8> {
&mut self.0
}
}
impl fmt::Debug for Bytes {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match str::from_utf8(&self.0) {
Ok(s) => write!(f, "\"{}\"", s),
Err(_) => {
write!(f, "\"")?;
for &b in self.0.iter() {
write!(f, "{:x}", b)?;
}
write!(f, "\"")?;
Ok(())
}
}
}
}
#[derive(Clone, Eq, PartialEq)]
pub struct List(Vec<Object>);
impl List {
pub fn new() -> List {
List(Vec::new())
}
pub fn push<V>(&mut self, val: V) where V: Into<Object> {
self.0.push(val.into())
}
}
impl Deref for List {
type Target = Vec<Object>;
fn deref(&self) -> &Vec<Object> {
&self.0
}
}
impl DerefMut for List {
fn deref_mut(&mut self) -> &mut Vec<Object> {
&mut self.0
}
}
impl fmt::Debug for List {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_list().entries(self.iter()).finish()
}
}
#[derive(Clone, Eq, PartialEq)]
pub struct Dict(BTreeMap<Bytes, Object>);
impl Dict {
pub fn new() -> Dict {
Dict(BTreeMap::new())
}
pub fn insert<K, V>(&mut self, key: K, val: V) where K: Into<Bytes>, V: Into<Object> {
self.0.insert(key.into(), val.into());
}
pub fn get_object<A>(&self, key: A) -> Option<&Object> where A: Into<Bytes> {
self.0.get(&key.into())
}
pub fn get_int<A>(&self, key: A) -> Option<i64> where A: Into<Bytes> {
self.0.get(&key.into()).and_then(|o| o.as_int())
}
pub fn get_bytes<A>(&self, key: A) -> Option<&Bytes> where A: Into<Bytes> {
self.0.get(&key.into()).and_then(|o| o.as_bytes())
}
pub fn get_str<A>(&self, key: A) -> Option<&str> where A: Into<Bytes> {
self.0.get(&key.into()).and_then(|o| o.as_bytes().and_then(|b| str::from_utf8(b).ok()))
}
pub fn get_string<A>(&self, key: A) -> Option<String> where A: Into<Bytes> {
self.get_str(key).map(|s| s.to_string())
}
pub fn get_list<A>(&self, key: A) -> Option<&List> where A: Into<Bytes> {
self.0.get(&key.into()).and_then(|o| o.as_list())
}
pub fn get_dict<A>(&self, key: A) -> Option<&Dict> where A: Into<Bytes> {
self.0.get(&key.into()).and_then(|o| o.as_dict())
}
}
impl Deref for Dict {
type Target = BTreeMap<Bytes, Object>;
fn deref(&self) -> &BTreeMap<Bytes, Object> {
&self.0
}
}
impl DerefMut for Dict {
fn deref_mut(&mut self) -> &mut BTreeMap<Bytes, Object> {
&mut self.0
}
}
impl fmt::Debug for Dict {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_map()
.entries(self.iter())
.finish()
}
}
// convertion utilities
// int
impl From<i64> for Object {
fn from(from: i64) -> Self {
Object::Int(from)
}
}
// bytes
impl<'a> From<&'a [u8]> for Bytes {
fn from(from: &'a [u8]) -> Self {
Bytes(from.to_vec())
}
}
impl<'a> From<&'a str> for Bytes {
fn from(from: &'a str) -> Self {
Bytes(from.as_bytes().to_vec())
}
}
impl From<Vec<u8>> for Bytes {
fn from(from: Vec<u8>) -> Self {
Bytes(from)
}
}
impl<'a> From<&'a [u8]> for Object {
fn from(from: &'a [u8]) -> Self {
Object::Bytes(Bytes(from.to_vec()))
}
}
impl<'a> From<&'a str> for Object {
fn from(from: &'a str) -> Self {
Object::Bytes(Bytes(from.as_bytes().to_vec()))
}
}
impl From<Vec<u8>> for Object {
fn from(from: Vec<u8>) -> Self {
Object::Bytes(Bytes(from))
}
}
impl From<Bytes> for Object {
fn from(from: Bytes) -> Self {
Object::Bytes(from)
}
}
// list
impl<T> From<Vec<T>> for List where T: Into<Object> {
fn from(from: Vec<T>) -> Self {
List(from.into_iter().map(Into::into).collect())
}
}
impl<T> From<Vec<T>> for Object where T: Into<Object> {
fn from(from: Vec<T>) -> Self {
Object::List(List(from.into_iter().map(Into::into).collect()))
}
}
impl From<List> for Object {
fn from(from: List) -> Self {
Object::List(from)
}
}
// dict
impl From<BTreeMap<Bytes, Object>> for Dict {
fn from(from: BTreeMap<Bytes, Object>) -> Self {
Dict(from)
}
}
impl From<BTreeMap<Bytes, Object>> for Object {
fn from(from: BTreeMap<Bytes, Object>) -> Self {
Object::Dict(Dict(from))
}
}
impl From<Dict> for Object {
fn from(from: Dict) -> Self {
Object::Dict(from)
}
}

183
src/bitfield.rs Normal file
View File

@@ -0,0 +1,183 @@
use std::u32;
pub struct Bitfield {
inner: Vec<u32>,
len: usize,
}
impl Bitfield {
pub fn new(len: usize) -> Bitfield {
Bitfield {
inner: vec![0u32; f64::ceil(len as f64 / 32.) as usize],
len: len,
}
}
#[inline]
fn calc_size(&self, idx: usize) -> (usize, usize) {
(idx / 32, idx % 32)
}
#[inline]
fn check_index(&self, idx: usize) {
if idx >= self.len() {
panic!("index out of range");
}
}
#[inline]
pub fn len(&self) -> usize {
self.len
}
#[inline]
pub fn get(&self, idx: usize) -> bool {
self.check_index(idx);
let (index, offset) = self.calc_size(idx);
unsafe { self.inner.get_unchecked(index) & (1 << offset) != 0 }
}
#[inline]
pub fn set(&mut self, idx: usize, val: bool) {
self.check_index(idx);
let (index, offset) = self.calc_size(idx);
let word = unsafe { self.inner.get_unchecked_mut(index) };
if val {
*word |= 1 << offset;
} else {
*word &= !(1 << offset);
}
}
#[inline]
pub fn iter(&self) -> Iter {
Iter {
bitfield: self,
index: 0,
}
}
#[inline]
pub fn num_set(&self) -> usize {
self.inner
.iter()
.map(|&x| x.count_ones() as usize)
.sum::<usize>()
}
#[inline]
pub fn ratio(&self) -> f64 {
self.num_set() as f64 / self.len() as f64
}
#[inline]
pub fn all(&self) -> bool {
self.num_set() == self.len()
}
}
pub struct Iter<'a> {
bitfield: &'a Bitfield,
index: usize,
}
impl<'a> Iterator for Iter<'a> {
type Item = bool;
fn next(&mut self) -> Option<bool> {
if self.index < self.bitfield.len() {
let bit = self.bitfield.get(self.index);
self.index += 1;
Some(bit)
} else {
None
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.bitfield.len(), Some(self.bitfield.len()))
}
}
#[test]
fn test_new_len() {
let b = Bitfield::new(50);
assert_eq!(b.len, 50);
assert_eq!(b.inner.len(), 2);
assert_eq!(b.len(), 50);
// make sure every bit is set to 0
for i in 0..b.len() {
assert_eq!(b.get(i), false);
}
}
#[test]
fn test_get_set() {
let mut b = Bitfield::new(150);
b.set(0, true);
b.set(45, true);
b.set(140, true);
assert_eq!(b.get(0), true);
assert_eq!(b.get(45), true);
assert_eq!(b.get(140), true);
b.set(140, false);
assert_eq!(b.get(140), false);
}
#[test]
#[should_panic]
fn test_get_outside() {
let b = Bitfield::new(2);
b.get(2);
}
#[test]
#[should_panic]
fn test_set_outside() {
let mut b = Bitfield::new(2);
b.set(2, true);
}
#[test]
fn test_iter() {
let mut b = Bitfield::new(5);
b.set(0, true);
b.set(2, true);
b.set(4, true);
assert_eq!(
b.iter().collect::<Vec<_>>(),
vec![true, false, true, false, true]
);
}
#[test]
fn test_num_set() {
let mut b = Bitfield::new(70);
b.set(0, true);
b.set(45, true);
b.set(68, true);
assert_eq!(b.num_set(), 3);
b.set(45, false);
assert_eq!(b.num_set(), 2);
}
#[test]
fn test_ratio() {
let mut b = Bitfield::new(4);
b.set(0, true);
b.set(1, true);
assert_eq!(b.ratio(), 0.5);
}
#[test]
fn test_all() {
let mut b = Bitfield::new(2);
b.set(0, true);
assert_eq!(b.all(), false);
b.set(1, true);
assert_eq!(b.all(), true);
}

69
src/error.rs Normal file
View File

@@ -0,0 +1,69 @@
use std::io;
use failure;
use reqwest;
use serde_bencode;
use url;
#[derive(Debug, Fail)]
pub enum Error {
#[fail(display = "Bencode Error {}", _0)]
BencodeError(#[cause]
serde_bencode::Error),
#[fail(display = "Http Error {}", _0)]
HttpError(#[cause]
reqwest::Error),
#[fail(display = "IO Error {}", _0)]
IoError(#[cause]
io::Error),
#[fail(display = "Tracker Error {}", _0)]
Tracker(String),
#[fail(display = "URL Error {}", _0)]
UrlError(#[cause]
url::ParseError),
#[fail(display = "Error {}", _0)]
Custom(failure::Error),
}
impl Error {
pub fn tracker<M>(msg: M) -> Self
where M: Into<String>
{
Error::Tracker(msg.into())
}
}
impl From<serde_bencode::Error> for Error {
fn from(err: serde_bencode::Error) -> Error {
Error::BencodeError(err)
}
}
impl From<reqwest::Error> for Error {
fn from(err: reqwest::Error) -> Error {
Error::HttpError(err)
}
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error {
Error::IoError(err)
}
}
impl From<url::ParseError> for Error {
fn from(err: url::ParseError) -> Error {
Error::UrlError(err)
}
}
impl From<failure::Error> for Error {
fn from(err: failure::Error) -> Error {
Error::Custom(err)
}
}

View File

@@ -1,10 +1,21 @@
extern crate byteorder; extern crate byteorder;
extern crate hyper; #[macro_use]
extern crate libc; extern crate failure;
extern crate rand;
extern crate reqwest;
extern crate serde;
extern crate serde_bencode;
extern crate serde_bytes;
#[macro_use]
extern crate serde_derive;
extern crate sha1; extern crate sha1;
extern crate url; extern crate url;
pub mod bencode; #[macro_use]
mod macros;
pub mod bitfield;
pub mod error;
pub mod metainfo; pub mod metainfo;
pub mod net; // pub mod net;
pub mod torrent;
pub mod tracker; pub mod tracker;

23
src/macros.rs Normal file
View File

@@ -0,0 +1,23 @@
#[macro_export]
macro_rules! bail2 {
($e:expr) => {
return Err(::failure::err_msg($e).into());
};
($fmt:expr, $($arg:tt)+) => {
return Err(::failure::err_msg(format!($fmt, $($arg)+)).into());
};
}
#[macro_export]
macro_rules! ensure2 {
($cond:expr, $e:expr) => {
if !($cond) {
bail2!($e);
}
};
($cond:expr, $fmt:expr, $($arg:tt)+) => {
if !($cond) {
bail2!($fmt, $($arg)+);
}
};
}

View File

@@ -1,145 +1,130 @@
use std::fmt; use std::fmt::{self, Write};
use std::fs;
use std::io::Read;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::path::PathBuf; use std::path::Path;
use rand::{thread_rng, Rng};
use serde_bencode;
use serde_bytes::ByteBuf;
use sha1::Sha1; use sha1::Sha1;
use bencode::{encode, Object}; use error::Error;
macro_rules! ts { #[derive(Debug, Deserialize, Serialize)]
( $e:expr ) => {
match $e {
Some(x) => x,
None => return None,
}
}
}
#[derive(Debug)]
pub struct Metainfo { pub struct Metainfo {
pub info: Info,
pub announce: String, pub announce: String,
pub announce_list: Vec<Vec<String>>, #[serde(rename = "announce-list")]
pub files: Vec<MetainfoFile>, pub announce_list: Option<Vec<Vec<String>>>,
pub info_hash: Hash, #[serde(rename = "creation date")]
pub piece_length: u32, pub creation_date: Option<i64>,
pub num_pieces: u32, pub comment: Option<String>,
pub pieces: Vec<Hash>, #[serde(rename = "created by")]
pub created_by: Option<String>,
#[serde(default)]
pub encoding: Option<String>,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(untagged)]
pub enum Info {
Single {
#[serde(rename = "piece length")]
piece_length: u64,
pieces: ByteBuf,
private: Option<u8>,
name: String,
length: u64,
md5sum: Option<String>,
},
Multiple {
#[serde(rename = "piece length")]
piece_length: u64,
pieces: ByteBuf,
private: Option<bool>,
name: String,
files: Vec<File>,
},
}
#[derive(Debug, Deserialize, Serialize)]
pub struct File {
pub length: u64, pub length: u64,
pub md5sum: Option<String>,
pub path: Vec<String>,
} }
impl Metainfo { impl Metainfo {
pub fn from_bencode<O>(obj: O) -> Option<Metainfo> where O: Into<Object> { /// Create a Metainfo structure from reading a file.
let obj = obj.into(); pub fn open<P: AsRef<Path>>(path: P) -> Result<Metainfo, Error> {
let metainfo = ts!(obj.as_dict()); let mut f = fs::File::open(path.as_ref())?;
let info = ts!(metainfo.get_dict("info")); let mut buf = Vec::new();
f.read_to_end(&mut buf)?;
let announce = ts!(metainfo.get_string("announce")); let meta = serde_bencode::de::from_bytes(&buf)?;
let info_hash = sha1(&encode(ts!(metainfo.get_object("info")))); Ok(meta)
let name = ts!(info.get_str("name"));
let piece_length = ts!(info.get_int("piece length"));
let pieces = ts!(info.get_bytes("pieces"));
let mut total_length = 0;
if pieces.len() % 20 != 0 {
return None;
}
let mut announce_list = vec![];
if let Some(list) = metainfo.get_list("announce-list") {
for tier in list.iter() {
let mut urls = vec![];
for url in ts!(tier.as_list()).iter() {
urls.push(ts!(url.as_string()));
}
announce_list.push(urls);
} }
} }
let mut files = vec![];
if let Some(list) = info.get_list("files") {
for obj in list.iter() {
let file = ts!(obj.as_dict());
let mut path = PathBuf::new();
path.push(name);
for component in ts!(file.get_list("path")).iter() {
path.push(ts!(component.as_str()));
}
let length = ts!(file.get_int("length")) as u64;
total_length += length;
files.push(MetainfoFile {
length: length,
path: path,
});
}
} else {
let mut path = PathBuf::new();
path.push(name);
total_length = ts!(info.get_int("length")) as u64;
files.push(MetainfoFile{
length: total_length,
path: path,
})
}
let pieces = pieces.chunks(20).map(Hash::from_slice).collect::<Vec<_>>();
Some(Metainfo {
announce: announce,
announce_list: announce_list,
files: files,
info_hash: info_hash,
piece_length: piece_length as u32,
num_pieces: pieces.len() as u32,
pieces: pieces,
length: total_length,
})
}
}
#[derive(Debug)]
pub struct MetainfoFile {
pub length: u64,
pub path: PathBuf,
}
fn sha1(bytes: &[u8]) -> Hash {
let mut hasher = Sha1::new();
hasher.update(bytes);
Hash::new(hasher.digest().bytes())
}
#[derive(Clone, Copy, Eq, Hash, PartialEq)] #[derive(Clone, Copy, Eq, Hash, PartialEq)]
pub struct Hash([u8; 20]); pub struct Hash([u8; 20]);
impl Hash { impl Hash {
/// Create a Hash filled with zeroes.
pub fn alloc() -> Hash { pub fn alloc() -> Hash {
Hash([0u8; 20]) Hash([0u8; 20])
} }
/// Create a Hash from a 20 byte array.
pub fn new(inner: [u8; 20]) -> Hash { pub fn new(inner: [u8; 20]) -> Hash {
Hash(inner) Hash(inner)
} }
/// Create a Hash from a 20 byte slice.
///
/// # Panics
/// If the slice is not of length 20, this function panics.
pub fn from_slice(bytes: &[u8]) -> Hash { pub fn from_slice(bytes: &[u8]) -> Hash {
assert_eq!(bytes.len(), 20); assert_eq!(bytes.len(), 20);
let mut hash = Hash::alloc(); let mut hash = Hash::alloc();
hash.copy_from_slice(bytes); hash.copy_from_slice(bytes);
hash hash
} }
/// Create a Hash filled with random bytes.
pub fn random() -> Hash {
let mut hash = Hash::alloc();
thread_rng().fill_bytes(&mut hash);
hash
}
/// Compute the SHA-1 hash of the slice.
pub fn sha1(bytes: &[u8]) -> Hash {
let mut hasher = Sha1::new();
hasher.update(bytes);
Hash::new(hasher.digest().bytes())
}
/// Compute the info hash of a Metainfo structure.
pub fn info_hash(metainfo: &Metainfo) -> Hash {
let buf = serde_bencode::to_bytes(&metainfo.info).expect("unable to compute info hash");
Hash::sha1(&buf)
}
/// Create a hexadecimal representation of this Hash.
pub fn hex(&self) -> String {
let mut buf = String::with_capacity(40);
for byte in self.0.iter() {
let _ = write!(buf, "{:02x}", byte);
}
buf
}
} }
impl fmt::Debug for Hash { impl fmt::Debug for Hash {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
for &b in self.0.iter() { write!(f, "Hash({})", self.hex())
write!(f, "{:x}", b)?
}
Ok(())
} }
} }
@@ -157,68 +142,9 @@ impl DerefMut for Hash {
} }
} }
#[cfg(test)]
mod tests {
use std::path::Path;
use bencode::{Dict, List, Object};
use metainfo::Metainfo;
#[test] #[test]
fn test_single_file() { fn test_hash() {
let mut meta = Dict::new(); let metainfo = Metainfo::open("samples/debian-9.1.0-amd64-netinst.iso.torrent").unwrap();
meta.insert("announce", "http://ubuntu.com/tracker:6969"); let info_hash = Hash::info_hash(&metainfo);
assert_eq!(info_hash.hex(), "fd5fdf21aef4505451861da97aa39000ed852988");
let mut info = Dict::new();
info.insert("name", "ubuntu-16.04-desktop.iso");
info.insert("length", 1024 * 1024 * 1024);
info.insert("piece length", 1024 * 512);
info.insert("pieces", "");
meta.insert("info", info);
let metainfo = Metainfo::from_bencode(meta).unwrap();
assert_eq!(metainfo.announce, "http://ubuntu.com/tracker:6969");
assert_eq!(metainfo.piece_length, 1024 * 512);
let file = &metainfo.files[0];
assert_eq!(file.path.to_str().unwrap(), "ubuntu-16.04-desktop.iso");
assert_eq!(file.length, 1024 * 1024 * 1024);
}
#[test]
fn test_multiple_files() {
let mut meta = Dict::new();
meta.insert("announce", "http://ubuntu.com/tracker:6969");
let mut info = Dict::new();
info.insert("name", "base_folder");
info.insert("piece length", 1024 * 512);
info.insert("pieces", "");
let mut files = List::new();
let mut file1 = Dict::new();
file1.insert("path", vec!["folder2", "image.txt"]);
file1.insert("length", 512);
files.push(file1);
let mut file2 = Dict::new();
file2.insert("path", vec!["txt.jpg"]);
file2.insert("length", 64);
files.push(file2);
info.insert("files", Object::List(files));
meta.insert("info", info);
let metainfo = Metainfo::from_bencode(meta).unwrap();
assert_eq!(metainfo.announce, "http://ubuntu.com/tracker:6969");
assert_eq!(metainfo.piece_length, 1024 * 512);
let file1 = &metainfo.files[0];
assert_eq!(file1.path, Path::new("base_folder/folder2/image.txt"));
assert_eq!(file1.length, 512);
let file2 = &metainfo.files[1];
assert_eq!(file2.path, Path::new("base_folder/txt.jpg"));
assert_eq!(file2.length, 64);
}
} }

248
src/net/_session/disk.rs Normal file
View File

@@ -0,0 +1,248 @@
use std::cmp;
use std::fs::File;
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::sync::Arc;
use net::_session::map::Map;
use metainfo::{Hash, Metainfo};
pub struct DiskManager<F> where F: Read + Seek + Write {
torrents: Map<Hash, Torrent<F>>,
}
impl<F> DiskManager<F> where F: Read + Seek + Write {
pub fn write_piece(&mut self, info_hash: &Hash, index: u32, mut piece: Vec<u8>) -> io::Result<()> {
let torrent = &mut self.torrents[info_hash];
torrent.iter_files_for_piece(index, &mut piece, |file, seek, buf| {
file.seek(SeekFrom::Start(seek))?;
file.write_all(&buf)?;
Ok(())
})?;
Ok(())
}
pub fn read_piece(&mut self, info_hash: &Hash, index: u32) -> io::Result<Vec<u8>> {
let torrent = &mut self.torrents[info_hash];
let len = torrent.metainfo.piece_length as usize;
let mut piece = Vec::with_capacity(len);
unsafe {
piece.set_len(len);
}
torrent.iter_files_for_piece(index, &mut piece, |file, seek, buf| {
file.seek(SeekFrom::Start(seek))?;
file.read_exact(buf)?;
Ok(())
})?;
Ok(piece)
}
}
impl DiskManager<File> {
pub fn add_torrent(&mut self, metainfo: Arc<Metainfo>) -> io::Result<()> {
let mut files = Vec::new();
for fileinfo in metainfo.files.iter() {
let file = File::create(&fileinfo.path)?;
file.set_len(fileinfo.length)?;
files.push(file);
}
self.torrents.insert(metainfo.info_hash, Torrent {
files: files,
metainfo: metainfo,
});
Ok(())
}
}
pub struct Torrent<F> where F: Read + Seek + Write {
metainfo: Arc<Metainfo>,
files: Vec<F>,
}
impl<F> Torrent<F> where F: Read + Seek + Write {
// This function iterates over the files that are spanned by a piece.
fn iter_files_for_piece<C>(&mut self, index: u32, piece: &mut [u8], mut callback: C) -> io::Result<()>
where C: FnMut(&mut F, u64, &mut [u8]) -> io::Result<()>
{
let buffer_len = piece.len();
let mut buffer_pos: usize = 0;
// All the files in a torrent can be seen as a single contiguous file. These
// values are relative to this contiguous file.
let mut abs_pos = index as u64 * self.metainfo.piece_length as u64;
let mut abs_start = 0;
let mut abs_end = 0;
for (idx, file) in self.metainfo.files.iter().enumerate() {
abs_end += file.length;
if abs_start <= abs_pos && abs_pos < abs_end {
// The amount of bytes to write is the remaining number of bytes in the buffer,
// or the size of the file, whichever is smaller.
let remaining = (buffer_len - buffer_pos) as u64;
let write_len = cmp::min(remaining, abs_end - abs_pos) as usize;
// Seek in the file. The position given is relative to the start of the file.
callback(&mut self.files[idx], abs_pos - abs_start, &mut piece[buffer_pos..buffer_pos + write_len])?;
abs_pos += write_len as u64;
buffer_pos += write_len;
// If the buffer position if equal to the length, we're done writing.
if buffer_pos == buffer_len {
break
}
}
// The start of the next file is the end of this one.
abs_start = abs_end;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::sync::Arc;
use net::_session::map::Map;
use metainfo::{Hash, Metainfo, MetainfoFile};
struct MockFile {
seek: SeekFrom,
read_len: usize,
write_len: usize,
}
impl MockFile {
pub fn new() -> Self {
MockFile {
seek: SeekFrom::Start(0),
read_len: 0,
write_len: 0,
}
}
}
impl Read for MockFile {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.read_len = buf.len();
Ok(buf.len())
}
}
impl Seek for MockFile {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.seek = pos;
Ok(0)
}
}
impl Write for MockFile {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.write_len = buf.len();
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
fn build_dummy() -> (DiskManager<MockFile>, Hash) {
let info_hash = Hash::new([1u8; 20]);
let metainfo = Metainfo {
announce: "".into(),
announce_list: vec![],
files: vec![MetainfoFile::new(12, PathBuf::new()),
MetainfoFile::new(6, PathBuf::new()),
MetainfoFile::new(2, PathBuf::new())],
info_hash: info_hash,
piece_length: 10,
num_pieces: 2,
pieces: vec![],
length: 20,
};
let mut torrents = Map::new();
torrents.insert(info_hash, Torrent {
metainfo: Arc::new(metainfo),
files: vec![MockFile::new(), MockFile::new(), MockFile::new()],
});
let dm = DiskManager {
torrents: torrents,
};
(dm, info_hash)
}
#[test]
fn test_read_piece() {
let (mut dm, info_hash) = build_dummy();
{
// Read a piece in a single file that is larger than the piece.
dm.read_piece(&info_hash, 0).unwrap();
let torrent = &dm.torrents[&info_hash];
assert_eq!(torrent.files[0].seek, SeekFrom::Start(0));
assert_eq!(torrent.files[0].read_len, 10);
}
{
// Read a piece over multiple files that are smaller that a piece.
dm.read_piece(&info_hash, 1).unwrap();
let torrent = &dm.torrents[&info_hash];
assert_eq!(torrent.files[0].seek, SeekFrom::Start(10));
assert_eq!(torrent.files[0].read_len, 2);
assert_eq!(torrent.files[1].seek, SeekFrom::Start(0));
assert_eq!(torrent.files[1].read_len, 6);
assert_eq!(torrent.files[2].seek, SeekFrom::Start(0));
assert_eq!(torrent.files[2].read_len, 2);
}
}
#[test]
fn test_write_piece() {
let (mut dm, info_hash) = build_dummy();
{
// Write a piece in a single file that is larger than the piece.
dm.write_piece(&info_hash, 0, vec![2u8; 10]).unwrap();
let torrent = &dm.torrents[&info_hash];
assert_eq!(torrent.files[0].seek, SeekFrom::Start(0));
assert_eq!(torrent.files[0].write_len, 10);
}
{
// Write a piece over multiple files that are smaller that a piece.
dm.write_piece(&info_hash, 1, vec![2u8; 10]).unwrap();
let torrent = &dm.torrents[&info_hash];
assert_eq!(torrent.files[0].seek, SeekFrom::Start(10));
assert_eq!(torrent.files[0].write_len, 2);
assert_eq!(torrent.files[1].seek, SeekFrom::Start(0));
assert_eq!(torrent.files[1].write_len, 6);
assert_eq!(torrent.files[2].seek, SeekFrom::Start(0));
assert_eq!(torrent.files[2].write_len, 2);
}
}
}

34
src/net/_session/map.rs Normal file
View File

@@ -0,0 +1,34 @@
use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::Hash;
use std::ops::{Deref, DerefMut, Index, IndexMut};
pub struct Map<K, V>(HashMap<K, V>);
impl<K, V> Deref for Map<K, V> {
type Target = HashMap<K, V>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<K, V> DerefMut for Map<K, V> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<I, K, V> Index<I> for Map<K, V> where I: Borrow<K>, K: Eq + Hash {
type Output = V;
fn index(&self, index: I) -> &V {
self.0.get(index.borrow()).expect("index not found")
}
}
impl<I, K, V> IndexMut<I> for Map<K, V> where I: Borrow<K>, K: Eq + Hash {
fn index_mut(&mut self, index: I) -> &mut V {
self.0.get_mut(index.borrow()).expect("index not found")
}
}

2
src/net/_session/mod.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod disk;
pub mod map;

View File

@@ -3,13 +3,16 @@ use std::fmt;
pub struct BitField { pub struct BitField {
bits: Vec<u8>, bits: Vec<u8>,
len: u32, len: u32,
num_set: u32,
} }
impl BitField { impl BitField {
pub fn new(bits: Vec<u8>, len: u32) -> BitField { pub fn new(bits: Vec<u8>, len: u32) -> BitField {
let num_set = bits.iter().map(|b| b.count_ones()).sum::<u32>();
BitField { BitField {
bits: bits, bits: bits,
len: len, len: len,
num_set: num_set,
} }
} }
@@ -17,38 +20,58 @@ impl BitField {
BitField { BitField {
bits: vec![0u8; f64::ceil(len as f64 / 8.) as usize], bits: vec![0u8; f64::ceil(len as f64 / 8.) as usize],
len: len, len: len,
num_set: 0,
} }
} }
#[inline]
fn calc_positions(&self, index: u32) -> (u32, u32) {
(index / 8, index % 8)
}
#[inline] #[inline]
pub fn len(&self) -> u32 { pub fn len(&self) -> u32 {
self.len self.len
} }
#[inline]
pub fn num_set(&self) -> u32 {
self.num_set
}
#[inline] #[inline]
pub fn is_set(&self, index: u32) -> bool { pub fn is_set(&self, index: u32) -> bool {
let idx = index / 8; let (idx, offset) = self.calc_positions(index);
let offset = index % 8;
(self.bits[idx as usize] & (1 << 7 - offset)) != 0 (self.bits[idx as usize] & (1 << 7 - offset)) != 0
} }
#[inline] #[inline]
pub fn set(&mut self, index: u32) { pub fn set(&mut self, index: u32) {
let idx = index / 8; if !self.is_set(index) {
let offset = index % 8; let (idx, offset) = self.calc_positions(index);
self.bits[idx as usize] |= 1 << 7 - offset; self.bits[idx as usize] |= 1 << 7 - offset;
self.num_set += 1;
}
} }
#[inline] #[inline]
pub fn unset(&mut self, index: u32) { pub fn unset(&mut self, index: u32) {
let idx = index / 8; if self.is_set(index) {
let offset = index % 8; let (idx, offset) = self.calc_positions(index);
self.bits[idx as usize] ^= 1 << 7 - offset; self.bits[idx as usize] ^= 1 << 7 - offset;
self.num_set -= 1;
}
} }
#[inline]
pub fn as_bytes(&self) -> &[u8] { pub fn as_bytes(&self) -> &[u8] {
&self.bits &self.bits
} }
#[inline]
pub fn set_ratio(&self) -> f64 {
self.num_set as f64 / self.len as f64
}
} }
impl fmt::Debug for BitField { impl fmt::Debug for BitField {
@@ -80,6 +103,8 @@ fn test_bitfield_is_set() {
assert!(bf.is_set(0xd)); assert!(bf.is_set(0xd));
assert!(bf.is_set(0xe)); assert!(bf.is_set(0xe));
assert!(bf.is_set(0xf)); assert!(bf.is_set(0xf));
assert_eq!(bf.num_set(), 12);
} }
#[test] #[test]
@@ -93,6 +118,8 @@ fn test_bitfield_set() {
assert_eq!(bf.bits[1], 128); assert_eq!(bf.bits[1], 128);
bf.set(15); bf.set(15);
assert_eq!(bf.bits[1], 129); assert_eq!(bf.bits[1], 129);
assert_eq!(bf.num_set(), 4);
} }
#[test] #[test]
@@ -106,4 +133,13 @@ fn test_bitfield_unset() {
assert_eq!(bf.bits[1], 255-128); assert_eq!(bf.bits[1], 255-128);
bf.unset(15); bf.unset(15);
assert_eq!(bf.bits[1], 255-128-1); assert_eq!(bf.bits[1], 255-128-1);
assert_eq!(bf.num_set(), 12);
}
#[test]
fn test_bitfield_set_ratio() {
use std::f64;
let mut bf = BitField::new(vec![255, 255], 16);
assert!(f64::abs(bf.set_ratio() - 1.0) < f64::EPSILON);
} }

68
src/net/buffers.rs Normal file
View File

@@ -0,0 +1,68 @@
use std::io::{self, Read};
use std::ptr;
use sha1::Sha1;
use metainfo::Hash;
/// Allocate a new buffer of the given size and read_exact into it.
pub fn read_exact<R>(mut reader: R, size: usize) -> io::Result<Vec<u8>> where R: Read {
let mut buf = Vec::with_capacity(size);
unsafe {
buf.set_len(size);
}
reader.read_exact(&mut buf)?;
Ok(buf)
}
pub struct PieceBuffer {
fragment_count: u32,
num_fragments: u32,
buffer: Vec<u8>,
}
impl PieceBuffer {
pub fn new(num_fragments: u32, piece_length: u32) -> Self {
let mut buffer = Vec::with_capacity(piece_length as usize);
unsafe {
buffer.set_len(piece_length as usize);
}
PieceBuffer {
fragment_count: 0,
num_fragments: num_fragments,
buffer: buffer,
}
}
pub fn add_fragment(&mut self, begin: u32, fragment: &[u8]) {
let begin = begin as usize;
assert!(begin + fragment.len() <= self.buffer.len());
unsafe {
ptr::copy_nonoverlapping(fragment.as_ptr(), self.buffer[begin..].as_mut_ptr(), fragment.len());
}
self.fragment_count += 1;
}
#[inline]
pub fn complete(&self) -> bool {
self.fragment_count == self.num_fragments
}
pub fn matches_hash(&self, info_hash: &Hash) -> bool {
if !self.complete() {
panic!("trying to hash piece buffer before it's complete");
} else {
let mut m = Sha1::new();
m.update(&self.buffer);
m.digest().bytes() == &info_hash[..]
}
}
pub fn get(self) -> Vec<u8> {
if !self.complete() {
panic!("trying to get piece buffer before it's complete");
} else {
self.buffer
}
}
}

View File

@@ -1,3 +1,5 @@
pub mod bitfield; pub mod bitfield;
mod buffers;
pub mod peer; pub mod peer;
pub mod session; pub mod session;
pub mod _session;

View File

@@ -4,6 +4,7 @@ use std::net::{Shutdown, TcpStream};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use metainfo::Hash; use metainfo::Hash;
use net::buffers;
use tracker::Peer; use tracker::Peer;
#[derive(Debug)] #[derive(Debug)]
@@ -68,11 +69,8 @@ impl PeerConnection {
}, },
5 => { 5 => {
let size = len as usize - 1; let size = len as usize - 1;
let mut bitfield = Vec::with_capacity(size); let bitfield = buffers::read_exact(&mut self.sock, size)?;
unsafe {
bitfield.set_len(size);
}
self.sock.read_exact(&mut bitfield)?;
Packet::Bitfield { Packet::Bitfield {
bitfield: bitfield, bitfield: bitfield,
} }
@@ -86,11 +84,7 @@ impl PeerConnection {
let size = len as usize - 9; let size = len as usize - 9;
let index = self.sock.read_u32::<BigEndian>()?; let index = self.sock.read_u32::<BigEndian>()?;
let begin = self.sock.read_u32::<BigEndian>()?; let begin = self.sock.read_u32::<BigEndian>()?;
let mut block = Vec::with_capacity(size); let block = buffers::read_exact(&mut self.sock, size)?;
unsafe {
block.set_len(size);
}
self.sock.read_exact(&mut block)?;
Packet::Piece { Packet::Piece {
index: index, index: index,

View File

@@ -2,7 +2,6 @@ use std::cmp;
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::fs::File; use std::fs::File;
use std::io::{Seek, SeekFrom, Write}; use std::io::{Seek, SeekFrom, Write};
use std::net::{Shutdown, TcpStream};
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
use std::sync::Arc; use std::sync::Arc;
use std::sync::mpsc::{self, Receiver, Sender}; use std::sync::mpsc::{self, Receiver, Sender};
@@ -10,10 +9,10 @@ use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use libc; use libc;
use sha1::Sha1;
use metainfo::{Hash, Metainfo}; use metainfo::{Hash, Metainfo};
use net::bitfield::BitField; use net::bitfield::BitField;
use net::buffers::PieceBuffer;
use net::peer::{Packet, PeerConnection}; use net::peer::{Packet, PeerConnection};
use tracker::http; use tracker::http;
@@ -50,6 +49,8 @@ struct InnerSession {
port: u16, port: u16,
} }
// == Session ==
pub struct Session { pub struct Session {
// inner: Arc<InnerSession>, // inner: Arc<InnerSession>,
sender: Sender<Signal>, sender: Sender<Signal>,
@@ -96,6 +97,8 @@ impl Session {
} }
} }
// == SessionPeer ==
struct SessionPeer { struct SessionPeer {
sock: PeerConnection, sock: PeerConnection,
bitfield: BitField, bitfield: BitField,
@@ -118,6 +121,8 @@ impl SessionPeer {
} }
} }
// == FragmentStatus ==
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
enum FragmentStatus { enum FragmentStatus {
Available, Available,
@@ -125,19 +130,23 @@ enum FragmentStatus {
Taken(Instant), Taken(Instant),
} }
// == SessionFragment ==
pub struct SessionFragment { pub struct SessionFragment {
begin: u32, begin: u32,
length: u32, length: u32,
status: FragmentStatus, status: FragmentStatus,
} }
// == SessionPiece ==
struct SessionPiece { struct SessionPiece {
fragments: BTreeMap<u32, SessionFragment>, fragments: BTreeMap<u32, SessionFragment>,
buffer: Vec<u8>, buffer: PieceBuffer,
num_fragments: u32,
total_fragments: u32,
} }
// == SessionTorrent ==
struct SessionTorrent { struct SessionTorrent {
metainfo: Metainfo, metainfo: Metainfo,
own_bitfield: BitField, own_bitfield: BitField,
@@ -198,9 +207,9 @@ impl SessionTorrent {
for index in (0..self.own_bitfield.len()).filter(|index| !keys.contains(index)) { for index in (0..self.own_bitfield.len()).filter(|index| !keys.contains(index)) {
if !self.own_bitfield.is_set(index) && peer.bitfield.is_set(index) { if !self.own_bitfield.is_set(index) && peer.bitfield.is_set(index) {
let total_fragments = f64::ceil(self.metainfo.piece_length as f64 / FRAGMENT_SIZE as f64) as u32; let num_fragments = f64::ceil(self.metainfo.piece_length as f64 / FRAGMENT_SIZE as f64) as u32;
let mut fragments = BTreeMap::new(); let mut fragments = BTreeMap::new();
for idx in 0..total_fragments { for idx in 0..num_fragments {
let begin = idx * FRAGMENT_SIZE; let begin = idx * FRAGMENT_SIZE;
fragments.insert(begin ,SessionFragment { fragments.insert(begin ,SessionFragment {
@@ -219,15 +228,9 @@ impl SessionTorrent {
length = taken.length; length = taken.length;
} }
let mut buffer = Vec::with_capacity(self.metainfo.piece_length as usize);
unsafe {
buffer.set_len(self.metainfo.piece_length as usize);
}
self.pieces.insert(index, SessionPiece { self.pieces.insert(index, SessionPiece {
fragments: fragments, fragments: fragments,
buffer: buffer, buffer: PieceBuffer::new(num_fragments, self.metainfo.piece_length),
num_fragments: 0,
total_fragments: total_fragments,
}); });
return Some((index, begin, length)); return Some((index, begin, length));
@@ -247,7 +250,7 @@ impl SessionTorrent {
} }
} }
fn write_piece(&mut self, index: u32) { fn write_piece(&mut self, index: u32, buffer: Vec<u8>) {
let mut start = 0; let mut start = 0;
let mut end = 0; let mut end = 0;
let mut pos: u64 = 0; let mut pos: u64 = 0;
@@ -260,7 +263,7 @@ impl SessionTorrent {
if write < end { if write < end {
let len = cmp::min(remaining, fileinfo.length as u64); let len = cmp::min(remaining, fileinfo.length as u64);
self.files[id].seek(SeekFrom::Start(write - start)); self.files[id].seek(SeekFrom::Start(write - start));
self.files[id].write_all(&self.pieces[&index].buffer[pos as usize..(pos+len) as usize]); self.files[id].write_all(&buffer[pos as usize..(pos+len) as usize]);
write += len; write += len;
pos += len; pos += len;
remaining -= len; remaining -= len;
@@ -281,41 +284,46 @@ impl SessionTorrent {
fn piece_reply(&mut self, peer_id: &Hash, index: u32, begin: u32, block: Vec<u8>) { fn piece_reply(&mut self, peer_id: &Hash, index: u32, begin: u32, block: Vec<u8>) {
let mut remove = false; let mut remove = false;
let mut reset = false;
{ {
if let Some(piece) = self.pieces.get_mut(&index) { if let Some(piece) = self.pieces.get_mut(&index) {
if let Some(fragment) = piece.fragments.get_mut(&begin) { if let Some(fragment) = piece.fragments.get_mut(&begin) {
fragment.status = FragmentStatus::Complete; fragment.status = FragmentStatus::Complete;
piece.buffer[begin as usize..(begin as usize)+block.len()].copy_from_slice(&block); piece.buffer.add_fragment(begin, &block);
piece.num_fragments += 1; if piece.buffer.complete() {
if piece.num_fragments == piece.total_fragments {
// TODO check hash
println!("piece is done {}", index); println!("piece is done {}", index);
let mut m = Sha1::new(); if piece.buffer.matches_hash(&self.metainfo.pieces[index as usize]) {
m.update(&piece.buffer);
if m.digest().bytes() == &self.metainfo.pieces[index as usize][..] {
self.own_bitfield.set(index); self.own_bitfield.set(index);
println!("it's a match!"); println!("it's a match!");
remove = true; remove = true;
} else { } else {
reset = true;
println!("no match"); println!("no match");
} }
} }
} else { } else {
println!("could not find fragment {}", begin); println!("could not find fragment {}", begin);
} }
if reset {
for fragment in piece.fragments.values_mut() {
fragment.status = FragmentStatus::Available;
}
}
} else { } else {
println!("could not find piece {}", index); println!("could not find piece {}", index);
} }
} }
if remove { if remove {
self.write_piece(index); let piece = self.pieces.remove(&index).expect("told to remove piece that doesn't exist");
self.pieces.remove(&index); self.write_piece(index, piece.buffer.get());
} }
self.requeue(peer_id); self.requeue(peer_id);
} }
} }
// == SessionNetworkThread ==
struct SessionNetworkThread { struct SessionNetworkThread {
session: Arc<InnerSession>, session: Arc<InnerSession>,
sender: Sender<Signal>, sender: Sender<Signal>,
@@ -326,34 +334,13 @@ struct SessionNetworkThread {
} }
impl SessionNetworkThread { impl SessionNetworkThread {
pub fn run(mut self, input: Receiver<Signal>) { pub fn run(mut self, signals: Receiver<Signal>) {
for signal in input.iter() { for signal in signals.iter() {
match signal { match signal {
Signal::AddTorrent(torrent) => self.signal_add_torrent(torrent), Signal::AddTorrent(torrent) => self.signal_add_torrent(torrent),
Signal::RemoveTorrent(id) => { Signal::RemoveTorrent(id) => self.signal_remove_torrent(&id),
if let Some(mut sess_torrent) = self.torrents.remove(&id) { Signal::ConnectionOpened { info_hash, peer_id, sock } => self.signal_connection_opened(&info_hash, &peer_id, sock),
for peer in sess_torrent.peers.values_mut() { Signal::ConnectionClosed { info_hash, peer_id } => self.signal_connection_closed(&info_hash, &peer_id),
peer.sock.shutdown();
}
}
}
Signal::ConnectionOpened { info_hash, peer_id, sock } => {
if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) {
let mut peer = SessionPeer::new(sock, sess_torrent.metainfo.pieces.len() as u32);
peer.sock.send_interested();
peer.am_interested = true;
sess_torrent.peers.insert(peer_id, peer);
}
}
Signal::ConnectionClosed { info_hash, peer_id } => {
if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) {
if let Some(mut peer) = sess_torrent.peers.remove(&peer_id) {
// if !peer.peer_choking && self.seeds > 1 {
// self.seeds -= 1;
// }
}
}
}
Signal::Packet { info_hash, peer_id, packet } => self.signal_packet(info_hash, peer_id, packet), Signal::Packet { info_hash, peer_id, packet } => self.signal_packet(info_hash, peer_id, packet),
Signal::Stop => break, Signal::Stop => break,
} }
@@ -405,6 +392,33 @@ impl SessionNetworkThread {
} }
} }
fn signal_remove_torrent(&mut self, id: &Hash) {
if let Some(mut sess_torrent) = self.torrents.remove(id) {
for peer in sess_torrent.peers.values_mut() {
peer.sock.shutdown();
}
}
}
fn signal_connection_opened(&mut self, info_hash: &Hash, peer_id: &Hash, conn: PeerConnection) {
if let Some(sess_torrent) = self.torrents.get_mut(info_hash) {
let mut peer = SessionPeer::new(conn, sess_torrent.metainfo.pieces.len() as u32);
peer.sock.send_interested();
peer.am_interested = true;
sess_torrent.peers.insert(*peer_id, peer);
}
}
fn signal_connection_closed(&mut self, info_hash: &Hash, peer_id: &Hash) {
if let Some(sess_torrent) = self.torrents.get_mut(&info_hash) {
if let Some(mut peer) = sess_torrent.peers.remove(&peer_id) {
// if !peer.peer_choking && self.seeds > 1 {
// self.seeds -= 1;
// }
}
}
}
fn signal_packet(&mut self, info_hash: Hash, peer_id: Hash, packet: Packet) { fn signal_packet(&mut self, info_hash: Hash, peer_id: Hash, packet: Packet) {
if let Some(torrent) = self.torrents.get_mut(&info_hash) { if let Some(torrent) = self.torrents.get_mut(&info_hash) {
match packet { match packet {

32
src/torrent.rs Normal file
View File

@@ -0,0 +1,32 @@
use metainfo::{Hash, Info, Metainfo};
pub struct Torrent {
pub info_hash: Hash,
pub metainfo: Metainfo,
pub uploaded: u64,
pub downloaded: u64,
pub left: u64,
pub size: u64,
}
impl Torrent {
pub fn new(metainfo: Metainfo) -> Self {
let size = match metainfo.info {
Info::Single { length, .. } => length,
Info::Multiple { ref files, .. } => {
let mut size = 0;
for file in files.iter() {
size += file.length;
}
size
}
};
Torrent { info_hash: Hash::info_hash(&metainfo),
metainfo: metainfo,
uploaded: 0,
downloaded: 0,
left: size,
size: size, }
}
}

184
src/tracker/bin.rs Normal file
View File

@@ -0,0 +1,184 @@
use std::error;
use std::fmt::{self, Display};
use std::io::{self, Write};
use std::marker::PhantomData;
use byteorder::{ByteOrder, WriteBytesExt};
use serde::ser::{self, Impossible};
#[derive(Debug)]
pub enum Error {
Custom(String),
Io(io::Error),
NotImplemented,
}
impl error::Error for Error {
fn description(&self) -> &str {
match *self {
Error::Custom(_) => "custom",
Error::Io(ref err) => err.description(),
Error::NotImplemented => "not implemented",
}
}
}
impl From<io::Error> for Error {
fn from(err: io::Error) -> Error {
Error::Io(err)
}
}
impl Display for Error {
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
Ok(())
}
}
impl ser::Error for Error {
fn custom<T>(msg: T) -> Self
where T: Display
{
Error::Custom(format!("{}", msg))
}
}
pub struct Serializer<'a, W: Write + 'a, B: ByteOrder> {
buf: &'a mut W,
_order: PhantomData<B>,
}
impl<'a, W: Write + 'a, B: ByteOrder> ser::Serializer for Serializer<'a, W, B> {
type Ok = ();
type Error = Error;
type SerializeSeq = Impossible<(), Error>;
type SerializeTuple = Impossible<(), Error>;
type SerializeTupleStruct = Impossible<(), Error>;
type SerializeTupleVariant = Impossible<(), Error>;
type SerializeMap = Impossible<(), Error>;
type SerializeStruct = Impossible<(), Error>;
type SerializeStructVariant = Impossible<(), Error>;
fn serialize_bool(self, v: bool) -> Result<Self::Ok, Self::Error> {
self.buf.write_u8(v as u8)?;
Ok(())
}
fn serialize_i8(self, v: i8) -> Result<Self::Ok, Self::Error> {
self.buf.write_i8(v)?;
Ok(())
}
fn serialize_i16(self, v: i16) -> Result<Self::Ok, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_i32(self, v: i32) -> Result<Self::Ok, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_i64(self, v: i64) -> Result<Self::Ok, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_u8(self, v: u8) -> Result<Self::Ok, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_u16(self, v: u16) -> Result<Self::Ok, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_u32(self, v: u32) -> Result<Self::Ok, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_u64(self, v: u64) -> Result<Self::Ok, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_f32(self, v: f32) -> Result<Self::Ok, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_f64(self, v: f64) -> Result<Self::Ok, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_char(self, v: char) -> Result<Self::Ok, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_bytes(self, v: &[u8]) -> Result<Self::Ok, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_some<T: ?Sized>(self, value: &T) -> Result<Self::Ok, Self::Error>
where T: ser::Serialize
{
Err(Error::NotImplemented)
}
fn serialize_unit(self) -> Result<Self::Ok, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_unit_struct(self, name: &'static str) -> Result<Self::Ok, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_unit_variant(self,
name: &'static str,
variant_index: u32,
variant: &'static str)
-> Result<Self::Ok, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_newtype_struct<T: ?Sized>(self,
name: &'static str,
value: &T)
-> Result<Self::Ok, Self::Error>
where T: ser::Serialize
{
Err(Error::NotImplemented)
}
fn serialize_newtype_variant<T: ?Sized>(self,
name: &'static str,
variant_index: u32,
variant: &'static str,
value: &T)
-> Result<Self::Ok, Self::Error>
where T: ser::Serialize
{
Err(Error::NotImplemented)
}
fn serialize_seq(self, len: Option<usize>) -> Result<Self::SerializeSeq, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_tuple(self, len: usize) -> Result<Self::SerializeTuple, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_tuple_struct(self,
name: &'static str,
len: usize)
-> Result<Self::SerializeTupleStruct, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_tuple_variant(self,
name: &'static str,
variant_index: u32,
variant: &'static str,
len: usize)
-> Result<Self::SerializeTupleVariant, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_map(self, len: Option<usize>) -> Result<Self::SerializeMap, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_struct(self,
name: &'static str,
len: usize)
-> Result<Self::SerializeStruct, Self::Error> {
Err(Error::NotImplemented)
}
fn serialize_struct_variant(self,
name: &'static str,
variant_index: u32,
variant: &'static str,
len: usize)
-> Result<Self::SerializeStructVariant, Self::Error> {
Err(Error::NotImplemented)
}
}

View File

@@ -1,96 +1,100 @@
use std::fmt::Display;
use std::io::Read; use std::io::Read;
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use hyper::Client; use reqwest;
use url::form_urlencoded::byte_serialize; use serde_bencode;
use serde_bytes::ByteBuf;
use url::Url;
use bencode::{decode, Dict}; use super::{Event, Peer};
use metainfo::{Hash, Metainfo}; use error::Error;
use tracker::{Peer, TrackerError, TrackerResponse, TrackerResult}; use metainfo::Hash;
use torrent::Torrent;
macro_rules! ts { pub struct QueryBuilder {
( $e:expr ) => { params: Vec<(&'static str, String)>,
match $e { }
Some(x) => x,
None => return Err(TrackerError::InvalidResponse), impl QueryBuilder {
pub fn new() -> Self {
QueryBuilder { params: Vec::new() }
}
pub fn add(&mut self, name: &'static str, val: &Display) {
self.params.push((name, format!("{}", val)));
}
pub fn apply_to(&self, url: &mut Url) {
let mut query_mut = url.query_pairs_mut();
for &(key, ref val) in self.params.iter() {
query_mut.append_pair(key, &val);
} }
} }
} }
pub fn get_peers(peer_id: Hash, port: u16, metainfo: &Metainfo, uploaded: u64, downloaded: u64, left: u64) -> TrackerResult { fn peer_from_bytes(chunk: &[u8]) -> Peer {
let url = format!("{}?info_hash={}&peer_id={}&port={}&uploaded={}\ assert!(chunk.len() == 6);
&downloaded={}&left={}&compact=1", Peer { addr: Ipv4Addr::new(chunk[0], chunk[1], chunk[2], chunk[3]),
metainfo.announce, port: (chunk[4] as u16) << 8 | (chunk[5] as u16) << 0, }
urlencode(&metainfo.info_hash),
urlencode(&peer_id),
port,
uploaded,
downloaded,
left);
let client = Client::new();
let mut resp = client.get(&url).send()?;
let mut buff = Vec::new();
resp.read_to_end(&mut buff)?;
let obj = decode(&buff)?;
let info = ts!(obj.as_dict());
parse_object(info)
}
fn parse_object(info: &Dict) -> TrackerResult {
if let Some(reason) = info.get_bytes("failure reason") {
Err(TrackerError::TrackerFailure(ts!(reason.string())))
} else {
let interval = ts!(info.get_int("interval"));
let peerstring = ts!(info.get_bytes("peers"));
if peerstring.len() % 6 != 0 {
return Err(TrackerError::InvalidResponse)
}
let peers = peerstring.chunks(6).map(|chunk| {
Peer {
addr: Ipv4Addr::new(chunk[0], chunk[1], chunk[2], chunk[3]),
port: (chunk[4] as u16) << 8 | (chunk[5] as u16) << 0,
}
}).collect();
Ok(TrackerResponse {
interval: interval as u64,
peers: peers,
})
}
}
fn urlencode(bytes: &[u8]) -> String {
byte_serialize(bytes).collect::<Vec<_>>().join("")
}
#[test]
fn test_failure() {
let mut info = Dict::new();
info.insert("failure reason", "unknown hash");
assert!(parse_object(&info).is_err());
}
#[test]
fn test_invalid_peer_list() {
let mut info = Dict::new();
info.insert("interval", 900);
info.insert("peers", vec![1u8, 1, 1, 1]);
assert!(parse_object(&info).is_err());
} }
#[test] pub fn fetch_peers(mut url: Url,
fn test_success() { event: Option<Event>,
let mut info = Dict::new(); torrent: &Torrent,
info.insert("interval", 900); peer_id: &Hash,
info.insert("peers", vec![1u8, 1, 1, 1, 1, 1]); port: u16)
let resp = parse_object(&info).unwrap(); -> Result<Vec<Peer>, Error> {
assert_eq!(resp.interval, 900); let mut query = QueryBuilder::new();
assert_eq!(resp.peers.len(), 1); query.add("info_hash", &torrent.info_hash.hex());
assert_eq!(resp.peers[0].addr.octets(), [1, 1, 1, 1]); query.add("peer_id", &peer_id.hex());
assert_eq!(resp.peers[0].port, 257); query.add("port", &port);
query.add("uploaded", &torrent.uploaded);
query.add("downloaded", &torrent.downloaded);
query.add("left", &torrent.left);
query.add("compact", &1);
if let Some(event) = event {
query.add("event", &event.as_str());
}
query.apply_to(&mut url);
let mut resp = reqwest::get(url)?;
ensure2!(resp.status().is_success(),
"Tracker failed, http code {}",
resp.status().as_u16());
let mut buf = Vec::new();
resp.read_to_end(&mut buf)?;
match serde_bencode::from_bytes(&buf)? {
Response::Failure { reason } => bail2!("Tracker failure, {}", reason),
Response::Ok { peers, .. } => {
ensure2!(peers.len() % 6 == 0, "Tracker blob is invalid");
let peers = peers.chunks(6).map(peer_from_bytes).collect();
Ok(peers)
}
}
}
#[derive(Deserialize, Serialize)]
pub enum Response {
Failure {
#[serde(rename = "failure reason")]
reason: String,
},
Ok {
#[serde(rename = "warning message")]
warning: Option<String>,
interval: i64,
#[serde(rename = "min interval")]
min_interval: Option<i64>,
tracker_id: String,
complete: u64,
incomplete: u64,
peers: ByteBuf, // only supports the compact format for now
},
} }

View File

@@ -1,48 +1,58 @@
pub mod http;
use std::io;
use std::net::Ipv4Addr; use std::net::Ipv4Addr;
use hyper; use url::Url;
use bencode::DecodeError; mod bin;
mod http;
mod udp;
pub type TrackerResult = Result<TrackerResponse, TrackerError>; use failure;
#[derive(Debug)] use error::Error;
pub struct TrackerResponse { use metainfo::Hash;
pub interval: u64, use torrent::Torrent;
pub peers: Vec<Peer>,
}
#[derive(Debug)]
pub enum TrackerError {
IoError(io::Error),
HttpError(hyper::Error),
InvalidResponse,
TrackerFailure(String),
}
impl From<io::Error> for TrackerError {
fn from(from: io::Error) -> Self {
TrackerError::IoError(from)
}
}
impl From<hyper::Error> for TrackerError {
fn from(from: hyper::Error) -> Self {
TrackerError::HttpError(from)
}
}
impl From<DecodeError> for TrackerError {
fn from(_: DecodeError) -> Self {
TrackerError::InvalidResponse
}
}
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
pub struct Peer { pub struct Peer {
pub addr: Ipv4Addr, pub addr: Ipv4Addr,
pub port: u16, pub port: u16,
} }
pub fn fetch_peers(url: &str,
event: Option<Event>,
torrent: &Torrent,
peer_id: &Hash,
port: u16)
-> Result<Vec<Peer>, Error> {
let url = Url::parse(url)?;
match url.scheme() {
"http" | "https" => http::fetch_peers(url, event, torrent, peer_id, port),
"udp" => udp::fetch_peers(url, event, torrent, peer_id, port),
_ => bail2!("Unknown tracker scheme: %s {}", url.scheme()),
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Event {
Completed,
Started,
Stopped,
}
impl Event {
pub fn as_str(&self) -> &'static str {
match *self {
Event::Completed => "completed",
Event::Started => "started",
Event::Stopped => "stopped",
}
}
pub fn as_int(&self) -> u32 {
match *self {
Event::Completed => 1,
Event::Started => 2,
Event::Stopped => 3,
}
}
}

76
src/tracker/udp.rs Normal file
View File

@@ -0,0 +1,76 @@
use std::io::{self, Cursor, Read, Write};
use std::net::UdpSocket;
use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt};
use rand::{thread_rng, Rng};
use serde::ser;
use url::Url;
use super::{Event, Peer};
use error::Error;
use metainfo::Hash;
use torrent::Torrent;
const CONNECT: u32 = 0;
const ANNOUNCE: u32 = 1;
pub fn fetch_peers(mut url: Url,
event: Option<Event>,
torrent: &Torrent,
peer_id: &Hash,
port: u16)
-> Result<Vec<Peer>, Error> {
let host = url.host_str().ok_or_else(|| Error::tracker("invalid host"))?;
let port = url.port().ok_or_else(|| Error::tracker("invalid port"))?;
let mut sock = UdpSocket::bind("0.0.0.0:0")?;
let trans = send_connect(&mut sock)?;
let conn = recv_connect(&mut sock, trans);
Ok(Vec::new())
}
fn send_connect(sock: &mut UdpSocket) -> io::Result<u32> {
let trans = thread_rng().next_u32();
let mut buf = [0u8; 128];
let mut cur = Cursor::new(&mut buf[..]);
cur.write_u64::<BigEndian>(0x41727101980);
cur.write_u32::<BigEndian>(CONNECT);
cur.write_u32::<BigEndian>(trans);
Ok(trans)
}
fn recv_connect(sock: &mut UdpSocket, own_trans: u32) -> io::Result<u64> {
let mut buf = [0u8; 128];
let _ = sock.recv_from(&mut buf);
let mut cur = Cursor::new(&mut buf[..]);
let action = cur.read_u32::<BigEndian>()?;
let trans = cur.read_u32::<BigEndian>()?;
let conn = cur.read_u64::<BigEndian>()?;
assert!(action == CONNECT); // TODO error
assert!(trans == own_trans); // TODO error
Ok(conn)
}
fn send_announce(sock: &mut UdpSocket,
conn: u64,
event: Option<Event>,
torrent: &Torrent,
peer_id: &Hash,
port: u16)
-> io::Result<(u32)> {
let trans = thread_rng().next_u32();
let mut buf = [0u8; 256];
let mut cur = Cursor::new(&mut buf[..]);
cur.write_u64::<BigEndian>(conn);
cur.write_u32::<BigEndian>(ANNOUNCE);
Ok(trans)
}