#![cfg(feature = "server")] use std::{ fs::File, net::SocketAddr, sync::{atomic::AtomicUsize, Arc}, time::Duration, }; use libpt::log::{debug, error, info, warn}; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; use rustls_pemfile::{certs, private_key}; use tokio::{ io::{split, AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, time::{self}, }; use tokio_rustls::{rustls, TlsAcceptor}; use crate::common::conf::Config; pub mod errors; use errors::*; const BUF_SIZE: usize = 64; pub struct Server { cfg: Config, pub timeout: Option, server: TcpListener, num_peers: AtomicUsize, acceptor: TlsAcceptor, } impl Server { pub async fn build(cfg: Config) -> anyhow::Result { let certs = Self::load_certs(cfg.clone())?; let key = Self::load_key(cfg.clone())?.expect("bad key?"); let tls_config = rustls::ServerConfig::builder() .with_no_client_auth() .with_single_cert(certs, key)?; let acceptor = TlsAcceptor::from(Arc::new(tls_config)); let server = TcpListener::bind(cfg.addr).await?; let timeout = Some(Duration::from_secs(5)); Ok(Server { cfg, timeout, server, num_peers: AtomicUsize::new(0), acceptor, }) } pub async fn run(self) -> anyhow::Result<()> { let rc_self = Arc::new(self); let ref_self = rc_self.clone(); tokio::spawn(async move { let mut interval = time::interval(Duration::from_millis(5000)); loop { interval.tick().await; info!( "status: {} peers", ref_self .num_peers .load(std::sync::atomic::Ordering::Relaxed) ); } }); loop { let (stream, addr) = match rc_self.server.accept().await { Ok(s) => s, Err(err) => { warn!("could not accept stream: {err:?}"); continue; } }; let ref_self = rc_self.clone(); // NOTE: we can only start the task now. If we start it before accepting connections // (so that the task theoretically accepts the connection), we would create endless // tasks in a loop. tokio::spawn(async move { let stream: tokio_rustls::server::TlsStream<_> = match ref_self.acceptor.accept(stream).await { Ok(s) => s, Err(err) => { error!("could not accept tcp stream: {err}"); return; } }; ref_self.peer_add(1); match ref_self.handle_stream(stream, addr).await { Ok(_) => (), Err(err) => match err { ServerError::Timeout(_) => { debug!("stream {:?} timed out", addr) } _ => { warn!("error while handling stream: {:?}", err) } }, }; ref_self.peer_sub(1); }); } } fn load_key(cfg: Config) -> std::io::Result>> { private_key(&mut std::io::BufReader::new(File::open(cfg.key)?)) } fn load_certs(cfg: Config) -> std::io::Result>> { certs(&mut std::io::BufReader::new(File::open(cfg.key)?)).collect() } #[inline] fn peer_add(&self, v: usize) { self.num_peers.store( self.num_peers.load(std::sync::atomic::Ordering::Relaxed) + v, std::sync::atomic::Ordering::Relaxed, ) } #[inline] fn peer_sub(&self, v: usize) { self.num_peers.store( self.num_peers.load(std::sync::atomic::Ordering::Relaxed) - v, std::sync::atomic::Ordering::Relaxed, ) } async fn handle_stream( &self, stream: tokio_rustls::server::TlsStream, addr: SocketAddr, ) -> Result<()> { debug!("new peer: {:?}", addr); let mut buf = [0; BUF_SIZE]; let (mut reader, mut writer) = split(stream); loop { match reader.read(&mut buf).await { Ok(len) if len == 0 => { break; } Ok(_) => (), Err(err) => { eprintln!("reader.read err: {err}") } } debug!("< {addr:?} : \"{}\"", self.decode(&buf)?); writer.write(b"pong\0").await?; // we should wait, so that we don't spam the client std::thread::sleep(self.cfg.delay); } debug!("disconnected peer: {:?}", addr); Ok(()) } #[inline] fn decode(&self, buf: &[u8]) -> Result { match std::str::from_utf8(buf) { Ok(s) => Ok(s.to_string()), Err(err) => Err(err.into()), } } }