From b0edd17825a535f37dbedbd52fa01c23ff35b122 Mon Sep 17 00:00:00 2001 From: PlexSheep Date: Tue, 23 Jan 2024 09:13:40 +0100 Subject: [PATCH] runtime --- Cargo.toml | 5 +- src/common/conf.rs | 3 ++ src/main.rs | 5 +- src/server/mod.rs | 115 ++++++++++++++++++++++++++------------------- 4 files changed, 75 insertions(+), 53 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 557b49a..7ed3e3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "netpong" -version = "0.1.0" +version = "0.1.1" edition = "2021" publish = true authors = ["Christoph J. Scherr "] @@ -19,9 +19,8 @@ clap-num = "1.0.2" clap-verbosity-flag = "2.1.2" libpt = { version = "0.3.10", features = ["net"] } thiserror = "1.0.56" -threadpool = { version = "1.8.1", optional = true } tokio = { version = "1.35.1", features = ["net", "rt", "macros"] } [features] default = ["server"] -server = ["dep:threadpool"] +server = [] diff --git a/src/common/conf.rs b/src/common/conf.rs index a860098..3af4bae 100644 --- a/src/common/conf.rs +++ b/src/common/conf.rs @@ -9,12 +9,14 @@ const DEFAULT_WIN_AFTER: usize = 20; #[derive(Debug, Clone, Copy)] pub enum Mode { Tcp, + Tls } impl ValueEnum for Mode { fn to_possible_value(&self) -> Option { Some(match self { Self::Tcp => clap::builder::PossibleValue::new("tcp"), + Self::Tls => clap::builder::PossibleValue::new("tls"), }) } fn value_variants<'a>() -> &'a [Self] { @@ -37,6 +39,7 @@ impl Display for Mode { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let repr: String = match self { Self::Tcp => format!("tcp"), + Self::Tls => format!("tls"), }; write!(f, "{}", repr) } diff --git a/src/main.rs b/src/main.rs index a46b7bb..4173e1e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,7 +25,10 @@ async fn main() -> Result<()> { #[cfg(feature = "server")] if cli.server { info!("starting server"); - return Server::build(cfg).await?.run().await; + return Server::build(cfg).await?.start().await; + loop { + // now we wait + } } // implicit else, so we can work without the server feature info!("starting client"); diff --git a/src/server/mod.rs b/src/server/mod.rs index 4cb6edc..69dff39 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -5,6 +5,7 @@ use libpt::log::{debug, info, trace, warn}; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, net::{TcpListener, TcpStream}, + runtime::{Builder, Runtime}, time::timeout, }; @@ -17,44 +18,62 @@ pub struct Server { cfg: Config, pub timeout: Option, server: TcpListener, + accept_runtime: Runtime, + request_runtime: Runtime, } impl Server { pub async fn build(cfg: Config) -> anyhow::Result { let server = TcpListener::bind(cfg.addr).await?; let timeout = Some(Duration::from_secs(5)); + let accept_runtime = Builder::new_multi_thread() + .worker_threads(1) + .thread_stack_size(3 * 1024 * 1024) + .enable_io() + .enable_time() + .build()?; + let request_runtime = Builder::new_multi_thread() + .worker_threads(1) + .thread_stack_size(3 * 1024 * 1024) + .enable_io() + .enable_time() + .build()?; Ok(Server { cfg, timeout, server, + accept_runtime, + request_runtime, }) } - pub async fn run(self) -> anyhow::Result<()> { - loop { - let (stream, addr) = match self.server.accept().await { - Ok(s) => s, - Err(err) => { - warn!("could not accept stream: {err:?}"); - continue; - } - }; - // TODO: handle the stream with multi threading, or async or something - // We need concurrency here! - match self.handle_stream(stream).await { - Ok(_) => (), - Err(err) => { - match err { - ServerError::Timeout(_) => { - info!("stream {:?} timed out", addr) - } - _ => { - warn!("error while handling stream: {:?}", err) - } + pub async fn start(self) -> anyhow::Result<()> { + self.accept_runtime.block_on(async { + loop { + let (stream, addr) = match self.server.accept().await { + Ok(s) => s, + Err(err) => { + warn!("could not accept stream: {err:?}"); + continue; } - continue; - } - }; - } + }; + let _guard = self.request_runtime.enter(); + match self.handle_stream(stream).await { + Ok(_) => (), + Err(err) => { + match err { + ServerError::Timeout(_) => { + info!("stream {:?} timed out", addr) + } + _ => { + warn!("error while handling stream: {:?}", err) + } + } + continue; + } + }; + } + }); + Ok(()) } async fn handle_stream(&self, stream: TcpStream) -> Result<()> { @@ -69,9 +88,12 @@ impl Server { info!("new peer: {:?}", addr); let mut buf = Vec::new(); let mut reader = BufReader::new(stream); - let mut len; loop { - len = match self.read(&mut reader, &mut buf).await { + match self.read(&mut reader, &mut buf).await { + Ok(len) if len == 0 => { + trace!("len is 0, so the stream has ended: {len:?}"); + break; + } Ok(len) => len, Err(err) => { match err { @@ -84,29 +106,24 @@ impl Server { } }; trace!("received message: {:X?}", buf); - if len == 0 { - trace!("len is apperently 0: {len:?}"); - break; - } else { - let msg = self.decode(&buf)?; - info!("< {:?} : {}", addr, msg); - if msg.contains("ping") { - pings += 1; - } - if pings < self.cfg.win_after { - reader.write_all(b"pong\0").await?; - info!("> {:?} : pong", addr,); - } else { - reader.write_all(b"you win!\0").await?; - info!("> {:?} : you win!", addr,); - reader.shutdown().await?; - break; - } - buf.clear(); - - // we should wait, so that we don't spam the client - std::thread::sleep(self.cfg.delay); + let msg = self.decode(&buf)?; + info!("< {:?} : {}", addr, msg); + if msg.contains("ping") { + pings += 1; } + if pings < self.cfg.win_after { + reader.write_all(b"pong\0").await?; + info!("> {:?} : pong", addr,); + } else { + reader.write_all(b"you win!\0").await?; + info!("> {:?} : you win!", addr,); + reader.shutdown().await?; + break; + } + buf.clear(); + + // we should wait, so that we don't spam the client + std::thread::sleep(self.cfg.delay); } info!("disconnected peer: {:?}", addr); Ok(())