diff --git a/Cargo.toml b/Cargo.toml index f1046ee..557b49a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ clap = "4.4.18" clap-num = "1.0.2" clap-verbosity-flag = "2.1.2" libpt = { version = "0.3.10", features = ["net"] } +thiserror = "1.0.56" threadpool = { version = "1.8.1", optional = true } tokio = { version = "1.35.1", features = ["net", "rt", "macros"] } diff --git a/src/common/conf.rs b/src/common/conf.rs index 0bc94fd..a06645a 100644 --- a/src/common/conf.rs +++ b/src/common/conf.rs @@ -2,7 +2,7 @@ use crate::common::args::Cli; use clap::ValueEnum; use std::{fmt::Display, time::Duration}; -const DEFAULT_TIMEOUT_LEN: u64 = 2000; // ms +const DEFAULT_TIMEOUT_LEN: u64 = 5000; // ms #[derive(Debug, Clone, Copy)] pub enum Mode { diff --git a/src/server/errors.rs b/src/server/errors.rs new file mode 100644 index 0000000..239eb79 --- /dev/null +++ b/src/server/errors.rs @@ -0,0 +1,47 @@ +use std::{fmt::Display, str::Utf8Error}; + +use anyhow; +use thiserror::Error; +use tokio::time::error::Elapsed; + +pub type Result = std::result::Result; + +#[derive(Debug, Error)] +pub enum ServerError { + Timeout(Elapsed), + CouldNotAccept, + RdFail, + Anyhow(anyhow::Error), + IO(std::io::Error), + Format(Utf8Error), +} + +impl From for ServerError { + fn from(value: anyhow::Error) -> Self { + Self::Anyhow(value) + } +} + +impl From for ServerError { + fn from(value: std::io::Error) -> Self { + Self::IO(value) + } +} + +impl From for ServerError { + fn from(value: Utf8Error) -> Self { + Self::Format(value) + } +} + +impl From for ServerError { + fn from(value: Elapsed) -> Self { + Self::Timeout(value) + } +} + +impl Display for ServerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} diff --git a/src/server/listener.rs b/src/server/listener.rs deleted file mode 100644 index 0abbcae..0000000 --- a/src/server/listener.rs +++ /dev/null @@ -1,6 +0,0 @@ -use std::net::TcpListener; - -pub trait Listener {} -pub trait Stream {} - -impl Listener for TcpListener {} diff --git a/src/server/mod.rs b/src/server/mod.rs index 54b9201..dec1ccd 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,10 +1,10 @@ #![cfg(feature = "server")] -use std::{net::SocketAddr, time::Duration}; +use std::{error::Error, net::SocketAddr, time::Duration}; -use anyhow::{anyhow, Result}; +use anyhow::anyhow; use libpt::{ bintols::display::humanbytes, - log::{debug, error, info, trace}, + log::{debug, error, info, trace, warn}, }; use threadpool::ThreadPool; use tokio::{ @@ -15,7 +15,8 @@ use tokio::{ use crate::common::conf::Config; -pub mod listener; +pub mod errors; +use errors::*; pub struct Server { cfg: Config, @@ -25,7 +26,7 @@ pub struct Server { } impl Server { - pub async fn build(cfg: Config) -> Result { + pub async fn build(cfg: Config) -> anyhow::Result { let server = TcpListener::bind(cfg.addr).await?; let pool = ThreadPool::new(cfg.threads); let timeout = Some(Duration::from_secs(5)); @@ -36,36 +37,63 @@ impl Server { server, }) } - pub async fn run(self) -> Result<()> { + pub async fn run(self) -> anyhow::Result<()> { loop { - let stream = self.server.accept().await?; - self.handle_stream(stream).await?; + let (stream, addr) = match self.server.accept().await { + Ok(s) => s, + Err(err) => { + warn!("could not accept stream: {err:?}"); + continue; + } + }; + match self.handle_stream(stream).await { + Ok(_) => (), + Err(err) => { + match err { + ServerError::Timeout(_) => { + info!("stream {:?} timed out", addr) + } + _ => { + warn!("error while handling stream: {:?}", err) + } + } + continue; + } + }; } } - async fn handle_stream(&self, stream: (TcpStream, SocketAddr)) -> Result<()> { + async fn handle_stream(&self, stream: TcpStream) -> Result<()> { const BUF_SIZE: usize = 1024; - info!("start handling stream {:?}", stream.1); + let addr = match stream.peer_addr() { + Ok(a) => a, + Err(err) => { + debug!("could not get peer address: {:?}", err); + return Err(err.into()); + } + }; + info!("new peer: {:?}", addr); let mut buf = [0u8; BUF_SIZE]; - let mut reader = BufReader::new(stream.0); + let mut reader = BufReader::new(stream); let mut len; loop { trace!("reading anew"); len = self.read(&mut reader, &mut buf).await?; trace!("did read"); if len == 0 { + trace!("len is apperently 0: {len:?}"); break; } else { info!( - "< {}\t({})\n{}", - stream.1, + "< {:?}\t({})\n{}", + addr, humanbytes(len), self.decode(&buf)? ); buf = [0; BUF_SIZE]; } } - info!("stop handling stream {:?}", stream.1); + info!("disconnected peer: {:?}", addr); Ok(()) } @@ -82,10 +110,15 @@ impl Server { len = match timeout(self.cfg.timeout, reader.read(buf)).await { Ok(inner) => { trace!("Read for len: {inner:?}"); - 0 + match inner { + Ok(len) => len, + Err(err) => { + error!("read failed: {err:?}"); + return Err(ServerError::RdFail); + } + } } Err(err) => { - error!("timeout while reading: {err}"); return Err(err.into()); } }; @@ -96,7 +129,8 @@ impl Server { failsafe += 1; } } - Err(anyhow!("read too often, so the failsafe activated")) + // TODO: remove the failsafe when this stuff works + Err(anyhow!("read too often, so the failsafe activated").into()) } fn should_end(&self, buf: &[u8]) -> bool { @@ -104,10 +138,10 @@ impl Server { let mut iter = buf.iter().skip(1).peekable(); while let Some(b) = iter.next() { if lb != 0 && *b == 0 && **iter.peek().unwrap() == 0 { - return true + return true; } lb = *b; } - return false + return false; } }