From d05d1c307cd0d3571a28375ebdb5f41f7fbb8c95 Mon Sep 17 00:00:00 2001 From: gbrochar Date: Thu, 10 Dec 2020 19:29:26 +0100 Subject: [PATCH] finished The Book --- webserver/src/{ => bin}/main.rs | 18 +++++- webserver/src/lib.rs | 99 +++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 3 deletions(-) rename webserver/src/{ => bin}/main.rs (58%) create mode 100644 webserver/src/lib.rs diff --git a/webserver/src/main.rs b/webserver/src/bin/main.rs similarity index 58% rename from webserver/src/main.rs rename to webserver/src/bin/main.rs index 1da6e9c..f4fc0ba 100644 --- a/webserver/src/main.rs +++ b/webserver/src/bin/main.rs @@ -2,15 +2,23 @@ use std::fs; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; +use std::thread; +use std::time::Duration; +use webserver::ThreadPool; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); + let pool = ThreadPool::new(4); - for stream in listener.incoming() { + for stream in listener.incoming().take(2) { let stream = stream.unwrap(); - handle_connection(stream); + pool.execute(|| { + handle_connection(stream); + }); } + + println!("Shutting down."); } fn handle_connection(mut stream: TcpStream) { @@ -19,9 +27,13 @@ fn handle_connection(mut stream: TcpStream) { stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; + let sleep = b"GET /sleep HTTP/1.1\r\n"; let (status_line, filename) = if buffer.starts_with(get) { - ("HTTP/1.1 200 OK\r\n\r\n", "hello.html") + ("HTTP/1.1 200 OK\r\n\r\n", "index.html") + } else if buffer.starts_with(sleep) { + thread::sleep(Duration::from_secs(5)); + ("HTTP/1.1 200 OK\r\n\r\n", "index.html") } else { ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html") }; diff --git a/webserver/src/lib.rs b/webserver/src/lib.rs new file mode 100644 index 0000000..bfee759 --- /dev/null +++ b/webserver/src/lib.rs @@ -0,0 +1,99 @@ +use std::sync::mpsc; +use std::sync::Arc; +use std::sync::Mutex; +use std::thread; + +type Job = Box; + +enum Message { + NewJob(Job), + Terminate, +} + +pub struct ThreadPool { + workers: Vec, + sender: mpsc::Sender, +} + +impl ThreadPool { + /// Create a new ThreadPool. + /// + /// The size is the number of threads in the pool. + /// + /// # Panics + /// + /// The `new` function will panic if the size is zero. + pub fn new(size: usize) -> ThreadPool { + assert!(size > 0); + + let (sender, receiver) = mpsc::channel(); + let receiver = Arc::new(Mutex::new(receiver)); + let mut workers = Vec::with_capacity(size); + + for id in 0..size { + workers.push(Worker::new(id, Arc::clone(&receiver))); + } + + ThreadPool { workers, sender } + } + + pub fn execute(&self, f: F) + where + F: FnOnce() + Send + 'static, + { + let job = Box::new(f); + + self.sender.send(Message::NewJob(job)).unwrap(); + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + println!("Sending terminate message to all workers."); + + for _ in &self.workers { + self.sender.send(Message::Terminate).unwrap(); + } + + println!("Shutting down all workers."); + + for worker in &mut self.workers { + println!("Shutting down worker {}", worker.id); + + if let Some(thread) = worker.thread.take() { + thread.join().unwrap(); + } + } + } +} + +struct Worker { + id: usize, + thread: Option>, +} + +impl Worker { + fn new(id: usize, receiver: Arc>>) -> Worker { + let thread = thread::spawn(move || loop { + let message = receiver.lock().unwrap().recv().unwrap(); + + match message { + Message::NewJob(job) => { + println!("Worker {} got a job; executing.", id); + + job(); + } + Message::Terminate => { + println!("Worker {} was told to terminate.", id); + + break; + } + } + }); + + Worker { + id, + thread: Some(thread), + } + } +}