From b0fb7de6e985bcc9347f2a48e901fd3c814937ff Mon Sep 17 00:00:00 2001 From: "Christoph J. Scherr" Date: Tue, 23 Jan 2024 13:50:40 +0100 Subject: [PATCH] concurrency --- Cargo.toml | 1 + scripts/client.py | 37 +++++++++++++++++++++---------------- scripts/spam.rs | 12 ++++++++++++ spammer/Cargo.toml | 9 +++++++++ spammer/src/main.rs | 22 ++++++++++++++++++++++ src/main.rs | 7 ++----- src/server/mod.rs | 34 +++++++++------------------------- 7 files changed, 76 insertions(+), 46 deletions(-) create mode 100644 scripts/spam.rs create mode 100644 spammer/Cargo.toml create mode 100644 spammer/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 7ed3e3e..3ea72ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,4 @@ +workspace = { members = ["spammer"] } [package] name = "netpong" version = "0.1.1" diff --git a/scripts/client.py b/scripts/client.py index 8aa1142..6041246 100644 --- a/scripts/client.py +++ b/scripts/client.py @@ -1,21 +1,26 @@ import socket -HOST = "127.0.0.1" -PORT = 9999 -payload = b"ping\0" +def ping(): -with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.connect((HOST, PORT)) - while True: - try: - s.sendall(payload) - print("> ping") - except Exception as e: - break - reply = s.recv(1024).decode() - if reply == "": - break - print(f"< {reply}") - print("connection shut down") + HOST = "127.0.0.1" + PORT = 9999 + payload = b"ping\0" + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((HOST, PORT)) + while True: + try: + s.sendall(payload) + print("> ping") + except Exception as e: + break + reply = s.recv(1024).decode() + if reply == "": + break + print(f"< {reply}") + print("connection shut down") + +if __name__ == "__main__": + ping() diff --git a/scripts/spam.rs b/scripts/spam.rs new file mode 100644 index 0000000..a5f8355 --- /dev/null +++ b/scripts/spam.rs @@ -0,0 +1,12 @@ +const MAX: usize = 50; +use std::process::Command; + +fn main() { + let mut pool = ThreadPool::new(MAX); + + loop { + pool.execute(||{ + Command::new("python3").args(["scripts/client.py"]).output().unwrap(); + }); + } +} diff --git a/spammer/Cargo.toml b/spammer/Cargo.toml new file mode 100644 index 0000000..ed89719 --- /dev/null +++ b/spammer/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "spammer" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +threadpool = "1.8.1" diff --git a/spammer/src/main.rs b/spammer/src/main.rs new file mode 100644 index 0000000..7ae53cf --- /dev/null +++ b/spammer/src/main.rs @@ -0,0 +1,22 @@ +use threadpool::ThreadPool; +const MAX: usize = 500; +use std::process::{exit, Command}; + +fn main() { + let pool = ThreadPool::new(MAX); + + loop { + if pool.queued_count() < MAX { + pool.execute(|| { + let mut cmd = Command::new("/usr/bin/python3"); + cmd.args(["../scripts/client.py"]); + let o = cmd.output().unwrap(); + let s = cmd.status().unwrap(); + }); + } + else { + std::thread::sleep(std::time::Duration::from_millis(400)); + println!("pool: {pool:?}") + } + } +} diff --git a/src/main.rs b/src/main.rs index 4173e1e..952dfa1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,13 +25,10 @@ async fn main() -> Result<()> { #[cfg(feature = "server")] if cli.server { info!("starting server"); - return Server::build(cfg).await?.start().await; - loop { - // now we wait - } + return Server::build(cfg).await?.run().await; } // implicit else, so we can work without the server feature info!("starting client"); todo!(); - Ok(()) + loop {} } diff --git a/src/server/mod.rs b/src/server/mod.rs index 69dff39..e1d4416 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,5 +1,5 @@ #![cfg(feature = "server")] -use std::time::Duration; +use std::{time::Duration, sync::Arc}; use libpt::log::{debug, info, trace, warn}; use tokio::{ @@ -18,46 +18,32 @@ pub struct Server { cfg: Config, pub timeout: Option, server: TcpListener, - accept_runtime: Runtime, - request_runtime: Runtime, } impl Server { pub async fn build(cfg: Config) -> anyhow::Result { let server = TcpListener::bind(cfg.addr).await?; let timeout = Some(Duration::from_secs(5)); - let accept_runtime = Builder::new_multi_thread() - .worker_threads(1) - .thread_stack_size(3 * 1024 * 1024) - .enable_io() - .enable_time() - .build()?; - let request_runtime = Builder::new_multi_thread() - .worker_threads(1) - .thread_stack_size(3 * 1024 * 1024) - .enable_io() - .enable_time() - .build()?; Ok(Server { cfg, timeout, server, - accept_runtime, - request_runtime, }) } - pub async fn start(self) -> anyhow::Result<()> { - self.accept_runtime.block_on(async { + pub async fn run(self) -> anyhow::Result<()> { + let rc_self = Arc::new(self); loop { - let (stream, addr) = match self.server.accept().await { + let rc_self = rc_self.clone(); + let (stream, addr) = match rc_self.server.accept().await { Ok(s) => s, Err(err) => { warn!("could not accept stream: {err:?}"); continue; } }; - let _guard = self.request_runtime.enter(); - match self.handle_stream(stream).await { + tokio::spawn(async move { + + match rc_self.handle_stream(stream).await { Ok(_) => (), Err(err) => { match err { @@ -68,12 +54,10 @@ impl Server { warn!("error while handling stream: {:?}", err) } } - continue; } }; + }); } - }); - Ok(()) } async fn handle_stream(&self, stream: TcpStream) -> Result<()> {