From 31f251a9aae5911bd75e08995f713c0b7b0292de Mon Sep 17 00:00:00 2001 From: "Christoph J. Scherr" Date: Fri, 8 Mar 2024 18:07:49 +0100 Subject: [PATCH] more logs and little docu --- members/rest-queued/src/main.rs | 10 ++++++++- members/rest-queued/src/store.rs | 37 +++++++++++++++++++++++++++++--- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/members/rest-queued/src/main.rs b/members/rest-queued/src/main.rs index 7f77a65..1bdfb01 100644 --- a/members/rest-queued/src/main.rs +++ b/members/rest-queued/src/main.rs @@ -1,3 +1,12 @@ +//! some task produces data [Items](store::Item), and we want clients to be able to get those. +//! +//! We don't want to keep storing all items, so we keep track of what the lowest item is that a +//! client hasnt received yet. The architecture requires clients to register, so while we're at it +//! we hand them a token too, which they will authenticate to us. +//! +//! This way, we have a distribution api, that clients can use to get any messages they have not +//! yet received. + use libpt::log::{debug, info}; mod routes; @@ -11,7 +20,6 @@ use client::*; async fn main() { libpt::log::Logger::build_mini(Some(libpt::log::Level::DEBUG)).expect("could not init logger"); let store = Store::new(); - debug!("spawning data_processing task: {store:#?}"); tokio::spawn(data_processing(store.clone())); info!("starting webserver"); warp::serve(routes(store)).run(([127, 0, 0, 1], 3030)).await; diff --git a/members/rest-queued/src/store.rs b/members/rest-queued/src/store.rs index ab39b3b..b561a25 100644 --- a/members/rest-queued/src/store.rs +++ b/members/rest-queued/src/store.rs @@ -1,4 +1,4 @@ -use libpt::log::debug; +use libpt::log::{debug, info, warn}; use serde::{Deserialize, Serialize}; use std::{ collections::{HashMap, VecDeque}, @@ -9,6 +9,8 @@ use tokio::sync::Mutex; use crate::{Client, Id, Token}; +pub const TOO_MANY_ITEMS: usize = 2048; + pub static SEQUENCE: AtomicUsize = AtomicUsize::new(0); pub static LAST_SEQUENCE: AtomicUsize = AtomicUsize::new(0); pub type Sequence = usize; @@ -49,14 +51,17 @@ impl Item { let seq = SEQUENCE.load(std::sync::atomic::Ordering::Relaxed); SEQUENCE.store(seq + 1, std::sync::atomic::Ordering::Relaxed); - Self { body: msg, seq } + Self { + body: msg, + seq, + } } } #[cfg(debug_assertions)] impl Drop for Item { fn drop(&mut self) { - debug!("dropping {:?}", self) + debug!("dropping {:?}", self.seq) } } @@ -83,9 +88,28 @@ impl Store { pub async fn add_item(&self, item: Item) { let mut store = self.all.lock().await; + if store.len() > TOO_MANY_ITEMS { + warn!( + "Too many items ({}), removing old ones until okay", + store.len() + ); + while let Some(_item) = store.front() { + if store.len() < TOO_MANY_ITEMS { + break; + } + store.pop_front(); + } + } store.push_back(item); } + pub async fn status(&self) { + let clients_len = self.clients.lock().await.len(); + let item_len = self.all.lock().await.len(); + + info!("status: {clients_len} clients; {item_len} items"); + } + pub async fn get_items(&self) -> Vec { let store = self.all.lock().await; store.clone().into_iter().collect() @@ -109,6 +133,8 @@ impl Store { } store.pop_front().unwrap(); } + drop(store); // free the lock + self.status().await; } pub(crate) async fn adjust_lseq(&self, newer: Sequence) -> Sequence { @@ -144,6 +170,11 @@ pub async fn data_processing(store: Store) { loop { let item = serde_json::json!({"foo": "bar", "value": iter}).into(); store.add_item(item).await; + + if iter % 5 == 0 { + store.status().await; + } + iter += 1; tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; }