diff --git a/scripts/client.py b/scripts/client.py new file mode 100644 index 0000000..8aa1142 --- /dev/null +++ b/scripts/client.py @@ -0,0 +1,21 @@ +import socket + +HOST = "127.0.0.1" +PORT = 9999 + +payload = b"ping\0" + +with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((HOST, PORT)) + while True: + try: + s.sendall(payload) + print("> ping") + except Exception as e: + break + reply = s.recv(1024).decode() + if reply == "": + break + print(f"< {reply}") + print("connection shut down") + diff --git a/src/common/conf.rs b/src/common/conf.rs index a06645a..a860098 100644 --- a/src/common/conf.rs +++ b/src/common/conf.rs @@ -3,6 +3,8 @@ use clap::ValueEnum; use std::{fmt::Display, time::Duration}; const DEFAULT_TIMEOUT_LEN: u64 = 5000; // ms +const DEFAULT_DELAY_LEN: u64 = 500; // ms +const DEFAULT_WIN_AFTER: usize = 20; #[derive(Debug, Clone, Copy)] pub enum Mode { @@ -45,6 +47,8 @@ pub struct Config { pub mode: Mode, pub threads: usize, pub timeout: Duration, + pub delay: Duration, + pub win_after: usize, } impl Config { @@ -54,6 +58,8 @@ impl Config { mode: cli.mode.clone(), threads: cli.threads, timeout: Duration::from_millis(DEFAULT_TIMEOUT_LEN), + delay: Duration::from_millis(DEFAULT_DELAY_LEN), + win_after: DEFAULT_WIN_AFTER, } } } diff --git a/src/server/mod.rs b/src/server/mod.rs index 9dc3bdb..3e3a021 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,14 +1,9 @@ #![cfg(feature = "server")] -use std::{error::Error, net::SocketAddr, time::Duration}; +use std::time::Duration; -use anyhow::anyhow; -use libpt::{ - bintols::display::humanbytes, - log::{debug, error, info, trace, warn}, -}; -use threadpool::ThreadPool; +use libpt::log::{debug, info, trace, warn}; use tokio::{ - io::{AsyncReadExt, BufReader, AsyncBufReadExt}, + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, net::{TcpListener, TcpStream}, time::timeout, }; @@ -20,7 +15,6 @@ use errors::*; pub struct Server { cfg: Config, - pool: ThreadPool, pub timeout: Option, server: TcpListener, } @@ -28,11 +22,9 @@ pub struct Server { impl Server { pub async fn build(cfg: Config) -> anyhow::Result { let server = TcpListener::bind(cfg.addr).await?; - let pool = ThreadPool::new(cfg.threads); let timeout = Some(Duration::from_secs(5)); Ok(Server { cfg, - pool, timeout, server, }) @@ -64,6 +56,7 @@ impl Server { } async fn handle_stream(&self, stream: TcpStream) -> Result<()> { + let mut pings: usize = 0; let addr = match stream.peer_addr() { Ok(a) => a, Err(err) => { @@ -93,13 +86,24 @@ impl Server { trace!("len is apperently 0: {len:?}"); break; } else { - info!( - "< {:?}\t({})\n{}", - addr, - humanbytes(len), - self.decode(&buf)? - ); + let msg = self.decode(&buf)?; + info!("< {:?} : {}", addr, msg); + if msg.contains("ping") { + pings += 1; + } + if pings < self.cfg.win_after { + reader.write_all(b"pong\0").await?; + info!("> {:?} : pong", addr,); + } else { + reader.write_all(b"you win!\0").await?; + info!("> {:?} : you win!", addr,); + reader.shutdown().await?; + break; + } buf.clear(); + + // we should wait, so that we don't spam the client + std::thread::sleep(self.cfg.delay); } } info!("disconnected peer: {:?}", addr); @@ -108,7 +112,7 @@ impl Server { #[inline] fn decode(&self, buf: &Vec) -> Result { - Ok(String::from_utf8(buf.clone())?) + Ok(String::from_utf8(buf.clone())?.replace('\n', "\\n")) } #[inline]