concurrency
cargo devel CI / cargo CI (push) Successful in 2m29s Details

This commit is contained in:
Christoph J. Scherr 2024-01-23 13:50:40 +01:00
parent b6c9b660f6
commit b0fb7de6e9
Signed by: cscherrNT
GPG Key ID: 8E2B45BC51A27EA7
7 changed files with 76 additions and 46 deletions

View File

@ -1,3 +1,4 @@
workspace = { members = ["spammer"] }
[package] [package]
name = "netpong" name = "netpong"
version = "0.1.1" version = "0.1.1"

View File

@ -1,21 +1,26 @@
import socket import socket
HOST = "127.0.0.1"
PORT = 9999
payload = b"ping\0" def ping():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: HOST = "127.0.0.1"
s.connect((HOST, PORT)) PORT = 9999
while True:
try:
s.sendall(payload)
print("> ping")
except Exception as e:
break
reply = s.recv(1024).decode()
if reply == "":
break
print(f"< {reply}")
print("connection shut down")
payload = b"ping\0"
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
while True:
try:
s.sendall(payload)
print("> ping")
except Exception as e:
break
reply = s.recv(1024).decode()
if reply == "":
break
print(f"< {reply}")
print("connection shut down")
if __name__ == "__main__":
ping()

12
scripts/spam.rs Normal file
View File

@ -0,0 +1,12 @@
const MAX: usize = 50;
use std::process::Command;
fn main() {
let mut pool = ThreadPool::new(MAX);
loop {
pool.execute(||{
Command::new("python3").args(["scripts/client.py"]).output().unwrap();
});
}
}

9
spammer/Cargo.toml Normal file
View File

@ -0,0 +1,9 @@
[package]
name = "spammer"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
threadpool = "1.8.1"

22
spammer/src/main.rs Normal file
View File

@ -0,0 +1,22 @@
use threadpool::ThreadPool;
const MAX: usize = 500;
use std::process::{exit, Command};
fn main() {
let pool = ThreadPool::new(MAX);
loop {
if pool.queued_count() < MAX {
pool.execute(|| {
let mut cmd = Command::new("/usr/bin/python3");
cmd.args(["../scripts/client.py"]);
let o = cmd.output().unwrap();
let s = cmd.status().unwrap();
});
}
else {
std::thread::sleep(std::time::Duration::from_millis(400));
println!("pool: {pool:?}")
}
}
}

View File

@ -25,13 +25,10 @@ async fn main() -> Result<()> {
#[cfg(feature = "server")] #[cfg(feature = "server")]
if cli.server { if cli.server {
info!("starting server"); info!("starting server");
return Server::build(cfg).await?.start().await; return Server::build(cfg).await?.run().await;
loop {
// now we wait
}
} }
// implicit else, so we can work without the server feature // implicit else, so we can work without the server feature
info!("starting client"); info!("starting client");
todo!(); todo!();
Ok(()) loop {}
} }

View File

@ -1,5 +1,5 @@
#![cfg(feature = "server")] #![cfg(feature = "server")]
use std::time::Duration; use std::{time::Duration, sync::Arc};
use libpt::log::{debug, info, trace, warn}; use libpt::log::{debug, info, trace, warn};
use tokio::{ use tokio::{
@ -18,46 +18,32 @@ pub struct Server {
cfg: Config, cfg: Config,
pub timeout: Option<Duration>, pub timeout: Option<Duration>,
server: TcpListener, server: TcpListener,
accept_runtime: Runtime,
request_runtime: Runtime,
} }
impl Server { impl Server {
pub async fn build(cfg: Config) -> anyhow::Result<Self> { pub async fn build(cfg: Config) -> anyhow::Result<Self> {
let server = TcpListener::bind(cfg.addr).await?; let server = TcpListener::bind(cfg.addr).await?;
let timeout = Some(Duration::from_secs(5)); let timeout = Some(Duration::from_secs(5));
let accept_runtime = Builder::new_multi_thread()
.worker_threads(1)
.thread_stack_size(3 * 1024 * 1024)
.enable_io()
.enable_time()
.build()?;
let request_runtime = Builder::new_multi_thread()
.worker_threads(1)
.thread_stack_size(3 * 1024 * 1024)
.enable_io()
.enable_time()
.build()?;
Ok(Server { Ok(Server {
cfg, cfg,
timeout, timeout,
server, server,
accept_runtime,
request_runtime,
}) })
} }
pub async fn start(self) -> anyhow::Result<()> { pub async fn run(self) -> anyhow::Result<()> {
self.accept_runtime.block_on(async { let rc_self = Arc::new(self);
loop { loop {
let (stream, addr) = match self.server.accept().await { let rc_self = rc_self.clone();
let (stream, addr) = match rc_self.server.accept().await {
Ok(s) => s, Ok(s) => s,
Err(err) => { Err(err) => {
warn!("could not accept stream: {err:?}"); warn!("could not accept stream: {err:?}");
continue; continue;
} }
}; };
let _guard = self.request_runtime.enter(); tokio::spawn(async move {
match self.handle_stream(stream).await {
match rc_self.handle_stream(stream).await {
Ok(_) => (), Ok(_) => (),
Err(err) => { Err(err) => {
match err { match err {
@ -68,12 +54,10 @@ impl Server {
warn!("error while handling stream: {:?}", err) warn!("error while handling stream: {:?}", err)
} }
} }
continue;
} }
}; };
});
} }
});
Ok(())
} }
async fn handle_stream(&self, stream: TcpStream) -> Result<()> { async fn handle_stream(&self, stream: TcpStream) -> Result<()> {