tokio is hard work
cargo devel CI / cargo CI (push) Successful in 2m18s Details

This commit is contained in:
Christoph J. Scherr 2024-01-19 12:56:25 +01:00
parent a11618add4
commit 13745e96b8
Signed by: cscherrNT
GPG Key ID: 8E2B45BC51A27EA7
4 changed files with 85 additions and 42 deletions

View File

@ -18,8 +18,8 @@ clap = "4.4.18"
clap-num = "1.0.2" clap-num = "1.0.2"
clap-verbosity-flag = "2.1.2" clap-verbosity-flag = "2.1.2"
libpt = { version = "0.3.10", features = ["net"] } libpt = { version = "0.3.10", features = ["net"] }
mio = { version = "0.8.10", features = ["net", "os-poll"] }
threadpool = { version = "1.8.1", optional = true } threadpool = { version = "1.8.1", optional = true }
tokio = { version = "1.35.1", features = ["net", "rt", "macros"] }
[features] [features]
default = ["server"] default = ["server"]

View File

@ -1,6 +1,9 @@
use crate::common::args::Cli; use crate::common::args::Cli;
use clap::ValueEnum; use clap::ValueEnum;
use std::fmt::Display; use std::{fmt::Display, time::Duration};
const DEFAULT_TIMEOUT_LEN: u64 = 2000; // ms
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub enum Mode { pub enum Mode {
Tcp, Tcp,
@ -41,6 +44,7 @@ pub struct Config {
pub addr: std::net::SocketAddr, pub addr: std::net::SocketAddr,
pub mode: Mode, pub mode: Mode,
pub threads: usize, pub threads: usize,
pub timeout: Duration,
} }
impl Config { impl Config {
@ -49,6 +53,7 @@ impl Config {
addr: cli.addr.clone(), addr: cli.addr.clone(),
mode: cli.mode.clone(), mode: cli.mode.clone(),
threads: cli.threads, threads: cli.threads,
timeout: Duration::from_millis(DEFAULT_TIMEOUT_LEN)
} }
} }
} }

View File

@ -5,6 +5,7 @@
use anyhow::Result; use anyhow::Result;
use libpt::log::*; use libpt::log::*;
use tokio;
mod client; mod client;
mod common; mod common;
@ -14,7 +15,8 @@ use common::{args::Cli, conf::*};
use crate::server::Server; use crate::server::Server;
fn main() -> Result<()> { #[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::cli_parse(); let cli = Cli::cli_parse();
debug!("dumping cli args:\n{:#?}", cli); debug!("dumping cli args:\n{:#?}", cli);
@ -23,8 +25,7 @@ fn main() -> Result<()> {
#[cfg(feature = "server")] #[cfg(feature = "server")]
if cli.server { if cli.server {
info!("starting server"); info!("starting server");
Server::build(cfg)?.run()?; return Server::build(cfg).await?.run().await
return Ok(());
} }
// implicit else, so we can work without the server feature // implicit else, so we can work without the server feature
info!("starting client"); info!("starting client");

View File

@ -1,69 +1,106 @@
#![cfg(feature = "server")] #![cfg(feature = "server")]
use anyhow::Result; use std::{net::SocketAddr, time::Duration};
use libpt::log::{error, info};
use mio::{self, event::Event, net::TcpListener, Events, Interest, Poll, Token}; use anyhow::{Result, anyhow};
use std::time::Duration; use libpt::{
bintols::display::humanbytes,
log::{error, info, trace, warn, debug},
};
use threadpool::ThreadPool; use threadpool::ThreadPool;
use tokio::{
io::{AsyncBufRead, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, Interest},
net::{self, TcpListener, TcpSocket, TcpStream},
select,
time::timeout,
};
use crate::common::conf::Config; use crate::common::conf::Config;
pub mod listener; pub mod listener;
const EVENT_CAPACITY: usize = 128;
const SERVER: Token = Token(0);
pub struct Server { pub struct Server {
cfg: Config, cfg: Config,
work: Poll,
events: Events,
pool: ThreadPool, pool: ThreadPool,
pub timeout: Option<Duration>, pub timeout: Option<Duration>,
server: TcpListener, server: TcpListener,
} }
impl Server { impl Server {
pub fn build(cfg: Config) -> Result<Self> { pub async fn build(cfg: Config) -> Result<Self> {
let mut server = TcpListener::bind(cfg.addr)?; let server = TcpListener::bind(cfg.addr).await?;
let poll: Poll = Poll::new()?;
poll.registry()
.register(&mut server, SERVER, Interest::READABLE | Interest::WRITABLE);
let events: Events = Events::with_capacity(EVENT_CAPACITY);
let pool = ThreadPool::new(cfg.threads); let pool = ThreadPool::new(cfg.threads);
let timeout = Some(Duration::from_secs(5)); let timeout = Some(Duration::from_secs(5));
Ok(Server { Ok(Server {
cfg, cfg,
work: poll,
events,
pool, pool,
timeout, timeout,
server, server,
}) })
} }
pub fn run(&mut self) -> Result<()> { pub async fn run(self) -> Result<()> {
loop { loop {
self.work.poll(&mut self.events, self.timeout)?; let stream = self.server.accept().await?;
for event in self.events.iter() { self.handle_stream(stream).await?;
let result = self.handle_event(event);
if let Err(err) = result {
error!("Error while handling server event {:?}: {:?}", event, err);
}
}
} }
} }
fn handle_event(&self, event: &Event) -> Result<()> { async fn handle_stream(&self, mut stream: (TcpStream, SocketAddr)) -> Result<()> {
dbg!(event); info!("start handling stream {:?}", stream.1);
let connection = match self.server.accept() { let mut buf = Vec::new();
Ok(c) => c, let mut reader = BufReader::new(stream.0);
let mut len;
loop {
trace!("reading anew");
len = self.read(&mut reader, &mut buf).await?;
trace!("did read");
if len == 0 {
break;
} else {
info!(
"< {}\t({})\n{}",
stream.1,
humanbytes(len),
self.decode(&buf)?
);
buf.clear();
}
}
info!("stop handling stream {:?}", stream.1);
Ok(())
}
#[inline]
fn decode(&self, buf: &Vec<u8>) -> Result<String> {
Ok(String::from_utf8(buf.clone())?)
}
async fn read(&self, reader: &mut BufReader<TcpStream>, buf: &mut Vec<u8>) -> Result<usize> {
let mut len: usize;
let mut failsafe: u8 = 0;
while failsafe < u8::MAX {
trace!("loop");
len = match timeout(self.cfg.timeout, reader.read(buf)).await {
Ok(inner) => {
trace!("Read for len: {inner:?}"); 0
},
Err(err) => { Err(err) => {
error!("error while handling connection: {:?}", err); error!("timeout while reading: {err}");
return Err(err.into()); return Err(err.into());
} }
}; };
self.pool.execute(move|| { if self.should_end(&buf) {
dbg!(connection); return Ok(len);
info!("received a connection!"); }
}); else {
Ok(()) // ?????????
failsafe += 1;
}
}
Err(anyhow!("read too often, so the failsafe activated"))
}
fn should_end(&self, buf: &Vec<u8>) -> bool {
debug!("eval should end: {:?}", buf);
buf.contains(&0x00)
} }
} }