We have a server at least #3

Merged
PlexSheep merged 14 commits from devel into master 2024-01-19 16:40:16 +01:00
2 changed files with 27 additions and 55 deletions
Showing only changes of commit a9bf861a0b - Show all commits

View File

@ -1,4 +1,4 @@
use std::{fmt::Display, str::Utf8Error}; use std::{fmt::Display, string::FromUtf8Error};
use anyhow; use anyhow;
use thiserror::Error; use thiserror::Error;
@ -9,11 +9,9 @@ pub type Result<T> = std::result::Result<T, ServerError>;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum ServerError { pub enum ServerError {
Timeout(Elapsed), Timeout(Elapsed),
CouldNotAccept,
RdFail,
Anyhow(anyhow::Error), Anyhow(anyhow::Error),
IO(std::io::Error), IO(std::io::Error),
Format(Utf8Error), Format(FromUtf8Error),
} }
impl From<anyhow::Error> for ServerError { impl From<anyhow::Error> for ServerError {
@ -28,8 +26,8 @@ impl From<std::io::Error> for ServerError {
} }
} }
impl From<Utf8Error> for ServerError { impl From<FromUtf8Error> for ServerError {
fn from(value: Utf8Error) -> Self { fn from(value: FromUtf8Error) -> Self {
Self::Format(value) Self::Format(value)
} }
} }

View File

@ -8,7 +8,7 @@ use libpt::{
}; };
use threadpool::ThreadPool; use threadpool::ThreadPool;
use tokio::{ use tokio::{
io::{AsyncReadExt, BufReader}, io::{AsyncReadExt, BufReader, AsyncBufReadExt},
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
time::timeout, time::timeout,
}; };
@ -64,7 +64,6 @@ impl Server {
} }
async fn handle_stream(&self, stream: TcpStream) -> Result<()> { async fn handle_stream(&self, stream: TcpStream) -> Result<()> {
const BUF_SIZE: usize = 1024;
let addr = match stream.peer_addr() { let addr = match stream.peer_addr() {
Ok(a) => a, Ok(a) => a,
Err(err) => { Err(err) => {
@ -73,13 +72,23 @@ impl Server {
} }
}; };
info!("new peer: {:?}", addr); info!("new peer: {:?}", addr);
let mut buf = [0u8; BUF_SIZE]; let mut buf = Vec::new();
let mut reader = BufReader::new(stream); let mut reader = BufReader::new(stream);
let mut len; let mut len;
loop { loop {
trace!("reading anew"); len = match self.read(&mut reader, &mut buf).await {
len = self.read(&mut reader, &mut buf).await?; Ok(len) => len,
trace!("did read"); Err(err) => {
match err {
ServerError::Timeout(_) => {
info!("peer {:?} timed out", addr)
}
_ => return Err(err),
}
break;
}
};
trace!("received message: {:X?}", buf);
if len == 0 { if len == 0 {
trace!("len is apperently 0: {len:?}"); trace!("len is apperently 0: {len:?}");
break; break;
@ -90,7 +99,7 @@ impl Server {
humanbytes(len), humanbytes(len),
self.decode(&buf)? self.decode(&buf)?
); );
buf = [0; BUF_SIZE]; buf.clear();
} }
} }
info!("disconnected peer: {:?}", addr); info!("disconnected peer: {:?}", addr);
@ -98,50 +107,15 @@ impl Server {
} }
#[inline] #[inline]
fn decode(&self, buf: &[u8]) -> Result<String> { fn decode(&self, buf: &Vec<u8>) -> Result<String> {
Ok(std::str::from_utf8(buf)?.to_string()) Ok(String::from_utf8(buf.clone())?)
} }
async fn read(&self, reader: &mut BufReader<TcpStream>, buf: &mut [u8]) -> Result<usize> { #[inline]
let mut len: usize; async fn read(&self, reader: &mut BufReader<TcpStream>, buf: &mut Vec<u8>) -> Result<usize> {
let mut failsafe: u8 = 0; match timeout(self.cfg.timeout, reader.read_until(0x00, buf)).await? {
while failsafe < 5 { Ok(len) => Ok(len),
trace!("loop"); Err(err) => Err(err.into()),
len = match timeout(self.cfg.timeout, reader.read(buf)).await {
Ok(inner) => {
trace!("Read for len: {inner:?}");
match inner {
Ok(len) => len,
Err(err) => {
error!("read failed: {err:?}");
return Err(ServerError::RdFail);
}
}
}
Err(err) => {
return Err(err.into());
}
};
if self.should_end(&buf) {
return Ok(len);
} else {
// ?????????
failsafe += 1;
}
} }
// TODO: remove the failsafe when this stuff works
Err(anyhow!("read too often, so the failsafe activated").into())
}
fn should_end(&self, buf: &[u8]) -> bool {
let mut lb: u8 = buf[0];
let mut iter = buf.iter().skip(1).peekable();
while let Some(b) = iter.next() {
if lb != 0 && *b == 0 && **iter.peek().unwrap() == 0 {
return true;
}
lb = *b;
}
return false;
} }
} }