Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

Kontrolliertes Beenden und Aufräumen

Der Code in Listing 21-20 antwortet auf Anfragen asynchron durch die Verwendung eines Thread-Pools, wie von uns beabsichtigt. Wir erhalten einige Warnungen über die Felder workers, id und thread, die wir nicht direkt benutzen, was uns daran erinnert, dass wir nichts aufräumen. Wenn wir die weniger elegante Methode Strg+c verwenden, um den Haupt-Thread (main thread) anzuhalten, werden auch alle anderen Threads sofort gestoppt, selbst wenn sie gerade dabei sind, eine Anfrage zu bedienen.

Als Nächstes werden wir das Trait Drop implementieren, um join für jeden der Threads im Pool aufzurufen, damit sie die Anfragen, an denen sie arbeiten, vor dem Schließen beenden können. Dann werden wir einen Weg implementieren, um den Threads mitzuteilen, dass sie keine neuen Anfragen mehr annehmen und herunterfahren sollen. Um diesen Code in Aktion zu sehen, werden wir unseren Server so modifizieren, dass er nur zwei Anfragen annimmt, bevor er seinen Thread-Pool kontrolliert herunterfährt.

Implementieren des Traits Drop auf ThreadPool

Lass uns damit beginnen, Drop auf unseren Thread-Pool zu implementieren. Wenn der Pool aufgeräumt wird, sollten wir auf das Ende unseres Threads warten, um sicherzustellen, dass sie ihre Arbeit beenden. Listing 21-22 zeigt einen ersten Versuch einer Drop-Implementierung; dieser Code wird noch nicht ganz funktionieren.

Dateiname: src/lib.rs

#![allow(unused)]
fn main() {
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Erzeuge einen neuen ThreadPool.
    ///
    /// Die Größe ist die Anzahl der Threads im Pool.
    ///
    /// # Panics
    ///
    /// Die Funktion `new` bricht ab, wenn die Größe null ist.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Worker {} herunterfahren", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");

                job();
            }
        });

        Worker { id, thread }
    }
}
}

Listing 21-22: Warten auf das Ende der einzelnen Threads, wenn der Thread-Pool den Gültigkeitsbereich verlässt

Zuerst iterieren wir über alle workers im Thread-Pool. Wir verwenden dafür &mut, weil self eine veränderbare Referenz ist und wir auch in der Lage sein müssen, worker zu verändern. Für jeden worker geben wir eine Nachricht aus, die besagt, dass diese bestimmte worker-Instanz heruntergefahren wird, und dann rufen wir auf dem Thread dieser worker-Instanz join auf. Wenn der Aufruf von join fehlschlägt, benutzen wir unwrap, um das Programm abzubrechen.

Hier ist der Fehler, den wir erhalten, wenn wir diesen Code kompilieren:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
  --> src/lib.rs:52:13
   |
52 |             worker.thread.join().unwrap();
   |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
   |             |
   |             move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
   |
note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread`
  --> /rustc/07dca489ac2d933c78d3c5158e3f43be/library/std/src/thread/mod.rs:1649:17

For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` (lib) due to 1 previous error

Der Fehler sagt uns, dass wir join nicht aufrufen können, weil wir nur eine veränderbare Borrow von jedem worker haben und join das Eigentum an seinem Argument übernimmt. Um dieses Problem zu lösen, müssen wir den Thread thread aus der Worker-Instanz herausnehmen, damit join den Thread konsumieren kann. Eine Möglichkeit, dies zu tun, besteht darin, den gleichen Ansatz wie in Listing 18-15 zu verfolgen. Wenn Worker ein Option<thread::JoinHandle<()>> hielte, könnten wir die Methode take auf Option aufrufen, um den Wert aus der Variante Some herauszuverschieben und eine Variante None an ihrer Stelle zu belassen. Mit anderen Worten, ein Worker, der läuft, würde eine Variante Some in thread haben, und wenn wir einen Worker aufräumen wollten, würden wir Some durch None ersetzen, sodass der Worker keinen Thread zum Laufen haben würde.

Das einzige Mal, dass dies der Fall wäre, wäre, wenn man den Worker aufräumt. Im Gegenzug müssten wir überall, wo wir auf Worker.thread zugreifen, mit einer Option<thread::JoinHandle<()>> umgehen. Idiomatisch verwendet Rust Option ziemlich oft, aber wenn du etwas in Option einpackst, von dem du weißt, dass es immer vorhanden sein wird, ist es eine gute Idee, nach alternativen Ansätzen zu suchen, die deinen Code sauberer und weniger fehleranfällig machen.

In diesem Fall gibt es eine bessere Alternative: Die Methode Vec::drain. Sie akzeptiert einen Bereichsparameter, um anzugeben, welche Elemente aus dem Vec entfernt werden sollen, und gibt einen Iterator dieser Elemente zurück. Die Angabe der Bereichssyntax .. entfernt alle Werte aus dem Vec.

Wir müssen also die drop-Implementierung von ThreadPool wie folgt aktualisieren:

Dateiname: src/lib.rs

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Erzeuge einen neuen ThreadPool.
    ///
    /// Die Größe ist die Anzahl der Threads im Pool.
    ///
    /// # Panics
    ///
    /// Die Funktion `new` bricht ab, wenn die Größe null ist.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in self.workers.drain(..) {
            println!("Worker {} herunterfahren", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");

                job();
            }
        });

        Worker { id, thread }
    }
}

Dadurch wird der Compilerfehler behoben, und es sind keine weiteren Änderungen an unserem Code erforderlich. Beachte, dass drop bei einem Programmabbruch aufgerufen werden kann; und wenn dann auch unwrap abbricht, eine doppelte Fehlersituation verursacht werden könnte, was sofort zum Programmende und zum Abbruch aller laufenden Bereinigungsvorgänge führen würde. Für ein Beispielprogramm ist dies in Ordnung, für Produktionscode jedoch nicht zu empfehlen.

Den Threads signalisieren, nicht mehr nach Aufträgen zu lauschen

Mit all den Änderungen, die wir vorgenommen haben, lässt sich unser Code ohne jede Warnung kompilieren. Aber die schlechte Nachricht ist, dass dieser Code noch nicht so funktioniert, wie wir es uns wünschen. Der Schlüssel ist die Logik in den Closures, die von den Threads der Worker-Instanzen ausgeführt werden: Im Moment rufen wir join auf, aber das wird die Threads nicht herunterfahren, weil sie sich in einer Endlosschleife auf der Suche nach Aufträgen befinden. Wenn wir versuchen, unseren ThreadPool mit unserer aktuellen Implementierung von Drop aufräumen zu lassen, wird der Haupt-Thread für immer blockieren und auf das Beenden des ersten Threads warten.

Um dieses Problem zu beheben, brauchen wir eine Änderung in der Implementierung von drop in ThreadPool und dann eine Änderung in der Worker-Schleife.

Zuerst ändern wir die Implementierung von drop in ThreadPool, um den sender explizit aufzuräumen, bevor wir auf das Ende der Threads warten. Listing 21-23 zeigt die Änderungen an ThreadPool, um den sender explizit aufzuräumen. Anders als beim Thread, müssen wir hier eine Option verwenden, um den sender mit Option::take aus dem ThreadPool herausnehmen zu können.

Dateiname: src/lib.rs

#![allow(unused)]
fn main() {
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
// --abschneiden--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Erzeuge einen neuen ThreadPool.
    ///
    /// Die Größe ist die Anzahl der Threads im Pool.
    ///
    /// # Panics
    ///
    /// Die Funktion `new` bricht ab, wenn die Größe null ist.
    pub fn new(size: usize) -> ThreadPool {
          // --abschneiden--

        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in self.workers.drain(..) {
            println!("Worker {} herunterfahren", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();
         
            println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");
         
            job();
        });

        Worker { id, thread }
    }
}
}

Listing 21-23: sender vor dem Warten auf die Worker-Threads explizit aufräumen

Das Aufräumen von sender schließt den Kanal, was bedeutet, dass keine weiteren Nachrichten gesendet werden. Wenn das passiert, geben alle Aufrufe von recv, die die Worker-Instanzen in der Endlosschleife machen, einen Fehler zurück. In Listing 21-24 ändern wir die Worker-Schleife so, dass die Schleife in diesem Fall ordnungsgemäß beendet wird, was bedeutet, dass die Threads beendet werden, wenn die Implementierung von drop in ThreadPool join für sie aufruft.

Dateiname: src/lib.rs

use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Erzeuge einen neuen ThreadPool.
    ///
    /// Die Größe ist die Anzahl der Threads im Pool.
    ///
    /// # Panics
    ///
    /// Die Funktion `new` bricht ab, wenn die Größe null ist.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Worker {} herunterfahren", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                        println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");

                        job();
                    }
                    Err(_) => {
                        println!("Worker {id} nicht mehr verbunden, wird beendet.");
                        break;
                    }
                }
            }
        });

        Worker { id, thread }
    }
}

Listing 21-24: Explizites Verlassen der Schleife, wenn recv einen Fehler zurückgibt

Um diesen Code in Aktion zu sehen, modifizieren wir main so, dass nur zwei Anfragen akzeptiert werden, bevor der Server kontrolliert heruntergefahren wird, wie in Listing 21-25 gezeigt.

Dateiname: src/main.rs

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Fahre herunter.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&mut stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Listing 21-25: Herunterfahren des Servers, nachdem er zwei Anfragen bearbeitet hat, durch Verlassen der Schleife

Du würdest nicht wollen, dass ein Webserver aus der realen Welt heruntergefahren wird, nachdem er nur zwei Anfragen bearbeitet hat. Dieser Code zeigt nur, dass das kontrollierte Herunterfahren und Aufräumen funktioniert.

Die Methode take ist im Trait Iterator definiert und beschränkt die Iteration auf die ersten beiden Elemente. Der ThreadPool wird am Ende von main den Gültigkeitsbereich verlassen und die drop-Implementierung ausgeführt werden.

Starte den Server mit cargo run und stelle drei Anfragen. Die dritte Anfrage sollte fehlerhaft sein und in deinem Terminal solltest du eine ähnliche Ausgabe wie diese sehen:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 1.0s
     Running `target/debug/hello`
Worker 0 hat einen Auftrag erhalten; führe ihn aus.
Fahre herunter.
Worker 0 herunterfahren
Worker 3 hat einen Auftrag erhalten; führe ihn aus.
Worker 1 nicht mehr verbunden, wird beendet.
Worker 2 nicht mehr verbunden, wird beendet.
Worker 3 nicht mehr verbunden, wird beendet.
Worker 0 nicht mehr verbunden, wird beendet.
Worker 1 herunterfahren
Worker 2 herunterfahren
Worker 3 herunterfahren

Möglicherweise siehst du eine andere Reihenfolge der Worker-IDs und der ausgegebenen Nachrichten. Wir können anhand der Nachrichten sehen, wie dieser Code funktioniert: Die Worker 0 und 3 haben die ersten beiden Anfragen erhalten. Der Server hat nach der zweiten Verbindung aufgehört, Verbindungen anzunehmen, und die Drop-Implementierung auf ThreadPool beginnt mit der Ausführung, bevor Worker 3 überhaupt seine Arbeit beginnt. Wenn man den sender aufräumt, werden alle Worker-Instanzen getrennt und angewiesen, sich zu beenden. Die Worker-Instanzen geben jeweils eine Nachricht aus, wenn sie die Verbindung trennen, und dann ruft der Thread-Pool join auf, um das Ende jedes Worker-Threads zu warten.

Beachte einen interessanten Aspekt in diesem speziellen Programmlauf: Der ThreadPool hat den sender aufgeräumt, und bevor ein Worker einen Fehler erhalten hat, haben wir versucht, auf Worker 0 zu warten. Worker 0 hatte noch keinen Fehler von recv erhalten, also blockierte der Haupt-Thread und wartete darauf, dass Worker 0 fertig wird. In der Zwischenzeit erhielt Worker 3 einen Auftrag, und dann erhielten alle Threads einen Fehler. Als Worker 0 fertig war, wartete der Haupt-Thread darauf, dass die restlichen Worker-Instanzen fertig wurden. Zu diesem Zeitpunkt hatten sie alle ihre Schleifen verlassen und konnten sich beenden.

Herzlichen Glückwunsch! Wir haben jetzt unser Projekt abgeschlossen; wir haben einen einfachen Webserver, der einen Thread-Pool verwendet, um asynchron zu antworten. Wir sind in der Lage, den Server kontrolliert herunterzufahren, wodurch alle Threads im Pool aufgeräumt werden.

Hier ist der vollständige Code als Referenz:

Dateiname: src/main.rs

use hello::ThreadPool;
use std::{
    fs,
    io::{BufReader, prelude::*},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Fahre herunter.");
}

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&stream);
    let request_line = buf_reader.lines().next().unwrap().unwrap();

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let contents = fs::read_to_string(filename).unwrap();
    let length = contents.len();

    let response =
        format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");

    stream.write_all(response.as_bytes()).unwrap();
}

Dateiname: src/lib.rs

#![allow(unused)]
fn main() {
use std::{
    sync::{Arc, Mutex, mpsc},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Erzeuge einen neuen ThreadPool.
    ///
    /// Die Größe ist die Anzahl der Threads im Pool.
    ///
    /// # Panics
    ///
    /// Die Funktion `new` bricht ab, wenn die Größe null ist.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Worker {} herunterfahren", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            loop {
                let message = receiver.lock().unwrap().recv();

                match message {
                    Ok(job) => {
                    println!("Worker {id} hat einen Auftrag erhalten; führe ihn aus.");

                        job();
                    }
                    Err(_) => {
                    println!("Worker {id} nicht mehr verbunden, wird beendet.");
                        break;
                    }
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}
}

Wir könnten hier mehr tun! Wenn du dieses Projekt weiter verbessern willst, findest du hier einige Ideen:

  • Füge weitere Dokumentation zu ThreadPool und seinen öffentlichen Methoden hinzu.
  • Füge Tests der Funktionalität der Bibliothek hinzu.
  • Ändere Aufrufe von unwrap in eine robustere Fehlerbehandlung.
  • Verwende ThreadPool, um eine andere Aufgabe als das Beantworten von Web-Anfragen durchzuführen.
  • Suche eine Thread-Pool-Crate auf crates.io und implementiere damit einen ähnlichen Webserver unter Verwendung der Crate. Vergleiche dann dessen API und Robustheit mit dem von uns implementierten Thread-Pool.

Zusammenfassung

Gut gemacht! Du hast es bis ans Ende des Buches geschafft! Wir möchten dir danken, dass du uns auf dieser Tour durch Rust begleitet hast. Du bist nun bereit, deine eigenen Rust-Projekte umzusetzen und bei den Projekten anderer zu helfen. Denke daran, dass es eine gastfreundliche Gemeinschaft von Rust-Entwicklern gibt, die dir bei jeder Herausforderung, denen du auf deiner Rust-Reise begegnest, gerne helfen.