Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

Von einem single-threaded zu einem multi-threaded Webserver

Im Moment verarbeitet der Server jede Anfrage der Reihe nach, d.h. er wird erst dann eine zweite Verbindung verarbeiten, wenn die erste Verbindung abgeschlossen ist. Würde der Server mehr und mehr Anfragen erhalten, wäre diese serielle Ausführung immer weniger optimal. Wenn der Server eine Anfrage erhält, deren Bearbeitung sehr lange dauert, müssen nachfolgende Anfragen warten, bis die lange dauernde Anfrage beendet ist, auch wenn die neuen Anfragen schnell bearbeitet werden können. Das müssen wir beheben, aber zuerst werden wir uns das Problem in Aktion ansehen.

Simulieren einer langsamen Anfrage

Wir werden untersuchen, wie sich eine Anfrage mit langsamer Verarbeitung auf andere Anfragen an unsere aktuelle Server-Implementierung auswirken kann. Listing 21-10 implementiert die Behandlung einer Anfrage an /sleep mit einer simulierten langsamen Antwort, die den Server veranlasst, fünf Sekunden lang zu schlafen, bevor er antwortet.

Dateiname: src/main.rs

use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --abschneiden--

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // --abschneiden--

    let buf_reader = BufReader::new(&mut stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --abschneiden--
    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Listing 21-10: Simulieren einer langsamen Anfrage durch Schlafen von fünf Sekunden

Wir haben von if zu match gewechselt, da wir nun drei Fälle haben. Wir müssen explizit auf ein Stück von request_line abgleichen, um Pattern Matching mit den String-Literalwerten durchzuführen; match führt keine automatische Referenzierung und Dereferenzierung durch, wie es die Gleichheitsmethode tut.

Der erste Zweig ist der gleiche wie der if-Block aus Listing 21-9. Der zweite Zweig entspricht einer Anfrage an /sleep. Wenn diese Anfrage empfangen wird, schläft der Server für fünf Sekunden, bevor er die erfolgreiche HTML-Seite rendert. Der dritte Zweig entspricht dem else-Block aus Listing 21-9.

Du kannst sehen, wie primitiv unser Server ist: Echte Bibliotheken würden das Erkennen mehrerer Anfragen viel weniger wortreich handhaben!

Starte den Server mit cargo run. Öffne dann zwei Browser-Fenster: Eines für http://127.0.0.1:7878 und das andere für http://127.0.0.1:7878/sleep. Wenn du die URI / wie bisher ein paar Mal eingibst, wirst du sehen, dass er schnell reagiert. Aber wenn du /sleep eingibst und dann / lädst, wirst du sehen, dass / wartet, bis sleep für volle 5 Sekunden geschlafen hat, bevor es geladen wird.

Es gibt mehrere Techniken, um zu vermeiden, dass sich Anfragen hinter einer langsamen Anfrage stauen; diejenige, die wir implementieren werden, ist ein Thread-Pool.

Verbessern des Durchsatzes mit einem Thread-Pool

Ein Thread-Pool ist eine Gruppe von erzeugten Threads, die bereit sind und warten, eine Aufgabe zu bearbeiten. Wenn das Programm eine neue Aufgabe erhält, ordnet es einen der Threads im Pool der Aufgabe zu, und dieser Thread wird die Aufgabe bearbeiten. Die verbleibenden Threads im Pool stehen für alle anderen Aufgaben zur Verfügung, die während der Verarbeitung des ersten Threads hereinkommen. Wenn der erste Thread mit der Verarbeitung seiner Aufgabe fertig ist, kehrt er in den Pool der unbeschäftigten Threads zurück und ist bereit, eine neue Aufgabe zu bearbeiten. Ein Thread-Pool ermöglicht es dir, Verbindungen gleichzeitig zu verarbeiten und so den Durchsatz deines Servers zu erhöhen.

Wir beschränken die Anzahl der Threads im Pool auf eine kleine Anzahl, um uns vor Dienstverweigerungsangriffen (Denial-of-Service, kurz DoS) zu schützen; wenn unser Programm für jede eingehende Anfrage einen neuen Thread erstellen würde, könnte jemand, der 10 Millionen Anfragen an unseren Server stellt, ein Chaos anrichten, indem er alle Ressourcen unseres Servers aufbraucht und die Bearbeitung der Anfragen zum Erliegen bringt.

Anstatt unbegrenzt viele Threads zu erzeugen, werden wir eine feste Anzahl von Threads im Pool warten lassen. Wenn Anfragen eingehen, werden sie zur Verarbeitung an den Pool geschickt. Der Pool verwaltet eine Warteschlange für eingehende Anfragen. Jeder der Threads im Pool wird eine Anfrage aus dieser Warteschlange holen, die Anfrage bearbeiten und dann die nächste Anfrage aus der Warteschlange holen. Mit diesem Design können wir bis zu N Anfragen gleichzeitig bearbeiten, wobei N die Anzahl der Threads ist. Wenn jeder Thread auf eine lang laufende Anfrage antwortet, können sich nachfolgende Anfragen immer noch in der Warteschlange rückstauen, aber wir haben die Anzahl der lang laufenden Anfragen erhöht, die wir bearbeiten können, bevor wir diesen Punkt erreichen.

Diese Technik ist nur eine von vielen Möglichkeiten, den Durchsatz eines Webservers zu verbessern. Weitere Optionen, die du untersuchen könntest, sind das Fork/Join-Modell, das asynchrone E/A-Modell mit einem Thread und das asynchrone E/A-Modell mit mehreren Threads. Wenn du an diesem Thema interessiert bist, kannst du mehr über andere Lösungen lesen und versuchen, sie in Rust zu implementieren; mit einer systemnahen Sprache wie Rust sind alle diese Optionen möglich.

Bevor wir mit der Implementierung eines Thread-Pools beginnen, lass uns darüber sprechen, wie die Verwendung des Pools aussehen sollte. Wenn du versuchst, Code zu entwerfen, kann das Schreiben der Client-Benutzeroberfläche beim Entwurf helfen. Schreibe die API des Codes so, dass sie so strukturiert ist, wie du sie aufrufen möchtest; implementiere dann die Funktionalität innerhalb dieser Struktur, anstatt zuerst die Funktionalität zu implementieren und danach die öffentliche API zu entwerfen.

Ähnlich wie wir die testgetriebene Entwicklung im Projekt in Kapitel 12 angewendet haben, werden wir hier die compilergetriebene Entwicklung verwenden. Wir werden den Code schreiben, der die von uns gewünschten Funktionen aufruft, und dann schauen wir uns Fehler des Compilers an, um zu bestimmen, was wir als Nächstes ändern sollten, damit der Code funktioniert. Bevor wir das tun, werden wir jedoch die Technik erkunden, die wir nicht als Ausgangspunkt verwenden werden.

Für jede Anfrage einen eigenen Thread erstellen

Lass uns zunächst untersuchen, wie unser Code aussehen könnte, wenn er für jede Verbindung einen neuen Thread erstellen würde. Wie bereits erwähnt, ist dies nicht unser endgültiger Plan, da es Probleme mit dem potenziellen Erzeugen einer unbegrenzten Anzahl von Threads gibt, aber es ist ein Ausgangspunkt, um zunächst einen funktionierenden multi-threaded Server zu erhalten. Dann fügen wir den Thread-Pool als Verbesserung hinzu, und es wird einfacher, die beiden Lösungen zu vergleichen.

Listing 21-11 zeigt die Änderungen, die an main vorgenommen werden müssen, um einen neuen Thread zu erzeugen, der jeden Stream innerhalb der for-Schleife verarbeitet.

Dateiname: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&mut stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Listing 21-11: Erstellen eines neuen Threads für jeden Stream

Wie du in Kapitel 16 gelernt hast, wird thread::spawn einen neuen Thread erstellen und dann den Code im Closure im neuen Thread ausführen. Wenn du diesen Code ausführst und /sleep in deinem Browser lädst, dann / in zwei weiteren Browser-Tabs, wirst du in der Tat sehen, dass die Anfragen an / nicht auf die Beendigung von /sleep warten müssen. Aber wie wir bereits erwähnt haben, wird dies letztendlich das System überfordern, weil du neue Threads ohne jede Begrenzung erstellen würdest.

Erstellen einer endlichen Anzahl von Threads

Wir möchten, dass unser Thread-Pool in einer ähnlichen, vertrauten Weise arbeitet, sodass der Wechsel von Threads zu einem Thread-Pool keine großen Änderungen am Code erfordert, der unsere API verwendet. Listing 21-12 zeigt die hypothetische Schnittstelle für eine Struktur (struct) ThreadPool, die wir anstelle von thread::spawn verwenden wollen.

Dateiname: src/main.rs

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&mut stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Listing 21-12: Unsere ideale ThreadPool-Schnittstelle

Wir verwenden ThreadPool::new, um einen neuen Thread-Pool mit einer konfigurierbaren Anzahl von Threads zu erstellen, in diesem Fall vier. In der for-Schleife hat pool.execute eine ähnliche Schnittstelle wie thread::spawn, indem es einen Closure entgegennimmt, den der Pool für jeden Stream ausführen soll. Wir müssen pool.execute implementieren, sodass es den Closure entgegennimmt und ihn einem Thread im Pool zur Ausführung übergibt. Dieser Code lässt sich noch nicht kompilieren, aber wir werden es versuchen, damit der Compiler uns anleiten kann, wie wir das Problem beheben können.

Aufbau von ThreadPool mit compilergetriebener Entwicklung

Nimm die Änderungen in Listing 21-12 an src/main.rs vor und lass uns dann die Compilerfehler von cargo check verwenden, um unsere Entwicklung voranzutreiben. Hier ist der erste Fehler, den wir erhalten:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:10:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error

Großartig! Dieser Fehler sagt uns, dass wir einen Typ oder ein Modul ThreadPool benötigen, also werden wir jetzt eines bauen. Unsere ThreadPool-Implementierung wird unabhängig von der Art der Arbeit unseres Webservers sein. Lass uns also die Crate hello von einer binären Crate auf eine Bibliotheks-Crate umstellen, um unsere ThreadPool-Implementierung aufzunehmen. Nachdem wir zu einer Bibliotheks-Crate umgestellt haben, könnten wir die separate Thread-Pool-Bibliothek auch für alle Arbeiten verwenden, die wir mit einem Thread-Pool durchführen wollen, nicht nur für die Bedienung von Webanfragen.

Erstelle eine Datei src/lib.rs, die das Folgende enthält, was die einfachste Definition einer ThreadPool-Struktur ist, die wir im Moment haben können:

Dateiname: src/lib.rs

pub struct ThreadPool;

Bearbeite dann die Datei main.rs, um ThreadPool in den Gültigkeitsbereich der Bibliotheks-Crate zu bringen, indem du den folgenden Code am Anfang von src/main.rs hinzufügst:

Dateiname: src/main.rs

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&mut stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Dieser Code wird immer noch nicht funktionieren, aber lass uns ihn noch einmal überprüfen, um den nächsten Fehler zu erhalten, den wir beheben müssen:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/bin/main.rs:11:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

Dieser Fehler deutet darauf hin, dass wir als Nächstes eine zugehörige Funktion namens new für ThreadPool erstellen müssen. Wir wissen auch, dass new einen Parameter haben muss, der 4 als Argument akzeptieren kann und eine ThreadPool-Instanz zurückgeben sollte. Lass uns die einfachste Funktion new implementieren, die diese Eigenschaften haben wird:

Dateiname: src/lib.rs

#![allow(unused)]
fn main() {
pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}
}

Wir haben usize als Typ des Parameters size gewählt, weil wir wissen, dass eine negative Anzahl von Threads keinen Sinn macht. Wir wissen auch, dass wir diese 4 als die Anzahl der Elemente in einer Kollektion von Threads verwenden werden, wofür der Typ usize gedacht ist, wie im Abschnitt „Ganzzahl-Typen“ in Kapitel 3 besprochen.

Lass uns den Code noch einmal überprüfen:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |         -----^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error

Der Fehler tritt jetzt auf, weil wir keine Methode execute auf ThreadPool haben. Erinnere dich an Abschnitt „Erstellen einer endlichen Anzahl von Threads“, wo wir beschlossen haben, dass unser Thread-Pool eine ähnliche Schnittstelle wie thread::spawn haben sollte. Zusätzlich werden wir die Funktion execute implementieren, sodass sie den Closure, der ihr gegeben wird, nimmt und sie einem unbeschäftigten Thread im Pool zur Ausführung übergibt.

Wir werden die Methode execute auf ThreadPool definieren, um einen Closure als Parameter zu nehmen. Aus Abschnitt „Verschieben erfasster Werte aus Closures“ in Kapitel 13 erinnern wir uns, dass wir Closures als Parameter mit drei verschiedenen Traits nehmen können: Fn, FnMut und FnOnce. Wir müssen entscheiden, welche Art von Closure wir hier verwenden. Wir wissen, dass wir am Ende etwas Ähnliches wie die Implementierung thread::spawn der Standardbibliothek tun werden, sodass wir uns ansehen können, welche Abgrenzungen die Signatur von thread::spawn in ihrem Parameter hat. Die Dokumentation zeigt uns Folgendes:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static

Der Parameter vom Typ F ist derjenige, um den es hier geht; der Parameter vom Typ T bezieht sich auf den Rückgabewert, und darum geht es uns nicht. Wir können sehen, dass spawn FnOnce als Trait verwendet, das an F gebunden ist. Das ist wahrscheinlich auch das, was wir wollen, denn wir werden das Argument, das wir bei execute bekommen, letztendlich an spawn weitergeben. Wir können weiterhin zuversichtlich sein, dass FnOnce das Trait ist, das wir verwenden wollen, weil der Thread zum Ausführen einer Anfrage den Closure dieser Anfrage nur einmal ausführt, was zu Once in FnOnce passt.

Der Parameter vom Typ F hat auch die Trait Bound Send und die Lebensdauer 'static, die in unserer Situation nützlich sind: Wir brauchen Send, um die Trait Bound von einem Thread zu einem anderen zu übertragen und 'static, weil wir nicht wissen, wie lange die Ausführung des Threads dauern wird. Lass uns eine Methode execute auf ThreadPool erstellen, die einen generischen Parameter vom Typ F mit diesen Abgrenzungen annimmt:

Dateiname: src/lib.rs

#![allow(unused)]
fn main() {
pub struct ThreadPool;

impl ThreadPool {
    // --abschneiden--
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
}

Wir verwenden immer noch () nach FnOnce, weil dieses FnOnce einen Closure darstellt, der keine Parameter benötigt und den Einheitstyp () zurückgibt. Genau wie bei Funktionsdefinitionen kann der Rückgabetyp in der Signatur weggelassen werden, aber selbst wenn wir keine Parameter haben, benötigen wir immer noch die Klammern.

Auch hier handelt es sich um die einfachste Implementierung der Methode execute: Sie tut nichts, aber wir versuchen nur, unseren Code kompilieren zu lassen. Lass es uns noch einmal überprüfen:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s

Er kompiliert! Aber beachte, dass du, wenn du cargo run versuchst und eine Anfrage im Browser stellst, die Fehler im Browser sehen wirst, die wir am Anfang des Kapitels gesehen haben. Unsere Bibliothek ruft den Closure, den wir an execute übergeben, noch nicht wirklich auf!

Hinweis: Ein Sprichwort, das man möglicherweise über Sprachen mit strengen Compilern wie Haskell und Rust hört, lautet: „Wenn der Code kompiliert, funktioniert er.“ Aber dieses Sprichwort ist nicht universell wahr. Unser Projekt kompiliert, aber es tut absolut nichts! Wenn wir ein echtes, vollständiges Projekt aufbauen würden, wäre dies ein guter Zeitpunkt, mit dem Schreiben von Modultests zu beginnen, um zu überprüfen, ob der Code kompiliert und das von uns gewünschte Verhalten aufweist.

Bedenke Folgendes: Was wäre hier anders, wenn wir statt eines Closures eine Future ausführen würden?

Validieren der Anzahl der Threads in new

Wir tun nichts mit den Parametern von new und execute. Lass uns die Rümpfe dieser Funktionen mit dem Verhalten implementieren, das wir wollen. Lass uns zunächst über new nachdenken. Früher wählten wir einen vorzeichenlosen Typ für den Parameter size, weil ein Pool mit einer negativen Anzahl von Threads keinen Sinn ergibt. Ein Pool mit null Threads ergibt jedoch auch keinen Sinn, dennoch ist null ein vollkommen gültiges usize. Wir fügen Code hinzu, um zu prüfen, ob size größer als null ist, bevor wir eine ThreadPool-Instanz zurückgeben; wenn size null ist, brechen wir das Programm ab, indem wir das Makro assert! verwenden, wie in Listing 21-13 gezeigt.

Dateiname: src/lib.rs

#![allow(unused)]
fn main() {
pub struct ThreadPool;

impl ThreadPool {
    /// Erzeuge einen neuen ThreadPool.
    ///
    /// Die Größe ist die Anzahl der Threads im Pool.
    ///
    /// # Panics
    ///
    /// Die Funktion `new` bricht ab, wenn die Größe null ist.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --abschneiden--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
}

Listing 21-13: Die Implementierung von ThreadPool::new bricht ab, wenn size gleich Null ist

Wir haben auch etwas Dokumentation für unseren ThreadPool mit Dokumentationskommentaren (doc comments) hinzugefügt. Beachte, dass wir uns an gute Dokumentationspraktiken gehalten haben, indem wir einen Abschnitt hinzugefügt haben, der die Situationen aufzeigt, in denen unsere Funktion abbrechen kann, wie in Kapitel 14 besprochen. Versuche, cargo doc --open auszuführen und die Struktur ThreadPool anzuklicken, um zu sehen, wie die generierte Dokumentation für new aussieht!

Anstatt das Makro assert! hinzuzufügen, wie wir es hier getan haben, könnten wir new zu build ändern und ein Result zurückgeben lassen, wie wir es mit Config::build im E/A-Projekt in Listing 12-9 getan haben. Aber wir haben in diesem Fall entschieden, dass der Versuch, einen Thread-Pool ohne Threads zu erstellen, ein nicht behebbarer Fehler sein sollte. Wenn du ehrgeizig bist, versuche, eine Funktion namens build mit der folgenden Signatur zu schreiben, um sie mit der Funktion new zu vergleichen:

pub fn new(size: usize) -> Result<ThreadPool, PoolCreationError> {

Platz zum Speichern der Threads schaffen

Jetzt, da wir eine Möglichkeit haben, zu wissen, dass wir eine gültige Anzahl von Threads im Pool haben, können wir diese Threads erstellen und sie in der Struktur ThreadPool speichern, bevor wir die Struktur zurückgeben. Aber wie „speichern“ wir einen Thread? Werfen wir noch einmal einen Blick auf die Signatur von Thread::spawn:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

Die Funktion spawn gibt einen JoinHandle<T> zurück, wobei T der Typ ist, den der Closure zurückgibt. Lass uns versuchen, auch JoinHandle zu benutzen und sehen, was passiert. In unserem Fall werden die Closures, die wir an den Thread-Pool übergeben, die Verbindung behandeln und nichts zurückgeben, also wird T der Unit-Typ () sein.

Der Code in Listing 21-14 lässt sich kompilieren, erzeugt aber noch keine Threads. Wir haben die Definition von ThreadPool so geändert, dass sie einen Vektor von thread::JoinHandle<()>-Instanzen enthält, den Vektor mit der Kapazität size initialisiert, eine for-Schleife eingerichtet, die etwas Code zum Erzeugen der Threads ausführt, und eine ThreadPool-Instanz zurückgibt, die diese enthält.

Dateiname: src/lib.rs

#![allow(unused)]
fn main() {
use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --abschneiden--
    /// Erzeuge einen neuen ThreadPool.
    ///
    /// Die Größe ist die Anzahl der Threads im Pool.
    ///
    /// # Panics
    ///
    /// Die Funktion `new` bricht ab, wenn die Größe null ist.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // einige Threads erstellen und im Vektor speichern
        }

        ThreadPool { threads }
    }
    // --abschneiden--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}
}

Listing 21-14: Erstellen eines Vektors für ThreadPool zum Aufnehmen der Threads

Wir haben std::thread in der Bibliotheks-Crate in den Gültigkeitsbereich gebracht, weil wir thread::JoinHandle als den Typ der Elemente im Vektor in ThreadPool verwenden.

Sobald wir eine gültige Größe erhalten haben, erzeugt unser ThreadPool einen neuen Vektor, der size Elemente aufnehmen kann. Die Funktion with_capacity erfüllt die gleiche Aufgabe wie Vec::new, aber mit einem wichtigen Unterschied: Sie weist dem Vektor Platz im Voraus zu. Da wir wissen, dass wir size Elemente im Vektor speichern müssen, ist diese Allokation im Voraus etwas effizienter als die Verwendung von Vec::new, das sich selbst in der Größe verändert, wenn Elemente eingefügt werden.

Wenn du cargo check erneut ausführst, sollte es erfolgreich sein.

Senden von Code vom ThreadPool an einen Thread

Wir haben einen Kommentar in der for-Schleife in Listing 21-14 bezüglich der Erstellung von Threads hinterlassen. Hier werden wir uns ansehen, wie wir tatsächlich Threads erstellen. Die Standardbibliothek bietet thread::spawn als eine Möglichkeit, Threads zu erstellen, und thread::spawn erwartet, dass es Code erhält, den der Thread ausführen soll, sobald der Thread erstellt ist. In unserem Fall wollen wir jedoch die Threads erstellen und sie auf Code warten lassen, den wir später senden werden. Die Implementierung von Threads in der Standardbibliothek enthält keine Möglichkeit, dies zu tun; wir müssen sie manuell implementieren.

Wir werden dieses Verhalten implementieren, indem wir eine neue Datenstruktur zwischen dem ThreadPool und den Threads, die dieses neue Verhalten verwalten werden, einführen. Wir nennen diese Datenstruktur Worker, was ein gängiger Begriff in Pool-Implementierungen ist. Der Worker holt den Code ab, der ausgeführt werden muss, und führt ihn in seinem Thread aus.

Denke an Menschen, die in der Küche eines Restaurants arbeiten: Die Arbeiter warten, bis Bestellungen von Kunden eingehen, und dann sind sie dafür verantwortlich, diese Bestellungen entgegenzunehmen und auszuführen.

Anstatt einen Vektor von JoinHandle<()>-Instanzen im Thread-Pool zu speichern, werden wir Instanzen der Worker-Struktur speichern. Jeder Worker wird eine einzelne JoinHandle<()>-Instanz speichern. Dann werden wir eine Methode auf Worker implementieren, die einen Closure zur Ausführung benötigt und ihn zur Ausführung an den bereits laufenden Thread sendet. Wir werden auch jedem Worker eine id geben, damit wir beim Protokollieren oder Debuggen zwischen den verschiedenen Worker-Instanzen im Pool unterscheiden können.

Hier ist der neue Prozess, der abläuft, wenn wir einen ThreadPool erstellen. Wir werden den Code implementieren, der den Closure an den Thread sendet, nachdem wir Worker auf diese Weise eingerichtet haben:

  1. Definiere eine Struktur Worker, die eine id und einen JoinHandle<()> enthält.
  2. Ändere ThreadPool, um einen Vektor von Worker-Instanzen zu halten.
  3. Definiere eine Funktion Worker::new, die eine id-Nummer nimmt und eine Worker-Instanz zurückgibt, die die id enthält, sowie einen Thread, der mit einem leeren Closure erzeugt wurde.
  4. Verwende in ThreadPool::new den for-Schleifenzähler, um eine id zu erzeugen, erzeuge einen neuen Worker mit dieser id und speichere den Worker im Vektor.

Wenn du zu einer Herausforderung bereit bist, versuche, diese Änderungen selbst zu implementieren, bevor du dir den Code in Listing 21-15 ansiehst.

Bereit? Hier ist Listing 21-15 mit einer Möglichkeit, die vorhergehenden Änderungen vorzunehmen.

Dateiname: src/lib.rs

#![allow(unused)]
fn main() {
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --abschneiden--
    /// Erzeuge einen neuen ThreadPool.
    ///
    /// Die Größe ist die Anzahl der Threads im Pool.
    ///
    /// # Panics
    ///
    /// Die Funktion `new` bricht ab, wenn die Größe null ist.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --abschneiden--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}
}

Listing 21-15: Modifizieren von ThreadPool, um Worker-Instanzen zu halten, anstatt Threads direkt zu halten

Wir haben den Namen des Feldes in ThreadPool von threads in workers geändert, weil es jetzt Worker-Instanzen statt JoinHandle<()>-Instanzen enthält. Wir benutzen den Zähler in der for-Schleife als Argument für Worker::new und wir speichern jeden neuen Worker im Vektor mit dem Namen workers.

Externer Code (wie unser Server in src/main.rs) muss die Implementierungsdetails bezüglich der Verwendung einer Worker-Struktur innerhalb von ThreadPool nicht kennen, also machen wir die Worker-Struktur und ihre Funktion new privat. Die Funktion Worker::new verwendet die id, die wir ihr geben, und speichert eine JoinHandle<()>-Instanz, die durch das Erzeugen eines neuen Threads unter Verwendung eines leeren Closures erzeugt wird.

Hinweis: Wenn das Betriebssystem keinen Thread erstellen kann, weil nicht genügend Systemressourcen vorhanden sind, bricht thread::spawn das Programm ab. Das führt dazu, dass unser gesamter Server abbricht, auch wenn die Erstellung einiger Threads erfolgreich wäre. Der Einfachheit halber lassen wir es bei diesem Verhalten, aber in einer produktionsreifen Thread-Pool-Implementierung würdest du wahrscheinlich std::thread::Builder mit der Methode spawn verwenden wollen, die stattdessen Result zurückgibt.

Dieser Code kompiliert und speichert die Anzahl der Worker-Instanzen, die wir als Argument für ThreadPool::new angegeben haben. Aber wir verarbeiten noch nicht den Closure, den wir in execute erhalten. Schauen wir uns als Nächstes an, wie wir das machen.

Senden von Anfragen an Threads über Kanäle

Das nächste Problem, das wir angehen, ist, dass die Closures bei thread::spawn absolut nichts bewirken. Gegenwärtig erhalten wir den Closure, den wir ausführen wollen, mit der Methode execute. Aber wir müssen thread::spawn einen Closure geben, der ausgeführt werden soll, wenn wir jeden Worker während der Erstellung des ThreadPool erstellen.

Wir möchten, dass die Struktur Worker, die wir gerade erstellt haben, um den Code aus einer Warteschlange im ThreadPool zu holen, diesen Code zur Ausführung an seinen Thread sendet.

In Kapitel 16 hast du etwas über Kanäle (channels) gelernt – eine einfache Art der Kommunikation zwischen zwei Threads –, die für diesen Anwendungsfall perfekt geeignet ist. Wir verwenden einen Kanal, der als Warteschlange von Aufträgen fungiert, und execute sendet einen Auftrag aus dem ThreadPool an die Worker-Instanzen, die den Auftrag an ihren Thread senden. Hier ist der Plan:

  1. Der ThreadPool erstellt einen Kanal und hält den Sender.
  2. Jeder Worker hält den Empfänger.
  3. Wir werden eine neue Struktur Job erstellen, die den Closure aufnimmt, den wir über den Kanal senden wollen.
  4. Die Methode execute sendet den Auftrag, der ausgeführt werden soll, durch den Sender.
  5. In seinem Thread wird der Worker auf den Empfänger warten und die Closures aller Aufträge, die er erhält, ausführen.

Beginnen wir damit, einen Kanal in ThreadPool::new zu erstellen und den Sender in der ThreadPool-Instanz zu halten, wie in Listing 21-16 gezeigt. Die Struktur Job enthält vorerst nichts, sie wird aber der Element-Typ sein, den wir in den Kanal senden.

Dateiname: src/lib.rs

#![allow(unused)]
fn main() {
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --abschneiden--
    /// Erzeuge einen neuen ThreadPool.
    ///
    /// Die Größe ist die Anzahl der Threads im Pool.
    ///
    /// # Panics
    ///
    /// Die Funktion `new` bricht ab, wenn die Größe null ist.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --abschneiden--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}
}

Listing 21-16: Ändern von ThreadPool, um den Sender zu speichern, der Job-Instanzen übermittelt

In ThreadPool::new erstellen wir unseren neuen Kanal und lassen den Pool das sendende Ende halten. Dies kompiliert erfolgreich.

Lass uns versuchen, einen Empfänger an jeden Worker weiterzugeben, während der Thread-Pool den Kanal erstellt. Wir wissen, dass wir den Empfänger im Thread verwenden wollen, den die Worker-Instanzen erzeugen, also werden wir den Parameter receiver im Closure referenzieren. Der Code in Listing 21-17 lässt sich noch nicht ganz kompilieren.

Dateiname: src/lib.rs

#![allow(unused)]
fn main() {
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --abschneiden--
    /// Erzeuge einen neuen ThreadPool.
    ///
    /// Die Größe ist die Anzahl der Threads im Pool.
    ///
    /// # Panics
    ///
    /// Die Funktion `new` bricht ab, wenn die Größe null ist.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --abschneiden--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --abschneiden--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
}

Listing 21-17: Übergeben des Empfängers an jeden Worker

Wir haben einige kleine und unkomplizierte Änderungen vorgenommen: Wir geben den Empfänger an Worker::new und dann verwenden wir ihn innerhalb des Closures.

Wenn wir versuchen, diesen Code zu überprüfen, erhalten wir diesen Fehler:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 |         for id in 0..size {
   |         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
  --> src/lib.rs:47:33
   |
47 |     fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error

Der Code versucht, receiver an mehrere Worker-Instanzen weiterzugeben. Das wird nicht funktionieren, wie du dich aus Kapitel 16 erinnern wirst: Die Kanalimplementierung, die Rust bietet, erlaubt mehrere Produzenten und einen einzigen Konsumenten. Das bedeutet, dass wir nicht einfach das konsumierende Ende des Kanals klonen können, um diesen Code zu reparieren. Selbst wenn wir das könnten, ist das nicht die Technik, die wir anwenden wollen; stattdessen wollen wir die Aufträge auf mehrere Threads verteilen, indem wir den einzigen receiver unter allen Worker-Instanzen aufteilen.

Außerdem erfordert das Entfernen eines Auftrags aus der Warteschlange des Kanals eine Mutation von receiver, sodass die Threads einen sicheren Weg benötigen, um receiver gemeinsam zu nutzen und zu modifizieren; andernfalls könnten wir Race Conditions erhalten (wie in Kapitel 16 behandelt).

Erinnere dich an die Thread-sicheren intelligenten Zeiger, die in Kapitel 16 besprochen wurden: Um das Eigentum über mehrere Threads zu teilen und den Threads zu erlauben, den Wert zu mutieren, müssen wir Arc<Mutex<T>> verwenden. Der Typ Arc ermöglicht es mehreren Worker-Instanzen, den Empfänger zu besitzen, und Mutex stellt sicher, dass immer nur ein Worker zur gleichen Zeit einen Auftrag vom Empfänger erhält. Listing 21-18 zeigt die Änderungen, die wir vornehmen müssen.

Dateiname: src/lib.rs

#![allow(unused)]
fn main() {
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};
// --abschneiden--

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --abschneiden--
    /// Erzeuge einen neuen ThreadPool.
    ///
    /// Die Größe ist die Anzahl der Threads im Pool.
    ///
    /// # Panics
    ///
    /// Die Funktion `new` bricht ab, wenn die Größe null ist.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --abschneiden--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

// --abschneiden--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --abschneiden--
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
}

Listing 21-18: Den Empfänger unter den Worker teilen, die Arc und Mutex benutzen

In ThreadPool::new setzen wir den Empfänger in einen Arc und einen Mutex. Für jeden neuen Worker klonen wir den Arc, um die Referenzzählung zu erhöhen, sodass die Worker-Instanzen das Eigentum am Empfänger teilen können.

Mit diesen Änderungen kompiliert der Code! Wir haben es geschafft!

Implementieren der Methode execute

Lass uns endlich die Methode execute auf ThreadPool implementieren. Wir werden auch Job von einer Struktur in einen Typ-Alias für ein Trait-Objekt ändern, das den Typ des Closures enthält, den execute erhält. Wie im Abschnitt „Typ-Synonyme und Typ-Aliase“ in Kapitel 19 besprochen, ermöglichen uns Typ-Aliase, lange Typen kürzer zu machen, um sie einfacher nutzen zu können. Siehe Listing 21-19.

Dateiname: src/lib.rs

#![allow(unused)]
fn main() {
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

// --abschneiden--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --abschneiden--
    /// Erzeuge einen neuen ThreadPool.
    ///
    /// Die Größe ist die Anzahl der Threads im Pool.
    ///
    /// # Panics
    ///
    /// Die Funktion `new` bricht ab, wenn die Größe null ist.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --abschneiden--

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}
}

Listing 21-19: Erstellen eines Alias vom Typ Job für eine Box, die jeden Closure enthält, und danach Senden des Auftrags in den Kanal

Nachdem wir eine neue Job-Instanz unter Verwendung des Closures, den wir in execute erhalten, erstellt haben, senden wir diesen Auftrag an das sendende Ende des Kanals. Wir rufen unwrap auf send auf für den Fall, dass das Senden fehlschlägt. Das kann zum Beispiel passieren, wenn wir alle unsere Threads von der Ausführung abhalten, was bedeutet, dass das empfangende Ende keine neuen Nachrichten mehr empfängt. Im Moment können wir die Ausführung unserer Threads nicht stoppen: Unsere Threads werden so lange ausgeführt, wie der Pool existiert. Der Grund, warum wir unwrap verwenden, ist, dass wir wissen, dass der Fehlerfall nicht passieren wird, aber der Compiler das nicht weiß.

Aber wir sind noch nicht ganz fertig! Im Worker wird unser Closure an thread::spawn weitergereicht, der immer noch nur auf das empfangende Ende des Kanals referenziert. Stattdessen müssen wir den Closure für immer in einer Schleife laufen lassen, indem wir das empfangende Ende des Kanals um einen Auftrag bitten und den Auftrag ausführen, wenn er einen bekommt. Lass uns die in Listing 21-20 gezeigte Änderung in Worker::new vornehmen.

Dateiname: src/lib.rs

#![allow(unused)]
fn main() {
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Erzeuge einen neuen ThreadPool.
    ///
    /// Die Größe ist die Anzahl der Threads im Pool.
    ///
    /// # Panics
    ///
    /// Die Funktion `new` bricht ab, wenn die Größe null ist.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

// --abschneiden--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");

                job();
            }
        });

        Worker { id, thread }
    }
}
}

Listing 21-20: Empfangen und Ausführen der Aufträge im Thread des Worker

Hier rufen wir zuerst lock auf receiver auf, um den Mutex zu erwerben, und dann rufen wir unwrap auf, um das Programm bei eventuellen Fehlern abzubrechen. Das Akquirieren einer Sperre kann fehlschlagen, wenn sich der Mutex in einem vergifteten Zustand befindet, was passieren kann, wenn ein anderer Thread abbricht, während er die Sperre hält, anstatt sie freizugeben. In dieser Situation ist der Aufruf von unwrap sinnvoll, damit dieser Thread abbricht. Fühle dich frei, dieses unwrap in ein expect mit einer Fehlermeldung zu ändern, die für dich von Bedeutung ist.

Wenn wir die Sperre auf dem Mutex erhalten, rufen wir recv auf, um einen Job vom Kanal zu empfangen. Ein abschließendes unwrap geht auch hier an eventuellen Fehlern vorbei, die auftreten könnten, wenn sich der Thread, der den Sender hält, beendet hat, ähnlich wie die Methode send den Wert Err zurückgibt, wenn der Empfänger abschaltet.

Der Aufruf von recv blockiert, wenn also noch kein Auftrag vorhanden ist, wartet der aktuelle Thread, bis ein Auftrag verfügbar wird. Der Mutex<T> stellt sicher, dass immer nur ein Worker-Thread zur gleichen Zeit versucht, einen Auftrag anzufordern.

Unser Thread-Pool ist jetzt in einem funktionierenden Zustand! Führe cargo run aus und stelle einige Anfragen:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field `workers` is never read
 --> src/lib.rs:7:5
  |
6 | pub struct ThreadPool {
  |            ---------- field in this struct
7 |     workers: Vec<Worker>,
  |     ^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: fields `id` and `thread` are never read
  --> src/lib.rs:48:5
   |
47 | struct Worker {
   |        ------ fields in this struct
48 |     id: usize,
   |     ^^
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^

warning: `hello` (lib) generated 2 warnings
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 4.91s
     Running `target/debug/hello`
Worker 0 hat einen Auftrag erhalten; führe ihn aus.
Worker 2 hat einen Auftrag erhalten; führe ihn aus.
Worker 1 hat einen Auftrag erhalten; führe ihn aus.
Worker 3 hat einen Auftrag erhalten; führe ihn aus.
Worker 0 hat einen Auftrag erhalten; führe ihn aus.
Worker 2 hat einen Auftrag erhalten; führe ihn aus.
Worker 1 hat einen Auftrag erhalten; führe ihn aus.
Worker 3 hat einen Auftrag erhalten; führe ihn aus.
Worker 0 hat einen Auftrag erhalten; führe ihn aus.
Worker 2 hat einen Auftrag erhalten; führe ihn aus.

Erfolg! Wir haben jetzt einen Thread-Pool, der Verbindungen asynchron ausführt. Es werden nie mehr als vier Threads erzeugt, sodass unser System nicht überlastet wird, wenn der Server viele Anfragen erhält. Wenn wir eine Anfrage an /sleep stellen, ist der Server immer noch in der Lage, andere Anfragen zu bedienen, indem er sie von einem anderen Thread ausführen lässt.

Hinweis: Wenn du /sleep in mehreren Browser-Fenstern gleichzeitig öffnest, werden diese möglicherweise in 5-Sekunden-Intervallen nacheinander geladen. Einige Web-Browser führen aus Gründen der Zwischenspeicherung mehrere Instanzen der gleichen Anfrage nacheinander aus. Diese Beschränkung wird nicht durch unseren Webserver verursacht.

Nachdem du die while let-Schleife in den Kapiteln 17 und 19 kennengelernt hast, fragst du dich vielleicht, warum wir den Code für den Worker-Thread nicht geschrieben haben, wie in Listing 21-21 gezeigt.

Dateiname: src/lib.rs

#![allow(unused)]
fn main() {
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Erzeuge einen neuen ThreadPool.
    ///
    /// Die Größe ist die Anzahl der Threads im Pool.
    ///
    /// # Panics
    ///
    /// Die Funktion `new` bricht ab, wenn die Größe null ist.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}
// --abschneiden--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");

                job();
            }
        });

        Worker { id, thread }
    }
}
}

Listing 21-21: Eine alternative Implementierung von Worker::new unter Verwendung von while let

Dieser Code wird kompiliert und ausgeführt, führt aber nicht zum gewünschten Thread-Verhalten: Eine langsame Anfrage führt immer noch dazu, dass andere Anfragen auf ihre Bearbeitung warten. Der Grund dafür ist etwas subtil: Die Struktur Mutex hat keine öffentliche Methode unlock, weil das Eigentum an der Sperre auf der Lebensdauer von MutexGuard<T> innerhalb von LockResult<MutexGuard<T>> basiert, die die Methode lock zurückgibt. Zur Kompilierzeit kann der Borrow Checker dann die Regel durchsetzen, dass auf eine von einem Mutex bewachte Ressource nicht zugegriffen werden kann, wenn wir die Sperre nicht halten. Diese Implementierung kann aber auch dazu führen, dass die Sperre länger als beabsichtigt gehalten wird, wenn wir nicht sorgfältig über die Lebensdauer von MutexGuard<T> nachdenken.

Der Code in Listing 21-20, der let job = receiver.lock().unwrap().recv().unwrap(); verwendet, funktioniert, weil mit let alle temporären Werte, die in dem Ausdruck auf der rechten Seite des Gleichheitszeichens verwendet werden, sofort verworfen werden, wenn die let-Anweisung endet. Allerdings gibt while let (und if let und match) temporäre Werte erst am Ende des zugehörigen Blocks frei. In Listing 21-21 bleibt die Sperre für die Dauer des Aufrufs von job() erhalten, was bedeutet, dass andere Worker-Instanzen keine Aufträge erhalten können.