We have a server at least #3

Merged
PlexSheep merged 14 commits from devel into master 2024-01-19 16:40:16 +01:00
9 changed files with 390 additions and 8 deletions

View File

@ -1,16 +1,27 @@
[package] [package]
name = "template" name = "netpong"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
publish = false publish = true
authors = ["Christoph J. Scherr <software@cscherr.de>"] authors = ["Christoph J. Scherr <software@cscherr.de>"]
license = "MIT" license = "MIT"
description = "No description yet" description = "let your servers play ping pong"
readme = "README.md" readme = "README.md"
homepage = "https://git.cscherr.de/PlexSheep/rs-base" homepage = "https://git.cscherr.de/PlexSheep/netpong"
repository = "https://git.cscherr.de/PlexSheep/rs-base" repository = "https://git.cscherr.de/PlexSheep/netpong"
keywords = ["template"] keywords = ["networking"]
[dependencies] [dependencies]
anyhow = "1.0.79"
clap = "4.4.18"
clap-num = "1.0.2"
clap-verbosity-flag = "2.1.2"
libpt = { version = "0.3.10", features = ["net"] }
thiserror = "1.0.56"
threadpool = { version = "1.8.1", optional = true }
tokio = { version = "1.35.1", features = ["net", "rt", "macros"] }
[features]
default = ["server"]
server = ["dep:threadpool"]

21
scripts/client.py Normal file
View File

@ -0,0 +1,21 @@
import socket
HOST = "127.0.0.1"
PORT = 9999
payload = b"ping\0"
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
while True:
try:
s.sendall(payload)
print("> ping")
except Exception as e:
break
reply = s.recv(1024).decode()
if reply == "":
break
print(f"< {reply}")
print("connection shut down")

1
src/client/mod.rs Normal file
View File

@ -0,0 +1 @@

82
src/common/args.rs Normal file
View File

@ -0,0 +1,82 @@
use libpt::log::{Level, Logger};
use clap::Parser;
use clap_verbosity_flag::{InfoLevel, Verbosity};
use crate::common::conf::Mode;
/// short about section displayed in help
const ABOUT_ROOT: &'static str = r##"
Let your hosts play ping pong over the network
"##;
/// longer about section displayed in help, is combined with [the short help](ABOUT_ROOT)
static LONG_ABOUT_ROOT: &'static str = r##"
Connect to a ping pong server and periodically send a ping. The server will reply with a pong.
That's it really. You can also host your own netpong server (server feature).
"##;
#[derive(Debug, Clone, Parser)]
#[command(
author,
version,
about = ABOUT_ROOT,
long_about = format!("{}{}", ABOUT_ROOT ,LONG_ABOUT_ROOT),
help_template =
r#"{about-section}
{usage-heading} {usage}
{all-args}{tab}
libpt: {version}
Author: {author-with-newline}
"#
)]
pub(crate) struct Cli {
// clap_verbosity_flag seems to make this a global option implicitly
/// set a verbosity, multiple allowed (f.e. -vvv)
#[command(flatten)]
pub(crate) verbose: Verbosity<InfoLevel>,
/// show additional logging meta data
#[arg(long)]
pub(crate) meta: bool,
// host a server instead of connecting to one
#[cfg(feature = "server")]
#[arg(short, long, default_value_t = false)]
pub(crate) server: bool,
// how much threads the server should use
#[cfg(feature = "server")]
#[arg(short, long, default_value_t = 4)]
pub(crate) threads: usize,
#[arg(short, long, default_value_t = Mode::Tcp, ignore_case = true)]
pub(crate) mode: Mode,
/// Address of the server
pub(crate) addr: std::net::SocketAddr,
}
impl Cli {
pub fn cli_parse() -> Self {
let cli = Self::parse();
let ll: Level = match cli.verbose.log_level().unwrap().as_str() {
"TRACE" => Level::TRACE,
"DEBUG" => Level::DEBUG,
"INFO" => Level::INFO,
"WARN" => Level::WARN,
"ERROR" => Level::ERROR,
_ => {
unreachable!();
}
};
if cli.meta {
Logger::init(None, Some(ll)).expect("could not initialize Logger");
} else {
// less verbose version
Logger::init_mini(Some(ll)).expect("could not initialize Logger");
}
return cli;
}
}

65
src/common/conf.rs Normal file
View File

@ -0,0 +1,65 @@
use crate::common::args::Cli;
use clap::ValueEnum;
use std::{fmt::Display, time::Duration};
const DEFAULT_TIMEOUT_LEN: u64 = 5000; // ms
const DEFAULT_DELAY_LEN: u64 = 500; // ms
const DEFAULT_WIN_AFTER: usize = 20;
#[derive(Debug, Clone, Copy)]
pub enum Mode {
Tcp,
}
impl ValueEnum for Mode {
fn to_possible_value(&self) -> Option<clap::builder::PossibleValue> {
Some(match self {
Self::Tcp => clap::builder::PossibleValue::new("tcp"),
})
}
fn value_variants<'a>() -> &'a [Self] {
&[Self::Tcp]
}
fn from_str(input: &str, ignore_case: bool) -> Result<Self, String> {
let comp: String = if ignore_case {
input.to_lowercase()
} else {
input.to_string()
};
match comp.as_str() {
"tcp" => return Ok(Self::Tcp),
_ => return Err(format!("\"{input}\" is not a valid mode")),
}
}
}
impl Display for Mode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let repr: String = match self {
Self::Tcp => format!("tcp"),
};
write!(f, "{}", repr)
}
}
pub struct Config {
pub addr: std::net::SocketAddr,
pub mode: Mode,
pub threads: usize,
pub timeout: Duration,
pub delay: Duration,
pub win_after: usize,
}
impl Config {
pub fn new(cli: &Cli) -> Self {
Config {
addr: cli.addr.clone(),
mode: cli.mode.clone(),
threads: cli.threads,
timeout: Duration::from_millis(DEFAULT_TIMEOUT_LEN),
delay: Duration::from_millis(DEFAULT_DELAY_LEN),
win_after: DEFAULT_WIN_AFTER,
}
}
}

2
src/common/mod.rs Normal file
View File

@ -0,0 +1,2 @@
pub mod args;
pub mod conf;

View File

@ -1,3 +1,33 @@
fn main() { //! # netpong
println!("Hello, world!"); //! This is pretty silly and only let's your client send "ping"
//! over a connection, while letting a server reply pong to
//! every connection. But I had to make it for some reason.
use anyhow::Result;
use libpt::log::*;
use tokio;
mod client;
mod common;
mod server;
use common::{args::Cli, conf::*};
use crate::server::Server;
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::cli_parse();
debug!("dumping cli args:\n{:#?}", cli);
let cfg = Config::new(&cli);
#[cfg(feature = "server")]
if cli.server {
info!("starting server");
return Server::build(cfg).await?.run().await;
}
// implicit else, so we can work without the server feature
info!("starting client");
Ok(())
} }

45
src/server/errors.rs Normal file
View File

@ -0,0 +1,45 @@
use std::{fmt::Display, string::FromUtf8Error};
use anyhow;
use thiserror::Error;
use tokio::time::error::Elapsed;
pub type Result<T> = std::result::Result<T, ServerError>;
#[derive(Debug, Error)]
pub enum ServerError {
Timeout(Elapsed),
Anyhow(anyhow::Error),
IO(std::io::Error),
Format(FromUtf8Error),
}
impl From<anyhow::Error> for ServerError {
fn from(value: anyhow::Error) -> Self {
Self::Anyhow(value)
}
}
impl From<std::io::Error> for ServerError {
fn from(value: std::io::Error) -> Self {
Self::IO(value)
}
}
impl From<FromUtf8Error> for ServerError {
fn from(value: FromUtf8Error) -> Self {
Self::Format(value)
}
}
impl From<Elapsed> for ServerError {
fn from(value: Elapsed) -> Self {
Self::Timeout(value)
}
}
impl Display for ServerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

125
src/server/mod.rs Normal file
View File

@ -0,0 +1,125 @@
#![cfg(feature = "server")]
use std::time::Duration;
use libpt::log::{debug, info, trace, warn};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::{TcpListener, TcpStream},
time::timeout,
};
use crate::common::conf::Config;
pub mod errors;
use errors::*;
pub struct Server {
cfg: Config,
pub timeout: Option<Duration>,
server: TcpListener,
}
impl Server {
pub async fn build(cfg: Config) -> anyhow::Result<Self> {
let server = TcpListener::bind(cfg.addr).await?;
let timeout = Some(Duration::from_secs(5));
Ok(Server {
cfg,
timeout,
server,
})
}
pub async fn run(self) -> anyhow::Result<()> {
loop {
let (stream, addr) = match self.server.accept().await {
Ok(s) => s,
Err(err) => {
warn!("could not accept stream: {err:?}");
continue;
}
};
match self.handle_stream(stream).await {
Ok(_) => (),
Err(err) => {
match err {
ServerError::Timeout(_) => {
info!("stream {:?} timed out", addr)
}
_ => {
warn!("error while handling stream: {:?}", err)
}
}
continue;
}
};
}
}
async fn handle_stream(&self, stream: TcpStream) -> Result<()> {
let mut pings: usize = 0;
let addr = match stream.peer_addr() {
Ok(a) => a,
Err(err) => {
debug!("could not get peer address: {:?}", err);
return Err(err.into());
}
};
info!("new peer: {:?}", addr);
let mut buf = Vec::new();
let mut reader = BufReader::new(stream);
let mut len;
loop {
len = match self.read(&mut reader, &mut buf).await {
Ok(len) => len,
Err(err) => {
match err {
ServerError::Timeout(_) => {
info!("peer {:?} timed out", addr)
}
_ => return Err(err),
}
break;
}
};
trace!("received message: {:X?}", buf);
if len == 0 {
trace!("len is apperently 0: {len:?}");
break;
} else {
let msg = self.decode(&buf)?;
info!("< {:?} : {}", addr, msg);
if msg.contains("ping") {
pings += 1;
}
if pings < self.cfg.win_after {
reader.write_all(b"pong\0").await?;
info!("> {:?} : pong", addr,);
} else {
reader.write_all(b"you win!\0").await?;
info!("> {:?} : you win!", addr,);
reader.shutdown().await?;
break;
}
buf.clear();
// we should wait, so that we don't spam the client
std::thread::sleep(self.cfg.delay);
}
}
info!("disconnected peer: {:?}", addr);
Ok(())
}
#[inline]
fn decode(&self, buf: &Vec<u8>) -> Result<String> {
Ok(String::from_utf8(buf.clone())?.replace('\n', "\\n"))
}
#[inline]
async fn read(&self, reader: &mut BufReader<TcpStream>, buf: &mut Vec<u8>) -> Result<usize> {
match timeout(self.cfg.timeout, reader.read_until(0x00, buf)).await? {
Ok(len) => Ok(len),
Err(err) => Err(err.into()),
}
}
}