more logs and little docu

This commit is contained in:
Christoph J. Scherr 2024-03-08 18:07:49 +01:00
parent c6dfd44eef
commit 31f251a9aa
Signed by: cscherrNT
GPG Key ID: 8E2B45BC51A27EA7
2 changed files with 43 additions and 4 deletions

View File

@ -1,3 +1,12 @@
//! some task produces data [Items](store::Item), and we want clients to be able to get those.
//!
//! We don't want to keep storing all items, so we keep track of what the lowest item is that a
//! client hasnt received yet. The architecture requires clients to register, so while we're at it
//! we hand them a token too, which they will authenticate to us.
//!
//! This way, we have a distribution api, that clients can use to get any messages they have not
//! yet received.
use libpt::log::{debug, info}; use libpt::log::{debug, info};
mod routes; mod routes;
@ -11,7 +20,6 @@ use client::*;
async fn main() { async fn main() {
libpt::log::Logger::build_mini(Some(libpt::log::Level::DEBUG)).expect("could not init logger"); libpt::log::Logger::build_mini(Some(libpt::log::Level::DEBUG)).expect("could not init logger");
let store = Store::new(); let store = Store::new();
debug!("spawning data_processing task: {store:#?}");
tokio::spawn(data_processing(store.clone())); tokio::spawn(data_processing(store.clone()));
info!("starting webserver"); info!("starting webserver");
warp::serve(routes(store)).run(([127, 0, 0, 1], 3030)).await; warp::serve(routes(store)).run(([127, 0, 0, 1], 3030)).await;

View File

@ -1,4 +1,4 @@
use libpt::log::debug; use libpt::log::{debug, info, warn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
collections::{HashMap, VecDeque}, collections::{HashMap, VecDeque},
@ -9,6 +9,8 @@ use tokio::sync::Mutex;
use crate::{Client, Id, Token}; use crate::{Client, Id, Token};
pub const TOO_MANY_ITEMS: usize = 2048;
pub static SEQUENCE: AtomicUsize = AtomicUsize::new(0); pub static SEQUENCE: AtomicUsize = AtomicUsize::new(0);
pub static LAST_SEQUENCE: AtomicUsize = AtomicUsize::new(0); pub static LAST_SEQUENCE: AtomicUsize = AtomicUsize::new(0);
pub type Sequence = usize; pub type Sequence = usize;
@ -49,14 +51,17 @@ impl Item {
let seq = SEQUENCE.load(std::sync::atomic::Ordering::Relaxed); let seq = SEQUENCE.load(std::sync::atomic::Ordering::Relaxed);
SEQUENCE.store(seq + 1, std::sync::atomic::Ordering::Relaxed); SEQUENCE.store(seq + 1, std::sync::atomic::Ordering::Relaxed);
Self { body: msg, seq } Self {
body: msg,
seq,
}
} }
} }
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
impl Drop for Item { impl Drop for Item {
fn drop(&mut self) { fn drop(&mut self) {
debug!("dropping {:?}", self) debug!("dropping {:?}", self.seq)
} }
} }
@ -83,9 +88,28 @@ impl Store {
pub async fn add_item(&self, item: Item) { pub async fn add_item(&self, item: Item) {
let mut store = self.all.lock().await; let mut store = self.all.lock().await;
if store.len() > TOO_MANY_ITEMS {
warn!(
"Too many items ({}), removing old ones until okay",
store.len()
);
while let Some(_item) = store.front() {
if store.len() < TOO_MANY_ITEMS {
break;
}
store.pop_front();
}
}
store.push_back(item); store.push_back(item);
} }
pub async fn status(&self) {
let clients_len = self.clients.lock().await.len();
let item_len = self.all.lock().await.len();
info!("status: {clients_len} clients; {item_len} items");
}
pub async fn get_items(&self) -> Vec<Item> { pub async fn get_items(&self) -> Vec<Item> {
let store = self.all.lock().await; let store = self.all.lock().await;
store.clone().into_iter().collect() store.clone().into_iter().collect()
@ -109,6 +133,8 @@ impl Store {
} }
store.pop_front().unwrap(); store.pop_front().unwrap();
} }
drop(store); // free the lock
self.status().await;
} }
pub(crate) async fn adjust_lseq(&self, newer: Sequence) -> Sequence { pub(crate) async fn adjust_lseq(&self, newer: Sequence) -> Sequence {
@ -144,6 +170,11 @@ pub async fn data_processing(store: Store) {
loop { loop {
let item = serde_json::json!({"foo": "bar", "value": iter}).into(); let item = serde_json::json!({"foo": "bar", "value": iter}).into();
store.add_item(item).await; store.add_item(item).await;
if iter % 5 == 0 {
store.status().await;
}
iter += 1; iter += 1;
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
} }