Now we’ll implement the Drop trait to call join on each of the threads in the pool so they can finish the requests they’re working on before closing. Then we’ll implement a way to tell the threads they should stop accepting new requests and shut down. To see this code in action, we’ll modify our server to accept only two requests before gracefully shutting down its thread pool.

    Let’s start with implementing Drop on our thread pool. When the pool is dropped, our threads should all join to make sure they finish their work. Listing 20-22 shows a first attempt at a Drop implementation; this code won’t quite work yet.

    Filename: src/lib.rs

    Listing 20-22: Joining each thread when the thread pool goes out of scope

    First, we loop through each of the thread pool workers. We use &mut for this because self is a mutable reference, and we also need to be able to mutate worker. For each worker, we print a message saying that this particular worker is shutting down, and then we call join on that worker’s thread. If the call to join fails, we use unwrap to make Rust panic and go into an ungraceful shutdown.

    Here is the error we get when we compile this code:

    1. $ cargo check
    2. Checking hello v0.1.0 (file:///projects/hello)
    3. error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
    4. --> src/lib.rs:52:13
    5. |
    6. 52 | worker.thread.join().unwrap();
    7. | ^^^^^^^^^^^^^ move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
    8. error: aborting due to previous error
    9. For more information about this error, try `rustc --explain E0507`.
    10. error: could not compile `hello`
    11. To learn more, run the command again with --verbose.

    The error tells us we can’t call join because we only have a mutable borrow of each worker and join takes ownership of its argument. To solve this issue, we need to move the thread out of the Worker instance that owns thread so join can consume the thread. We did this in Listing 17-15: if Worker holds an Option<thread::JoinHandle<()>> instead, we can call the take method on the Option to move the value out of the Some variant and leave a None variant in its place. In other words, a Worker that is running will have a Some variant in thread, and when we want to clean up a Worker, we’ll replace Some with None so the Worker doesn’t have a thread to run.

    So we know we want to update the definition of Worker like this:

    Filename: src/lib.rs

    1. use std::sync::mpsc;
    2. use std::sync::Arc;
    3. use std::sync::Mutex;
    4. use std::thread;
    5. pub struct ThreadPool {
    6. workers: Vec<Worker>,
    7. sender: mpsc::Sender<Job>,
    8. }
    9. type Job = Box<dyn FnOnce() + Send + 'static>;
    10. impl ThreadPool {
    11. /// Create a new ThreadPool.
    12. ///
    13. /// The size is the number of threads in the pool.
    14. ///
    15. /// # Panics
    16. ///
    17. /// The `new` function will panic if the size is zero.
    18. pub fn new(size: usize) -> ThreadPool {
    19. assert!(size > 0);
    20. let (sender, receiver) = mpsc::channel();
    21. let receiver = Arc::new(Mutex::new(receiver));
    22. let mut workers = Vec::with_capacity(size);
    23. for id in 0..size {
    24. workers.push(Worker::new(id, Arc::clone(&receiver)));
    25. }
    26. ThreadPool { workers, sender }
    27. }
    28. pub fn execute<F>(&self, f: F)
    29. where
    30. F: FnOnce() + Send + 'static,
    31. {
    32. let job = Box::new(f);
    33. self.sender.send(job).unwrap();
    34. }
    35. }
    36. impl Drop for ThreadPool {
    37. fn drop(&mut self) {
    38. for worker in &mut self.workers {
    39. println!("Shutting down worker {}", worker.id);
    40. worker.thread.join().unwrap();
    41. }
    42. }
    43. }
    44. struct Worker {
    45. id: usize,
    46. thread: Option<thread::JoinHandle<()>>,
    47. }
    48. impl Worker {
    49. fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
    50. let thread = thread::spawn(move || loop {
    51. let job = receiver.lock().unwrap().recv().unwrap();
    52. println!("Worker {} got a job; executing.", id);
    53. job();
    54. });
    55. Worker { id, thread }
    56. }
    57. }

    Now let’s lean on the compiler to find the other places that need to change. Checking this code, we get two errors:

    1. $ cargo check
    2. Checking hello v0.1.0 (file:///projects/hello)
    3. error[E0599]: no method named `join` found for enum `Option<JoinHandle<()>>` in the current scope
    4. --> src/lib.rs:52:27
    5. |
    6. 52 | worker.thread.join().unwrap();
    7. | ^^^^ method not found in `Option<JoinHandle<()>>`
    8. error[E0308]: mismatched types
    9. --> src/lib.rs:72:22
    10. |
    11. 72 | Worker { id, thread }
    12. | ^^^^^^
    13. | |
    14. | expected enum `Option`, found struct `JoinHandle`
    15. | help: try using a variant of the expected enum: `Some(thread)`
    16. |
    17. = note: expected enum `Option<JoinHandle<()>>`
    18. found struct `JoinHandle<_>`
    19. error: aborting due to 2 previous errors
    20. Some errors have detailed explanations: E0308, E0599.
    21. For more information about an error, try `rustc --explain E0308`.
    22. error: could not compile `hello`
    23. To learn more, run the command again with --verbose.

    Let’s address the second error, which points to the code at the end of Worker::new; we need to wrap the thread value in Some when we create a new Worker. Make the following changes to fix this error:

    Filename: src/lib.rs

    The first error is in our Drop implementation. We mentioned earlier that we intended to call take on the Option value to move thread out of worker. The following changes will do so:

    1. use std::sync::mpsc;
    2. use std::sync::Arc;
    3. use std::sync::Mutex;
    4. use std::thread;
    5. pub struct ThreadPool {
    6. workers: Vec<Worker>,
    7. sender: mpsc::Sender<Job>,
    8. }
    9. type Job = Box<dyn FnOnce() + Send + 'static>;
    10. impl ThreadPool {
    11. /// Create a new ThreadPool.
    12. ///
    13. /// The size is the number of threads in the pool.
    14. ///
    15. /// # Panics
    16. ///
    17. /// The `new` function will panic if the size is zero.
    18. pub fn new(size: usize) -> ThreadPool {
    19. assert!(size > 0);
    20. let (sender, receiver) = mpsc::channel();
    21. let receiver = Arc::new(Mutex::new(receiver));
    22. let mut workers = Vec::with_capacity(size);
    23. for id in 0..size {
    24. workers.push(Worker::new(id, Arc::clone(&receiver)));
    25. }
    26. ThreadPool { workers, sender }
    27. }
    28. pub fn execute<F>(&self, f: F)
    29. where
    30. F: FnOnce() + Send + 'static,
    31. let job = Box::new(f);
    32. self.sender.send(job).unwrap();
    33. }
    34. }
    35. impl Drop for ThreadPool {
    36. fn drop(&mut self) {
    37. for worker in &mut self.workers {
    38. if let Some(thread) = worker.thread.take() {
    39. thread.join().unwrap();
    40. }
    41. }
    42. }
    43. }
    44. struct Worker {
    45. id: usize,
    46. thread: Option<thread::JoinHandle<()>>,
    47. }
    48. impl Worker {
    49. fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
    50. let thread = thread::spawn(move || loop {
    51. let job = receiver.lock().unwrap().recv().unwrap();
    52. println!("Worker {} got a job; executing.", id);
    53. job();
    54. });
    55. Worker {
    56. id,
    57. thread: Some(thread),
    58. }
    59. }
    60. }

    As discussed in Chapter 17, the take method on Option takes the Some variant out and leaves None in its place. We’re using if let to destructure the Some and get the thread; then we call join on the thread. If a worker’s thread is already None, we know that worker has already had its thread cleaned up, so nothing happens in that case.

    With all the changes we’ve made, our code compiles without any warnings. But the bad news is this code doesn’t function the way we want it to yet. The key is the logic in the closures run by the threads of the Worker instances: at the moment, we call join, but that won’t shut down the threads because they loop forever looking for jobs. If we try to drop our ThreadPool with our current implementation of drop, the main thread will block forever waiting for the first thread to finish.

    To fix this problem, we’ll modify the threads so they listen for either a Job to run or a signal that they should stop listening and exit the infinite loop. Instead of Job instances, our channel will send one of these two enum variants.

    Filename: src/lib.rs

    1. use std::sync::mpsc;
    2. use std::sync::Arc;
    3. use std::sync::Mutex;
    4. use std::thread;
    5. pub struct ThreadPool {
    6. workers: Vec<Worker>,
    7. sender: mpsc::Sender<Job>,
    8. }
    9. type Job = Box<dyn FnOnce() + Send + 'static>;
    10. enum Message {
    11. NewJob(Job),
    12. Terminate,
    13. }
    14. impl ThreadPool {
    15. /// Create a new ThreadPool.
    16. ///
    17. /// The size is the number of threads in the pool.
    18. ///
    19. /// # Panics
    20. ///
    21. /// The `new` function will panic if the size is zero.
    22. pub fn new(size: usize) -> ThreadPool {
    23. assert!(size > 0);
    24. let (sender, receiver) = mpsc::channel();
    25. let receiver = Arc::new(Mutex::new(receiver));
    26. let mut workers = Vec::with_capacity(size);
    27. for id in 0..size {
    28. workers.push(Worker::new(id, Arc::clone(&receiver)));
    29. }
    30. ThreadPool { workers, sender }
    31. }
    32. pub fn execute<F>(&self, f: F)
    33. where
    34. F: FnOnce() + Send + 'static,
    35. {
    36. let job = Box::new(f);
    37. self.sender.send(job).unwrap();
    38. }
    39. }
    40. impl Drop for ThreadPool {
    41. fn drop(&mut self) {
    42. for worker in &mut self.workers {
    43. println!("Shutting down worker {}", worker.id);
    44. if let Some(thread) = worker.thread.take() {
    45. thread.join().unwrap();
    46. }
    47. }
    48. }
    49. }
    50. struct Worker {
    51. id: usize,
    52. thread: Option<thread::JoinHandle<()>>,
    53. }
    54. impl Worker {
    55. fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
    56. let thread = thread::spawn(move || loop {
    57. let job = receiver.lock().unwrap().recv().unwrap();
    58. println!("Worker {} got a job; executing.", id);
    59. job();
    60. });
    61. Worker {
    62. id,
    63. thread: Some(thread),
    64. }
    65. }
    66. }

    This Message enum will either be a NewJob variant that holds the Job the thread should run, or it will be a Terminate variant that will cause the thread to exit its loop and stop.

    We need to adjust the channel to use values of type Message rather than type Job, as shown in Listing 20-23.

    Filename: src/lib.rs

    1. use std::sync::mpsc;
    2. use std::sync::Arc;
    3. use std::sync::Mutex;
    4. use std::thread;
    5. pub struct ThreadPool {
    6. workers: Vec<Worker>,
    7. sender: mpsc::Sender<Message>,
    8. }
    9. // --snip--
    10. type Job = Box<dyn FnOnce() + Send + 'static>;
    11. enum Message {
    12. NewJob(Job),
    13. Terminate,
    14. }
    15. impl ThreadPool {
    16. // --snip--
    17. /// Create a new ThreadPool.
    18. ///
    19. /// The size is the number of threads in the pool.
    20. ///
    21. /// # Panics
    22. ///
    23. /// The `new` function will panic if the size is zero.
    24. pub fn new(size: usize) -> ThreadPool {
    25. assert!(size > 0);
    26. let (sender, receiver) = mpsc::channel();
    27. let receiver = Arc::new(Mutex::new(receiver));
    28. let mut workers = Vec::with_capacity(size);
    29. for id in 0..size {
    30. workers.push(Worker::new(id, Arc::clone(&receiver)));
    31. }
    32. ThreadPool { workers, sender }
    33. }
    34. pub fn execute<F>(&self, f: F)
    35. where
    36. F: FnOnce() + Send + 'static,
    37. {
    38. let job = Box::new(f);
    39. self.sender.send(Message::NewJob(job)).unwrap();
    40. }
    41. }
    42. // --snip--
    43. impl Drop for ThreadPool {
    44. fn drop(&mut self) {
    45. for worker in &mut self.workers {
    46. println!("Shutting down worker {}", worker.id);
    47. if let Some(thread) = worker.thread.take() {
    48. thread.join().unwrap();
    49. }
    50. }
    51. }
    52. }
    53. struct Worker {
    54. id: usize,
    55. thread: Option<thread::JoinHandle<()>>,
    56. }
    57. impl Worker {
    58. fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
    59. let thread = thread::spawn(move || loop {
    60. let message = receiver.lock().unwrap().recv().unwrap();
    61. match message {
    62. Message::NewJob(job) => {
    63. println!("Worker {} got a job; executing.", id);
    64. job();
    65. }
    66. Message::Terminate => {
    67. println!("Worker {} was told to terminate.", id);
    68. break;
    69. }
    70. }
    71. });
    72. Worker {
    73. id,
    74. thread: Some(thread),
    75. }
    76. }
    77. }

    Listing 20-23: Sending and receiving Message values and exiting the loop if a Worker receives Message::Terminate

    To incorporate the Message enum, we need to change Job to Message in two places: the definition of ThreadPool and the signature of Worker::new. The execute method of ThreadPool needs to send jobs wrapped in the Message::NewJob variant. Then, in Worker::new where a Message is received from the channel, the job will be processed if the NewJob variant is received, and the thread will break out of the loop if the Terminate variant is received.

    With these changes, the code will compile and continue to function in the same way as it did after Listing 20-20. But we’ll get a warning because we aren’t creating any messages of the Terminate variety. Let’s fix this warning by changing our Drop implementation to look like Listing 20-24.

    Filename: src/lib.rs

    Listing 20-24: Sending Message::Terminate to the workers before calling join on each worker thread

    We’re now iterating over the workers twice: once to send one Terminate message for each worker and once to call join on each worker’s thread. If we tried to send a message and join immediately in the same loop, we couldn’t guarantee that the worker in the current iteration would be the one to get the message from the channel.

    To prevent this scenario, we first put all of our Terminate messages on the channel in one loop; then we join on all the threads in another loop. Each worker will stop receiving requests on the channel once it gets a terminate message. So, we can be sure that if we send the same number of terminate messages as there are workers, each worker will receive a terminate message before join is called on its thread.

    To see this code in action, let’s modify main to accept only two requests before gracefully shutting down the server, as shown in Listing 20-25.

    Filename: src/bin/main.rs

    1. use hello::ThreadPool;
    2. use std::fs;
    3. use std::io::prelude::*;
    4. use std::net::TcpListener;
    5. use std::net::TcpStream;
    6. use std::thread;
    7. use std::time::Duration;
    8. fn main() {
    9. let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    10. let pool = ThreadPool::new(4);
    11. for stream in listener.incoming().take(2) {
    12. let stream = stream.unwrap();
    13. pool.execute(|| {
    14. handle_connection(stream);
    15. });
    16. }
    17. println!("Shutting down.");
    18. }
    19. fn handle_connection(mut stream: TcpStream) {
    20. let mut buffer = [0; 1024];
    21. stream.read(&mut buffer).unwrap();
    22. let get = b"GET / HTTP/1.1\r\n";
    23. let sleep = b"GET /sleep HTTP/1.1\r\n";
    24. let (status_line, filename) = if buffer.starts_with(get) {
    25. ("HTTP/1.1 200 OK", "hello.html")
    26. } else if buffer.starts_with(sleep) {
    27. thread::sleep(Duration::from_secs(5));
    28. ("HTTP/1.1 200 OK", "hello.html")
    29. } else {
    30. ("HTTP/1.1 404 NOT FOUND", "404.html")
    31. };
    32. let contents = fs::read_to_string(filename).unwrap();
    33. let response = format!(
    34. "{}\r\nContent-Length: {}\r\n\r\n{}",
    35. status_line,
    36. contents.len(),
    37. contents
    38. );
    39. stream.write(response.as_bytes()).unwrap();
    40. stream.flush().unwrap();
    41. }

    Listing 20-25: Shut down the server after serving two requests by exiting the loop

    You wouldn’t want a real-world web server to shut down after serving only two requests. This code just demonstrates that the graceful shutdown and cleanup is in working order.

    The take method is defined in the Iterator trait and limits the iteration to the first two items at most. The ThreadPool will go out of scope at the end of main, and the drop implementation will run.

    Start the server with cargo run, and make three requests. The third request should error, and in your terminal you should see output similar to this:

    1. $ cargo run
    2. Compiling hello v0.1.0 (file:///projects/hello)
    3. Finished dev [unoptimized + debuginfo] target(s) in 1.0s
    4. Running `target/debug/main`
    5. Worker 0 got a job; executing.
    6. Worker 3 got a job; executing.
    7. Shutting down.
    8. Sending terminate message to all workers.
    9. Shutting down all workers.
    10. Shutting down worker 0
    11. Worker 1 was told to terminate.
    12. Worker 2 was told to terminate.
    13. Worker 0 was told to terminate.
    14. Worker 3 was told to terminate.
    15. Shutting down worker 1
    16. Shutting down worker 2
    17. Shutting down worker 3

    You might see a different ordering of workers and messages printed. We can see how this code works from the messages: workers 0 and 3 got the first two requests, and then on the third request, the server stopped accepting connections. When the ThreadPool goes out of scope at the end of main, its Drop implementation kicks in, and the pool tells all workers to terminate. The workers each print a message when they see the terminate message, and then the thread pool calls join to shut down each worker thread.

    Notice one interesting aspect of this particular execution: the ThreadPool sent the terminate messages down the channel, and before any worker received the messages, we tried to join worker 0. Worker 0 had not yet received the terminate message, so the main thread blocked waiting for worker 0 to finish. In the meantime, each of the workers received the termination messages. When worker 0 finished, the main thread waited for the rest of the workers to finish. At that point, they had all received the termination message and were able to shut down.

    Congrats! We’ve now completed our project; we have a basic web server that uses a thread pool to respond asynchronously. We’re able to perform a graceful shutdown of the server, which cleans up all the threads in the pool.

    Here’s the full code for reference:

    Filename: src/bin/main.rs

    1. use hello::ThreadPool;
    2. use std::fs;
    3. use std::io::prelude::*;
    4. use std::net::TcpListener;
    5. use std::net::TcpStream;
    6. use std::thread;
    7. use std::time::Duration;
    8. fn main() {
    9. let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    10. let pool = ThreadPool::new(4);
    11. for stream in listener.incoming() {
    12. let stream = stream.unwrap();
    13. pool.execute(|| {
    14. handle_connection(stream);
    15. });
    16. }
    17. println!("Shutting down.");
    18. }
    19. fn handle_connection(mut stream: TcpStream) {
    20. let mut buffer = [0; 1024];
    21. stream.read(&mut buffer).unwrap();
    22. let get = b"GET / HTTP/1.1\r\n";
    23. let sleep = b"GET /sleep HTTP/1.1\r\n";
    24. let (status_line, filename) = if buffer.starts_with(get) {
    25. ("HTTP/1.1 200 OK", "hello.html")
    26. } else if buffer.starts_with(sleep) {
    27. thread::sleep(Duration::from_secs(5));
    28. ("HTTP/1.1 200 OK", "hello.html")
    29. } else {
    30. ("HTTP/1.1 404 NOT FOUND", "404.html")
    31. };
    32. let contents = fs::read_to_string(filename).unwrap();
    33. let response = format!(
    34. "{}\r\nContent-Length: {}\r\n\r\n{}",
    35. status_line,
    36. contents.len(),
    37. contents
    38. );
    39. stream.write(response.as_bytes()).unwrap();
    40. stream.flush().unwrap();
    41. }

    Filename: src/lib.rs

    • Add more documentation to ThreadPool and its public methods.
    • Add tests of the library’s functionality.
    • Change calls to unwrap to more robust error handling.
    • Use to perform some task other than serving web requests.

    Well done! You’ve made it to the end of the book! We want to thank you for joining us on this tour of Rust. You’re now ready to implement your own Rust projects and help with other peoples’ projects. Keep in mind that there is a welcoming community of other Rustaceans who would love to help you with any challenges you encounter on your Rust journey.