Kontrolliertes Beenden und Aufräumen
Der Code in Codeblock 20-20 antwortet auf Anfragen asynchron durch die
Verwendung eines Strang-Vorrats (thread pool), wie von uns beabsichtigt. Wir
erhalten einige Warnungen über die Felder workers
, id
und thread
, die wir
nicht direkt benutzen, was uns daran erinnert, dass wir nichts aufräumen. Wenn
wir die weniger elegante Methode Strg+c verwenden, um den
Hauptstrang (main thread) anzuhalten, werden auch alle anderen Stränge sofort
gestoppt, selbst wenn sie gerade dabei sind, eine Anfrage zu bedienen.
Als Nächstes werden wir das Merkmal (trait) Drop
implementieren, um join
für jeden der Stränge im Vorrat aufzurufen, damit sie die Anfragen, an denen
sie arbeiten, vor dem Schließen beenden können. Dann werden wir einen Weg
implementieren, um den Strängen mitzuteilen, dass sie keine neuen Anfragen mehr
annehmen und herunterfahren sollen. Um diesen Code in Aktion zu sehen, werden
wir unseren Server so modifizieren, dass er nur zwei Anfragen annimmt, bevor er
seinen Strang-Vorrat kontrolliert herunterfährt.
Implementieren des Merkmals Drop
auf ThreadPool
Lass uns damit beginnen, Drop
auf unseren Strang-Vorrat zu implementieren.
Wenn der Vorrat aufgeräumt wird, sollten wir auf das Ende unsere Stränge
warten, um sicherzustellen, dass sie ihre Arbeit beenden. Codeblock 20-22 zeigt
einen ersten Versuch einer Drop
-Implementierung; dieser Code wird noch nicht
ganz funktionieren.
Dateiname: src/lib.rs
#![allow(unused)] fn main() { use std::{ sync::{mpsc, Arc, Mutex}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Erzeuge einen neuen ThreadPool. /// /// Die Größe ist die Anzahl der Stränge im Vorrat. /// /// # Panics /// /// Die Funktion `new` stürzt ab, wenn die Größe Null ist. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Worker {} herunterfahren", worker.id); worker.thread.join().unwrap(); } } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus."); job(); }); Worker { id, thread } } } }
Zuerst iterieren wir über alle workers
im Strang-Vorrat. Wir verwenden dafür
&mut
, weil self
eine veränderbare Referenz ist und wir auch in der Lage
sein müssen, worker
zu verändern. Für jeden worker
geben wir eine Nachricht
aus, die besagt, dass dieser bestimmte worker
heruntergefahren wird, und dann
rufen wir auf dem Strang join
auf. Wenn der Aufruf von join
fehlschlägt,
benutzen wir unwrap
, um das Programm abstürzen zu lassen.
Hier ist der Fehler, den wir erhalten, wenn wir diesen Code kompilieren:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
--> src/lib.rs:52:13
|
52 | worker.thread.join().unwrap();
| ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
| |
| move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
|
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
--> /rustc/07dca489ac2d933c78d3c5158e3f43be/library/std/src/thread/mod.rs:1649:17
For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error
Der Fehler sagt uns, dass wir join
nicht aufrufen können, weil wir nur eine
veränderbare Ausleihe von jedem worker
haben und join
die Eigentümerschaft
für sein Argument übernimmt. Um dieses Problem zu lösen, müssen wir den Strang
thread
aus der Worker
-Instanz herausnehmen, damit join
den Strang
konsumieren kann. Wir haben dies in Codeblock 17-15 getan: Wenn Worker
stattdessen ein Option<Thread::JoinHandle<()>>
hält, können wir die Methode
take
auf Option
aufrufen, um den Wert aus der Variante Some
herauszuverschieben und eine Variante None
an ihrer Stelle zu belassen. Mit
anderen Worten, ein Worker
, der läuft, wird eine Variante Some
in thread
haben, und wenn wir einen Worker
aufräumen wollen, ersetzen wir Some
durch
None
, sodass der Worker
keinen Strang zum Laufen hat.
Wir wissen also, dass wir die Definition von Worker
so aktualisieren wollen:
Dateiname: src/lib.rs
#![allow(unused)] fn main() { use std::{ sync::{mpsc, Arc, Mutex}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Erzeuge einen neuen ThreadPool. /// /// Die Größe ist die Anzahl der Stränge im Vorrat. /// /// # Panics /// /// Die Funktion `new` stürzt ab, wenn die Größe Null ist. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Worker {} herunterfahren", worker.id); worker.thread.join().unwrap(); } } } struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus."); job(); }); Worker { id, thread } } } }
Nun wollen wir uns auf den Compiler stützen, um die anderen Stellen zu finden, die geändert werden müssen. Wenn wir diesen Code überprüfen, erhalten wir zwei Fehler:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `join` found for enum `Option` in the current scope
--> src/lib.rs:52:27
|
52 | worker.thread.join().unwrap();
| ^^^^ method not found in `Option<JoinHandle<()>>`
|
note: the method `join` exists on the type `JoinHandle<()>`
--> /rustc/07dca489ac2d933c78d3c5158e3f43be/library/std/src/thread/mod.rs:1649:5
help: consider using `Option::expect` to unwrap the `JoinHandle<()>` value, panicking if the value is an `Option::None`
|
52 | worker.thread.expect("REASON").join().unwrap();
| +++++++++++++++++
error[E0308]: mismatched types
--> src/lib.rs:72:22
|
72 | Worker { id, thread }
| ^^^^^^ expected `Option<JoinHandle<()>>`, found `JoinHandle<_>`
|
= note: expected enum `Option<JoinHandle<()>>`
found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
|
72 | Worker { id, thread: Some(thread) }
| +++++++++++++ +
Some errors have detailed explanations: E0308, E0599.
For more information about an error, try `rustc --explain E0308`.
error: could not compile `hello` (lib) due to 2 previous errors
Lass uns den zweiten Fehler beheben, der auf den Code am Ende von Worker::new
verweist; wir müssen den Wert thread
in Some
einpacken, wenn wir einen
neuen Worker
erstellen. Nimm die folgenden Änderungen vor, um diesen Fehler
zu beheben:
Dateiname: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Stränge im Vorrat.
///
/// # Panics
///
/// Die Funktion `new` stürzt ab, wenn die Größe Null ist.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Worker {} herunterfahren", worker.id);
worker.thread.join().unwrap();
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --abschneiden--
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");
job();
});
Worker {
id,
thread: Some(thread),
}
}
}
Der erste Fehler liegt in unserer Drop
-Implementierung. Wir haben bereits
erwähnt, dass wir beabsichtigten, take
auf dem Option
-Wert aufzurufen, um
thread
aus worker
heraus zu verschieben. Die folgenden Änderungen werden
dies tun:
Dateiname: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Stränge im Vorrat.
///
/// # Panics
///
/// Die Funktion `new` stürzt ab, wenn die Größe Null ist.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Worker {} herunterfahren", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");
job();
});
Worker {
id,
thread: Some(thread),
}
}
}
Wie in Kapitel 17 besprochen, nimmt die Methode take
auf Option
die
Variante Some
heraus und lässt an ihrer Stelle None
stehen. Wir benutzen
if let
, um die Some
zu destrukturieren und den Strang zu erhalten; dann
rufen wir join
auf dem Strang auf. Wenn der Strang eines Worker
bereits
None
ist, wissen wir, dass der Strang bereits aufgeräumt wurde, also passiert
in diesem Fall nichts.
Den Strängen signalisieren, nicht mehr nach Aufträgen zu lauschen
Mit all den Änderungen, die wir vorgenommen haben, lässt sich unser Code ohne
jede Warnung kompilieren. Aber die schlechte Nachricht ist, dass dieser Code
noch nicht so funktioniert, wie wir es uns wünschen. Der Schlüssel ist die
Logik in den Funktionsabschlüssen, die von den Strängen der Worker
-Instanzen
ausgeführt werden: Im Moment rufen wir join
auf, aber das wird die Stränge
nicht herunterfahren, weil sie sich in einer Endlosschleife auf der Suche nach
Aufträgen befinden. Wenn wir versuchen, unseren ThreadPool
mit unserer
aktuellen Implementierung von Drop
aufräumen zu lassen, wird der Hauptstrang
für immer blockieren und auf das Beenden des ersten Strangs warten.
Um dieses Problem zu beheben, brauchen wir eine Änderung in der Implementierung
von drop
in ThreadPool
und dann eine Änderung in der Worker
-Schleife.
Zuerst ändern wir die Implementierung von drop
in ThreadPool
, um den
Sender
explizit zu aufzuräumen, bevor wir auf das Ende der Stränge warten.
Codeblock 20-23 zeigt die Änderungen an ThreadPool
, um den Absender
explizit aufzuräumen. Wir verwenden die gleiche Option
und take
-Technik wie
beim Strang, um sender
aus dem ThreadPool
herauszuverschieben:
Dateiname: src/lib.rs
#![allow(unused)] fn main() { use std::{ sync::{mpsc, Arc, Mutex}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Message>, } // --abschneiden-- type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Erzeuge einen neuen ThreadPool. /// /// Die Größe ist die Anzahl der Stränge im Vorrat. /// /// # Panics /// /// Die Funktion `new` stürzt ab, wenn die Größe Null ist. pub fn new(size: usize) -> ThreadPool { // --abschneiden-- assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(Message::NewJob(job)).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take()); for worker in &mut self.workers { println!("Worker {} herunterfahren", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } } struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); }); Worker { id, thread: Some(thread), } } } }
Das Aufräumen von sender
schließt den Kanal, was bedeutet, dass keine
weiteren Nachrichten gesendet werden. Wenn das passiert, geben alle Aufrufe
von recv
, die die Worker
in der Endlosschleife machen, einen Fehler zurück.
In Codeblock 20-24 ändern wir die Worker
-Schleife so, dass die Schleife in
diesem Fall ordnungsgemäß beendet wird, was bedeutet, dass die Stränge beendet
werden, wenn die Implementierung von drop
in ThreadPool
join
für sie
aufruft.
Dateiname: src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Stränge im Vorrat.
///
/// # Panics
///
/// Die Funktion `new` stürzt ab, wenn die Größe Null ist.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(Message::NewJob(job)).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Worker {} herunterfahren", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
match receiver.lock().unwrap().recv() {
Ok(job) => {
println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");
job();
}
Err(_) => {
println!("Worker {id} nicht mehr verbunden, wird beendet.");
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
Um diesen Code in Aktion zu sehen, modifizieren wir main
so, dass nur zwei
Anfragen akzeptiert werden, bevor der Server kontrolliert heruntergefahren
wird, wie in Codeblock 20-25 gezeigt.
Dateiname: src/main.rs
use hello::ThreadPool;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Fahre herunter.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Du würdest nicht wollen, dass ein Webserver aus der realen Welt heruntergefahren wird, nachdem er nur zwei Anfragen bearbeitet hat. Dieser Code zeigt nur, dass das kontrollierte Herunterfahren und Aufräumen funktioniert.
Die Methode take
ist im Merkmal Iterator
definiert und beschränkt die
Iteration auf die ersten beiden Elemente. Der ThreadPool
wird am Ende von
main
den Gültigkeitsbereich verlassen und die drop
-Implementierung
ausgeführt werden.
Starte den Server mit cargo run
und stelle drei Anfragen. Die dritte Anfrage
sollte fehlerhaft sein und in deinem Terminal solltest du eine ähnliche Ausgabe
wie diese sehen:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 1.0s
Running `target/debug/hello`
Worker 0 hat einen Auftrag erhalten; führe ihn aus.
Fahre herunter.
Worker 0 herunterfahren
Worker 3 hat einen Auftrag erhalten; führe ihn aus.
Worker 1 nicht mehr verbunden, wird beendet.
Worker 2 nicht mehr verbunden, wird beendet.
Worker 3 nicht mehr verbunden, wird beendet.
Worker 0 nicht mehr verbunden, wird beendet.
Worker 1 herunterfahren
Worker 2 herunterfahren
Worker 3 herunterfahren
Möglicherweise siehst du eine andere Reihenfolge der Worker
und der
ausgegebenen Nachrichten. Wir können anhand der Nachrichten sehen, wie dieser
Code funktioniert: Die Worker
0 und 3 haben die ersten beiden Anfragen
erhalten. Der Server hat nach der zweiten Verbindung aufgehört, Verbindungen
anzunehmen, und die Drop
-Implementierung auf ThreadPool
beginnt mit der
Ausführung, bevor Worker
3 überhaupt seinen Job beginnt. Wenn man den
sender
aufräumt, werden alle Worker
getrennt und angewiesen, sich zu
beenden. Die Worker
geben jeweils eine Nachricht aus, wenn sie die Verbindung
trennen, und dann ruft der Strang-Vorrat join
auf, um das Ende jedes
Worker
-Strangs zu warten.
Beachte einen interessanten Aspekt diesem speziellen Programmlauf: Der
ThreadPool
hat den sender
aufgeräumt, und bevor ein Worker
einen Fehler
erhalten hat, haben wir versucht, auf Worker
0 zu warten. Worker
0 hatte
noch keinen Fehler von recv
erhalten, also blockierte der Hauptstrang und
wartete darauf, dass Worker
0 fertig wird. In der Zwischenzeit erhielt
Worker
3 einen Auftrag, und dann erhielten alle Stränge einen Fehler. Als
Worker
0 fertig war, wartete der Hauptstrang darauf, dass die restlichen
Worker
fertig wurden. Zu diesem Zeitpunkt hatten sie alle ihre Schleifen
verlassen und konnten sich beenden.
Herzlichen Glückwunsch! Wir haben jetzt unser Projekt abgeschlossen; wir haben einen einfachen Webserver, der einen Strang-Vorrat verwendet, um asynchron zu antworten. Wir sind in der Lage, den Server kontrolliert herunterzufahren, wodurch alle Stränge im Vorrat aufgeräumt werden.
Hier ist der vollständige Code als Referenz:
Dateiname: src/main.rs
use hello::ThreadPool;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Fahre herunter.");
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Dateiname: src/lib.rs
#![allow(unused)] fn main() { use std::{ sync::{mpsc, Arc, Mutex}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Message>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Erzeuge einen neuen ThreadPool. /// /// Die Größe ist die Anzahl der Stränge im Vorrat. /// /// # Panics /// /// Die Funktion `new` stürzt ab, wenn die Größe Null ist. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(Message::NewJob(job)).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take()); for worker in &mut self.workers { println!("Worker {} herunterfahren", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } } struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker { let thread = thread::spawn(move || loop { let message = receiver.lock().unwrap().recv().unwrap(); match message { Ok(job) => { println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus."); job(); } Err(_) => { println!("Worker {id} nicht mehr verbunden, wird beendet."); break; } } }); Worker { id, thread: Some(thread), } } } }
Wir könnten hier mehr tun! Wenn du dieses Projekt weiter verbessern willst, findest du hier einige Ideen:
- Füge weitere Dokumentation zu
ThreadPool
und seinen öffentlichen Methoden hinzu. - Füge Tests der Funktionalität der Bibliothek hinzu.
- Ändere Aufrufe von
unwrap
in eine robustere Fehlerbehandlung. - Verwende
ThreadPool
, um eine andere Aufgabe als das Beantworten von Web-Anfragen durchzuführen. - Suche eine Strang-Vorrats-Kiste auf crates.io und implementiere damit einen ähnlichen Webserver unter Verwendung der Kiste. Vergleiche dann dessen API und Robustheit mit dem von uns implementierten Strang-Vorrat.
Zusammenfassung
Gut gemacht! Du hast es bis ans Ende des Buchs geschafft! Wir möchten dir danken, dass du uns auf dieser Tour durch Rust begleitet hast. Du bist nun bereit, deine eigenen Rust-Projekte umzusetzen und bei den Projekten anderer zu helfen. Denke daran, dass es eine gastfreundliche Gemeinschaft von anderen Rust-Entwicklern gibt, die dir bei allen Herausforderungen, denen du auf deiner Rust-Reise begegnest, gerne helfen würden.