We have a server at least #3

Merged
PlexSheep merged 14 commits from devel into master 2024-01-19 16:40:16 +01:00
5 changed files with 102 additions and 26 deletions
Showing only changes of commit de412de988 - Show all commits

View File

@ -18,6 +18,7 @@ clap = "4.4.18"
clap-num = "1.0.2" clap-num = "1.0.2"
clap-verbosity-flag = "2.1.2" clap-verbosity-flag = "2.1.2"
libpt = { version = "0.3.10", features = ["net"] } libpt = { version = "0.3.10", features = ["net"] }
thiserror = "1.0.56"
threadpool = { version = "1.8.1", optional = true } threadpool = { version = "1.8.1", optional = true }
tokio = { version = "1.35.1", features = ["net", "rt", "macros"] } tokio = { version = "1.35.1", features = ["net", "rt", "macros"] }

View File

@ -2,7 +2,7 @@ use crate::common::args::Cli;
use clap::ValueEnum; use clap::ValueEnum;
use std::{fmt::Display, time::Duration}; use std::{fmt::Display, time::Duration};
const DEFAULT_TIMEOUT_LEN: u64 = 2000; // ms const DEFAULT_TIMEOUT_LEN: u64 = 5000; // ms
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub enum Mode { pub enum Mode {

47
src/server/errors.rs Normal file
View File

@ -0,0 +1,47 @@
use std::{fmt::Display, str::Utf8Error};
use anyhow;
use thiserror::Error;
use tokio::time::error::Elapsed;
pub type Result<T> = std::result::Result<T, ServerError>;
#[derive(Debug, Error)]
pub enum ServerError {
Timeout(Elapsed),
CouldNotAccept,
RdFail,
Anyhow(anyhow::Error),
IO(std::io::Error),
Format(Utf8Error),
}
impl From<anyhow::Error> for ServerError {
fn from(value: anyhow::Error) -> Self {
Self::Anyhow(value)
}
}
impl From<std::io::Error> for ServerError {
fn from(value: std::io::Error) -> Self {
Self::IO(value)
}
}
impl From<Utf8Error> for ServerError {
fn from(value: Utf8Error) -> Self {
Self::Format(value)
}
}
impl From<Elapsed> for ServerError {
fn from(value: Elapsed) -> Self {
Self::Timeout(value)
}
}
impl Display for ServerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

View File

@ -1,6 +0,0 @@
use std::net::TcpListener;
pub trait Listener {}
pub trait Stream {}
impl Listener for TcpListener {}

View File

@ -1,10 +1,10 @@
#![cfg(feature = "server")] #![cfg(feature = "server")]
use std::{net::SocketAddr, time::Duration}; use std::{error::Error, net::SocketAddr, time::Duration};
use anyhow::{anyhow, Result}; use anyhow::anyhow;
use libpt::{ use libpt::{
bintols::display::humanbytes, bintols::display::humanbytes,
log::{debug, error, info, trace}, log::{debug, error, info, trace, warn},
}; };
use threadpool::ThreadPool; use threadpool::ThreadPool;
use tokio::{ use tokio::{
@ -15,7 +15,8 @@ use tokio::{
use crate::common::conf::Config; use crate::common::conf::Config;
pub mod listener; pub mod errors;
use errors::*;
pub struct Server { pub struct Server {
cfg: Config, cfg: Config,
@ -25,7 +26,7 @@ pub struct Server {
} }
impl Server { impl Server {
pub async fn build(cfg: Config) -> Result<Self> { pub async fn build(cfg: Config) -> anyhow::Result<Self> {
let server = TcpListener::bind(cfg.addr).await?; let server = TcpListener::bind(cfg.addr).await?;
let pool = ThreadPool::new(cfg.threads); let pool = ThreadPool::new(cfg.threads);
let timeout = Some(Duration::from_secs(5)); let timeout = Some(Duration::from_secs(5));
@ -36,36 +37,63 @@ impl Server {
server, server,
}) })
} }
pub async fn run(self) -> Result<()> { pub async fn run(self) -> anyhow::Result<()> {
loop { loop {
let stream = self.server.accept().await?; let (stream, addr) = match self.server.accept().await {
self.handle_stream(stream).await?; Ok(s) => s,
Err(err) => {
warn!("could not accept stream: {err:?}");
continue;
}
};
match self.handle_stream(stream).await {
Ok(_) => (),
Err(err) => {
match err {
ServerError::Timeout(_) => {
info!("stream {:?} timed out", addr)
}
_ => {
warn!("error while handling stream: {:?}", err)
}
}
continue;
}
};
} }
} }
async fn handle_stream(&self, stream: (TcpStream, SocketAddr)) -> Result<()> { async fn handle_stream(&self, stream: TcpStream) -> Result<()> {
const BUF_SIZE: usize = 1024; const BUF_SIZE: usize = 1024;
info!("start handling stream {:?}", stream.1); let addr = match stream.peer_addr() {
Ok(a) => a,
Err(err) => {
debug!("could not get peer address: {:?}", err);
return Err(err.into());
}
};
info!("new peer: {:?}", addr);
let mut buf = [0u8; BUF_SIZE]; let mut buf = [0u8; BUF_SIZE];
let mut reader = BufReader::new(stream.0); let mut reader = BufReader::new(stream);
let mut len; let mut len;
loop { loop {
trace!("reading anew"); trace!("reading anew");
len = self.read(&mut reader, &mut buf).await?; len = self.read(&mut reader, &mut buf).await?;
trace!("did read"); trace!("did read");
if len == 0 { if len == 0 {
trace!("len is apperently 0: {len:?}");
break; break;
} else { } else {
info!( info!(
"< {}\t({})\n{}", "< {:?}\t({})\n{}",
stream.1, addr,
humanbytes(len), humanbytes(len),
self.decode(&buf)? self.decode(&buf)?
); );
buf = [0; BUF_SIZE]; buf = [0; BUF_SIZE];
} }
} }
info!("stop handling stream {:?}", stream.1); info!("disconnected peer: {:?}", addr);
Ok(()) Ok(())
} }
@ -82,10 +110,15 @@ impl Server {
len = match timeout(self.cfg.timeout, reader.read(buf)).await { len = match timeout(self.cfg.timeout, reader.read(buf)).await {
Ok(inner) => { Ok(inner) => {
trace!("Read for len: {inner:?}"); trace!("Read for len: {inner:?}");
0 match inner {
Ok(len) => len,
Err(err) => {
error!("read failed: {err:?}");
return Err(ServerError::RdFail);
}
}
} }
Err(err) => { Err(err) => {
error!("timeout while reading: {err}");
return Err(err.into()); return Err(err.into());
} }
}; };
@ -96,7 +129,8 @@ impl Server {
failsafe += 1; failsafe += 1;
} }
} }
Err(anyhow!("read too often, so the failsafe activated")) // TODO: remove the failsafe when this stuff works
Err(anyhow!("read too often, so the failsafe activated").into())
} }
fn should_end(&self, buf: &[u8]) -> bool { fn should_end(&self, buf: &[u8]) -> bool {
@ -104,10 +138,10 @@ impl Server {
let mut iter = buf.iter().skip(1).peekable(); let mut iter = buf.iter().skip(1).peekable();
while let Some(b) = iter.next() { while let Some(b) = iter.next() {
if lb != 0 && *b == 0 && **iter.peek().unwrap() == 0 { if lb != 0 && *b == 0 && **iter.peek().unwrap() == 0 {
return true return true;
} }
lb = *b; lb = *b;
} }
return false return false;
} }
} }