Kontrolliertes Beenden und Aufräumen
Der Code in Listing 21-20 antwortet auf Anfragen asynchron durch die
Verwendung eines Thread-Pools, wie von uns beabsichtigt. Wir erhalten einige
Warnungen über die Felder workers, id und thread, die wir nicht direkt
benutzen, was uns daran erinnert, dass wir nichts aufräumen. Wenn wir die
weniger elegante Methode Strg+c verwenden, um den
Haupt-Thread (main thread) anzuhalten, werden auch alle anderen Threads sofort
gestoppt, selbst wenn sie gerade dabei sind, eine Anfrage zu bedienen.
Als Nächstes werden wir das Trait Drop implementieren, um join für jeden der
Threads im Pool aufzurufen, damit sie die Anfragen, an denen sie arbeiten, vor
dem Schließen beenden können. Dann werden wir einen Weg implementieren, um den
Threads mitzuteilen, dass sie keine neuen Anfragen mehr annehmen und
herunterfahren sollen. Um diesen Code in Aktion zu sehen, werden wir unseren
Server so modifizieren, dass er nur zwei Anfragen annimmt, bevor er seinen
Thread-Pool kontrolliert herunterfährt.
Implementieren des Traits Drop auf ThreadPool
Lass uns damit beginnen, Drop auf unseren Thread-Pool zu implementieren. Wenn
der Pool aufgeräumt wird, sollten wir auf das Ende unseres Threads warten, um
sicherzustellen, dass sie ihre Arbeit beenden. Listing 21-22 zeigt einen ersten
Versuch einer Drop-Implementierung; dieser Code wird noch nicht ganz
funktionieren.
Dateiname: src/lib.rs
#![allow(unused)]
fn main() {
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Threads im Pool.
///
/// # Panics
///
/// Die Funktion `new` bricht ab, wenn die Größe null ist.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Worker {} herunterfahren", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");
job();
}
});
Worker { id, thread }
}
}
}
Listing 21-22: Warten auf das Ende der einzelnen Threads, wenn der Thread-Pool den Gültigkeitsbereich verlässt
Zuerst iterieren wir über alle workers im Thread-Pool. Wir verwenden dafür
&mut, weil self eine veränderbare Referenz ist und wir auch in der Lage sein
müssen, worker zu verändern. Für jeden worker geben wir eine Nachricht aus,
die besagt, dass diese bestimmte worker-Instanz heruntergefahren wird, und
dann rufen wir auf dem Thread dieser worker-Instanz join auf. Wenn der
Aufruf von join fehlschlägt, benutzen wir unwrap, um das Programm
abzubrechen.
Hier ist der Fehler, den wir erhalten, wenn wir diesen Code kompilieren:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
| |
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
--> /rustc/07dca489ac2d933c78d3c5158e3f43be/library/std/src/thread/mod.rs:1649:17
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error
Der Fehler sagt uns, dass wir join nicht aufrufen können, weil wir nur eine
veränderbare Borrow von jedem worker haben und join das Eigentum an seinem
Argument übernimmt. Um dieses Problem zu lösen, müssen wir den Thread thread
aus der Worker-Instanz herausnehmen, damit join den Thread konsumieren kann.
Eine Möglichkeit, dies zu tun, besteht darin, den gleichen Ansatz wie in Listing
18-15 zu verfolgen. Wenn Worker ein Option<thread::JoinHandle<()>> hielte,
könnten wir die Methode take auf Option aufrufen, um den Wert aus der
Variante Some herauszuverschieben und eine Variante None an ihrer Stelle zu
belassen. Mit anderen Worten, ein Worker, der läuft, würde eine Variante
Some in thread haben, und wenn wir einen Worker aufräumen wollten, würden
wir Some durch None ersetzen, sodass der Worker keinen Thread zum Laufen
haben würde.
Das einzige Mal, dass dies der Fall wäre, wäre, wenn man den Worker
aufräumt. Im Gegenzug müssten wir überall, wo wir auf Worker.thread zugreifen,
mit einer Option<thread::JoinHandle<()>> umgehen. Idiomatisch verwendet Rust
Option ziemlich oft, aber wenn du etwas in Option einpackst, von dem du
weißt, dass es immer vorhanden sein wird, ist es eine gute Idee, nach
alternativen Ansätzen zu suchen, die deinen Code sauberer und weniger
fehleranfällig machen.
In diesem Fall gibt es eine bessere Alternative: Die Methode Vec::drain. Sie
akzeptiert einen Bereichsparameter, um anzugeben, welche Elemente aus dem Vec
entfernt werden sollen, und gibt einen Iterator dieser Elemente zurück. Die
Angabe der Bereichssyntax .. entfernt alle Werte aus dem Vec.
Wir müssen also die drop-Implementierung von ThreadPool wie folgt
aktualisieren:
Dateiname: src/lib.rs
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Threads im Pool.
///
/// # Panics
///
/// Die Funktion `new` bricht ab, wenn die Größe null ist.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in self.workers.drain(..) {
println!("Worker {} herunterfahren", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");
job();
}
});
Worker { id, thread }
}
}
Dadurch wird der Compilerfehler behoben, und es sind keine weiteren Änderungen
an unserem Code erforderlich. Beachte, dass drop bei einem Programmabbruch
aufgerufen werden kann; und wenn dann auch unwrap abbricht, eine doppelte
Fehlersituation verursacht werden könnte, was sofort zum Programmende und zum
Abbruch aller laufenden Bereinigungsvorgänge führen würde. Für ein
Beispielprogramm ist dies in Ordnung, für Produktionscode jedoch nicht zu
empfehlen.
Den Threads signalisieren, nicht mehr nach Aufträgen zu lauschen
Mit all den Änderungen, die wir vorgenommen haben, lässt sich unser Code ohne
jede Warnung kompilieren. Aber die schlechte Nachricht ist, dass dieser Code
noch nicht so funktioniert, wie wir es uns wünschen. Der Schlüssel ist die Logik
in den Closures, die von den Threads der Worker-Instanzen ausgeführt werden:
Im Moment rufen wir join auf, aber das wird die Threads nicht herunterfahren,
weil sie sich in einer Endlosschleife auf der Suche nach Aufträgen befinden.
Wenn wir versuchen, unseren ThreadPool mit unserer aktuellen Implementierung
von Drop aufräumen zu lassen, wird der Haupt-Thread für immer blockieren und
auf das Beenden des ersten Threads warten.
Um dieses Problem zu beheben, brauchen wir eine Änderung in der Implementierung
von drop in ThreadPool und dann eine Änderung in der Worker-Schleife.
Zuerst ändern wir die Implementierung von drop in ThreadPool, um den
sender explizit aufzuräumen, bevor wir auf das Ende der Threads warten.
Listing 21-23 zeigt die Änderungen an ThreadPool, um den sender explizit
aufzuräumen. Anders als beim Thread, müssen wir hier eine Option verwenden,
um den sender mit Option::take aus dem ThreadPool herausnehmen zu können.
Dateiname: src/lib.rs
#![allow(unused)]
fn main() {
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
// --abschneiden--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Threads im Pool.
///
/// # Panics
///
/// Die Funktion `new` bricht ab, wenn die Größe null ist.
pub fn new(size: usize) -> ThreadPool {
// --abschneiden--
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Worker {} herunterfahren", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");
job();
});
Worker { id, thread }
}
}
}
Listing 21-23: sender vor dem Warten auf die
Worker-Threads explizit aufräumen
Das Aufräumen von sender schließt den Kanal, was bedeutet, dass keine weiteren
Nachrichten gesendet werden. Wenn das passiert, geben alle Aufrufe von recv,
die die Worker-Instanzen in der Endlosschleife machen, einen Fehler zurück. In
Listing 21-24 ändern wir die Worker-Schleife so, dass die Schleife in diesem
Fall ordnungsgemäß beendet wird, was bedeutet, dass die Threads beendet werden,
wenn die Implementierung von drop in ThreadPool join für sie aufruft.
Dateiname: src/lib.rs
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Threads im Pool.
///
/// # Panics
///
/// Die Funktion `new` bricht ab, wenn die Größe null ist.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Worker {} herunterfahren", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");
job();
}
Err(_) => {
println!("Worker {id} nicht mehr verbunden, wird beendet.");
break;
}
}
}
});
Worker { id, thread }
}
}
Listing 21-24: Explizites Verlassen der Schleife, wenn
recv einen Fehler zurückgibt
Um diesen Code in Aktion zu sehen, modifizieren wir main so, dass nur zwei
Anfragen akzeptiert werden, bevor der Server kontrolliert heruntergefahren
wird, wie in Listing 21-25 gezeigt.
Dateiname: src/main.rs
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Fahre herunter.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-25: Herunterfahren des Servers, nachdem er zwei Anfragen bearbeitet hat, durch Verlassen der Schleife
Du würdest nicht wollen, dass ein Webserver aus der realen Welt heruntergefahren wird, nachdem er nur zwei Anfragen bearbeitet hat. Dieser Code zeigt nur, dass das kontrollierte Herunterfahren und Aufräumen funktioniert.
Die Methode take ist im Trait Iterator definiert und beschränkt die
Iteration auf die ersten beiden Elemente. Der ThreadPool wird am Ende von
main den Gültigkeitsbereich verlassen und die drop-Implementierung
ausgeführt werden.
Starte den Server mit cargo run und stelle drei Anfragen. Die dritte Anfrage
sollte fehlerhaft sein und in deinem Terminal solltest du eine ähnliche Ausgabe
wie diese sehen:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 1.0s
Running `target/debug/hello`
Worker 0 hat einen Auftrag erhalten; führe ihn aus.
Fahre herunter.
Worker 0 herunterfahren
Worker 3 hat einen Auftrag erhalten; führe ihn aus.
Worker 1 nicht mehr verbunden, wird beendet.
Worker 2 nicht mehr verbunden, wird beendet.
Worker 3 nicht mehr verbunden, wird beendet.
Worker 0 nicht mehr verbunden, wird beendet.
Worker 1 herunterfahren
Worker 2 herunterfahren
Worker 3 herunterfahren
Möglicherweise siehst du eine andere Reihenfolge der Worker-IDs und der
ausgegebenen Nachrichten. Wir können anhand der Nachrichten sehen, wie dieser
Code funktioniert: Die Worker 0 und 3 haben die ersten beiden Anfragen
erhalten. Der Server hat nach der zweiten Verbindung aufgehört, Verbindungen
anzunehmen, und die Drop-Implementierung auf ThreadPool beginnt mit der
Ausführung, bevor Worker 3 überhaupt seine Arbeit beginnt. Wenn man den
sender aufräumt, werden alle Worker-Instanzen getrennt und angewiesen, sich
zu beenden. Die Worker-Instanzen geben jeweils eine Nachricht aus, wenn sie
die Verbindung trennen, und dann ruft der Thread-Pool join auf, um das Ende
jedes Worker-Threads zu warten.
Beachte einen interessanten Aspekt in diesem speziellen Programmlauf: Der
ThreadPool hat den sender aufgeräumt, und bevor ein Worker einen Fehler
erhalten hat, haben wir versucht, auf Worker 0 zu warten. Worker 0 hatte
noch keinen Fehler von recv erhalten, also blockierte der Haupt-Thread und
wartete darauf, dass Worker 0 fertig wird. In der Zwischenzeit erhielt Worker 3 einen Auftrag, und dann erhielten alle Threads einen Fehler. Als Worker 0
fertig war, wartete der Haupt-Thread darauf, dass die restlichen
Worker-Instanzen fertig wurden. Zu diesem Zeitpunkt hatten sie alle ihre
Schleifen verlassen und konnten sich beenden.
Herzlichen Glückwunsch! Wir haben jetzt unser Projekt abgeschlossen; wir haben einen einfachen Webserver, der einen Thread-Pool verwendet, um asynchron zu antworten. Wir sind in der Lage, den Server kontrolliert herunterzufahren, wodurch alle Threads im Pool aufgeräumt werden.
Hier ist der vollständige Code als Referenz:
Dateiname: src/main.rs
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Fahre herunter.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Dateiname: src/lib.rs
#![allow(unused)]
fn main() {
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Threads im Pool.
///
/// # Panics
///
/// Die Funktion `new` bricht ab, wenn die Größe null ist.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Worker {} herunterfahren", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");
job();
}
Err(_) => {
println!("Worker {id} nicht mehr verbunden, wird beendet.");
break;
}
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
}
Wir könnten hier mehr tun! Wenn du dieses Projekt weiter verbessern willst, findest du hier einige Ideen:
- Füge weitere Dokumentation zu
ThreadPoolund seinen öffentlichen Methoden hinzu. - Füge Tests der Funktionalität der Bibliothek hinzu.
- Ändere Aufrufe von
unwrapin eine robustere Fehlerbehandlung. - Verwende
ThreadPool, um eine andere Aufgabe als das Beantworten von Web-Anfragen durchzuführen. - Suche eine Thread-Pool-Crate auf crates.io und implementiere damit einen ähnlichen Webserver unter Verwendung der Crate. Vergleiche dann dessen API und Robustheit mit dem von uns implementierten Thread-Pool.
Zusammenfassung
Gut gemacht! Du hast es bis ans Ende des Buches geschafft! Wir möchten dir danken, dass du uns auf dieser Tour durch Rust begleitet hast. Du bist nun bereit, deine eigenen Rust-Projekte umzusetzen und bei den Projekten anderer zu helfen. Denke daran, dass es eine gastfreundliche Gemeinschaft von Rust-Entwicklern gibt, die dir bei jeder Herausforderung, denen du auf deiner Rust-Reise begegnest, gerne helfen.