diff --git a/Cargo.toml b/Cargo.toml index 0543e1f..557b49a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,16 +1,27 @@ [package] -name = "template" +name = "netpong" version = "0.1.0" edition = "2021" -publish = false +publish = true authors = ["Christoph J. Scherr "] license = "MIT" -description = "No description yet" +description = "let your servers play ping pong" readme = "README.md" -homepage = "https://git.cscherr.de/PlexSheep/rs-base" -repository = "https://git.cscherr.de/PlexSheep/rs-base" -keywords = ["template"] +homepage = "https://git.cscherr.de/PlexSheep/netpong" +repository = "https://git.cscherr.de/PlexSheep/netpong" +keywords = ["networking"] [dependencies] +anyhow = "1.0.79" +clap = "4.4.18" +clap-num = "1.0.2" +clap-verbosity-flag = "2.1.2" +libpt = { version = "0.3.10", features = ["net"] } +thiserror = "1.0.56" +threadpool = { version = "1.8.1", optional = true } +tokio = { version = "1.35.1", features = ["net", "rt", "macros"] } +[features] +default = ["server"] +server = ["dep:threadpool"] diff --git a/scripts/client.py b/scripts/client.py new file mode 100644 index 0000000..8aa1142 --- /dev/null +++ b/scripts/client.py @@ -0,0 +1,21 @@ +import socket + +HOST = "127.0.0.1" +PORT = 9999 + +payload = b"ping\0" + +with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect((HOST, PORT)) + while True: + try: + s.sendall(payload) + print("> ping") + except Exception as e: + break + reply = s.recv(1024).decode() + if reply == "": + break + print(f"< {reply}") + print("connection shut down") + diff --git a/src/client/mod.rs b/src/client/mod.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/client/mod.rs @@ -0,0 +1 @@ + diff --git a/src/common/args.rs b/src/common/args.rs new file mode 100644 index 0000000..9a53635 --- /dev/null +++ b/src/common/args.rs @@ -0,0 +1,82 @@ +use libpt::log::{Level, Logger}; + +use clap::Parser; +use clap_verbosity_flag::{InfoLevel, Verbosity}; + +use crate::common::conf::Mode; + +/// short about section displayed in help +const ABOUT_ROOT: &'static str = r##" +Let your hosts play ping pong over the network +"##; +/// longer about section displayed in help, is combined with [the short help](ABOUT_ROOT) +static LONG_ABOUT_ROOT: &'static str = r##" + + Connect to a ping pong server and periodically send a ping. The server will reply with a pong. + That's it really. You can also host your own netpong server (server feature). +"##; + +#[derive(Debug, Clone, Parser)] +#[command( + author, + version, + about = ABOUT_ROOT, + long_about = format!("{}{}", ABOUT_ROOT ,LONG_ABOUT_ROOT), + help_template = +r#"{about-section} +{usage-heading} {usage} +{all-args}{tab} + +libpt: {version} +Author: {author-with-newline} +"# + )] +pub(crate) struct Cli { + // clap_verbosity_flag seems to make this a global option implicitly + /// set a verbosity, multiple allowed (f.e. -vvv) + #[command(flatten)] + pub(crate) verbose: Verbosity, + + /// show additional logging meta data + #[arg(long)] + pub(crate) meta: bool, + + // host a server instead of connecting to one + #[cfg(feature = "server")] + #[arg(short, long, default_value_t = false)] + pub(crate) server: bool, + + // how much threads the server should use + #[cfg(feature = "server")] + #[arg(short, long, default_value_t = 4)] + pub(crate) threads: usize, + + #[arg(short, long, default_value_t = Mode::Tcp, ignore_case = true)] + pub(crate) mode: Mode, + + /// Address of the server + pub(crate) addr: std::net::SocketAddr, +} + +impl Cli { + pub fn cli_parse() -> Self { + let cli = Self::parse(); + let ll: Level = match cli.verbose.log_level().unwrap().as_str() { + "TRACE" => Level::TRACE, + "DEBUG" => Level::DEBUG, + "INFO" => Level::INFO, + "WARN" => Level::WARN, + "ERROR" => Level::ERROR, + _ => { + unreachable!(); + } + }; + if cli.meta { + Logger::init(None, Some(ll)).expect("could not initialize Logger"); + } else { + // less verbose version + Logger::init_mini(Some(ll)).expect("could not initialize Logger"); + } + return cli; + } +} diff --git a/src/common/conf.rs b/src/common/conf.rs new file mode 100644 index 0000000..a860098 --- /dev/null +++ b/src/common/conf.rs @@ -0,0 +1,65 @@ +use crate::common::args::Cli; +use clap::ValueEnum; +use std::{fmt::Display, time::Duration}; + +const DEFAULT_TIMEOUT_LEN: u64 = 5000; // ms +const DEFAULT_DELAY_LEN: u64 = 500; // ms +const DEFAULT_WIN_AFTER: usize = 20; + +#[derive(Debug, Clone, Copy)] +pub enum Mode { + Tcp, +} + +impl ValueEnum for Mode { + fn to_possible_value(&self) -> Option { + Some(match self { + Self::Tcp => clap::builder::PossibleValue::new("tcp"), + }) + } + fn value_variants<'a>() -> &'a [Self] { + &[Self::Tcp] + } + fn from_str(input: &str, ignore_case: bool) -> Result { + let comp: String = if ignore_case { + input.to_lowercase() + } else { + input.to_string() + }; + match comp.as_str() { + "tcp" => return Ok(Self::Tcp), + _ => return Err(format!("\"{input}\" is not a valid mode")), + } + } +} + +impl Display for Mode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let repr: String = match self { + Self::Tcp => format!("tcp"), + }; + write!(f, "{}", repr) + } +} + +pub struct Config { + pub addr: std::net::SocketAddr, + pub mode: Mode, + pub threads: usize, + pub timeout: Duration, + pub delay: Duration, + pub win_after: usize, +} + +impl Config { + pub fn new(cli: &Cli) -> Self { + Config { + addr: cli.addr.clone(), + mode: cli.mode.clone(), + threads: cli.threads, + timeout: Duration::from_millis(DEFAULT_TIMEOUT_LEN), + delay: Duration::from_millis(DEFAULT_DELAY_LEN), + win_after: DEFAULT_WIN_AFTER, + } + } +} diff --git a/src/common/mod.rs b/src/common/mod.rs new file mode 100644 index 0000000..f4200ec --- /dev/null +++ b/src/common/mod.rs @@ -0,0 +1,2 @@ +pub mod args; +pub mod conf; diff --git a/src/main.rs b/src/main.rs index e7a11a9..f0f5acc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,33 @@ -fn main() { - println!("Hello, world!"); +//! # netpong +//! This is pretty silly and only let's your client send "ping" +//! over a connection, while letting a server reply pong to +//! every connection. But I had to make it for some reason. + +use anyhow::Result; +use libpt::log::*; +use tokio; + +mod client; +mod common; +mod server; + +use common::{args::Cli, conf::*}; + +use crate::server::Server; + +#[tokio::main] +async fn main() -> Result<()> { + let cli = Cli::cli_parse(); + debug!("dumping cli args:\n{:#?}", cli); + + let cfg = Config::new(&cli); + + #[cfg(feature = "server")] + if cli.server { + info!("starting server"); + return Server::build(cfg).await?.run().await; + } + // implicit else, so we can work without the server feature + info!("starting client"); + Ok(()) } diff --git a/src/server/errors.rs b/src/server/errors.rs new file mode 100644 index 0000000..90dd1f5 --- /dev/null +++ b/src/server/errors.rs @@ -0,0 +1,45 @@ +use std::{fmt::Display, string::FromUtf8Error}; + +use anyhow; +use thiserror::Error; +use tokio::time::error::Elapsed; + +pub type Result = std::result::Result; + +#[derive(Debug, Error)] +pub enum ServerError { + Timeout(Elapsed), + Anyhow(anyhow::Error), + IO(std::io::Error), + Format(FromUtf8Error), +} + +impl From for ServerError { + fn from(value: anyhow::Error) -> Self { + Self::Anyhow(value) + } +} + +impl From for ServerError { + fn from(value: std::io::Error) -> Self { + Self::IO(value) + } +} + +impl From for ServerError { + fn from(value: FromUtf8Error) -> Self { + Self::Format(value) + } +} + +impl From for ServerError { + fn from(value: Elapsed) -> Self { + Self::Timeout(value) + } +} + +impl Display for ServerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs new file mode 100644 index 0000000..3e3a021 --- /dev/null +++ b/src/server/mod.rs @@ -0,0 +1,125 @@ +#![cfg(feature = "server")] +use std::time::Duration; + +use libpt::log::{debug, info, trace, warn}; +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + net::{TcpListener, TcpStream}, + time::timeout, +}; + +use crate::common::conf::Config; + +pub mod errors; +use errors::*; + +pub struct Server { + cfg: Config, + pub timeout: Option, + server: TcpListener, +} + +impl Server { + pub async fn build(cfg: Config) -> anyhow::Result { + let server = TcpListener::bind(cfg.addr).await?; + let timeout = Some(Duration::from_secs(5)); + Ok(Server { + cfg, + timeout, + server, + }) + } + pub async fn run(self) -> anyhow::Result<()> { + loop { + let (stream, addr) = match self.server.accept().await { + Ok(s) => s, + Err(err) => { + warn!("could not accept stream: {err:?}"); + continue; + } + }; + match self.handle_stream(stream).await { + Ok(_) => (), + Err(err) => { + match err { + ServerError::Timeout(_) => { + info!("stream {:?} timed out", addr) + } + _ => { + warn!("error while handling stream: {:?}", err) + } + } + continue; + } + }; + } + } + + async fn handle_stream(&self, stream: TcpStream) -> Result<()> { + let mut pings: usize = 0; + let addr = match stream.peer_addr() { + Ok(a) => a, + Err(err) => { + debug!("could not get peer address: {:?}", err); + return Err(err.into()); + } + }; + info!("new peer: {:?}", addr); + let mut buf = Vec::new(); + let mut reader = BufReader::new(stream); + let mut len; + loop { + len = match self.read(&mut reader, &mut buf).await { + Ok(len) => len, + Err(err) => { + match err { + ServerError::Timeout(_) => { + info!("peer {:?} timed out", addr) + } + _ => return Err(err), + } + break; + } + }; + trace!("received message: {:X?}", buf); + if len == 0 { + trace!("len is apperently 0: {len:?}"); + break; + } else { + let msg = self.decode(&buf)?; + info!("< {:?} : {}", addr, msg); + if msg.contains("ping") { + pings += 1; + } + if pings < self.cfg.win_after { + reader.write_all(b"pong\0").await?; + info!("> {:?} : pong", addr,); + } else { + reader.write_all(b"you win!\0").await?; + info!("> {:?} : you win!", addr,); + reader.shutdown().await?; + break; + } + buf.clear(); + + // we should wait, so that we don't spam the client + std::thread::sleep(self.cfg.delay); + } + } + info!("disconnected peer: {:?}", addr); + Ok(()) + } + + #[inline] + fn decode(&self, buf: &Vec) -> Result { + Ok(String::from_utf8(buf.clone())?.replace('\n', "\\n")) + } + + #[inline] + async fn read(&self, reader: &mut BufReader, buf: &mut Vec) -> Result { + match timeout(self.cfg.timeout, reader.read_until(0x00, buf)).await? { + Ok(len) => Ok(len), + Err(err) => Err(err.into()), + } + } +}