diff --git a/Cargo.lock b/Cargo.lock index 640304c..b91fe49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1480,6 +1480,7 @@ dependencies = [ "libc", "rand_chacha", "rand_core", + "serde", ] [[package]] @@ -1576,6 +1577,19 @@ dependencies = [ "warp", ] +[[package]] +name = "rest-queued" +version = "0.1.0" +dependencies = [ + "anyhow", + "libpt", + "rand", + "serde", + "serde_json", + "tokio", + "warp", +] + [[package]] name = "revsqrt" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index a172959..8565fed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ members = [ "members/echargs", "members/claptest", "members/rest", + "members/rest-queued", ] default-members = [ ".", @@ -40,6 +41,7 @@ default-members = [ "members/matchmatchmatch", "members/future_stream", "members/rest", + "members/rest-queued", ] publish = false @@ -58,6 +60,7 @@ tokio = { version = "1.35.1", features = [ futures = { version = "0.3.30", features = ["executor"] } serde = { version = "1.0.171", features = ["derive"] } serde_json = "1.0.102" +libpt = { version = "0.4.2", features = ["log"] } [package] name = "rs-basic" diff --git a/members/rest-queued/Cargo.toml b/members/rest-queued/Cargo.toml new file mode 100644 index 0000000..1f9e5f5 --- /dev/null +++ b/members/rest-queued/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "rest-queued" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow.workspace = true +libpt.workspace = true +rand = { version = "0.8.5", features = ["serde"] } +serde.workspace = true +serde_json.workspace = true +tokio.workspace = true +warp = "0.3.6" diff --git a/members/rest-queued/src/client.rs b/members/rest-queued/src/client.rs new file mode 100644 index 0000000..ca634b6 --- /dev/null +++ b/members/rest-queued/src/client.rs @@ -0,0 +1,112 @@ +use std::{convert::Infallible, fmt::Display, str::FromStr}; + +use crate::{store::Sequence, Item, Store}; +use rand::{prelude::*, seq::SliceRandom}; +use serde::{Deserialize, Serialize}; + +const ALPHABET: &str = "qwertzuiopasdfghjklyxcvbnmQWERTZUIOPASDFGHJKLYXCVBNM"; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct Client { + #[serde(flatten)] + id: Id, + #[serde(flatten)] + token: Token, + last: Sequence, +} + +impl Client { + pub fn new() -> Self { + Self { + id: Id::new(), + token: Token::new(), + last: 0, + } + } + + #[cfg(debug_assertions)] + #[allow(unused)] + pub(crate) fn new_debug() -> Self { + Self { + id: Id::from_str("myid").unwrap(), + token: Token::from_str("mytok").unwrap(), + last: 0, + } + } + + pub async fn get_items(&self, store: Store) -> Vec { + let items = store.get_items().await; + if let Some(item) = items.last() { + store.adjust_lseq(item.seq).await; + } + items + } + + pub fn validate_token(&self, token: Token) -> bool { + token == self.token + } + + pub fn id(&self) -> &Id { + &self.id + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] +pub struct Id { + #[serde(rename = "id")] + inner: String, +} + +impl Id { + pub fn new() -> Self { + let mut rng = rand::thread_rng(); + let mut data = ALPHABET.to_string().into_bytes(); + data.shuffle(&mut rng); + Self { + inner: String::from_utf8(data[..20].into()).unwrap(), + } + } +} + +impl FromStr for Id { + type Err = Infallible; + + fn from_str(s: &str) -> Result { + Ok(Self { + inner: s.to_string(), + }) + } +} + +impl Display for Id { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.inner) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct Token { + #[serde(rename = "token")] + inner: String, +} + +impl Token { + pub fn new() -> Self { + let mut rng = rand::thread_rng(); + let mut data = ALPHABET.to_string().into_bytes(); + data.shuffle(&mut rng); + Self { + inner: String::from_utf8(data).unwrap(), + } + } +} + +impl FromStr for Token { + type Err = Infallible; + + fn from_str(s: &str) -> Result { + Ok(Self { + inner: s.to_string(), + }) + } +} diff --git a/members/rest-queued/src/main.rs b/members/rest-queued/src/main.rs new file mode 100644 index 0000000..7f77a65 --- /dev/null +++ b/members/rest-queued/src/main.rs @@ -0,0 +1,18 @@ +use libpt::log::{debug, info}; + +mod routes; +use routes::*; +mod store; +use store::*; +mod client; +use client::*; + +#[tokio::main] +async fn main() { + libpt::log::Logger::build_mini(Some(libpt::log::Level::DEBUG)).expect("could not init logger"); + let store = Store::new(); + debug!("spawning data_processing task: {store:#?}"); + tokio::spawn(data_processing(store.clone())); + info!("starting webserver"); + warp::serve(routes(store)).run(([127, 0, 0, 1], 3030)).await; +} diff --git a/members/rest-queued/src/routes.rs b/members/rest-queued/src/routes.rs new file mode 100644 index 0000000..f979fe5 --- /dev/null +++ b/members/rest-queued/src/routes.rs @@ -0,0 +1,90 @@ +use std::{collections::HashMap, convert::Infallible, str::FromStr}; + +use libpt::log::{debug, error, info}; +use warp::{ + filters::{path::param, BoxedFilter}, + http::StatusCode, + reject::{MissingHeader, Rejection}, + reply, Filter, Reply, +}; + +use crate::{Client, Id, Item, Store, Token, StoreErr}; + +pub fn with_store(store: Store) -> impl Filter + Clone { + warp::any().map(move || store.clone()) +} + +pub fn routes(store: Store) -> BoxedFilter<(impl Reply,)> { + get_items(store.clone()) + .or(get_register(store.clone())) + .recover(handle_rejection) + .boxed() +} + +async fn handle_rejection(err: Rejection) -> Result { + if err.is_not_found() { + debug!("page not found"); + Ok(reply::with_status("NOT_FOUND", StatusCode::NOT_FOUND)) + } else if let Some(e) = err.find::() { + debug!("{e}"); + Ok(reply::with_status( + "MISSING_HEADER: TOKEN", + StatusCode::FORBIDDEN, + )) + } else if let Some(e) = err.find::() { + debug!("{e}"); + Ok(reply::with_status("UNAUTHENTICATED", StatusCode::FORBIDDEN)) + } else { + error!("unhandled rejection: {:?}", err); + Ok(reply::with_status( + "INTERNAL_SERVER_ERROR", + StatusCode::INTERNAL_SERVER_ERROR, + )) + } +} + +pub async fn item_getter( + param: HashMap, + token: Token, + store: Store, +) -> Result { + info!("GET /api/v1/items"); + if let Some(id) = param.get("id") { + let id = Id::from_str(id).unwrap(); + let client = match store.login(id, token).await { + Ok(client) => client, + Err(unauth) => return Err(warp::reject::custom(unauth)), + }; + Ok(warp::reply::json(&client.get_items(store.clone()).await)) + } else { + Err(warp::reject()) + } +} + +// GET /api/v1/items +pub fn get_items(store: Store) -> BoxedFilter<(impl Reply,)> { + warp::path!("api" / "v1" / "items") + .and(warp::get()) + .and(warp::query::>()) + .and(warp::header::("Token")) + .and(with_store(store)) + // .and(warp::body::content_length_limit(2 << 13)) + .and_then(item_getter) + .boxed() +} + +// GET /api/v1/register +pub fn get_register(store: Store) -> BoxedFilter<(impl Reply,)> { + warp::path!("api" / "v1" / "register") + .and(warp::get()) + .and(with_store(store)) + // .and(warp::body::content_length_limit(2 << 13)) + .then(|store: Store| async move { + info!("GET /api/v1/register"); + let client = Client::new(); + let response = warp::reply::json(&client); + store.register_client(client).await; + response + }) + .boxed() +} diff --git a/members/rest-queued/src/store.rs b/members/rest-queued/src/store.rs new file mode 100644 index 0000000..ab39b3b --- /dev/null +++ b/members/rest-queued/src/store.rs @@ -0,0 +1,150 @@ +use libpt::log::debug; +use serde::{Deserialize, Serialize}; +use std::{ + collections::{HashMap, VecDeque}, + fmt::Display, + sync::{atomic::AtomicUsize, Arc}, +}; +use tokio::sync::Mutex; + +use crate::{Client, Id, Token}; + +pub static SEQUENCE: AtomicUsize = AtomicUsize::new(0); +pub static LAST_SEQUENCE: AtomicUsize = AtomicUsize::new(0); +pub type Sequence = usize; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub enum StoreErr { + Unauthenticated(Id), + NotRegistered(Id), +} +impl warp::reject::Reject for StoreErr {} +impl Display for StoreErr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Unauthenticated(id) => { + write!(f, "unauthenticated get_items request for: {id}",) + } + Self::NotRegistered(id) => { + write!(f, "request with unregistered id: {id}",) + } + } + } +} + +#[derive(Debug, Clone)] +pub struct Store { + all: Arc>>, + clients: Arc>>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Item { + pub body: serde_json::Value, + pub seq: Sequence, +} + +impl Item { + fn new(msg: serde_json::Value) -> Self { + let seq = SEQUENCE.load(std::sync::atomic::Ordering::Relaxed); + SEQUENCE.store(seq + 1, std::sync::atomic::Ordering::Relaxed); + + Self { body: msg, seq } + } +} + +#[cfg(debug_assertions)] +impl Drop for Item { + fn drop(&mut self) { + debug!("dropping {:?}", self) + } +} + +impl From for Item { + fn from(value: serde_json::Value) -> Self { + Item::new(value) + } +} + +impl Store { + pub fn new() -> Self { + Self { + all: Arc::new(Mutex::new( + vec![ + Item::new(serde_json::json!({"foo": "bar", "value": 0})), + Item::new(serde_json::json!({"foo": "bar", "value": 1})), + Item::new(serde_json::json!({"foo": "bar", "value": 2})), + ] + .into(), + )), + clients: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub async fn add_item(&self, item: Item) { + let mut store = self.all.lock().await; + store.push_back(item); + } + + pub async fn get_items(&self) -> Vec { + let store = self.all.lock().await; + store.clone().into_iter().collect() + } + + pub async fn register_client(&self, client: Client) { + let mut store = self.clients.lock().await; + store.insert(client.id().clone(), client); + } + + pub async fn garbage_collect(&self) { + let mut store = self.all.lock().await; + let seq = SEQUENCE.load(std::sync::atomic::Ordering::Relaxed); + let lseq = LAST_SEQUENCE.load(std::sync::atomic::Ordering::Relaxed); + if seq <= lseq { + return; + } + while let Some(item) = store.front() { + if item.seq > lseq { + break; + } + store.pop_front().unwrap(); + } + } + + pub(crate) async fn adjust_lseq(&self, newer: Sequence) -> Sequence { + let lseq = LAST_SEQUENCE.load(std::sync::atomic::Ordering::Relaxed); + + if newer > lseq { + LAST_SEQUENCE.store(newer, std::sync::atomic::Ordering::Relaxed); + self.garbage_collect().await; + newer + } else { + lseq + } + } + + pub async fn login(&self, id: Id, token: Token) -> Result { + let clients = self.clients.lock().await; + let potential_client = match clients.get(&id) { + Some(c) => c, + None => return Err(StoreErr::NotRegistered(id)), + }; + + if potential_client.validate_token(token) { + // HACK: cloning here is bad + Ok(potential_client.clone()) + } else { + Err(StoreErr::Unauthenticated(id)) + } + } +} + +pub async fn data_processing(store: Store) { + let mut iter = 3; + loop { + let item = serde_json::json!({"foo": "bar", "value": iter}).into(); + store.add_item(item).await; + iter += 1; + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + } +}