cleanup from v2 & debugging

This commit is contained in:
hendrik 2024-10-11 19:52:04 +02:00
parent efe20a06fd
commit 530af0c561
7 changed files with 16 additions and 443 deletions

View File

@ -2,10 +2,7 @@ use std::future::Future;
use crate::{
ouput_gen::{display_ips, display_peer},
tcp::{
delegate::SimpleTcP,
handshake::{self, Handshake},
},
tcp::{delegate::SimpleTcP, handshake::Handshake},
tracker::{exec::get_peers, request::TrackerRequest, response::Ip},
};
@ -36,12 +33,6 @@ pub async fn make_handshake<I: Into<Handshake> + Into<TrackerRequest> + Clone>(
pub async fn make_extended_handshake<I: Into<Handshake> + Into<TrackerRequest> + Clone>(
obh: I,
) -> Result<Handshake, SomeError> {
/*let ips = get_peers(obh.clone()).await.expect("Expected ips");
let handshake = obh.into();
let mut handler = TcpHandler::new(ips[0]).await?;
let msg = handler.extended_connection(&handshake).await?;
Ok(handshake)*/
handshake_(obh.clone(), obh, make_extended_handshake_try).await
}
@ -74,11 +65,6 @@ async fn make_handshake_try(handshake: Handshake, ip: Ip) -> Result<Handshake, S
.map_err(|e| SomeError::TcpError(e.to_string()))?;
tcp_handler.initial_handshake(handshake).await
// .map_err(|e| SomeError::HandshakeError(e.to_string()))?;
// Ok(tcp_handler)
// Ok(tcp_handler.parse_handshake_response().ok_or_else(|| {
// SomeError::HandshakeError("handshake response not parseable".into())
// })??)
}
async fn make_extended_handshake_try(handshake: Handshake, ip: Ip) -> Result<Handshake, SomeError> {

View File

@ -3,8 +3,6 @@ use crate::tracker::response::Ip;
use std::fmt::Debug;
use std::hash::Hash;
use std::thread::sleep;
use std::time::Duration;
use std::{io::Error, result};
use crate::ouput_gen::log_message;
@ -137,7 +135,6 @@ where
pub async fn run_default(&mut self, data: Handshake) -> Result<(), SomeError> {
self.initial_handshake(data).await?;
println!("Wait for response");
let resp = self.worker.read_msg().await?;
log_message(&format!(
"[{}][{}] Got response {:?} ",

View File

@ -18,38 +18,6 @@ use super::{
use anyhow::Result;
/*
#[derive(Error, Debug)]
pub enum SomeError {
#[error("io error: {0}")]
IoError(std::io::Error),
#[error("parse error:")]
ParseError,
#[error("Unexpected EOF")]
EOF,
#[error("Taskinput closed")]
TaskInputClosed,
#[error("Block did not validate")]
BlockValidationFailed,
#[error("Error transfering block")]
SendError,
#[error("Extendet handhsake failed")]
ExtHandshakeFailed,
#[error("Error converting message")]
MessageConversionError,
}
impl From<std::io::Error> for SomeError {
fn from(e: std::io::Error) -> Self {
SomeError::IoError(e)
}
}
*/
#[derive(Default)]
pub struct TcpWorker {
stream: Option<TcpStream>,
@ -113,20 +81,10 @@ impl TcpWorker {
while bytes_read < expected_data {
let dis_time = self.stream().read(&mut buf[bytes_read..]).await?;
if dis_time == 0 {
println!("unex eof");
return Err(SomeError::EOF);
/*return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("Response too short. Expected {expected_data} - got {bytes_read} so far",),
)
.into());*/
}
bytes_read += dis_time
}
println!("Read (<=13 bytes): {:?}", &buf[..bytes_read.min(13)]);
if bytes_read > 4 {
println!("Read taf: {:?}", buf[5]);
}
Ok(Message::from(&buf[..bytes_read]))
}

View File

@ -6,6 +6,7 @@ use std::{
use crate::{
encode::{bedecode::decode_bencoded_single_value, beencoder::bencode},
magnet::err::SomeError,
ouput_gen::log_message,
tracker::response::Ip,
};
use serde_bencode::value::Value;
@ -89,7 +90,6 @@ impl TryInto<ExtHandshakePayload> for Message {
let mut m_dict = HashMap::new();
let mut additional_data = HashMap::new();
let Value::Dict(full_m_dict) = decode_bencoded_single_value(&self.payload[1..]) else {
println!("m dict not found");
return Err(SomeError::MessageConversionError);
};
for (k, v) in full_m_dict {
@ -99,7 +99,6 @@ impl TryInto<ExtHandshakePayload> for Message {
m_dict.insert(k, v);
}
} else {
eprintln!("m is not a dict");
return Err(SomeError::MessageConversionError);
}
} else {
@ -149,7 +148,6 @@ impl From<MessageTag> for Message {
impl From<&[u8]> for Message {
fn from(value: &[u8]) -> Self {
if value.len() < 5 {
println!("tooo short: {value:?}");
return Message::msg(MessageTag::ExtHandshake);
}
let size = u32::from_be_bytes(value[..4].try_into().unwrap());
@ -341,13 +339,23 @@ impl Block {
pub fn validate(&self, request: &BlockRequestPayload) -> bool {
if self.index != request.index {
println!("index differ: {:?}, {:?}", self.index, request.index);
log_message(&format!(
"[BLOCK VALIDATION]: index differ: {:?}, {:?}",
self.index, request.index
));
false
} else if self.begin != request.begin {
println!("begin differ: {:?}, {:?}", self.begin, request.begin);
log_message(&format!(
"[BLOCK VALIDATION]: begin differ: {:?}, {:?}",
self.begin, request.begin
));
false
} else if self.block().len() != request.length as usize {
println!("len differ: {:?}, {:?}", self.block().len(), request.length);
log_message(&format!(
"[BLOCK VALIDATION]: len differ: {:?}, {:?}",
self.block().len(),
request.length
));
false
} else {
true

View File

@ -1,369 +0,0 @@
use crate::tracker::response::Ip;
use std::fmt::Debug;
use std::hash::Hash;
use std::time::Duration;
use std::{io::Error, result};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::time::timeout;
use crate::ouput_gen::{addr_tag, log_message};
use crate::tcp::transport::MessageTag;
use super::transport::ExtHandshakePayload;
use super::{
handshake::Handshake,
parser::parse_handshake_response,
transport::{Block, BlockRequestPayload, Message},
};
pub struct TcpHandler {
worker: Worker<usize>,
}
impl TcpHandler {
pub async fn new(ip: Ip) -> result::Result<Self, Error> {
let worker = Worker::simple(ip).await?;
Ok(Self { worker })
}
pub async fn create_connection(&mut self, data: &Handshake) -> result::Result<(), Error> {
self.worker.create_connection(data).await.map(|_| ())
}
pub async fn extended_connection(
&mut self,
data: &Handshake,
) -> result::Result<Message, Error> {
self.worker.run_extended_hanshake_exchange(data).await
}
//pub fn get_extension_message(&mut self) -> Option<ExtHandshakePayload> {
// None // self.worker.handshake_extension.take()
//}
pub fn parse_handshake_response(&self) -> Option<Result<Handshake, Error>> {
self.worker.parse_handshake_response()
}
}
pub struct Worker<T> {
id: usize,
queue_rec: Receiver<(T, BlockRequestPayload)>,
block_sinc: Sender<(T, Block)>,
stream: TcpStream,
ip: Ip,
handshake_response_raw: Option<Vec<u8>>,
// handshake_extension: Option<ExtHandshakePayload>,
}
/* Establisched connection */
impl<T> Worker<T>
where
T: Eq + Hash + Debug + Copy,
{
pub async fn new(
id: usize,
queue_rec: Receiver<(T, BlockRequestPayload)>,
block_sinc: Sender<(T, Block)>,
ip: Ip,
) -> result::Result<Self, Error> {
let stream = TcpStream::connect(ip.socket_addr().clone()).await?;
log_message(&format!("[{ip}][{id}] Spawning worker"));
Ok(Self {
id,
queue_rec,
block_sinc,
stream,
ip,
handshake_response_raw: None,
// handshake_extension: None,
})
}
pub fn get_extension_message(&mut self) -> Option<ExtHandshakePayload> {
None //self.handshake_extension.take()
}
pub async fn simple(ip: Ip) -> result::Result<Self, Error> {
let stream = TcpStream::connect(ip.socket_addr().clone()).await?;
let (_, source) = mpsc::channel(1);
let (sink, _) = mpsc::channel(1);
Ok(Self {
id: 0,
queue_rec: source,
block_sinc: sink,
stream,
ip,
handshake_response_raw: None,
// handshake_extension: None,
})
}
pub async fn create_connection(
&mut self,
data: &Handshake,
) -> result::Result<Handshake, Error> {
log_message(&format!("[{}][{}] Sending handshake", self.id, self.ip));
self.stream.write_all(&data.handshake_data()).await?;
let mut buf = [0; 128];
let response = self.stream.read(&mut buf).await;
if let Err(e) = response {
log_message(&format!(
"Handshake failed on ip {:?}",
self.ip.socket_addr()
));
return Err(e);
} else {
log_message(&format!(
"[{}][{}] Got handshake response",
self.id, self.ip
));
let bytes_read = response?;
self.handshake_response_raw = Some(buf[..bytes_read].to_vec())
}
let handshake = self.parse_handshake_response().unwrap();
log_message(&format!(
"[{}][{}] Handshake parseable: {:?}",
self.id,
self.ip,
handshake.is_ok()
));
handshake
}
pub fn parse_handshake_response(&self) -> Option<Result<Handshake, Error>> {
self.handshake_response_raw
.as_ref()
.map(|x| &x[..])
.map(parse_handshake_response)
}
pub async fn run_extended_hanshake_exchange(
&mut self,
data: &Handshake,
) -> result::Result<Message, Error> {
if self.setup_connection(data).await? == 1 {
self.extended_msg_exchange().await
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("{} does not support extension", self.ip),
))
}
}
pub async fn setup_connection(&mut self, data: &Handshake) -> result::Result<usize, Error> {
log_message(&format!("[{}][{}] Sending handshake", self.id, self.ip));
let handshake_response = self.create_connection(data).await?;
log_message(&format!(
"[{}][{}] Established connection - can now wait for bitfield",
self.id, self.ip
));
match handshake_response.support_extension() {
true => {
log_message(&format!(
"[{}][{}] support extension handshake",
self.id, self.ip
));
Ok(1)
}
false => {
log_message(&format!(
"[{}][{}] does not support extension handshake",
self.id, self.ip
));
Ok(0)
}
}
}
async fn extended_msg_exchange(&mut self) -> result::Result<Message, Error> {
log_message(&format!("[{}][{}] Sending bitfield", self.id, self.ip));
self.send_msg(Message::msg(MessageTag::Bitfield)).await?;
log_message(&format!(
"[{}][{}] Sent bitfield - expecting bitfield back",
self.id, self.ip
));
let msg1 = self.read_timouted_msg().await?;
log_message(&format!(
"[{}][{}]: Got {:?} expected {:?}",
self.id,
self.ip,
&msg1.tag() as &MessageTag,
&MessageTag::Bitfield
));
log_message(&format!(
"[{}][{}] Sending extended handshake",
self.id, self.ip
));
log_message(&format!(
"[{}][{}] Sending extended handshake {}",
self.id,
self.ip,
ExtHandshakePayload::default()
));
self.send_msg(ExtHandshakePayload::default().into()).await?;
log_message(&format!(
"[{}][{}] Waiting for extended handshake response",
self.id, self.ip
));
let msg2 = self.read_as_msg().await?;
log_message(&format!(
"[{}][{}]: Got {:?} expected {:?}",
self.id,
self.ip,
&msg2.tag() as &MessageTag,
&MessageTag::ExtHandshake
));
println!("msg2: {:?}", msg2);
if msg2.tag() == MessageTag::ExtHandshake {
Ok(msg2)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("{} ext hand res didnt come", self.ip),
))
}
}
async fn read_timouted_msg(&mut self) -> result::Result<Message, Error> {
let msg1 = timeout(Duration::from_secs(5), self.read_as_msg()).await?;
let msg =
msg1.unwrap_or_else(|_| panic!("[{}][{}] some inner msg1 error", self.id, self.ip));
Ok(msg)
}
async fn default_msg_exchange(&mut self) -> result::Result<(), Error> {
let msg1 = self.read_timouted_msg().await?;
log_message(&format!(
"[{}][{}]: Got {:?} expected {:?}",
self.id,
self.ip,
&msg1.tag() as &MessageTag,
&MessageTag::Bitfield
));
log_message(&format!("[{}][{}] Sending interested", self.id, self.ip));
self.send_msg(Message::msg(MessageTag::Interested)).await?;
log_message(&format!("[{}][{}] Sendt interested", self.id, self.ip));
let msg2 = self.read_as_msg().await?;
log_message(&format!(
"[{}][{}]: Got {:?} expected {:?}",
self.id,
self.ip,
&msg2.tag() as &MessageTag,
&MessageTag::Unchoke
));
Ok(())
}
pub async fn prepare_default_connection(
&mut self,
data: &Handshake,
) -> result::Result<(), Error> {
self.setup_connection(data).await?;
self.default_msg_exchange().await
}
async fn send_msg(&mut self, msg: Message) -> Result<(), Error> {
let write_buf: Vec<u8> = msg.into();
self.stream.write_all(&write_buf).await
}
async fn read_as_msg(&mut self) -> result::Result<Message, Error> {
let mut buf = [0; 128];
let mut bytes_read = 0;
while bytes_read < 5 {
let dis_time = self.stream.read(&mut buf[bytes_read..]).await?;
if dis_time == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!(
"{}: Response too short. Expected 5 - got {bytes_read} so far",
addr_tag(&self.ip.socket_addr())
),
));
}
bytes_read += dis_time
}
Ok(buf[..bytes_read].into())
}
pub async fn run(&mut self, data: Handshake) -> result::Result<(), Error> {
let connection_result = self.prepare_default_connection(&data).await;
log_message(&format!(
"[{}][{}] set up connection - result {:?}",
self.id, self.ip, connection_result
));
connection_result?;
loop {
log_message(&format!("[{}][{}] Checking for tasks", self.id, self.ip));
let Some((id, requesr)) = self.queue_rec.recv().await else {
break;
};
log_message(&format!("[{}][{}] Found task {id:?}", self.id, self.ip));
let block = self.get_block(&requesr).await?;
log_message(&format!(
"[{}][{}][{id:?}] Found block {}",
self.id, self.ip, block
));
log_message(&format!(
"[{}][{}] Sending block data: {:?} ",
self.id,
self.ip,
self.block_sinc.send((id, block)).await
));
}
log_message(&format!(
"[{}][{}] Jumped out of the Loop!",
self.id, self.ip
));
Ok(())
}
pub async fn get_block(&mut self, block_request: &BlockRequestPayload) -> Result<Block, Error> {
let request: Message = (*block_request).into();
self.send_msg(request).await?;
log_message(&format!(
"[{}][{}]: Sent Message {:?}",
self.id,
self.ip,
&MessageTag::Request
));
let mut block_data = vec![0_u8; block_request.expected_response_len()];
let mut bytes_read = 0;
while bytes_read < block_request.expected_response_len() {
let read_now = self.stream.read(&mut block_data[bytes_read..]).await?;
if read_now == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("Connection closed by peer (thrown by me) - got {bytes_read} so far"),
));
}
bytes_read += read_now;
}
log_message(&format!(
"[{}][{}]: Got {:?} expected: {:?}",
self.id,
self.ip,
&block_data[4].into() as &MessageTag,
&MessageTag::Piece
));
let block: Block = block_data.into();
if !block.validate(block_request) {
log_message(&format!(
"block (idx, start) = {:?} does not seem to be valid - dont care",
block.meta()
));
}
Ok(block)
}
}

View File

@ -1,6 +1,5 @@
use std::fmt::Debug;
use std::hash::Hash;
use std::io::Error;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::{self, Sender};
use tokio::task::JoinHandle;

View File

@ -1,12 +1,6 @@
#[cfg(test)]
mod tests {
use bittorrent_starter_rust::{
encode::hash::HashValue,
magnet::{
err::SomeError,
parse::{parse_dn, parse_url, parse_xt},
},
};
/*
#[test]
fn test_parse_xt_valid() {