look at the amazing rest api with a queue and authentication!!!!!

This commit is contained in:
Christoph J. Scherr 2024-03-08 17:03:49 +01:00
parent a7ce0cc375
commit 1646371378
Signed by: cscherrNT
GPG Key ID: 8E2B45BC51A27EA7
7 changed files with 402 additions and 0 deletions

14
Cargo.lock generated
View File

@ -1480,6 +1480,7 @@ dependencies = [
"libc", "libc",
"rand_chacha", "rand_chacha",
"rand_core", "rand_core",
"serde",
] ]
[[package]] [[package]]
@ -1576,6 +1577,19 @@ dependencies = [
"warp", "warp",
] ]
[[package]]
name = "rest-queued"
version = "0.1.0"
dependencies = [
"anyhow",
"libpt",
"rand",
"serde",
"serde_json",
"tokio",
"warp",
]
[[package]] [[package]]
name = "revsqrt" name = "revsqrt"
version = "0.1.0" version = "0.1.0"

View File

@ -19,6 +19,7 @@ members = [
"members/echargs", "members/echargs",
"members/claptest", "members/claptest",
"members/rest", "members/rest",
"members/rest-queued",
] ]
default-members = [ default-members = [
".", ".",
@ -40,6 +41,7 @@ default-members = [
"members/matchmatchmatch", "members/matchmatchmatch",
"members/future_stream", "members/future_stream",
"members/rest", "members/rest",
"members/rest-queued",
] ]
publish = false publish = false
@ -58,6 +60,7 @@ tokio = { version = "1.35.1", features = [
futures = { version = "0.3.30", features = ["executor"] } futures = { version = "0.3.30", features = ["executor"] }
serde = { version = "1.0.171", features = ["derive"] } serde = { version = "1.0.171", features = ["derive"] }
serde_json = "1.0.102" serde_json = "1.0.102"
libpt = { version = "0.4.2", features = ["log"] }
[package] [package]
name = "rs-basic" name = "rs-basic"

View File

@ -0,0 +1,15 @@
[package]
name = "rest-queued"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow.workspace = true
libpt.workspace = true
rand = { version = "0.8.5", features = ["serde"] }
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
warp = "0.3.6"

View File

@ -0,0 +1,112 @@
use std::{convert::Infallible, fmt::Display, str::FromStr};
use crate::{store::Sequence, Item, Store};
use rand::{prelude::*, seq::SliceRandom};
use serde::{Deserialize, Serialize};
const ALPHABET: &str = "qwertzuiopasdfghjklyxcvbnmQWERTZUIOPASDFGHJKLYXCVBNM";
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct Client {
#[serde(flatten)]
id: Id,
#[serde(flatten)]
token: Token,
last: Sequence,
}
impl Client {
pub fn new() -> Self {
Self {
id: Id::new(),
token: Token::new(),
last: 0,
}
}
#[cfg(debug_assertions)]
#[allow(unused)]
pub(crate) fn new_debug() -> Self {
Self {
id: Id::from_str("myid").unwrap(),
token: Token::from_str("mytok").unwrap(),
last: 0,
}
}
pub async fn get_items(&self, store: Store) -> Vec<Item> {
let items = store.get_items().await;
if let Some(item) = items.last() {
store.adjust_lseq(item.seq).await;
}
items
}
pub fn validate_token(&self, token: Token) -> bool {
token == self.token
}
pub fn id(&self) -> &Id {
&self.id
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
pub struct Id {
#[serde(rename = "id")]
inner: String,
}
impl Id {
pub fn new() -> Self {
let mut rng = rand::thread_rng();
let mut data = ALPHABET.to_string().into_bytes();
data.shuffle(&mut rng);
Self {
inner: String::from_utf8(data[..20].into()).unwrap(),
}
}
}
impl FromStr for Id {
type Err = Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self {
inner: s.to_string(),
})
}
}
impl Display for Id {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.inner)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct Token {
#[serde(rename = "token")]
inner: String,
}
impl Token {
pub fn new() -> Self {
let mut rng = rand::thread_rng();
let mut data = ALPHABET.to_string().into_bytes();
data.shuffle(&mut rng);
Self {
inner: String::from_utf8(data).unwrap(),
}
}
}
impl FromStr for Token {
type Err = Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self {
inner: s.to_string(),
})
}
}

View File

@ -0,0 +1,18 @@
use libpt::log::{debug, info};
mod routes;
use routes::*;
mod store;
use store::*;
mod client;
use client::*;
#[tokio::main]
async fn main() {
libpt::log::Logger::build_mini(Some(libpt::log::Level::DEBUG)).expect("could not init logger");
let store = Store::new();
debug!("spawning data_processing task: {store:#?}");
tokio::spawn(data_processing(store.clone()));
info!("starting webserver");
warp::serve(routes(store)).run(([127, 0, 0, 1], 3030)).await;
}

View File

@ -0,0 +1,90 @@
use std::{collections::HashMap, convert::Infallible, str::FromStr};
use libpt::log::{debug, error, info};
use warp::{
filters::{path::param, BoxedFilter},
http::StatusCode,
reject::{MissingHeader, Rejection},
reply, Filter, Reply,
};
use crate::{Client, Id, Item, Store, Token, StoreErr};
pub fn with_store(store: Store) -> impl Filter<Extract = (Store,), Error = Infallible> + Clone {
warp::any().map(move || store.clone())
}
pub fn routes(store: Store) -> BoxedFilter<(impl Reply,)> {
get_items(store.clone())
.or(get_register(store.clone()))
.recover(handle_rejection)
.boxed()
}
async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> {
if err.is_not_found() {
debug!("page not found");
Ok(reply::with_status("NOT_FOUND", StatusCode::NOT_FOUND))
} else if let Some(e) = err.find::<MissingHeader>() {
debug!("{e}");
Ok(reply::with_status(
"MISSING_HEADER: TOKEN",
StatusCode::FORBIDDEN,
))
} else if let Some(e) = err.find::<StoreErr>() {
debug!("{e}");
Ok(reply::with_status("UNAUTHENTICATED", StatusCode::FORBIDDEN))
} else {
error!("unhandled rejection: {:?}", err);
Ok(reply::with_status(
"INTERNAL_SERVER_ERROR",
StatusCode::INTERNAL_SERVER_ERROR,
))
}
}
pub async fn item_getter(
param: HashMap<String, String>,
token: Token,
store: Store,
) -> Result<warp::reply::Json, warp::Rejection> {
info!("GET /api/v1/items");
if let Some(id) = param.get("id") {
let id = Id::from_str(id).unwrap();
let client = match store.login(id, token).await {
Ok(client) => client,
Err(unauth) => return Err(warp::reject::custom(unauth)),
};
Ok(warp::reply::json(&client.get_items(store.clone()).await))
} else {
Err(warp::reject())
}
}
// GET /api/v1/items
pub fn get_items(store: Store) -> BoxedFilter<(impl Reply,)> {
warp::path!("api" / "v1" / "items")
.and(warp::get())
.and(warp::query::<HashMap<String, String>>())
.and(warp::header::<Token>("Token"))
.and(with_store(store))
// .and(warp::body::content_length_limit(2 << 13))
.and_then(item_getter)
.boxed()
}
// GET /api/v1/register
pub fn get_register(store: Store) -> BoxedFilter<(impl Reply,)> {
warp::path!("api" / "v1" / "register")
.and(warp::get())
.and(with_store(store))
// .and(warp::body::content_length_limit(2 << 13))
.then(|store: Store| async move {
info!("GET /api/v1/register");
let client = Client::new();
let response = warp::reply::json(&client);
store.register_client(client).await;
response
})
.boxed()
}

View File

@ -0,0 +1,150 @@
use libpt::log::debug;
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, VecDeque},
fmt::Display,
sync::{atomic::AtomicUsize, Arc},
};
use tokio::sync::Mutex;
use crate::{Client, Id, Token};
pub static SEQUENCE: AtomicUsize = AtomicUsize::new(0);
pub static LAST_SEQUENCE: AtomicUsize = AtomicUsize::new(0);
pub type Sequence = usize;
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub enum StoreErr {
Unauthenticated(Id),
NotRegistered(Id),
}
impl warp::reject::Reject for StoreErr {}
impl Display for StoreErr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Unauthenticated(id) => {
write!(f, "unauthenticated get_items request for: {id}",)
}
Self::NotRegistered(id) => {
write!(f, "request with unregistered id: {id}",)
}
}
}
}
#[derive(Debug, Clone)]
pub struct Store {
all: Arc<Mutex<VecDeque<Item>>>,
clients: Arc<Mutex<HashMap<Id, Client>>>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Item {
pub body: serde_json::Value,
pub seq: Sequence,
}
impl Item {
fn new(msg: serde_json::Value) -> Self {
let seq = SEQUENCE.load(std::sync::atomic::Ordering::Relaxed);
SEQUENCE.store(seq + 1, std::sync::atomic::Ordering::Relaxed);
Self { body: msg, seq }
}
}
#[cfg(debug_assertions)]
impl Drop for Item {
fn drop(&mut self) {
debug!("dropping {:?}", self)
}
}
impl From<serde_json::Value> for Item {
fn from(value: serde_json::Value) -> Self {
Item::new(value)
}
}
impl Store {
pub fn new() -> Self {
Self {
all: Arc::new(Mutex::new(
vec![
Item::new(serde_json::json!({"foo": "bar", "value": 0})),
Item::new(serde_json::json!({"foo": "bar", "value": 1})),
Item::new(serde_json::json!({"foo": "bar", "value": 2})),
]
.into(),
)),
clients: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn add_item(&self, item: Item) {
let mut store = self.all.lock().await;
store.push_back(item);
}
pub async fn get_items(&self) -> Vec<Item> {
let store = self.all.lock().await;
store.clone().into_iter().collect()
}
pub async fn register_client(&self, client: Client) {
let mut store = self.clients.lock().await;
store.insert(client.id().clone(), client);
}
pub async fn garbage_collect(&self) {
let mut store = self.all.lock().await;
let seq = SEQUENCE.load(std::sync::atomic::Ordering::Relaxed);
let lseq = LAST_SEQUENCE.load(std::sync::atomic::Ordering::Relaxed);
if seq <= lseq {
return;
}
while let Some(item) = store.front() {
if item.seq > lseq {
break;
}
store.pop_front().unwrap();
}
}
pub(crate) async fn adjust_lseq(&self, newer: Sequence) -> Sequence {
let lseq = LAST_SEQUENCE.load(std::sync::atomic::Ordering::Relaxed);
if newer > lseq {
LAST_SEQUENCE.store(newer, std::sync::atomic::Ordering::Relaxed);
self.garbage_collect().await;
newer
} else {
lseq
}
}
pub async fn login(&self, id: Id, token: Token) -> Result<Client, StoreErr> {
let clients = self.clients.lock().await;
let potential_client = match clients.get(&id) {
Some(c) => c,
None => return Err(StoreErr::NotRegistered(id)),
};
if potential_client.validate_token(token) {
// HACK: cloning here is bad
Ok(potential_client.clone())
} else {
Err(StoreErr::Unauthenticated(id))
}
}
}
pub async fn data_processing(store: Store) {
let mut iter = 3;
loop {
let item = serde_json::json!({"foo": "bar", "value": iter}).into();
store.add_item(item).await;
iter += 1;
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
}
}