From dc407c8ce85ec91ed32e8a34e6102693657dee42 Mon Sep 17 00:00:00 2001 From: "Christoph J. Scherr" Date: Tue, 23 Jan 2024 14:58:59 +0100 Subject: [PATCH] status and performance improvements --- spammer/src/main.rs | 10 ++--- src/server/mod.rs | 89 ++++++++++++++++++++++++++++++++------------- 2 files changed, 66 insertions(+), 33 deletions(-) diff --git a/spammer/src/main.rs b/spammer/src/main.rs index 7ae53cf..fb742e5 100644 --- a/spammer/src/main.rs +++ b/spammer/src/main.rs @@ -1,5 +1,5 @@ use threadpool::ThreadPool; -const MAX: usize = 500; +const MAX: usize = 20; use std::process::{exit, Command}; fn main() { @@ -10,13 +10,9 @@ fn main() { pool.execute(|| { let mut cmd = Command::new("/usr/bin/python3"); cmd.args(["../scripts/client.py"]); - let o = cmd.output().unwrap(); - let s = cmd.status().unwrap(); + let _ = cmd.output().unwrap(); }); } - else { - std::thread::sleep(std::time::Duration::from_millis(400)); - println!("pool: {pool:?}") - } + std::thread::sleep(std::time::Duration::from_millis(100)); } } diff --git a/src/server/mod.rs b/src/server/mod.rs index e1d4416..9fd79ba 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,12 +1,15 @@ #![cfg(feature = "server")] -use std::{time::Duration, sync::Arc}; +use std::{ + ops::Add, + sync::{atomic::AtomicUsize, Arc}, + time::Duration, +}; use libpt::log::{debug, info, trace, warn}; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, net::{TcpListener, TcpStream}, - runtime::{Builder, Runtime}, - time::timeout, + time::{self, timeout}, }; use crate::common::conf::Config; @@ -18,46 +21,80 @@ pub struct Server { cfg: Config, pub timeout: Option, server: TcpListener, + num_peers: AtomicUsize, } impl Server { pub async fn build(cfg: Config) -> anyhow::Result { let server = TcpListener::bind(cfg.addr).await?; let timeout = Some(Duration::from_secs(5)); + let num_peers = AtomicUsize::new(0); Ok(Server { cfg, timeout, server, + num_peers, }) } pub async fn run(self) -> anyhow::Result<()> { - let rc_self = Arc::new(self); + let rc_self = Arc::new(self); + let ref_self = rc_self.clone(); + tokio::spawn(async move { + let mut interval = time::interval(Duration::from_millis(5000)); loop { - let rc_self = rc_self.clone(); - let (stream, addr) = match rc_self.server.accept().await { + interval.tick().await; + info!( + "status: {} peers", + ref_self + .num_peers + .load(std::sync::atomic::Ordering::Relaxed) + ); + } + }); + loop { + let ref_self = rc_self.clone(); + let (stream, addr) = match ref_self.server.accept().await { Ok(s) => s, Err(err) => { warn!("could not accept stream: {err:?}"); continue; } }; - tokio::spawn(async move { - - match rc_self.handle_stream(stream).await { + // NOTE: we can only start the task now. If we start it before accepting connections + // (so that the task theoretically accepts the connection), we would create endless + // tasks in a loop + tokio::spawn(async move { + ref_self.peer_add(1); + match ref_self.handle_stream(stream).await { Ok(_) => (), - Err(err) => { - match err { - ServerError::Timeout(_) => { - info!("stream {:?} timed out", addr) - } - _ => { - warn!("error while handling stream: {:?}", err) - } + Err(err) => match err { + ServerError::Timeout(_) => { + debug!("stream {:?} timed out", addr) } - } + _ => { + warn!("error while handling stream: {:?}", err) + } + }, }; - }); - } + ref_self.peer_sub(1); + }); + } + } + + #[inline] + fn peer_add(&self, v: usize) { + self.num_peers.store( + (self.num_peers.load(std::sync::atomic::Ordering::Relaxed) + v), + std::sync::atomic::Ordering::Relaxed, + ) + } + + #[inline] + fn peer_sub(&self, v: usize) { + self.num_peers.store( + (self.num_peers.load(std::sync::atomic::Ordering::Relaxed) - v), + std::sync::atomic::Ordering::Relaxed, + ) } async fn handle_stream(&self, stream: TcpStream) -> Result<()> { @@ -69,7 +106,7 @@ impl Server { return Err(err.into()); } }; - info!("new peer: {:?}", addr); + debug!("new peer: {:?}", addr); let mut buf = Vec::new(); let mut reader = BufReader::new(stream); loop { @@ -82,7 +119,7 @@ impl Server { Err(err) => { match err { ServerError::Timeout(_) => { - info!("peer {:?} timed out", addr) + debug!("peer {:?} timed out", addr) } _ => return Err(err), } @@ -91,16 +128,16 @@ impl Server { }; trace!("received message: {:X?}", buf); let msg = self.decode(&buf)?; - info!("< {:?} : {}", addr, msg); + debug!("< {:?} : {}", addr, msg); if msg.contains("ping") { pings += 1; } if pings < self.cfg.win_after { reader.write_all(b"pong\0").await?; - info!("> {:?} : pong", addr,); + debug!("> {:?} : pong", addr,); } else { reader.write_all(b"you win!\0").await?; - info!("> {:?} : you win!", addr,); + debug!("> {:?} : you win!", addr,); reader.shutdown().await?; break; } @@ -109,7 +146,7 @@ impl Server { // we should wait, so that we don't spam the client std::thread::sleep(self.cfg.delay); } - info!("disconnected peer: {:?}", addr); + debug!("disconnected peer: {:?}", addr); Ok(()) }