runtime
cargo devel CI / cargo CI (push) Successful in 2m17s Details

This commit is contained in:
Christoph J. Scherr 2024-01-23 09:13:40 +01:00
parent 6078ee58a9
commit b0edd17825
No known key found for this signature in database
GPG Key ID: 7CDD0B14851A08EF
4 changed files with 75 additions and 53 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "netpong"
version = "0.1.0"
version = "0.1.1"
edition = "2021"
publish = true
authors = ["Christoph J. Scherr <software@cscherr.de>"]
@ -19,9 +19,8 @@ clap-num = "1.0.2"
clap-verbosity-flag = "2.1.2"
libpt = { version = "0.3.10", features = ["net"] }
thiserror = "1.0.56"
threadpool = { version = "1.8.1", optional = true }
tokio = { version = "1.35.1", features = ["net", "rt", "macros"] }
[features]
default = ["server"]
server = ["dep:threadpool"]
server = []

View File

@ -9,12 +9,14 @@ const DEFAULT_WIN_AFTER: usize = 20;
#[derive(Debug, Clone, Copy)]
pub enum Mode {
Tcp,
Tls
}
impl ValueEnum for Mode {
fn to_possible_value(&self) -> Option<clap::builder::PossibleValue> {
Some(match self {
Self::Tcp => clap::builder::PossibleValue::new("tcp"),
Self::Tls => clap::builder::PossibleValue::new("tls"),
})
}
fn value_variants<'a>() -> &'a [Self] {
@ -37,6 +39,7 @@ impl Display for Mode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let repr: String = match self {
Self::Tcp => format!("tcp"),
Self::Tls => format!("tls"),
};
write!(f, "{}", repr)
}

View File

@ -25,7 +25,10 @@ async fn main() -> Result<()> {
#[cfg(feature = "server")]
if cli.server {
info!("starting server");
return Server::build(cfg).await?.run().await;
return Server::build(cfg).await?.start().await;
loop {
// now we wait
}
}
// implicit else, so we can work without the server feature
info!("starting client");

View File

@ -5,6 +5,7 @@ use libpt::log::{debug, info, trace, warn};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::{TcpListener, TcpStream},
runtime::{Builder, Runtime},
time::timeout,
};
@ -17,44 +18,62 @@ pub struct Server {
cfg: Config,
pub timeout: Option<Duration>,
server: TcpListener,
accept_runtime: Runtime,
request_runtime: Runtime,
}
impl Server {
pub async fn build(cfg: Config) -> anyhow::Result<Self> {
let server = TcpListener::bind(cfg.addr).await?;
let timeout = Some(Duration::from_secs(5));
let accept_runtime = Builder::new_multi_thread()
.worker_threads(1)
.thread_stack_size(3 * 1024 * 1024)
.enable_io()
.enable_time()
.build()?;
let request_runtime = Builder::new_multi_thread()
.worker_threads(1)
.thread_stack_size(3 * 1024 * 1024)
.enable_io()
.enable_time()
.build()?;
Ok(Server {
cfg,
timeout,
server,
accept_runtime,
request_runtime,
})
}
pub async fn run(self) -> anyhow::Result<()> {
loop {
let (stream, addr) = match self.server.accept().await {
Ok(s) => s,
Err(err) => {
warn!("could not accept stream: {err:?}");
continue;
}
};
// TODO: handle the stream with multi threading, or async or something
// We need concurrency here!
match self.handle_stream(stream).await {
Ok(_) => (),
Err(err) => {
match err {
ServerError::Timeout(_) => {
info!("stream {:?} timed out", addr)
}
_ => {
warn!("error while handling stream: {:?}", err)
}
pub async fn start(self) -> anyhow::Result<()> {
self.accept_runtime.block_on(async {
loop {
let (stream, addr) = match self.server.accept().await {
Ok(s) => s,
Err(err) => {
warn!("could not accept stream: {err:?}");
continue;
}
continue;
}
};
}
};
let _guard = self.request_runtime.enter();
match self.handle_stream(stream).await {
Ok(_) => (),
Err(err) => {
match err {
ServerError::Timeout(_) => {
info!("stream {:?} timed out", addr)
}
_ => {
warn!("error while handling stream: {:?}", err)
}
}
continue;
}
};
}
});
Ok(())
}
async fn handle_stream(&self, stream: TcpStream) -> Result<()> {
@ -69,9 +88,12 @@ impl Server {
info!("new peer: {:?}", addr);
let mut buf = Vec::new();
let mut reader = BufReader::new(stream);
let mut len;
loop {
len = match self.read(&mut reader, &mut buf).await {
match self.read(&mut reader, &mut buf).await {
Ok(len) if len == 0 => {
trace!("len is 0, so the stream has ended: {len:?}");
break;
}
Ok(len) => len,
Err(err) => {
match err {
@ -84,29 +106,24 @@ impl Server {
}
};
trace!("received message: {:X?}", buf);
if len == 0 {
trace!("len is apperently 0: {len:?}");
break;
} else {
let msg = self.decode(&buf)?;
info!("< {:?} : {}", addr, msg);
if msg.contains("ping") {
pings += 1;
}
if pings < self.cfg.win_after {
reader.write_all(b"pong\0").await?;
info!("> {:?} : pong", addr,);
} else {
reader.write_all(b"you win!\0").await?;
info!("> {:?} : you win!", addr,);
reader.shutdown().await?;
break;
}
buf.clear();
// we should wait, so that we don't spam the client
std::thread::sleep(self.cfg.delay);
let msg = self.decode(&buf)?;
info!("< {:?} : {}", addr, msg);
if msg.contains("ping") {
pings += 1;
}
if pings < self.cfg.win_after {
reader.write_all(b"pong\0").await?;
info!("> {:?} : pong", addr,);
} else {
reader.write_all(b"you win!\0").await?;
info!("> {:?} : you win!", addr,);
reader.shutdown().await?;
break;
}
buf.clear();
// we should wait, so that we don't spam the client
std::thread::sleep(self.cfg.delay);
}
info!("disconnected peer: {:?}", addr);
Ok(())