From a10bdc90159e351f0855c70e3236b7b785894d40 Mon Sep 17 00:00:00 2001 From: "Christoph J. Scherr" Date: Mon, 15 Jan 2024 18:06:32 +0100 Subject: [PATCH] holy fuck i implemented a threadpool --- members/socker/src/main.rs | 11 ++++-- members/socker/src/pool.rs | 68 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 2 deletions(-) create mode 100644 members/socker/src/pool.rs diff --git a/members/socker/src/main.rs b/members/socker/src/main.rs index 7bad230..bcb3415 100644 --- a/members/socker/src/main.rs +++ b/members/socker/src/main.rs @@ -1,12 +1,17 @@ use std::{ - io::{BufRead, BufReader, Write}, + io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, process::exit, }; +mod pool; +use pool::ThreadPool; + + fn main() { println!("Starting the server"); let target_address = "127.0.0.1:7878"; + let pool = ThreadPool::build(4).unwrap(); let listener = match TcpListener::bind(target_address) { Ok(listener) => listener, Err(err) => { @@ -17,7 +22,9 @@ fn main() { for stream in listener.incoming() { let stream = stream.unwrap(); - handle_connection(stream); + pool.execute(|| { + handle_connection(stream); + }); } } diff --git a/members/socker/src/pool.rs b/members/socker/src/pool.rs new file mode 100644 index 0000000..b56f534 --- /dev/null +++ b/members/socker/src/pool.rs @@ -0,0 +1,68 @@ +use std::{ + sync::{mpsc, Arc, Mutex}, + thread, +}; + +struct WorkerThread { + handle: thread::JoinHandle<()>, + id: usize, +} + +impl WorkerThread { + fn new(id: usize, receiver: Arc>>) -> WorkerThread { + let handle = thread::spawn(move || loop { + let job = receiver.lock().unwrap().recv().unwrap(); + + println!("Worker {id} got a job; executing."); + + job(); + }); + WorkerThread { handle, id } + } +} + +/// Shares tasks over multiple worker threads +pub struct ThreadPool { + /// executes tasks assigned by the instructor + workers: Vec, + /// sends instructions to the workers + sender: mpsc::Sender, +} + +/// Something for the workers to do +type Job = Box; + +impl ThreadPool { + /// Create a new ThreadPool. + /// + /// The size is the number of threads in the pool. + /// + /// # Panics + /// + /// The `new` function will panic if the size is zero. + pub fn build(size: usize) -> Result { + if !(size > 0) { + return Err(format!("cannot build a thread pool with size 0!")); + } + let (sender, receiver) = mpsc::channel(); + let receiver = Arc::new(Mutex::new(receiver)); + let mut workers = Vec::with_capacity(size); + + for id in 0..size { + // create some threads and store them in the vector + workers.push(WorkerThread::new(id, Arc::clone(&receiver))); + } + + Ok(ThreadPool { workers, sender }) + } + + /// schedules the given job for execution + pub fn execute(&self, f: F) + where + F: FnOnce() + Send + 'static, + { + // now it's getting crazy + let job = Box::new(f); + self.sender.send(job).unwrap(); + } +}