From 6f58b5c6a0220e999dd700a8ca6b529d23faae60 Mon Sep 17 00:00:00 2001 From: PlexSheep Date: Thu, 18 Jan 2024 23:13:18 +0100 Subject: [PATCH 01/13] early cli interface --- Cargo.toml | 22 +++++++++++---- src/main.rs | 81 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0543e1f..c9c6905 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,16 +1,26 @@ [package] -name = "template" +name = "netpong" version = "0.1.0" edition = "2021" -publish = false +publish = true authors = ["Christoph J. Scherr "] license = "MIT" -description = "No description yet" +description = "let your servers play ping pong" readme = "README.md" -homepage = "https://git.cscherr.de/PlexSheep/rs-base" -repository = "https://git.cscherr.de/PlexSheep/rs-base" -keywords = ["template"] +homepage = "https://git.cscherr.de/PlexSheep/netpong" +repository = "https://git.cscherr.de/PlexSheep/netpong" +keywords = ["networking"] [dependencies] +anyhow = "1.0.79" +clap = "4.4.18" +clap-num = "1.0.2" +clap-verbosity-flag = "2.1.2" +libpt = { version = "0.3.10", features = ["net"] } +threadpool = "1.8.1" +[features] +default = ["server", "client"] +client = [] +server = [] diff --git a/src/main.rs b/src/main.rs index e7a11a9..3de5b45 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,84 @@ +//! # netpong +//! This is pretty silly and only let's your client send "ping" +//! over a connection, while letting a server reply pong to +//! every connection. But I had to make it for some reason. + +use libpt::log::*; + +use clap::Parser; +use clap_verbosity_flag::{InfoLevel, Verbosity}; + +/// Chose a mode for the application to run in, server needs more than client. +enum Mode { + #[cfg(feature = "server")] + Server, + #[cfg(feature = "client")] + Client, +} + +/// short about section displayed in help +const ABOUT_ROOT: &'static str = r##" +Let your hosts play ping pong over the network +"##; +/// longer about section displayed in help, is combined with [the short help](ABOUT_ROOT) +static LONG_ABOUT_ROOT: &'static str = r##" + + Connect to a ping pong server and periodically send a ping. The server will reply with a pong. + That's it really. You can also host your own netpong server (server feature). +"##; + +#[derive(Debug, Clone, Parser)] +#[command( + author, + version, + about = ABOUT_ROOT, + long_about = format!("{}{}", ABOUT_ROOT ,LONG_ABOUT_ROOT), + help_template = +r#"{about-section} +{usage-heading} {usage} +{all-args}{tab} + +libpt: {version} +Author: {author-with-newline} +"# + )] +pub struct Cli { + // clap_verbosity_flag seems to make this a global option implicitly + /// set a verbosity, multiple allowed (f.e. -vvv) + #[command(flatten)] + pub verbose: Verbosity, + + /// show additional logging meta data + #[arg(long)] + pub meta: bool, + + /// Address of the server + pub addr: std::net::SocketAddr, +} + +fn cli_parse() -> Cli { + let cli = Cli::parse(); + let ll: Level = match cli.verbose.log_level().unwrap().as_str() { + "TRACE" => Level::TRACE, + "DEBUG" => Level::DEBUG, + "INFO" => Level::INFO, + "WARN" => Level::WARN, + "ERROR" => Level::ERROR, + _ => { + unreachable!(); + } + }; + if cli.meta { + Logger::init(None, Some(ll)).expect("could not initialize Logger"); + } else { + // less verbose version + Logger::init_mini(Some(ll)).expect("could not initialize Logger"); + } + return cli; +} + fn main() { + let mut cli = cli_parse(); + debug!("dumping cli args:\n{:#?}", cli); println!("Hello, world!"); } -- 2.40.1 From 716c02da5d50ca46ac70117d38794b49d699b13f Mon Sep 17 00:00:00 2001 From: PlexSheep Date: Thu, 18 Jan 2024 22:15:21 +0000 Subject: [PATCH 02/13] automatic cargo CI changes --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 3de5b45..cee0018 100644 --- a/src/main.rs +++ b/src/main.rs @@ -78,7 +78,7 @@ fn cli_parse() -> Cli { } fn main() { - let mut cli = cli_parse(); + let cli = cli_parse(); debug!("dumping cli args:\n{:#?}", cli); println!("Hello, world!"); } -- 2.40.1 From 085223d0a6cc0c0a86bd0052463d79359808de61 Mon Sep 17 00:00:00 2001 From: PlexSheep Date: Thu, 18 Jan 2024 23:45:50 +0100 Subject: [PATCH 03/13] continues cli interface, mode enum --- src/client/mod.rs | 0 src/common/mod.rs | 38 ++++++++++++++++++++++++++++++++++++++ src/main.rs | 36 ++++++++++++++++++++++++------------ src/server/mod.rs | 0 4 files changed, 62 insertions(+), 12 deletions(-) create mode 100644 src/client/mod.rs create mode 100644 src/common/mod.rs create mode 100644 src/server/mod.rs diff --git a/src/client/mod.rs b/src/client/mod.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/common/mod.rs b/src/common/mod.rs new file mode 100644 index 0000000..00da770 --- /dev/null +++ b/src/common/mod.rs @@ -0,0 +1,38 @@ +use std::fmt::Display; + +use clap::ValueEnum; +#[derive(Debug, Clone, Copy)] +pub(crate) enum Mode { + Tcp, +} + +impl ValueEnum for Mode { + fn to_possible_value(&self) -> Option { + Some(match self { + Self::Tcp => clap::builder::PossibleValue::new("tcp"), + }) + } + fn value_variants<'a>() -> &'a [Self] { + &[Self::Tcp] + } + fn from_str(input: &str, ignore_case: bool) -> Result { + let comp: String = if ignore_case { + input.to_lowercase() + } else { + input.to_string() + }; + match comp.as_str() { + "tcp" => return Ok(Self::Tcp), + _ => return Err(format!("\"{input}\" is not a valid mode")), + } + } +} + +impl Display for Mode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let repr: String = match self{ + Self::Tcp => format!("tcp") + }; + write!(f, "{}", repr) + } +} diff --git a/src/main.rs b/src/main.rs index 3de5b45..cd27c7b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,13 +8,11 @@ use libpt::log::*; use clap::Parser; use clap_verbosity_flag::{InfoLevel, Verbosity}; -/// Chose a mode for the application to run in, server needs more than client. -enum Mode { - #[cfg(feature = "server")] - Server, - #[cfg(feature = "client")] - Client, -} +mod client; +mod common; +mod server; + +use common::Mode; /// short about section displayed in help const ABOUT_ROOT: &'static str = r##" @@ -42,18 +40,25 @@ libpt: {version} Author: {author-with-newline} "# )] -pub struct Cli { +pub(crate) struct Cli { // clap_verbosity_flag seems to make this a global option implicitly /// set a verbosity, multiple allowed (f.e. -vvv) #[command(flatten)] - pub verbose: Verbosity, + pub(crate) verbose: Verbosity, /// show additional logging meta data #[arg(long)] - pub meta: bool, + pub(crate) meta: bool, + + #[cfg(feature = "server")] + #[arg(short, long, default_value_t = false)] + pub(crate) server: bool, + + #[arg(short, long, default_value_t = Mode::Tcp, ignore_case = true)] + pub(crate) mode: Mode, /// Address of the server - pub addr: std::net::SocketAddr, + pub(crate) addr: std::net::SocketAddr, } fn cli_parse() -> Cli { @@ -80,5 +85,12 @@ fn cli_parse() -> Cli { fn main() { let mut cli = cli_parse(); debug!("dumping cli args:\n{:#?}", cli); - println!("Hello, world!"); + #[cfg(feature = "server")] + if cli.server { + info!("starting server"); + + return; + } + // implicit else, so we can work without the server feature + info!("starting client"); } diff --git a/src/server/mod.rs b/src/server/mod.rs new file mode 100644 index 0000000..e69de29 -- 2.40.1 From a212cfff83a3be5e24afb2b104fc670297634460 Mon Sep 17 00:00:00 2001 From: PlexSheep Date: Thu, 18 Jan 2024 23:55:02 +0100 Subject: [PATCH 04/13] split more stuff into modules --- src/common/args.rs | 76 ++++++++++++++++++++++++++++++++++++++++++++++ src/common/conf.rs | 49 ++++++++++++++++++++++++++++++ src/common/mod.rs | 40 +++--------------------- src/main.rs | 75 ++------------------------------------------- 4 files changed, 131 insertions(+), 109 deletions(-) create mode 100644 src/common/args.rs create mode 100644 src/common/conf.rs diff --git a/src/common/args.rs b/src/common/args.rs new file mode 100644 index 0000000..65a1085 --- /dev/null +++ b/src/common/args.rs @@ -0,0 +1,76 @@ +use libpt::log::{Level, Logger}; + +use clap::Parser; +use clap_verbosity_flag::{InfoLevel, Verbosity}; + +use crate::common::conf::Mode; + +/// short about section displayed in help +const ABOUT_ROOT: &'static str = r##" +Let your hosts play ping pong over the network +"##; +/// longer about section displayed in help, is combined with [the short help](ABOUT_ROOT) +static LONG_ABOUT_ROOT: &'static str = r##" + + Connect to a ping pong server and periodically send a ping. The server will reply with a pong. + That's it really. You can also host your own netpong server (server feature). +"##; + +#[derive(Debug, Clone, Parser)] +#[command( + author, + version, + about = ABOUT_ROOT, + long_about = format!("{}{}", ABOUT_ROOT ,LONG_ABOUT_ROOT), + help_template = +r#"{about-section} +{usage-heading} {usage} +{all-args}{tab} + +libpt: {version} +Author: {author-with-newline} +"# + )] +pub(crate) struct Cli { + // clap_verbosity_flag seems to make this a global option implicitly + /// set a verbosity, multiple allowed (f.e. -vvv) + #[command(flatten)] + pub(crate) verbose: Verbosity, + + /// show additional logging meta data + #[arg(long)] + pub(crate) meta: bool, + + #[cfg(feature = "server")] + #[arg(short, long, default_value_t = false)] + pub(crate) server: bool, + + #[arg(short, long, default_value_t = Mode::Tcp, ignore_case = true)] + pub(crate) mode: Mode, + + /// Address of the server + pub(crate) addr: std::net::SocketAddr, +} + +impl Cli { + pub fn cli_parse() -> Self { + let cli = Self::parse(); + let ll: Level = match cli.verbose.log_level().unwrap().as_str() { + "TRACE" => Level::TRACE, + "DEBUG" => Level::DEBUG, + "INFO" => Level::INFO, + "WARN" => Level::WARN, + "ERROR" => Level::ERROR, + _ => { + unreachable!(); + } + }; + if cli.meta { + Logger::init(None, Some(ll)).expect("could not initialize Logger"); + } else { + // less verbose version + Logger::init_mini(Some(ll)).expect("could not initialize Logger"); + } + return cli; + } +} diff --git a/src/common/conf.rs b/src/common/conf.rs new file mode 100644 index 0000000..514f588 --- /dev/null +++ b/src/common/conf.rs @@ -0,0 +1,49 @@ +use std::fmt::Display; +use clap::ValueEnum; +use crate::common::args::Cli; +#[derive(Debug, Clone, Copy)] +pub enum Mode { + Tcp, +} + +impl ValueEnum for Mode { + fn to_possible_value(&self) -> Option { + Some(match self { + Self::Tcp => clap::builder::PossibleValue::new("tcp"), + }) + } + fn value_variants<'a>() -> &'a [Self] { + &[Self::Tcp] + } + fn from_str(input: &str, ignore_case: bool) -> Result { + let comp: String = if ignore_case { + input.to_lowercase() + } else { + input.to_string() + }; + match comp.as_str() { + "tcp" => return Ok(Self::Tcp), + _ => return Err(format!("\"{input}\" is not a valid mode")), + } + } +} + +impl Display for Mode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let repr: String = match self { + Self::Tcp => format!("tcp"), + }; + write!(f, "{}", repr) + } +} + +pub(crate) struct Config { + cli: Cli, + addr: std::net::SocketAddr, +} + +impl Config { + pub fn new(cli: Cli, addr: std::net::SocketAddr) -> Self { + Config { cli, addr } + } +} diff --git a/src/common/mod.rs b/src/common/mod.rs index 00da770..8d2493b 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,38 +1,6 @@ -use std::fmt::Display; -use clap::ValueEnum; -#[derive(Debug, Clone, Copy)] -pub(crate) enum Mode { - Tcp, -} +pub mod args; +use args::*; -impl ValueEnum for Mode { - fn to_possible_value(&self) -> Option { - Some(match self { - Self::Tcp => clap::builder::PossibleValue::new("tcp"), - }) - } - fn value_variants<'a>() -> &'a [Self] { - &[Self::Tcp] - } - fn from_str(input: &str, ignore_case: bool) -> Result { - let comp: String = if ignore_case { - input.to_lowercase() - } else { - input.to_string() - }; - match comp.as_str() { - "tcp" => return Ok(Self::Tcp), - _ => return Err(format!("\"{input}\" is not a valid mode")), - } - } -} - -impl Display for Mode { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let repr: String = match self{ - Self::Tcp => format!("tcp") - }; - write!(f, "{}", repr) - } -} +pub mod conf; +use conf::*; diff --git a/src/main.rs b/src/main.rs index 83d938c..3b1d87d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,85 +5,14 @@ use libpt::log::*; -use clap::Parser; -use clap_verbosity_flag::{InfoLevel, Verbosity}; - mod client; mod common; mod server; -use common::Mode; - -/// short about section displayed in help -const ABOUT_ROOT: &'static str = r##" -Let your hosts play ping pong over the network -"##; -/// longer about section displayed in help, is combined with [the short help](ABOUT_ROOT) -static LONG_ABOUT_ROOT: &'static str = r##" - - Connect to a ping pong server and periodically send a ping. The server will reply with a pong. - That's it really. You can also host your own netpong server (server feature). -"##; - -#[derive(Debug, Clone, Parser)] -#[command( - author, - version, - about = ABOUT_ROOT, - long_about = format!("{}{}", ABOUT_ROOT ,LONG_ABOUT_ROOT), - help_template = -r#"{about-section} -{usage-heading} {usage} -{all-args}{tab} - -libpt: {version} -Author: {author-with-newline} -"# - )] -pub(crate) struct Cli { - // clap_verbosity_flag seems to make this a global option implicitly - /// set a verbosity, multiple allowed (f.e. -vvv) - #[command(flatten)] - pub(crate) verbose: Verbosity, - - /// show additional logging meta data - #[arg(long)] - pub(crate) meta: bool, - - #[cfg(feature = "server")] - #[arg(short, long, default_value_t = false)] - pub(crate) server: bool, - - #[arg(short, long, default_value_t = Mode::Tcp, ignore_case = true)] - pub(crate) mode: Mode, - - /// Address of the server - pub(crate) addr: std::net::SocketAddr, -} - -fn cli_parse() -> Cli { - let cli = Cli::parse(); - let ll: Level = match cli.verbose.log_level().unwrap().as_str() { - "TRACE" => Level::TRACE, - "DEBUG" => Level::DEBUG, - "INFO" => Level::INFO, - "WARN" => Level::WARN, - "ERROR" => Level::ERROR, - _ => { - unreachable!(); - } - }; - if cli.meta { - Logger::init(None, Some(ll)).expect("could not initialize Logger"); - } else { - // less verbose version - Logger::init_mini(Some(ll)).expect("could not initialize Logger"); - } - return cli; -} +use common::{args::Cli, conf::*}; fn main() { - let cli = cli_parse(); + let cli = Cli::cli_parse(); debug!("dumping cli args:\n{:#?}", cli); #[cfg(feature = "server")] if cli.server { -- 2.40.1 From 78340a23d2d6c15f155bb62d4461aa35b49e1d55 Mon Sep 17 00:00:00 2001 From: PlexSheep Date: Fri, 19 Jan 2024 00:59:16 +0100 Subject: [PATCH 05/13] a very primitive "server" --- Cargo.toml | 8 ++--- src/common/args.rs | 7 +++++ src/common/conf.rs | 19 +++++++----- src/common/mod.rs | 4 --- src/main.rs | 13 +++++++-- src/server/listener.rs | 6 ++++ src/server/mod.rs | 66 ++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 105 insertions(+), 18 deletions(-) create mode 100644 src/server/listener.rs diff --git a/Cargo.toml b/Cargo.toml index c9c6905..246973f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,9 +18,9 @@ clap = "4.4.18" clap-num = "1.0.2" clap-verbosity-flag = "2.1.2" libpt = { version = "0.3.10", features = ["net"] } -threadpool = "1.8.1" +mio = { version = "0.8.10", features = ["net", "os-poll"] } +threadpool = { version = "1.8.1", optional = true } [features] -default = ["server", "client"] -client = [] -server = [] +default = ["server"] +server = ["dep:threadpool"] diff --git a/src/common/args.rs b/src/common/args.rs index 65a1085..9b39244 100644 --- a/src/common/args.rs +++ b/src/common/args.rs @@ -41,10 +41,17 @@ pub(crate) struct Cli { #[arg(long)] pub(crate) meta: bool, + // host a server instead of connecting to one #[cfg(feature = "server")] #[arg(short, long, default_value_t = false)] pub(crate) server: bool, + + // how much threads the server should use + #[cfg(feature = "server")] + #[arg(short, long, default_value_t = 4)] + pub(crate) threads: usize, + #[arg(short, long, default_value_t = Mode::Tcp, ignore_case = true)] pub(crate) mode: Mode, diff --git a/src/common/conf.rs b/src/common/conf.rs index 514f588..0756095 100644 --- a/src/common/conf.rs +++ b/src/common/conf.rs @@ -1,6 +1,6 @@ -use std::fmt::Display; -use clap::ValueEnum; use crate::common::args::Cli; +use clap::ValueEnum; +use std::fmt::Display; #[derive(Debug, Clone, Copy)] pub enum Mode { Tcp, @@ -37,13 +37,18 @@ impl Display for Mode { } } -pub(crate) struct Config { - cli: Cli, - addr: std::net::SocketAddr, +pub struct Config { + pub addr: std::net::SocketAddr, + pub mode: Mode, + pub threads: usize } impl Config { - pub fn new(cli: Cli, addr: std::net::SocketAddr) -> Self { - Config { cli, addr } + pub fn new(cli: &Cli) -> Self { + Config { + addr: cli.addr.clone(), + mode: cli.mode.clone(), + threads: cli.threads + } } } diff --git a/src/common/mod.rs b/src/common/mod.rs index 8d2493b..f4200ec 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,6 +1,2 @@ - pub mod args; -use args::*; - pub mod conf; -use conf::*; diff --git a/src/main.rs b/src/main.rs index 3b1d87d..0b621d9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ //! over a connection, while letting a server reply pong to //! every connection. But I had to make it for some reason. +use anyhow::Result; use libpt::log::*; mod client; @@ -11,15 +12,21 @@ mod server; use common::{args::Cli, conf::*}; -fn main() { +use crate::server::Server; + +fn main() -> Result<()> { let cli = Cli::cli_parse(); debug!("dumping cli args:\n{:#?}", cli); + + let cfg = Config::new(&cli); + #[cfg(feature = "server")] if cli.server { info!("starting server"); - - return; + Server::build(cfg)?.run()?; + return Ok(()); } // implicit else, so we can work without the server feature info!("starting client"); + Ok(()) } diff --git a/src/server/listener.rs b/src/server/listener.rs new file mode 100644 index 0000000..0abbcae --- /dev/null +++ b/src/server/listener.rs @@ -0,0 +1,6 @@ +use std::net::TcpListener; + +pub trait Listener {} +pub trait Stream {} + +impl Listener for TcpListener {} diff --git a/src/server/mod.rs b/src/server/mod.rs index e69de29..9aaf850 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -0,0 +1,66 @@ +#![cfg(feature = "server")] +use anyhow::Result; +use libpt::log::{error, info}; +use mio::{self, event::Event, net::TcpListener, Events, Interest, Poll, Token}; +use std::{ + io::{prelude::*, BufReader}, + time::Duration, +}; +use threadpool::ThreadPool; + +use crate::common::conf::{Config, Mode}; + +pub mod listener; +use listener::*; + +const EVENT_CAPACITY: usize = 128; +const SERVER: Token = Token(0); + +pub struct Server { + cfg: Config, + work: Poll, + events: Events, + pool: ThreadPool, + pub timeout: Option, + server: TcpListener, +} + +impl Server { + pub fn build(cfg: Config) -> Result { + let mut server = TcpListener::bind(cfg.addr)?; + let poll: Poll = Poll::new()?; + poll.registry() + .register(&mut server, SERVER, Interest::READABLE | Interest::WRITABLE); + let events: Events = Events::with_capacity(EVENT_CAPACITY); + let pool = ThreadPool::new(cfg.threads); + let timeout = Some(Duration::from_secs(5)); + Ok(Server { + cfg, + work: poll, + events, + pool, + timeout, + server, + }) + } + pub fn run(&mut self) -> Result<()> { + loop { + self.work.poll(&mut self.events, self.timeout)?; + for event in self.events.iter() { + // self.pool.execute(|| { + let result = self.handle_event(event); + if let Err(err) = result { + error!("Error while handling server event {:?}: {:?}", event, err); + } + // }); + } + } + } + + fn handle_event(&self, event: &Event) -> Result<()> { + dbg!(event); + let connection = self.server.accept()?; + info!("received a connection!"); + Ok(()) + } +} -- 2.40.1 From 12a60eb189a1f0cabb1f8d17de9904c99dd0ae68 Mon Sep 17 00:00:00 2001 From: PlexSheep Date: Fri, 19 Jan 2024 00:01:29 +0000 Subject: [PATCH 06/13] automatic cargo CI changes --- src/client/mod.rs | 1 + src/common/args.rs | 1 - src/common/conf.rs | 4 ++-- src/server/mod.rs | 10 +++------- 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index e69de29..8b13789 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -0,0 +1 @@ + diff --git a/src/common/args.rs b/src/common/args.rs index 9b39244..9a53635 100644 --- a/src/common/args.rs +++ b/src/common/args.rs @@ -46,7 +46,6 @@ pub(crate) struct Cli { #[arg(short, long, default_value_t = false)] pub(crate) server: bool, - // how much threads the server should use #[cfg(feature = "server")] #[arg(short, long, default_value_t = 4)] diff --git a/src/common/conf.rs b/src/common/conf.rs index 0756095..7d44a8a 100644 --- a/src/common/conf.rs +++ b/src/common/conf.rs @@ -40,7 +40,7 @@ impl Display for Mode { pub struct Config { pub addr: std::net::SocketAddr, pub mode: Mode, - pub threads: usize + pub threads: usize, } impl Config { @@ -48,7 +48,7 @@ impl Config { Config { addr: cli.addr.clone(), mode: cli.mode.clone(), - threads: cli.threads + threads: cli.threads, } } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 9aaf850..b2eabe5 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2,16 +2,12 @@ use anyhow::Result; use libpt::log::{error, info}; use mio::{self, event::Event, net::TcpListener, Events, Interest, Poll, Token}; -use std::{ - io::{prelude::*, BufReader}, - time::Duration, -}; +use std::time::Duration; use threadpool::ThreadPool; -use crate::common::conf::{Config, Mode}; +use crate::common::conf::Config; pub mod listener; -use listener::*; const EVENT_CAPACITY: usize = 128; const SERVER: Token = Token(0); @@ -59,7 +55,7 @@ impl Server { fn handle_event(&self, event: &Event) -> Result<()> { dbg!(event); - let connection = self.server.accept()?; + let _connection = self.server.accept()?; info!("received a connection!"); Ok(()) } -- 2.40.1 From a11618add47def2fd247b8bc9d285e7970a85718 Mon Sep 17 00:00:00 2001 From: "Christoph J. Scherr" Date: Fri, 19 Jan 2024 09:36:17 +0100 Subject: [PATCH 07/13] use thread pool to process connections --- src/server/mod.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index b2eabe5..4bc9f97 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -43,20 +43,27 @@ impl Server { loop { self.work.poll(&mut self.events, self.timeout)?; for event in self.events.iter() { - // self.pool.execute(|| { let result = self.handle_event(event); if let Err(err) = result { error!("Error while handling server event {:?}: {:?}", event, err); } - // }); } } } fn handle_event(&self, event: &Event) -> Result<()> { dbg!(event); - let _connection = self.server.accept()?; - info!("received a connection!"); + let connection = match self.server.accept() { + Ok(c) => c, + Err(err) => { + error!("error while handling connection: {:?}", err); + return Err(err.into()); + } + }; + self.pool.execute(move|| { + dbg!(connection); + info!("received a connection!"); + }); Ok(()) } } -- 2.40.1 From 13745e96b8f013825a3f61b134902e934375bd80 Mon Sep 17 00:00:00 2001 From: "Christoph J. Scherr" Date: Fri, 19 Jan 2024 12:56:25 +0100 Subject: [PATCH 08/13] tokio is hard work --- Cargo.toml | 2 +- src/common/conf.rs | 7 ++- src/main.rs | 7 +-- src/server/mod.rs | 111 ++++++++++++++++++++++++++++++--------------- 4 files changed, 85 insertions(+), 42 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 246973f..f1046ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,8 +18,8 @@ clap = "4.4.18" clap-num = "1.0.2" clap-verbosity-flag = "2.1.2" libpt = { version = "0.3.10", features = ["net"] } -mio = { version = "0.8.10", features = ["net", "os-poll"] } threadpool = { version = "1.8.1", optional = true } +tokio = { version = "1.35.1", features = ["net", "rt", "macros"] } [features] default = ["server"] diff --git a/src/common/conf.rs b/src/common/conf.rs index 7d44a8a..9c43509 100644 --- a/src/common/conf.rs +++ b/src/common/conf.rs @@ -1,6 +1,9 @@ use crate::common::args::Cli; use clap::ValueEnum; -use std::fmt::Display; +use std::{fmt::Display, time::Duration}; + +const DEFAULT_TIMEOUT_LEN: u64 = 2000; // ms + #[derive(Debug, Clone, Copy)] pub enum Mode { Tcp, @@ -41,6 +44,7 @@ pub struct Config { pub addr: std::net::SocketAddr, pub mode: Mode, pub threads: usize, + pub timeout: Duration, } impl Config { @@ -49,6 +53,7 @@ impl Config { addr: cli.addr.clone(), mode: cli.mode.clone(), threads: cli.threads, + timeout: Duration::from_millis(DEFAULT_TIMEOUT_LEN) } } } diff --git a/src/main.rs b/src/main.rs index 0b621d9..786a4ae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ use anyhow::Result; use libpt::log::*; +use tokio; mod client; mod common; @@ -14,7 +15,8 @@ use common::{args::Cli, conf::*}; use crate::server::Server; -fn main() -> Result<()> { +#[tokio::main] +async fn main() -> Result<()> { let cli = Cli::cli_parse(); debug!("dumping cli args:\n{:#?}", cli); @@ -23,8 +25,7 @@ fn main() -> Result<()> { #[cfg(feature = "server")] if cli.server { info!("starting server"); - Server::build(cfg)?.run()?; - return Ok(()); + return Server::build(cfg).await?.run().await } // implicit else, so we can work without the server feature info!("starting client"); diff --git a/src/server/mod.rs b/src/server/mod.rs index 4bc9f97..f53d857 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,69 +1,106 @@ #![cfg(feature = "server")] -use anyhow::Result; -use libpt::log::{error, info}; -use mio::{self, event::Event, net::TcpListener, Events, Interest, Poll, Token}; -use std::time::Duration; +use std::{net::SocketAddr, time::Duration}; + +use anyhow::{Result, anyhow}; +use libpt::{ + bintols::display::humanbytes, + log::{error, info, trace, warn, debug}, +}; use threadpool::ThreadPool; +use tokio::{ + io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, Interest}, + net::{self, TcpListener, TcpSocket, TcpStream}, + select, + time::timeout, +}; use crate::common::conf::Config; pub mod listener; -const EVENT_CAPACITY: usize = 128; -const SERVER: Token = Token(0); - pub struct Server { cfg: Config, - work: Poll, - events: Events, pool: ThreadPool, pub timeout: Option, server: TcpListener, } impl Server { - pub fn build(cfg: Config) -> Result { - let mut server = TcpListener::bind(cfg.addr)?; - let poll: Poll = Poll::new()?; - poll.registry() - .register(&mut server, SERVER, Interest::READABLE | Interest::WRITABLE); - let events: Events = Events::with_capacity(EVENT_CAPACITY); + pub async fn build(cfg: Config) -> Result { + let server = TcpListener::bind(cfg.addr).await?; let pool = ThreadPool::new(cfg.threads); let timeout = Some(Duration::from_secs(5)); Ok(Server { cfg, - work: poll, - events, pool, timeout, server, }) } - pub fn run(&mut self) -> Result<()> { + pub async fn run(self) -> Result<()> { loop { - self.work.poll(&mut self.events, self.timeout)?; - for event in self.events.iter() { - let result = self.handle_event(event); - if let Err(err) = result { - error!("Error while handling server event {:?}: {:?}", event, err); - } - } + let stream = self.server.accept().await?; + self.handle_stream(stream).await?; } } - fn handle_event(&self, event: &Event) -> Result<()> { - dbg!(event); - let connection = match self.server.accept() { - Ok(c) => c, - Err(err) => { - error!("error while handling connection: {:?}", err); - return Err(err.into()); + async fn handle_stream(&self, mut stream: (TcpStream, SocketAddr)) -> Result<()> { + info!("start handling stream {:?}", stream.1); + let mut buf = Vec::new(); + let mut reader = BufReader::new(stream.0); + let mut len; + loop { + trace!("reading anew"); + len = self.read(&mut reader, &mut buf).await?; + trace!("did read"); + if len == 0 { + break; + } else { + info!( + "< {}\t({})\n{}", + stream.1, + humanbytes(len), + self.decode(&buf)? + ); + buf.clear(); } - }; - self.pool.execute(move|| { - dbg!(connection); - info!("received a connection!"); - }); + } + info!("stop handling stream {:?}", stream.1); Ok(()) } + + #[inline] + fn decode(&self, buf: &Vec) -> Result { + Ok(String::from_utf8(buf.clone())?) + } + + async fn read(&self, reader: &mut BufReader, buf: &mut Vec) -> Result { + let mut len: usize; + let mut failsafe: u8 = 0; + while failsafe < u8::MAX { + trace!("loop"); + len = match timeout(self.cfg.timeout, reader.read(buf)).await { + Ok(inner) => { + trace!("Read for len: {inner:?}"); 0 + }, + Err(err) => { + error!("timeout while reading: {err}"); + return Err(err.into()); + } + }; + if self.should_end(&buf) { + return Ok(len); + } + else { + // ????????? + failsafe += 1; + } + } + Err(anyhow!("read too often, so the failsafe activated")) + } + + fn should_end(&self, buf: &Vec) -> bool { + debug!("eval should end: {:?}", buf); + buf.contains(&0x00) + } } -- 2.40.1 From 9e492adeb359011330899a8c9889dde0ddc4ff6c Mon Sep 17 00:00:00 2001 From: cscherrNT Date: Fri, 19 Jan 2024 11:58:53 +0000 Subject: [PATCH 09/13] automatic cargo CI changes --- src/common/conf.rs | 2 +- src/main.rs | 2 +- src/server/mod.rs | 19 +++++++++---------- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/common/conf.rs b/src/common/conf.rs index 9c43509..0bc94fd 100644 --- a/src/common/conf.rs +++ b/src/common/conf.rs @@ -53,7 +53,7 @@ impl Config { addr: cli.addr.clone(), mode: cli.mode.clone(), threads: cli.threads, - timeout: Duration::from_millis(DEFAULT_TIMEOUT_LEN) + timeout: Duration::from_millis(DEFAULT_TIMEOUT_LEN), } } } diff --git a/src/main.rs b/src/main.rs index 786a4ae..f0f5acc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,7 +25,7 @@ async fn main() -> Result<()> { #[cfg(feature = "server")] if cli.server { info!("starting server"); - return Server::build(cfg).await?.run().await + return Server::build(cfg).await?.run().await; } // implicit else, so we can work without the server feature info!("starting client"); diff --git a/src/server/mod.rs b/src/server/mod.rs index f53d857..5508799 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,16 +1,15 @@ #![cfg(feature = "server")] use std::{net::SocketAddr, time::Duration}; -use anyhow::{Result, anyhow}; +use anyhow::{anyhow, Result}; use libpt::{ bintols::display::humanbytes, - log::{error, info, trace, warn, debug}, + log::{debug, error, info, trace}, }; use threadpool::ThreadPool; use tokio::{ - io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, Interest}, - net::{self, TcpListener, TcpSocket, TcpStream}, - select, + io::{AsyncReadExt, BufReader}, + net::{TcpListener, TcpStream}, time::timeout, }; @@ -44,7 +43,7 @@ impl Server { } } - async fn handle_stream(&self, mut stream: (TcpStream, SocketAddr)) -> Result<()> { + async fn handle_stream(&self, stream: (TcpStream, SocketAddr)) -> Result<()> { info!("start handling stream {:?}", stream.1); let mut buf = Vec::new(); let mut reader = BufReader::new(stream.0); @@ -81,8 +80,9 @@ impl Server { trace!("loop"); len = match timeout(self.cfg.timeout, reader.read(buf)).await { Ok(inner) => { - trace!("Read for len: {inner:?}"); 0 - }, + trace!("Read for len: {inner:?}"); + 0 + } Err(err) => { error!("timeout while reading: {err}"); return Err(err.into()); @@ -90,8 +90,7 @@ impl Server { }; if self.should_end(&buf) { return Ok(len); - } - else { + } else { // ????????? failsafe += 1; } -- 2.40.1 From 956dbf375a4f553032f382c0c846b26815881265 Mon Sep 17 00:00:00 2001 From: "Christoph J. Scherr" Date: Fri, 19 Jan 2024 14:21:19 +0100 Subject: [PATCH 10/13] progress with the reads --- src/server/mod.rs | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index 5508799..54b9201 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -44,8 +44,9 @@ impl Server { } async fn handle_stream(&self, stream: (TcpStream, SocketAddr)) -> Result<()> { + const BUF_SIZE: usize = 1024; info!("start handling stream {:?}", stream.1); - let mut buf = Vec::new(); + let mut buf = [0u8; BUF_SIZE]; let mut reader = BufReader::new(stream.0); let mut len; loop { @@ -61,7 +62,7 @@ impl Server { humanbytes(len), self.decode(&buf)? ); - buf.clear(); + buf = [0; BUF_SIZE]; } } info!("stop handling stream {:?}", stream.1); @@ -69,14 +70,14 @@ impl Server { } #[inline] - fn decode(&self, buf: &Vec) -> Result { - Ok(String::from_utf8(buf.clone())?) + fn decode(&self, buf: &[u8]) -> Result { + Ok(std::str::from_utf8(buf)?.to_string()) } - async fn read(&self, reader: &mut BufReader, buf: &mut Vec) -> Result { + async fn read(&self, reader: &mut BufReader, buf: &mut [u8]) -> Result { let mut len: usize; let mut failsafe: u8 = 0; - while failsafe < u8::MAX { + while failsafe < 5 { trace!("loop"); len = match timeout(self.cfg.timeout, reader.read(buf)).await { Ok(inner) => { @@ -98,8 +99,15 @@ impl Server { Err(anyhow!("read too often, so the failsafe activated")) } - fn should_end(&self, buf: &Vec) -> bool { - debug!("eval should end: {:?}", buf); - buf.contains(&0x00) + fn should_end(&self, buf: &[u8]) -> bool { + let mut lb: u8 = buf[0]; + let mut iter = buf.iter().skip(1).peekable(); + while let Some(b) = iter.next() { + if lb != 0 && *b == 0 && **iter.peek().unwrap() == 0 { + return true + } + lb = *b; + } + return false } } -- 2.40.1 From de412de988ce3f36416457d6560dd994d003079b Mon Sep 17 00:00:00 2001 From: "Christoph J. Scherr" Date: Fri, 19 Jan 2024 15:26:30 +0100 Subject: [PATCH 11/13] better errors --- Cargo.toml | 1 + src/common/conf.rs | 2 +- src/server/errors.rs | 47 +++++++++++++++++++++++++++ src/server/listener.rs | 6 ---- src/server/mod.rs | 72 +++++++++++++++++++++++++++++++----------- 5 files changed, 102 insertions(+), 26 deletions(-) create mode 100644 src/server/errors.rs delete mode 100644 src/server/listener.rs diff --git a/Cargo.toml b/Cargo.toml index f1046ee..557b49a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ clap = "4.4.18" clap-num = "1.0.2" clap-verbosity-flag = "2.1.2" libpt = { version = "0.3.10", features = ["net"] } +thiserror = "1.0.56" threadpool = { version = "1.8.1", optional = true } tokio = { version = "1.35.1", features = ["net", "rt", "macros"] } diff --git a/src/common/conf.rs b/src/common/conf.rs index 0bc94fd..a06645a 100644 --- a/src/common/conf.rs +++ b/src/common/conf.rs @@ -2,7 +2,7 @@ use crate::common::args::Cli; use clap::ValueEnum; use std::{fmt::Display, time::Duration}; -const DEFAULT_TIMEOUT_LEN: u64 = 2000; // ms +const DEFAULT_TIMEOUT_LEN: u64 = 5000; // ms #[derive(Debug, Clone, Copy)] pub enum Mode { diff --git a/src/server/errors.rs b/src/server/errors.rs new file mode 100644 index 0000000..239eb79 --- /dev/null +++ b/src/server/errors.rs @@ -0,0 +1,47 @@ +use std::{fmt::Display, str::Utf8Error}; + +use anyhow; +use thiserror::Error; +use tokio::time::error::Elapsed; + +pub type Result = std::result::Result; + +#[derive(Debug, Error)] +pub enum ServerError { + Timeout(Elapsed), + CouldNotAccept, + RdFail, + Anyhow(anyhow::Error), + IO(std::io::Error), + Format(Utf8Error), +} + +impl From for ServerError { + fn from(value: anyhow::Error) -> Self { + Self::Anyhow(value) + } +} + +impl From for ServerError { + fn from(value: std::io::Error) -> Self { + Self::IO(value) + } +} + +impl From for ServerError { + fn from(value: Utf8Error) -> Self { + Self::Format(value) + } +} + +impl From for ServerError { + fn from(value: Elapsed) -> Self { + Self::Timeout(value) + } +} + +impl Display for ServerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} diff --git a/src/server/listener.rs b/src/server/listener.rs deleted file mode 100644 index 0abbcae..0000000 --- a/src/server/listener.rs +++ /dev/null @@ -1,6 +0,0 @@ -use std::net::TcpListener; - -pub trait Listener {} -pub trait Stream {} - -impl Listener for TcpListener {} diff --git a/src/server/mod.rs b/src/server/mod.rs index 54b9201..dec1ccd 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,10 +1,10 @@ #![cfg(feature = "server")] -use std::{net::SocketAddr, time::Duration}; +use std::{error::Error, net::SocketAddr, time::Duration}; -use anyhow::{anyhow, Result}; +use anyhow::anyhow; use libpt::{ bintols::display::humanbytes, - log::{debug, error, info, trace}, + log::{debug, error, info, trace, warn}, }; use threadpool::ThreadPool; use tokio::{ @@ -15,7 +15,8 @@ use tokio::{ use crate::common::conf::Config; -pub mod listener; +pub mod errors; +use errors::*; pub struct Server { cfg: Config, @@ -25,7 +26,7 @@ pub struct Server { } impl Server { - pub async fn build(cfg: Config) -> Result { + pub async fn build(cfg: Config) -> anyhow::Result { let server = TcpListener::bind(cfg.addr).await?; let pool = ThreadPool::new(cfg.threads); let timeout = Some(Duration::from_secs(5)); @@ -36,36 +37,63 @@ impl Server { server, }) } - pub async fn run(self) -> Result<()> { + pub async fn run(self) -> anyhow::Result<()> { loop { - let stream = self.server.accept().await?; - self.handle_stream(stream).await?; + let (stream, addr) = match self.server.accept().await { + Ok(s) => s, + Err(err) => { + warn!("could not accept stream: {err:?}"); + continue; + } + }; + match self.handle_stream(stream).await { + Ok(_) => (), + Err(err) => { + match err { + ServerError::Timeout(_) => { + info!("stream {:?} timed out", addr) + } + _ => { + warn!("error while handling stream: {:?}", err) + } + } + continue; + } + }; } } - async fn handle_stream(&self, stream: (TcpStream, SocketAddr)) -> Result<()> { + async fn handle_stream(&self, stream: TcpStream) -> Result<()> { const BUF_SIZE: usize = 1024; - info!("start handling stream {:?}", stream.1); + let addr = match stream.peer_addr() { + Ok(a) => a, + Err(err) => { + debug!("could not get peer address: {:?}", err); + return Err(err.into()); + } + }; + info!("new peer: {:?}", addr); let mut buf = [0u8; BUF_SIZE]; - let mut reader = BufReader::new(stream.0); + let mut reader = BufReader::new(stream); let mut len; loop { trace!("reading anew"); len = self.read(&mut reader, &mut buf).await?; trace!("did read"); if len == 0 { + trace!("len is apperently 0: {len:?}"); break; } else { info!( - "< {}\t({})\n{}", - stream.1, + "< {:?}\t({})\n{}", + addr, humanbytes(len), self.decode(&buf)? ); buf = [0; BUF_SIZE]; } } - info!("stop handling stream {:?}", stream.1); + info!("disconnected peer: {:?}", addr); Ok(()) } @@ -82,10 +110,15 @@ impl Server { len = match timeout(self.cfg.timeout, reader.read(buf)).await { Ok(inner) => { trace!("Read for len: {inner:?}"); - 0 + match inner { + Ok(len) => len, + Err(err) => { + error!("read failed: {err:?}"); + return Err(ServerError::RdFail); + } + } } Err(err) => { - error!("timeout while reading: {err}"); return Err(err.into()); } }; @@ -96,7 +129,8 @@ impl Server { failsafe += 1; } } - Err(anyhow!("read too often, so the failsafe activated")) + // TODO: remove the failsafe when this stuff works + Err(anyhow!("read too often, so the failsafe activated").into()) } fn should_end(&self, buf: &[u8]) -> bool { @@ -104,10 +138,10 @@ impl Server { let mut iter = buf.iter().skip(1).peekable(); while let Some(b) = iter.next() { if lb != 0 && *b == 0 && **iter.peek().unwrap() == 0 { - return true + return true; } lb = *b; } - return false + return false; } } -- 2.40.1 From a9bf861a0bef2c98847b59c693893a2e34a4e3be Mon Sep 17 00:00:00 2001 From: "Christoph J. Scherr" Date: Fri, 19 Jan 2024 16:11:02 +0100 Subject: [PATCH 12/13] it somewhat works, now we just need to pong back --- src/server/errors.rs | 10 +++--- src/server/mod.rs | 72 ++++++++++++++------------------------------ 2 files changed, 27 insertions(+), 55 deletions(-) diff --git a/src/server/errors.rs b/src/server/errors.rs index 239eb79..90dd1f5 100644 --- a/src/server/errors.rs +++ b/src/server/errors.rs @@ -1,4 +1,4 @@ -use std::{fmt::Display, str::Utf8Error}; +use std::{fmt::Display, string::FromUtf8Error}; use anyhow; use thiserror::Error; @@ -9,11 +9,9 @@ pub type Result = std::result::Result; #[derive(Debug, Error)] pub enum ServerError { Timeout(Elapsed), - CouldNotAccept, - RdFail, Anyhow(anyhow::Error), IO(std::io::Error), - Format(Utf8Error), + Format(FromUtf8Error), } impl From for ServerError { @@ -28,8 +26,8 @@ impl From for ServerError { } } -impl From for ServerError { - fn from(value: Utf8Error) -> Self { +impl From for ServerError { + fn from(value: FromUtf8Error) -> Self { Self::Format(value) } } diff --git a/src/server/mod.rs b/src/server/mod.rs index dec1ccd..9dc3bdb 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -8,7 +8,7 @@ use libpt::{ }; use threadpool::ThreadPool; use tokio::{ - io::{AsyncReadExt, BufReader}, + io::{AsyncReadExt, BufReader, AsyncBufReadExt}, net::{TcpListener, TcpStream}, time::timeout, }; @@ -64,7 +64,6 @@ impl Server { } async fn handle_stream(&self, stream: TcpStream) -> Result<()> { - const BUF_SIZE: usize = 1024; let addr = match stream.peer_addr() { Ok(a) => a, Err(err) => { @@ -73,13 +72,23 @@ impl Server { } }; info!("new peer: {:?}", addr); - let mut buf = [0u8; BUF_SIZE]; + let mut buf = Vec::new(); let mut reader = BufReader::new(stream); let mut len; loop { - trace!("reading anew"); - len = self.read(&mut reader, &mut buf).await?; - trace!("did read"); + len = match self.read(&mut reader, &mut buf).await { + Ok(len) => len, + Err(err) => { + match err { + ServerError::Timeout(_) => { + info!("peer {:?} timed out", addr) + } + _ => return Err(err), + } + break; + } + }; + trace!("received message: {:X?}", buf); if len == 0 { trace!("len is apperently 0: {len:?}"); break; @@ -90,7 +99,7 @@ impl Server { humanbytes(len), self.decode(&buf)? ); - buf = [0; BUF_SIZE]; + buf.clear(); } } info!("disconnected peer: {:?}", addr); @@ -98,50 +107,15 @@ impl Server { } #[inline] - fn decode(&self, buf: &[u8]) -> Result { - Ok(std::str::from_utf8(buf)?.to_string()) + fn decode(&self, buf: &Vec) -> Result { + Ok(String::from_utf8(buf.clone())?) } - async fn read(&self, reader: &mut BufReader, buf: &mut [u8]) -> Result { - let mut len: usize; - let mut failsafe: u8 = 0; - while failsafe < 5 { - trace!("loop"); - len = match timeout(self.cfg.timeout, reader.read(buf)).await { - Ok(inner) => { - trace!("Read for len: {inner:?}"); - match inner { - Ok(len) => len, - Err(err) => { - error!("read failed: {err:?}"); - return Err(ServerError::RdFail); - } - } - } - Err(err) => { - return Err(err.into()); - } - }; - if self.should_end(&buf) { - return Ok(len); - } else { - // ????????? - failsafe += 1; - } + #[inline] + async fn read(&self, reader: &mut BufReader, buf: &mut Vec) -> Result { + match timeout(self.cfg.timeout, reader.read_until(0x00, buf)).await? { + Ok(len) => Ok(len), + Err(err) => Err(err.into()), } - // TODO: remove the failsafe when this stuff works - Err(anyhow!("read too often, so the failsafe activated").into()) - } - - fn should_end(&self, buf: &[u8]) -> bool { - let mut lb: u8 = buf[0]; - let mut iter = buf.iter().skip(1).peekable(); - while let Some(b) = iter.next() { - if lb != 0 && *b == 0 && **iter.peek().unwrap() == 0 { - return true; - } - lb = *b; - } - return false; } } -- 2.40.1 From 6868af20166897147cf58d25bf2b50e60c49ef9e Mon Sep 17 00:00:00 2001 From: "Christoph J. Scherr" Date: Fri, 19 Jan 2024 16:33:11 +0100 Subject: [PATCH 13/13] ping pong works (with a python client) --- scripts/client.py | 21 +++++++++++++++++++++ src/common/conf.rs | 6 ++++++ src/server/mod.rs | 40 ++++++++++++++++++++++------------------ 3 files changed, 49 insertions(+), 18 deletions(-) create mode 100644 scripts/client.py diff --git a/scripts/client.py b/scripts/client.py new file mode 100644 index 0000000..8aa1142 --- /dev/null +++ b/scripts/client.py @@ -0,0 +1,21 @@ +import socket + +HOST = "127.0.0.1" +PORT = 9999 + +payload = b"ping\0" + +with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((HOST, PORT)) + while True: + try: + s.sendall(payload) + print("> ping") + except Exception as e: + break + reply = s.recv(1024).decode() + if reply == "": + break + print(f"< {reply}") + print("connection shut down") + diff --git a/src/common/conf.rs b/src/common/conf.rs index a06645a..a860098 100644 --- a/src/common/conf.rs +++ b/src/common/conf.rs @@ -3,6 +3,8 @@ use clap::ValueEnum; use std::{fmt::Display, time::Duration}; const DEFAULT_TIMEOUT_LEN: u64 = 5000; // ms +const DEFAULT_DELAY_LEN: u64 = 500; // ms +const DEFAULT_WIN_AFTER: usize = 20; #[derive(Debug, Clone, Copy)] pub enum Mode { @@ -45,6 +47,8 @@ pub struct Config { pub mode: Mode, pub threads: usize, pub timeout: Duration, + pub delay: Duration, + pub win_after: usize, } impl Config { @@ -54,6 +58,8 @@ impl Config { mode: cli.mode.clone(), threads: cli.threads, timeout: Duration::from_millis(DEFAULT_TIMEOUT_LEN), + delay: Duration::from_millis(DEFAULT_DELAY_LEN), + win_after: DEFAULT_WIN_AFTER, } } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 9dc3bdb..3e3a021 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,14 +1,9 @@ #![cfg(feature = "server")] -use std::{error::Error, net::SocketAddr, time::Duration}; +use std::time::Duration; -use anyhow::anyhow; -use libpt::{ - bintols::display::humanbytes, - log::{debug, error, info, trace, warn}, -}; -use threadpool::ThreadPool; +use libpt::log::{debug, info, trace, warn}; use tokio::{ - io::{AsyncReadExt, BufReader, AsyncBufReadExt}, + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, net::{TcpListener, TcpStream}, time::timeout, }; @@ -20,7 +15,6 @@ use errors::*; pub struct Server { cfg: Config, - pool: ThreadPool, pub timeout: Option, server: TcpListener, } @@ -28,11 +22,9 @@ pub struct Server { impl Server { pub async fn build(cfg: Config) -> anyhow::Result { let server = TcpListener::bind(cfg.addr).await?; - let pool = ThreadPool::new(cfg.threads); let timeout = Some(Duration::from_secs(5)); Ok(Server { cfg, - pool, timeout, server, }) @@ -64,6 +56,7 @@ impl Server { } async fn handle_stream(&self, stream: TcpStream) -> Result<()> { + let mut pings: usize = 0; let addr = match stream.peer_addr() { Ok(a) => a, Err(err) => { @@ -93,13 +86,24 @@ impl Server { trace!("len is apperently 0: {len:?}"); break; } else { - info!( - "< {:?}\t({})\n{}", - addr, - humanbytes(len), - self.decode(&buf)? - ); + let msg = self.decode(&buf)?; + info!("< {:?} : {}", addr, msg); + if msg.contains("ping") { + pings += 1; + } + if pings < self.cfg.win_after { + reader.write_all(b"pong\0").await?; + info!("> {:?} : pong", addr,); + } else { + reader.write_all(b"you win!\0").await?; + info!("> {:?} : you win!", addr,); + reader.shutdown().await?; + break; + } buf.clear(); + + // we should wait, so that we don't spam the client + std::thread::sleep(self.cfg.delay); } } info!("disconnected peer: {:?}", addr); @@ -108,7 +112,7 @@ impl Server { #[inline] fn decode(&self, buf: &Vec) -> Result { - Ok(String::from_utf8(buf.clone())?) + Ok(String::from_utf8(buf.clone())?.replace('\n', "\\n")) } #[inline] -- 2.40.1