Von einem single-threaded zu einem multi-threaded Webserver
Im Moment verarbeitet der Server jede Anfrage der Reihe nach, d.h. er wird erst dann eine zweite Verbindung verarbeiten, wenn die erste Verbindung abgeschlossen ist. Würde der Server mehr und mehr Anfragen erhalten, wäre diese serielle Ausführung immer weniger optimal. Wenn der Server eine Anfrage erhält, deren Bearbeitung sehr lange dauert, müssen nachfolgende Anfragen warten, bis die lange dauernde Anfrage beendet ist, auch wenn die neuen Anfragen schnell bearbeitet werden können. Das müssen wir beheben, aber zuerst werden wir uns das Problem in Aktion ansehen.
Simulieren einer langsamen Anfrage
Wir werden untersuchen, wie sich eine Anfrage mit langsamer Verarbeitung auf andere Anfragen an unsere aktuelle Server-Implementierung auswirken kann. Listing 21-10 implementiert die Behandlung einer Anfrage an /sleep mit einer simulierten langsamen Antwort, die den Server veranlasst, fünf Sekunden lang zu schlafen, bevor er antwortet.
Dateiname: src/main.rs
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
// --abschneiden--
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
// --abschneiden--
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
// --abschneiden--
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-10: Simulieren einer langsamen Anfrage durch Schlafen von fünf Sekunden
Wir haben von if zu match gewechselt, da wir nun drei Fälle haben. Wir
müssen explizit auf ein Stück von request_line abgleichen, um Pattern Matching
mit den String-Literalwerten durchzuführen; match führt keine automatische
Referenzierung und Dereferenzierung durch, wie es die Gleichheitsmethode tut.
Der erste Zweig ist der gleiche wie der if-Block aus Listing 21-9. Der
zweite Zweig entspricht einer Anfrage an /sleep. Wenn diese Anfrage empfangen
wird, schläft der Server für fünf Sekunden, bevor er die erfolgreiche
HTML-Seite rendert. Der dritte Zweig entspricht dem else-Block aus Listing
21-9.
Du kannst sehen, wie primitiv unser Server ist: Echte Bibliotheken würden das Erkennen mehrerer Anfragen viel weniger wortreich handhaben!
Starte den Server mit cargo run. Öffne dann zwei Browser-Fenster: Eines für
http://127.0.0.1:7878 und das andere für http://127.0.0.1:7878/sleep. Wenn
du die URI / wie bisher ein paar Mal eingibst, wirst du sehen, dass er
schnell reagiert. Aber wenn du /sleep eingibst und dann / lädst, wirst du
sehen, dass / wartet, bis sleep für volle 5 Sekunden geschlafen hat, bevor
es geladen wird.
Es gibt mehrere Techniken, um zu vermeiden, dass sich Anfragen hinter einer langsamen Anfrage stauen; diejenige, die wir implementieren werden, ist ein Thread-Pool.
Verbessern des Durchsatzes mit einem Thread-Pool
Ein Thread-Pool ist eine Gruppe von erzeugten Threads, die bereit sind und warten, eine Aufgabe zu bearbeiten. Wenn das Programm eine neue Aufgabe erhält, ordnet es einen der Threads im Pool der Aufgabe zu, und dieser Thread wird die Aufgabe bearbeiten. Die verbleibenden Threads im Pool stehen für alle anderen Aufgaben zur Verfügung, die während der Verarbeitung des ersten Threads hereinkommen. Wenn der erste Thread mit der Verarbeitung seiner Aufgabe fertig ist, kehrt er in den Pool der unbeschäftigten Threads zurück und ist bereit, eine neue Aufgabe zu bearbeiten. Ein Thread-Pool ermöglicht es dir, Verbindungen gleichzeitig zu verarbeiten und so den Durchsatz deines Servers zu erhöhen.
Wir beschränken die Anzahl der Threads im Pool auf eine kleine Anzahl, um uns vor Dienstverweigerungsangriffen (Denial-of-Service, kurz DoS) zu schützen; wenn unser Programm für jede eingehende Anfrage einen neuen Thread erstellen würde, könnte jemand, der 10 Millionen Anfragen an unseren Server stellt, ein Chaos anrichten, indem er alle Ressourcen unseres Servers aufbraucht und die Bearbeitung der Anfragen zum Erliegen bringt.
Anstatt unbegrenzt viele Threads zu erzeugen, werden wir eine feste Anzahl von
Threads im Pool warten lassen. Wenn Anfragen eingehen, werden sie zur
Verarbeitung an den Pool geschickt. Der Pool verwaltet eine Warteschlange für
eingehende Anfragen. Jeder der Threads im Pool wird eine Anfrage aus dieser
Warteschlange holen, die Anfrage bearbeiten und dann die nächste Anfrage aus der
Warteschlange holen. Mit diesem Design können wir bis zu N Anfragen
gleichzeitig bearbeiten, wobei N die Anzahl der Threads ist. Wenn jeder
Thread auf eine lang laufende Anfrage antwortet, können sich nachfolgende
Anfragen immer noch in der Warteschlange rückstauen, aber wir haben die Anzahl
der lang laufenden Anfragen erhöht, die wir bearbeiten können, bevor wir diesen
Punkt erreichen.
Diese Technik ist nur eine von vielen Möglichkeiten, den Durchsatz eines Webservers zu verbessern. Weitere Optionen, die du untersuchen könntest, sind das Fork/Join-Modell, das asynchrone E/A-Modell mit einem Thread und das asynchrone E/A-Modell mit mehreren Threads. Wenn du an diesem Thema interessiert bist, kannst du mehr über andere Lösungen lesen und versuchen, sie in Rust zu implementieren; mit einer systemnahen Sprache wie Rust sind alle diese Optionen möglich.
Bevor wir mit der Implementierung eines Thread-Pools beginnen, lass uns darüber sprechen, wie die Verwendung des Pools aussehen sollte. Wenn du versuchst, Code zu entwerfen, kann das Schreiben der Client-Benutzeroberfläche beim Entwurf helfen. Schreibe die API des Codes so, dass sie so strukturiert ist, wie du sie aufrufen möchtest; implementiere dann die Funktionalität innerhalb dieser Struktur, anstatt zuerst die Funktionalität zu implementieren und danach die öffentliche API zu entwerfen.
Ähnlich wie wir die testgetriebene Entwicklung im Projekt in Kapitel 12 angewendet haben, werden wir hier die compilergetriebene Entwicklung verwenden. Wir werden den Code schreiben, der die von uns gewünschten Funktionen aufruft, und dann schauen wir uns Fehler des Compilers an, um zu bestimmen, was wir als Nächstes ändern sollten, damit der Code funktioniert. Bevor wir das tun, werden wir jedoch die Technik erkunden, die wir nicht als Ausgangspunkt verwenden werden.
Für jede Anfrage einen eigenen Thread erstellen
Lass uns zunächst untersuchen, wie unser Code aussehen könnte, wenn er für jede Verbindung einen neuen Thread erstellen würde. Wie bereits erwähnt, ist dies nicht unser endgültiger Plan, da es Probleme mit dem potenziellen Erzeugen einer unbegrenzten Anzahl von Threads gibt, aber es ist ein Ausgangspunkt, um zunächst einen funktionierenden multi-threaded Server zu erhalten. Dann fügen wir den Thread-Pool als Verbesserung hinzu, und es wird einfacher, die beiden Lösungen zu vergleichen.
Listing 21-11 zeigt die Änderungen, die an main vorgenommen werden müssen, um
einen neuen Thread zu erzeugen, der jeden Stream innerhalb der for-Schleife
verarbeitet.
Dateiname: src/main.rs
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
thread::spawn(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-11: Erstellen eines neuen Threads für jeden Stream
Wie du in Kapitel 16 gelernt hast, wird thread::spawn einen neuen Thread
erstellen und dann den Code im Closure im neuen Thread ausführen. Wenn du diesen
Code ausführst und /sleep in deinem Browser lädst, dann / in zwei weiteren
Browser-Tabs, wirst du in der Tat sehen, dass die Anfragen an / nicht auf die
Beendigung von /sleep warten müssen. Aber wie wir bereits erwähnt haben, wird
dies letztendlich das System überfordern, weil du neue Threads ohne jede
Begrenzung erstellen würdest.
Erstellen einer endlichen Anzahl von Threads
Wir möchten, dass unser Thread-Pool in einer ähnlichen, vertrauten Weise
arbeitet, sodass der Wechsel von Threads zu einem Thread-Pool keine großen
Änderungen am Code erfordert, der unsere API verwendet. Listing 21-12 zeigt
die hypothetische Schnittstelle für eine Struktur (struct) ThreadPool, die wir
anstelle von thread::spawn verwenden wollen.
Dateiname: src/main.rs
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Listing 21-12: Unsere ideale
ThreadPool-Schnittstelle
Wir verwenden ThreadPool::new, um einen neuen Thread-Pool mit einer
konfigurierbaren Anzahl von Threads zu erstellen, in diesem Fall vier. In der
for-Schleife hat pool.execute eine ähnliche Schnittstelle wie
thread::spawn, indem es einen Closure entgegennimmt, den der Pool für jeden
Stream ausführen soll. Wir müssen pool.execute implementieren, sodass es den
Closure entgegennimmt und ihn einem Thread im Pool zur Ausführung übergibt.
Dieser Code lässt sich noch nicht kompilieren, aber wir werden es versuchen,
damit der Compiler uns anleiten kann, wie wir das Problem beheben können.
Aufbau von ThreadPool mit compilergetriebener Entwicklung
Nimm die Änderungen in Listing 21-12 an src/main.rs vor und lass uns dann die
Compilerfehler von cargo check verwenden, um unsere Entwicklung
voranzutreiben. Hier ist der erste Fehler, den wir erhalten:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:10:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Großartig! Dieser Fehler sagt uns, dass wir einen Typ oder ein Modul
ThreadPool benötigen, also werden wir jetzt eines bauen. Unsere
ThreadPool-Implementierung wird unabhängig von der Art der Arbeit unseres
Webservers sein. Lass uns also die Crate hello von einer binären Crate auf
eine Bibliotheks-Crate umstellen, um unsere ThreadPool-Implementierung
aufzunehmen. Nachdem wir zu einer Bibliotheks-Crate umgestellt haben, könnten
wir die separate Thread-Pool-Bibliothek auch für alle Arbeiten verwenden, die
wir mit einem Thread-Pool durchführen wollen, nicht nur für die Bedienung von
Webanfragen.
Erstelle eine Datei src/lib.rs, die das Folgende enthält, was die einfachste
Definition einer ThreadPool-Struktur ist, die wir im Moment haben können:
Dateiname: src/lib.rs
pub struct ThreadPool;
Bearbeite dann die Datei main.rs, um ThreadPool in den Gültigkeitsbereich
der Bibliotheks-Crate zu bringen, indem du den folgenden Code am Anfang von
src/main.rs hinzufügst:
Dateiname: src/main.rs
use hello::ThreadPool;
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
Dieser Code wird immer noch nicht funktionieren, aber lass uns ihn noch einmal überprüfen, um den nächsten Fehler zu erhalten, den wir beheben müssen:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/bin/main.rs:11:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Dieser Fehler deutet darauf hin, dass wir als Nächstes eine zugehörige Funktion
namens new für ThreadPool erstellen müssen. Wir wissen auch, dass new
einen Parameter haben muss, der 4 als Argument akzeptieren kann und eine
ThreadPool-Instanz zurückgeben sollte. Lass uns die einfachste Funktion new
implementieren, die diese Eigenschaften haben wird:
Dateiname: src/lib.rs
#![allow(unused)]
fn main() {
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
}
Wir haben usize als Typ des Parameters size gewählt, weil wir wissen, dass
eine negative Anzahl von Threads keinen Sinn macht. Wir wissen auch, dass wir
diese 4 als die Anzahl der Elemente in einer Kollektion von Threads verwenden
werden, wofür der Typ usize gedacht ist, wie im Abschnitt
„Ganzzahl-Typen“ in Kapitel 3 besprochen.
Lass uns den Code noch einmal überprüfen:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
Der Fehler tritt jetzt auf, weil wir keine Methode execute auf ThreadPool
haben. Erinnere dich an Abschnitt „Erstellen einer endlichen Anzahl von
Threads“, wo wir beschlossen haben, dass unser Thread-Pool
eine ähnliche Schnittstelle wie thread::spawn haben sollte. Zusätzlich werden
wir die Funktion execute implementieren, sodass sie den Closure, der ihr
gegeben wird, nimmt und sie einem unbeschäftigten Thread im Pool zur Ausführung
übergibt.
Wir werden die Methode execute auf ThreadPool definieren, um einen Closure
als Parameter zu nehmen. Aus Abschnitt „Verschieben erfasster Werte aus
Closures“ in Kapitel 13 erinnern wir uns, dass wir
Closures als Parameter mit drei verschiedenen Traits nehmen können: Fn,
FnMut und FnOnce. Wir müssen entscheiden, welche Art von Closure wir hier
verwenden. Wir wissen, dass wir am Ende etwas Ähnliches wie die Implementierung
thread::spawn der Standardbibliothek tun werden, sodass wir uns ansehen
können, welche Abgrenzungen die Signatur von thread::spawn in ihrem Parameter
hat. Die Dokumentation zeigt uns Folgendes:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static
Der Parameter vom Typ F ist derjenige, um den es hier geht; der Parameter vom
Typ T bezieht sich auf den Rückgabewert, und darum geht es uns nicht. Wir
können sehen, dass spawn FnOnce als Trait verwendet, das an F gebunden
ist. Das ist wahrscheinlich auch das, was wir wollen, denn wir werden das
Argument, das wir bei execute bekommen, letztendlich an spawn weitergeben.
Wir können weiterhin zuversichtlich sein, dass FnOnce das Trait ist, das wir
verwenden wollen, weil der Thread zum Ausführen einer Anfrage den Closure dieser
Anfrage nur einmal ausführt, was zu Once in FnOnce passt.
Der Parameter vom Typ F hat auch die Trait Bound Send und die Lebensdauer
'static, die in unserer Situation nützlich sind: Wir brauchen Send, um die
Trait Bound von einem Thread zu einem anderen zu übertragen und 'static, weil
wir nicht wissen, wie lange die Ausführung des Threads dauern wird. Lass uns
eine Methode execute auf ThreadPool erstellen, die einen generischen
Parameter vom Typ F mit diesen Abgrenzungen annimmt:
Dateiname: src/lib.rs
#![allow(unused)]
fn main() {
pub struct ThreadPool;
impl ThreadPool {
// --abschneiden--
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
}
Wir verwenden immer noch () nach FnOnce, weil dieses FnOnce einen Closure
darstellt, der keine Parameter benötigt und den Einheitstyp () zurückgibt.
Genau wie bei Funktionsdefinitionen kann der Rückgabetyp in der Signatur
weggelassen werden, aber selbst wenn wir keine Parameter haben, benötigen wir
immer noch die Klammern.
Auch hier handelt es sich um die einfachste Implementierung der Methode
execute: Sie tut nichts, aber wir versuchen nur, unseren Code kompilieren zu
lassen. Lass es uns noch einmal überprüfen:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
Er kompiliert! Aber beachte, dass du, wenn du cargo run versuchst und eine
Anfrage im Browser stellst, die Fehler im Browser sehen wirst, die wir am Anfang
des Kapitels gesehen haben. Unsere Bibliothek ruft den Closure, den wir an
execute übergeben, noch nicht wirklich auf!
Hinweis: Ein Sprichwort, das man möglicherweise über Sprachen mit strengen Compilern wie Haskell und Rust hört, lautet: „Wenn der Code kompiliert, funktioniert er.“ Aber dieses Sprichwort ist nicht universell wahr. Unser Projekt kompiliert, aber es tut absolut nichts! Wenn wir ein echtes, vollständiges Projekt aufbauen würden, wäre dies ein guter Zeitpunkt, mit dem Schreiben von Modultests zu beginnen, um zu überprüfen, ob der Code kompiliert und das von uns gewünschte Verhalten aufweist.
Bedenke Folgendes: Was wäre hier anders, wenn wir statt eines Closures eine Future ausführen würden?
Validieren der Anzahl der Threads in new
Wir tun nichts mit den Parametern von new und execute. Lass uns die Rümpfe
dieser Funktionen mit dem Verhalten implementieren, das wir wollen. Lass uns
zunächst über new nachdenken. Früher wählten wir einen vorzeichenlosen Typ für
den Parameter size, weil ein Pool mit einer negativen Anzahl von Threads
keinen Sinn ergibt. Ein Pool mit null Threads ergibt jedoch auch keinen Sinn,
dennoch ist null ein vollkommen gültiges usize. Wir fügen Code hinzu, um zu
prüfen, ob size größer als null ist, bevor wir eine ThreadPool-Instanz
zurückgeben; wenn size null ist, brechen wir das Programm ab, indem wir das
Makro assert! verwenden, wie in Listing 21-13 gezeigt.
Dateiname: src/lib.rs
#![allow(unused)]
fn main() {
pub struct ThreadPool;
impl ThreadPool {
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Threads im Pool.
///
/// # Panics
///
/// Die Funktion `new` bricht ab, wenn die Größe null ist.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --abschneiden--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
}
Listing 21-13: Die Implementierung von ThreadPool::new
bricht ab, wenn size gleich Null ist
Wir haben auch etwas Dokumentation für unseren ThreadPool mit
Dokumentationskommentaren (doc comments) hinzugefügt. Beachte, dass wir uns an
gute Dokumentationspraktiken gehalten haben, indem wir einen Abschnitt
hinzugefügt haben, der die Situationen aufzeigt, in denen unsere Funktion
abbrechen kann, wie in Kapitel 14 besprochen. Versuche, cargo doc --open
auszuführen und die Struktur ThreadPool anzuklicken, um zu sehen, wie die
generierte Dokumentation für new aussieht!
Anstatt das Makro assert! hinzuzufügen, wie wir es hier getan haben, könnten
wir new zu build ändern und ein Result zurückgeben lassen, wie wir es mit
Config::build im E/A-Projekt in Listing 12-9 getan haben. Aber wir haben in
diesem Fall entschieden, dass der Versuch, einen Thread-Pool ohne Threads zu
erstellen, ein nicht behebbarer Fehler sein sollte. Wenn du ehrgeizig bist,
versuche, eine Funktion namens build mit der folgenden Signatur zu schreiben,
um sie mit der Funktion new zu vergleichen:
pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {
Platz zum Speichern der Threads schaffen
Jetzt, da wir eine Möglichkeit haben, zu wissen, dass wir eine gültige Anzahl
von Threads im Pool haben, können wir diese Threads erstellen und sie in der
Struktur ThreadPool speichern, bevor wir die Struktur zurückgeben. Aber wie
„speichern“ wir einen Thread? Werfen wir noch einmal einen Blick auf die
Signatur von Thread::spawn:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
Die Funktion spawn gibt einen JoinHandle<T> zurück, wobei T der Typ ist,
den der Closure zurückgibt. Lass uns versuchen, auch JoinHandle zu benutzen
und sehen, was passiert. In unserem Fall werden die Closures, die wir an den
Thread-Pool übergeben, die Verbindung behandeln und nichts zurückgeben, also
wird T der Unit-Typ () sein.
Der Code in Listing 21-14 lässt sich kompilieren, erzeugt aber noch keine
Threads. Wir haben die Definition von ThreadPool so geändert, dass sie einen
Vektor von thread::JoinHandle<()>-Instanzen enthält, den Vektor mit der
Kapazität size initialisiert, eine for-Schleife eingerichtet, die etwas Code
zum Erzeugen der Threads ausführt, und eine ThreadPool-Instanz zurückgibt, die
diese enthält.
Dateiname: src/lib.rs
#![allow(unused)]
fn main() {
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --abschneiden--
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Threads im Pool.
///
/// # Panics
///
/// Die Funktion `new` bricht ab, wenn die Größe null ist.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// einige Threads erstellen und im Vektor speichern
}
ThreadPool { threads }
}
// --abschneiden--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
}
Listing 21-14: Erstellen eines Vektors für ThreadPool
zum Aufnehmen der Threads
Wir haben std::thread in der Bibliotheks-Crate in den Gültigkeitsbereich
gebracht, weil wir thread::JoinHandle als den Typ der Elemente im Vektor in
ThreadPool verwenden.
Sobald wir eine gültige Größe erhalten haben, erzeugt unser ThreadPool einen
neuen Vektor, der size Elemente aufnehmen kann. Die Funktion with_capacity
erfüllt die gleiche Aufgabe wie Vec::new, aber mit einem wichtigen
Unterschied: Sie weist dem Vektor Platz im Voraus zu. Da wir wissen, dass wir
size Elemente im Vektor speichern müssen, ist diese Allokation im Voraus
etwas effizienter als die Verwendung von Vec::new, das sich selbst in der
Größe verändert, wenn Elemente eingefügt werden.
Wenn du cargo check erneut ausführst, sollte es erfolgreich sein.
Senden von Code vom ThreadPool an einen Thread
Wir haben einen Kommentar in der for-Schleife in Listing 21-14 bezüglich der
Erstellung von Threads hinterlassen. Hier werden wir uns ansehen, wie wir
tatsächlich Threads erstellen. Die Standardbibliothek bietet thread::spawn als
eine Möglichkeit, Threads zu erstellen, und thread::spawn erwartet, dass es
Code erhält, den der Thread ausführen soll, sobald der Thread erstellt ist. In
unserem Fall wollen wir jedoch die Threads erstellen und sie auf Code warten
lassen, den wir später senden werden. Die Implementierung von Threads in der
Standardbibliothek enthält keine Möglichkeit, dies zu tun; wir müssen sie
manuell implementieren.
Wir werden dieses Verhalten implementieren, indem wir eine neue Datenstruktur
zwischen dem ThreadPool und den Threads, die dieses neue Verhalten verwalten
werden, einführen. Wir nennen diese Datenstruktur Worker, was ein gängiger
Begriff in Pool-Implementierungen ist. Der Worker holt den Code ab, der
ausgeführt werden muss, und führt ihn in seinem Thread aus.
Denke an Menschen, die in der Küche eines Restaurants arbeiten: Die Arbeiter warten, bis Bestellungen von Kunden eingehen, und dann sind sie dafür verantwortlich, diese Bestellungen entgegenzunehmen und auszuführen.
Anstatt einen Vektor von JoinHandle<()>-Instanzen im Thread-Pool zu
speichern, werden wir Instanzen der Worker-Struktur speichern. Jeder Worker
wird eine einzelne JoinHandle<()>-Instanz speichern. Dann werden wir eine
Methode auf Worker implementieren, die einen Closure zur Ausführung benötigt
und ihn zur Ausführung an den bereits laufenden Thread sendet. Wir werden auch
jedem Worker eine id geben, damit wir beim Protokollieren oder Debuggen
zwischen den verschiedenen Worker-Instanzen im Pool unterscheiden können.
Hier ist der neue Prozess, der abläuft, wenn wir einen ThreadPool erstellen.
Wir werden den Code implementieren, der den Closure an den Thread sendet,
nachdem wir Worker auf diese Weise eingerichtet haben:
- Definiere eine Struktur
Worker, die eineidund einenJoinHandle<()>enthält. - Ändere
ThreadPool, um einen Vektor vonWorker-Instanzen zu halten. - Definiere eine Funktion
Worker::new, die eineid-Nummer nimmt und eineWorker-Instanz zurückgibt, die dieidenthält, sowie einen Thread, der mit einem leeren Closure erzeugt wurde. - Verwende in
ThreadPool::newdenfor-Schleifenzähler, um eineidzu erzeugen, erzeuge einen neuenWorkermit dieseridund speichere denWorkerim Vektor.
Wenn du zu einer Herausforderung bereit bist, versuche, diese Änderungen selbst zu implementieren, bevor du dir den Code in Listing 21-15 ansiehst.
Bereit? Hier ist Listing 21-15 mit einer Möglichkeit, die vorhergehenden Änderungen vorzunehmen.
Dateiname: src/lib.rs
#![allow(unused)]
fn main() {
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --abschneiden--
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Threads im Pool.
///
/// # Panics
///
/// Die Funktion `new` bricht ab, wenn die Größe null ist.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --abschneiden--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
}
Listing 21-15: Modifizieren von ThreadPool, um
Worker-Instanzen zu halten, anstatt Threads direkt zu halten
Wir haben den Namen des Feldes in ThreadPool von threads in workers
geändert, weil es jetzt Worker-Instanzen statt JoinHandle<()>-Instanzen
enthält. Wir benutzen den Zähler in der for-Schleife als Argument für
Worker::new und wir speichern jeden neuen Worker im Vektor mit dem Namen
workers.
Externer Code (wie unser Server in src/main.rs) muss die
Implementierungsdetails bezüglich der Verwendung einer Worker-Struktur
innerhalb von ThreadPool nicht kennen, also machen wir die Worker-Struktur
und ihre Funktion new privat. Die Funktion Worker::new verwendet die id,
die wir ihr geben, und speichert eine JoinHandle<()>-Instanz, die durch das
Erzeugen eines neuen Threads unter Verwendung eines leeren Closures erzeugt
wird.
Hinweis: Wenn das Betriebssystem keinen Thread erstellen kann, weil nicht genügend Systemressourcen vorhanden sind, bricht
thread::spawndas Programm ab. Das führt dazu, dass unser gesamter Server abbricht, auch wenn die Erstellung einiger Threads erfolgreich wäre. Der Einfachheit halber lassen wir es bei diesem Verhalten, aber in einer produktionsreifen Thread-Pool-Implementierung würdest du wahrscheinlichstd::thread::Buildermit der Methodespawnverwenden wollen, die stattdessenResultzurückgibt.
Dieser Code kompiliert und speichert die Anzahl der Worker-Instanzen, die wir
als Argument für ThreadPool::new angegeben haben. Aber wir verarbeiten noch
nicht den Closure, den wir in execute erhalten. Schauen wir uns als Nächstes
an, wie wir das machen.
Senden von Anfragen an Threads über Kanäle
Das nächste Problem, das wir angehen, ist, dass die Closures bei thread::spawn
absolut nichts bewirken. Gegenwärtig erhalten wir den Closure, den wir ausführen
wollen, mit der Methode execute. Aber wir müssen thread::spawn einen Closure
geben, der ausgeführt werden soll, wenn wir jeden Worker während der
Erstellung des ThreadPool erstellen.
Wir möchten, dass die Struktur Worker, die wir gerade erstellt haben, um den
Code aus einer Warteschlange im ThreadPool zu holen, diesen Code zur
Ausführung an seinen Thread sendet.
In Kapitel 16 hast du etwas über Kanäle (channels) gelernt – eine
einfache Art der Kommunikation zwischen zwei Threads –, die für diesen
Anwendungsfall perfekt geeignet ist. Wir verwenden einen Kanal, der als
Warteschlange von Aufträgen fungiert, und execute sendet einen Auftrag aus dem
ThreadPool an die Worker-Instanzen, die den Auftrag an ihren Thread senden.
Hier ist der Plan:
- Der
ThreadPoolerstellt einen Kanal und hält den Sender. - Jeder
Workerhält den Empfänger. - Wir werden eine neue Struktur
Joberstellen, die den Closure aufnimmt, den wir über den Kanal senden wollen. - Die Methode
executesendet den Auftrag, der ausgeführt werden soll, durch den Sender. - In seinem Thread wird der
Workerauf den Empfänger warten und die Closures aller Aufträge, die er erhält, ausführen.
Beginnen wir damit, einen Kanal in ThreadPool::new zu erstellen und den Sender
in der ThreadPool-Instanz zu halten, wie in Listing 21-16 gezeigt. Die
Struktur Job enthält vorerst nichts, sie wird aber der Element-Typ sein, den
wir in den Kanal senden.
Dateiname: src/lib.rs
#![allow(unused)]
fn main() {
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --abschneiden--
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Threads im Pool.
///
/// # Panics
///
/// Die Funktion `new` bricht ab, wenn die Größe null ist.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --abschneiden--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
}
Listing 21-16: Ändern von ThreadPool, um den Sender
zu speichern, der Job-Instanzen übermittelt
In ThreadPool::new erstellen wir unseren neuen Kanal und lassen den Pool das
sendende Ende halten. Dies kompiliert erfolgreich.
Lass uns versuchen, einen Empfänger an jeden Worker weiterzugeben, während der
Thread-Pool den Kanal erstellt. Wir wissen, dass wir den Empfänger im Thread
verwenden wollen, den die Worker-Instanzen erzeugen, also werden wir den
Parameter receiver im Closure referenzieren. Der Code in Listing 21-17 lässt
sich noch nicht ganz kompilieren.
Dateiname: src/lib.rs
#![allow(unused)]
fn main() {
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --abschneiden--
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Threads im Pool.
///
/// # Panics
///
/// Die Funktion `new` bricht ab, wenn die Größe null ist.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --abschneiden--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --abschneiden--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
}
Listing 21-17: Übergeben des Empfängers an jeden
Worker
Wir haben einige kleine und unkomplizierte Änderungen vorgenommen: Wir geben den
Empfänger an Worker::new und dann verwenden wir ihn innerhalb des Closures.
Wenn wir versuchen, diesen Code zu überprüfen, erhalten wir diesen Fehler:
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 | for id in 0..size {
| ----------------- inside of this loop
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--> src/lib.rs:47:33
|
47 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
|
25 ~ let mut value = Worker::new(id, receiver);
26 ~ for id in 0..size {
27 ~ workers.push(value);
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error
Der Code versucht, receiver an mehrere Worker-Instanzen weiterzugeben. Das
wird nicht funktionieren, wie du dich aus Kapitel 16 erinnern wirst: Die
Kanalimplementierung, die Rust bietet, erlaubt mehrere Produzenten und einen
einzigen Konsumenten. Das bedeutet, dass wir nicht einfach das konsumierende
Ende des Kanals klonen können, um diesen Code zu reparieren. Selbst wenn wir
das könnten, ist das nicht die Technik, die wir anwenden wollen; stattdessen
wollen wir die Aufträge auf mehrere Threads verteilen, indem wir den einzigen
receiver unter allen Worker-Instanzen aufteilen.
Außerdem erfordert das Entfernen eines Auftrags aus der Warteschlange des Kanals
eine Mutation von receiver, sodass die Threads einen sicheren Weg benötigen,
um receiver gemeinsam zu nutzen und zu modifizieren; andernfalls könnten wir
Race Conditions erhalten (wie in Kapitel 16 behandelt).
Erinnere dich an die Thread-sicheren intelligenten Zeiger, die in Kapitel 16
besprochen wurden: Um das Eigentum über mehrere Threads zu teilen und den
Threads zu erlauben, den Wert zu mutieren, müssen wir Arc<Mutex<T>> verwenden.
Der Typ Arc ermöglicht es mehreren Worker-Instanzen, den Empfänger zu
besitzen, und Mutex stellt sicher, dass immer nur ein Worker zur gleichen
Zeit einen Auftrag vom Empfänger erhält. Listing 21-18 zeigt die Änderungen, die
wir vornehmen müssen.
Dateiname: src/lib.rs
#![allow(unused)]
fn main() {
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
// --abschneiden--
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --abschneiden--
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Threads im Pool.
///
/// # Panics
///
/// Die Funktion `new` bricht ab, wenn die Größe null ist.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --abschneiden--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --abschneiden--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --abschneiden--
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
}
Listing 21-18: Den Empfänger unter den Worker
teilen, die Arc und Mutex benutzen
In ThreadPool::new setzen wir den Empfänger in einen Arc und einen Mutex.
Für jeden neuen Worker klonen wir den Arc, um die Referenzzählung zu
erhöhen, sodass die Worker-Instanzen das Eigentum am Empfänger teilen können.
Mit diesen Änderungen kompiliert der Code! Wir haben es geschafft!
Implementieren der Methode execute
Lass uns endlich die Methode execute auf ThreadPool implementieren. Wir
werden auch Job von einer Struktur in einen Typ-Alias für ein Trait-Objekt
ändern, das den Typ des Closures enthält, den execute erhält. Wie im Abschnitt
„Typ-Synonyme und Typ-Aliase“ in Kapitel 19 besprochen,
ermöglichen uns Typ-Aliase, lange Typen kürzer zu machen, um sie einfacher
nutzen zu können. Siehe Listing 21-19.
Dateiname: src/lib.rs
#![allow(unused)]
fn main() {
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
// --abschneiden--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --abschneiden--
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Threads im Pool.
///
/// # Panics
///
/// Die Funktion `new` bricht ab, wenn die Größe null ist.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --abschneiden--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
}
Listing 21-19: Erstellen eines Alias vom Typ Job für
eine Box, die jeden Closure enthält, und danach Senden des Auftrags in den
Kanal
Nachdem wir eine neue Job-Instanz unter Verwendung des Closures, den wir in
execute erhalten, erstellt haben, senden wir diesen Auftrag an das sendende
Ende des Kanals. Wir rufen unwrap auf send auf für den Fall, dass das Senden
fehlschlägt. Das kann zum Beispiel passieren, wenn wir alle unsere Threads von
der Ausführung abhalten, was bedeutet, dass das empfangende Ende keine neuen
Nachrichten mehr empfängt. Im Moment können wir die Ausführung unserer Threads
nicht stoppen: Unsere Threads werden so lange ausgeführt, wie der Pool
existiert. Der Grund, warum wir unwrap verwenden, ist, dass wir wissen, dass
der Fehlerfall nicht passieren wird, aber der Compiler das nicht weiß.
Aber wir sind noch nicht ganz fertig! Im Worker wird unser Closure an
thread::spawn weitergereicht, der immer noch nur auf das empfangende Ende des
Kanals referenziert. Stattdessen müssen wir den Closure für immer in einer
Schleife laufen lassen, indem wir das empfangende Ende des Kanals um einen
Auftrag bitten und den Auftrag ausführen, wenn er einen bekommt. Lass uns die in
Listing 21-20 gezeigte Änderung in Worker::new vornehmen.
Dateiname: src/lib.rs
#![allow(unused)]
fn main() {
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Threads im Pool.
///
/// # Panics
///
/// Die Funktion `new` bricht ab, wenn die Größe null ist.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --abschneiden--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");
job();
}
});
Worker { id, thread }
}
}
}
Listing 21-20: Empfangen und Ausführen der Aufträge im
Thread des Worker
Hier rufen wir zuerst lock auf receiver auf, um den Mutex zu erwerben, und
dann rufen wir unwrap auf, um das Programm bei eventuellen Fehlern
abzubrechen. Das Akquirieren einer Sperre kann fehlschlagen, wenn sich der Mutex
in einem vergifteten Zustand befindet, was passieren kann, wenn ein anderer
Thread abbricht, während er die Sperre hält, anstatt sie freizugeben. In dieser
Situation ist der Aufruf von unwrap sinnvoll, damit dieser Thread abbricht.
Fühle dich frei, dieses unwrap in ein expect mit einer Fehlermeldung zu
ändern, die für dich von Bedeutung ist.
Wenn wir die Sperre auf dem Mutex erhalten, rufen wir recv auf, um einen
Job vom Kanal zu empfangen. Ein abschließendes unwrap geht auch hier an
eventuellen Fehlern vorbei, die auftreten könnten, wenn sich der Thread, der
den Sender hält, beendet hat, ähnlich wie die Methode send den Wert Err
zurückgibt, wenn der Empfänger abschaltet.
Der Aufruf von recv blockiert, wenn also noch kein Auftrag vorhanden ist,
wartet der aktuelle Thread, bis ein Auftrag verfügbar wird. Der Mutex<T>
stellt sicher, dass immer nur ein Worker-Thread zur gleichen Zeit versucht,
einen Auftrag anzufordern.
Unser Thread-Pool ist jetzt in einem funktionierenden Zustand! Führe cargo run aus und stelle einige Anfragen:
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
--> src/lib.rs:7:5
|
6 | pub struct ThreadPool {
| ---------- field in this struct
7 | workers: Vec<Worker>,
| ^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: fields `id` and `thread` are never read
--> src/lib.rs:48:5
|
47 | struct Worker {
| ------ fields in this struct
48 | id: usize,
| ^^
49 | thread: thread::JoinHandle<()>,
| ^^^^^^
warning: `hello` (lib) generated 2 warnings
Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
Running `target/debug/hello`
Worker 0 hat einen Auftrag erhalten; führe ihn aus.
Worker 2 hat einen Auftrag erhalten; führe ihn aus.
Worker 1 hat einen Auftrag erhalten; führe ihn aus.
Worker 3 hat einen Auftrag erhalten; führe ihn aus.
Worker 0 hat einen Auftrag erhalten; führe ihn aus.
Worker 2 hat einen Auftrag erhalten; führe ihn aus.
Worker 1 hat einen Auftrag erhalten; führe ihn aus.
Worker 3 hat einen Auftrag erhalten; führe ihn aus.
Worker 0 hat einen Auftrag erhalten; führe ihn aus.
Worker 2 hat einen Auftrag erhalten; führe ihn aus.
Erfolg! Wir haben jetzt einen Thread-Pool, der Verbindungen asynchron ausführt. Es werden nie mehr als vier Threads erzeugt, sodass unser System nicht überlastet wird, wenn der Server viele Anfragen erhält. Wenn wir eine Anfrage an /sleep stellen, ist der Server immer noch in der Lage, andere Anfragen zu bedienen, indem er sie von einem anderen Thread ausführen lässt.
Hinweis: Wenn du /sleep in mehreren Browser-Fenstern gleichzeitig öffnest, werden diese möglicherweise in 5-Sekunden-Intervallen nacheinander geladen. Einige Web-Browser führen aus Gründen der Zwischenspeicherung mehrere Instanzen der gleichen Anfrage nacheinander aus. Diese Beschränkung wird nicht durch unseren Webserver verursacht.
Nachdem du die while let-Schleife in den Kapiteln 17 und 19 kennengelernt
hast, fragst du dich vielleicht, warum wir den Code für den Worker-Thread
nicht geschrieben haben, wie in Listing 21-21 gezeigt.
Dateiname: src/lib.rs
#![allow(unused)]
fn main() {
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Erzeuge einen neuen ThreadPool.
///
/// Die Größe ist die Anzahl der Threads im Pool.
///
/// # Panics
///
/// Die Funktion `new` bricht ab, wenn die Größe null ist.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --abschneiden--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");
job();
}
});
Worker { id, thread }
}
}
}
Listing 21-21: Eine alternative Implementierung von
Worker::new unter Verwendung von while let
Dieser Code wird kompiliert und ausgeführt, führt aber nicht zum gewünschten
Thread-Verhalten: Eine langsame Anfrage führt immer noch dazu, dass andere
Anfragen auf ihre Bearbeitung warten. Der Grund dafür ist etwas subtil: Die
Struktur Mutex hat keine öffentliche Methode unlock, weil das Eigentum an
der Sperre auf der Lebensdauer von MutexGuard<T> innerhalb von
LockResult<MutexGuard<T>> basiert, die die Methode lock zurückgibt. Zur
Kompilierzeit kann der Borrow Checker dann die Regel durchsetzen, dass auf eine
von einem Mutex bewachte Ressource nicht zugegriffen werden kann, wenn wir die
Sperre nicht halten. Diese Implementierung kann aber auch dazu führen, dass die
Sperre länger als beabsichtigt gehalten wird, wenn wir nicht sorgfältig über die
Lebensdauer von MutexGuard<T> nachdenken.
Der Code in Listing 21-20, der let job = receiver.lock().unwrap().recv().unwrap(); verwendet, funktioniert, weil mit
let alle temporären Werte, die in dem Ausdruck auf der rechten Seite des
Gleichheitszeichens verwendet werden, sofort verworfen werden, wenn die
let-Anweisung endet. Allerdings gibt while let (und if let und match)
temporäre Werte erst am Ende des zugehörigen Blocks frei. In Listing 21-21
bleibt die Sperre für die Dauer des Aufrufs von job() erhalten, was bedeutet,
dass andere Worker-Instanzen keine Aufträge erhalten können.