status and performance improvements
cargo devel CI / cargo CI (push) Failing after 1m31s Details

This commit is contained in:
Christoph J. Scherr 2024-01-23 14:58:59 +01:00
parent b0fb7de6e9
commit dc407c8ce8
Signed by: cscherrNT
GPG Key ID: 8E2B45BC51A27EA7
2 changed files with 66 additions and 33 deletions

View File

@ -1,5 +1,5 @@
use threadpool::ThreadPool; use threadpool::ThreadPool;
const MAX: usize = 500; const MAX: usize = 20;
use std::process::{exit, Command}; use std::process::{exit, Command};
fn main() { fn main() {
@ -10,13 +10,9 @@ fn main() {
pool.execute(|| { pool.execute(|| {
let mut cmd = Command::new("/usr/bin/python3"); let mut cmd = Command::new("/usr/bin/python3");
cmd.args(["../scripts/client.py"]); cmd.args(["../scripts/client.py"]);
let o = cmd.output().unwrap(); let _ = cmd.output().unwrap();
let s = cmd.status().unwrap();
}); });
} }
else { std::thread::sleep(std::time::Duration::from_millis(100));
std::thread::sleep(std::time::Duration::from_millis(400));
println!("pool: {pool:?}")
}
} }
} }

View File

@ -1,12 +1,15 @@
#![cfg(feature = "server")] #![cfg(feature = "server")]
use std::{time::Duration, sync::Arc}; use std::{
ops::Add,
sync::{atomic::AtomicUsize, Arc},
time::Duration,
};
use libpt::log::{debug, info, trace, warn}; use libpt::log::{debug, info, trace, warn};
use tokio::{ use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
runtime::{Builder, Runtime}, time::{self, timeout},
time::timeout,
}; };
use crate::common::conf::Config; use crate::common::conf::Config;
@ -18,48 +21,82 @@ pub struct Server {
cfg: Config, cfg: Config,
pub timeout: Option<Duration>, pub timeout: Option<Duration>,
server: TcpListener, server: TcpListener,
num_peers: AtomicUsize,
} }
impl Server { impl Server {
pub async fn build(cfg: Config) -> anyhow::Result<Self> { pub async fn build(cfg: Config) -> anyhow::Result<Self> {
let server = TcpListener::bind(cfg.addr).await?; let server = TcpListener::bind(cfg.addr).await?;
let timeout = Some(Duration::from_secs(5)); let timeout = Some(Duration::from_secs(5));
let num_peers = AtomicUsize::new(0);
Ok(Server { Ok(Server {
cfg, cfg,
timeout, timeout,
server, server,
num_peers,
}) })
} }
pub async fn run(self) -> anyhow::Result<()> { pub async fn run(self) -> anyhow::Result<()> {
let rc_self = Arc::new(self); let rc_self = Arc::new(self);
let ref_self = rc_self.clone();
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_millis(5000));
loop { loop {
let rc_self = rc_self.clone(); interval.tick().await;
let (stream, addr) = match rc_self.server.accept().await { info!(
"status: {} peers",
ref_self
.num_peers
.load(std::sync::atomic::Ordering::Relaxed)
);
}
});
loop {
let ref_self = rc_self.clone();
let (stream, addr) = match ref_self.server.accept().await {
Ok(s) => s, Ok(s) => s,
Err(err) => { Err(err) => {
warn!("could not accept stream: {err:?}"); warn!("could not accept stream: {err:?}");
continue; continue;
} }
}; };
// NOTE: we can only start the task now. If we start it before accepting connections
// (so that the task theoretically accepts the connection), we would create endless
// tasks in a loop
tokio::spawn(async move { tokio::spawn(async move {
ref_self.peer_add(1);
match rc_self.handle_stream(stream).await { match ref_self.handle_stream(stream).await {
Ok(_) => (), Ok(_) => (),
Err(err) => { Err(err) => match err {
match err {
ServerError::Timeout(_) => { ServerError::Timeout(_) => {
info!("stream {:?} timed out", addr) debug!("stream {:?} timed out", addr)
} }
_ => { _ => {
warn!("error while handling stream: {:?}", err) warn!("error while handling stream: {:?}", err)
} }
} },
}
}; };
ref_self.peer_sub(1);
}); });
} }
} }
#[inline]
fn peer_add(&self, v: usize) {
self.num_peers.store(
(self.num_peers.load(std::sync::atomic::Ordering::Relaxed) + v),
std::sync::atomic::Ordering::Relaxed,
)
}
#[inline]
fn peer_sub(&self, v: usize) {
self.num_peers.store(
(self.num_peers.load(std::sync::atomic::Ordering::Relaxed) - v),
std::sync::atomic::Ordering::Relaxed,
)
}
async fn handle_stream(&self, stream: TcpStream) -> Result<()> { async fn handle_stream(&self, stream: TcpStream) -> Result<()> {
let mut pings: usize = 0; let mut pings: usize = 0;
let addr = match stream.peer_addr() { let addr = match stream.peer_addr() {
@ -69,7 +106,7 @@ impl Server {
return Err(err.into()); return Err(err.into());
} }
}; };
info!("new peer: {:?}", addr); debug!("new peer: {:?}", addr);
let mut buf = Vec::new(); let mut buf = Vec::new();
let mut reader = BufReader::new(stream); let mut reader = BufReader::new(stream);
loop { loop {
@ -82,7 +119,7 @@ impl Server {
Err(err) => { Err(err) => {
match err { match err {
ServerError::Timeout(_) => { ServerError::Timeout(_) => {
info!("peer {:?} timed out", addr) debug!("peer {:?} timed out", addr)
} }
_ => return Err(err), _ => return Err(err),
} }
@ -91,16 +128,16 @@ impl Server {
}; };
trace!("received message: {:X?}", buf); trace!("received message: {:X?}", buf);
let msg = self.decode(&buf)?; let msg = self.decode(&buf)?;
info!("< {:?} : {}", addr, msg); debug!("< {:?} : {}", addr, msg);
if msg.contains("ping") { if msg.contains("ping") {
pings += 1; pings += 1;
} }
if pings < self.cfg.win_after { if pings < self.cfg.win_after {
reader.write_all(b"pong\0").await?; reader.write_all(b"pong\0").await?;
info!("> {:?} : pong", addr,); debug!("> {:?} : pong", addr,);
} else { } else {
reader.write_all(b"you win!\0").await?; reader.write_all(b"you win!\0").await?;
info!("> {:?} : you win!", addr,); debug!("> {:?} : you win!", addr,);
reader.shutdown().await?; reader.shutdown().await?;
break; break;
} }
@ -109,7 +146,7 @@ impl Server {
// we should wait, so that we don't spam the client // we should wait, so that we don't spam the client
std::thread::sleep(self.cfg.delay); std::thread::sleep(self.cfg.delay);
} }
info!("disconnected peer: {:?}", addr); debug!("disconnected peer: {:?}", addr);
Ok(()) Ok(())
} }