From a9bf861a0bef2c98847b59c693893a2e34a4e3be Mon Sep 17 00:00:00 2001 From: "Christoph J. Scherr" Date: Fri, 19 Jan 2024 16:11:02 +0100 Subject: [PATCH] it somewhat works, now we just need to pong back --- src/server/errors.rs | 10 +++--- src/server/mod.rs | 72 ++++++++++++++------------------------------ 2 files changed, 27 insertions(+), 55 deletions(-) diff --git a/src/server/errors.rs b/src/server/errors.rs index 239eb79..90dd1f5 100644 --- a/src/server/errors.rs +++ b/src/server/errors.rs @@ -1,4 +1,4 @@ -use std::{fmt::Display, str::Utf8Error}; +use std::{fmt::Display, string::FromUtf8Error}; use anyhow; use thiserror::Error; @@ -9,11 +9,9 @@ pub type Result = std::result::Result; #[derive(Debug, Error)] pub enum ServerError { Timeout(Elapsed), - CouldNotAccept, - RdFail, Anyhow(anyhow::Error), IO(std::io::Error), - Format(Utf8Error), + Format(FromUtf8Error), } impl From for ServerError { @@ -28,8 +26,8 @@ impl From for ServerError { } } -impl From for ServerError { - fn from(value: Utf8Error) -> Self { +impl From for ServerError { + fn from(value: FromUtf8Error) -> Self { Self::Format(value) } } diff --git a/src/server/mod.rs b/src/server/mod.rs index dec1ccd..9dc3bdb 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -8,7 +8,7 @@ use libpt::{ }; use threadpool::ThreadPool; use tokio::{ - io::{AsyncReadExt, BufReader}, + io::{AsyncReadExt, BufReader, AsyncBufReadExt}, net::{TcpListener, TcpStream}, time::timeout, }; @@ -64,7 +64,6 @@ impl Server { } async fn handle_stream(&self, stream: TcpStream) -> Result<()> { - const BUF_SIZE: usize = 1024; let addr = match stream.peer_addr() { Ok(a) => a, Err(err) => { @@ -73,13 +72,23 @@ impl Server { } }; info!("new peer: {:?}", addr); - let mut buf = [0u8; BUF_SIZE]; + let mut buf = Vec::new(); let mut reader = BufReader::new(stream); let mut len; loop { - trace!("reading anew"); - len = self.read(&mut reader, &mut buf).await?; - trace!("did read"); + len = match self.read(&mut reader, &mut buf).await { + Ok(len) => len, + Err(err) => { + match err { + ServerError::Timeout(_) => { + info!("peer {:?} timed out", addr) + } + _ => return Err(err), + } + break; + } + }; + trace!("received message: {:X?}", buf); if len == 0 { trace!("len is apperently 0: {len:?}"); break; @@ -90,7 +99,7 @@ impl Server { humanbytes(len), self.decode(&buf)? ); - buf = [0; BUF_SIZE]; + buf.clear(); } } info!("disconnected peer: {:?}", addr); @@ -98,50 +107,15 @@ impl Server { } #[inline] - fn decode(&self, buf: &[u8]) -> Result { - Ok(std::str::from_utf8(buf)?.to_string()) + fn decode(&self, buf: &Vec) -> Result { + Ok(String::from_utf8(buf.clone())?) } - async fn read(&self, reader: &mut BufReader, buf: &mut [u8]) -> Result { - let mut len: usize; - let mut failsafe: u8 = 0; - while failsafe < 5 { - trace!("loop"); - len = match timeout(self.cfg.timeout, reader.read(buf)).await { - Ok(inner) => { - trace!("Read for len: {inner:?}"); - match inner { - Ok(len) => len, - Err(err) => { - error!("read failed: {err:?}"); - return Err(ServerError::RdFail); - } - } - } - Err(err) => { - return Err(err.into()); - } - }; - if self.should_end(&buf) { - return Ok(len); - } else { - // ????????? - failsafe += 1; - } + #[inline] + async fn read(&self, reader: &mut BufReader, buf: &mut Vec) -> Result { + match timeout(self.cfg.timeout, reader.read_until(0x00, buf)).await? { + Ok(len) => Ok(len), + Err(err) => Err(err.into()), } - // TODO: remove the failsafe when this stuff works - Err(anyhow!("read too often, so the failsafe activated").into()) - } - - fn should_end(&self, buf: &[u8]) -> bool { - let mut lb: u8 = buf[0]; - let mut iter = buf.iter().skip(1).peekable(); - while let Some(b) = iter.next() { - if lb != 0 && *b == 0 && **iter.peek().unwrap() == 0 { - return true; - } - lb = *b; - } - return false; } }