Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

Nachrichtenaustausch zwischen Threads

Ein immer beliebter werdender Ansatz zur Gewährleistung einer sicheren Nebenläufigkeit (safe concurrency) ist der Nachrichtenaustausch (message passing), bei dem Threads oder Akteure kommunizieren, indem sie sich gegenseitig Nachrichten mit Daten senden. Hier ist die Idee in einem Slogan aus der Go-Sprachdokumentation: „Kommuniziere nicht, indem du Arbeitsspeicher teilst; teile stattdessen Arbeitsspeicher durch Kommunikation.“

Um Nebenläufigkeit beim Senden von Nachrichten zu erreichen, bietet die Standardbibliothek von Rust eine Implementierung für Kanäle. Ein Kanal (channel) ist ein allgemeines Programmierkonzept, mit dem Daten von einem Thread zu einem anderen gesendet werden.

Du kannst dir einen Kanal in der Programmierung wie einen gerichteten Wasserkanal vorstellen, z.B. einen Bach oder einen Fluss. Wenn du etwas wie eine Gummiente in einen Fluss setzt, wird sie stromabwärts bis zum Ende des Wasserwegs reisen.

Ein Kanal hat zwei Hälften: einen Sender und einen Empfänger. Die Senderhälfte ist die stromaufwärts gelegene Stelle, an der du die Gummiente in den Fluss setzt, und die Empfängerhälfte ist die Stelle, an der die Gummiente stromabwärts ankommt. Ein Teil deines Codes ruft Methoden auf dem Sender mit den Daten auf, die du senden möchtest, und ein anderer Teil überprüft die Empfangsseite auf ankommende Nachrichten. Ein Kanal gilt als geschlossen (closed), wenn entweder die Sender- oder die Empfängerhälfte aufgeräumt (dropped) wird.

Hier erarbeiten wir uns ein Programm, das einen Thread hat, um Werte zu generieren und sie über einen Kanal zu senden, und einen anderen Thread, der die Werte empfängt und ausgibt. Wir werden einfache Werte zwischen den Threads über einen Kanal senden, um die Funktionalität zu veranschaulichen. Sobald du mit der Technik vertraut bist, kannst du Kanäle für alle Threads verwenden, die miteinander kommunizieren müssen, z.B. für ein Chatsystem oder ein System, in dem viele Threads Teile einer Berechnung durchführen und die Teile an einen Thread senden, der die Ergebnisse zusammenfasst.

Erstens werden wir in Listing 16-6 einen Kanal erstellen, aber nichts damit machen. Beachte, dass sich dieser Code noch nicht kompilieren lässt, weil Rust nicht sagen kann, welchen Typ von Werten wir über den Kanal senden wollen.

Dateiname: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

Listing 16-6: Erstellen eines Kanals und Zuweisen der beiden Hälften zu tx und rx

Wir erstellen einen neuen Kanal mit der Funktion mpsc::channel; mpsc steht für mehrfacher Produzent, einzelner Konsument (multiple producer, single consumer). Kurz gesagt, die Art und Weise, wie die Standardbibliothek von Rust Kanäle implementiert, bedeutet, dass ein Kanal mehrere sendende Enden haben kann, die Werte produzieren, aber nur ein empfangendes Ende, das diese Werte konsumiert. Stell dir vor, mehrere Bäche würden zu einem großen Fluss zusammenfließen: Alles, was in einem der Bäche hinuntergeschickt wird, landet am Ende in einem Fluss. Wir fangen zunächst mit einem einzigen Produzenten an, aber wir fügen mehrere Produzenten hinzu, wenn dieses Beispiel funktioniert.

Die Funktion mpsc::channel gibt ein Tupel zurück, dessen erstes Element die sendende Seite und dessen zweites Element die empfangende Seite ist. Die Abkürzungen tx und rx werden traditionell in vielen Feldern für Sender (transmitter) bzw. Empfänger (receiver) verwendet, daher benennen wir unsere Variablen als solche, um jedes Ende anzugeben. Wir verwenden eine let-Anweisung mit einem Muster, das die Tupel destrukturiert; wir werden die Verwendung von Mustern in let-Anweisungen und die Destrukturierung in Kapitel 19 besprechen. Für den Moment solltest du wissen, dass die Verwendung einer let-Anweisung auf diese Weise ein bequemer Ansatz ist, um die Teile des Tupels zu extrahieren, die von mpsc::channel zurückgegeben werden.

Verschieben wir das sendende Ende in einen erzeugten Thread und lassen ihn einen String senden, sodass der erzeugte Thread mit dem Haupt-Thread kommuniziert, wie in Listing 16-7 gezeigt. Das ist so, als würde man eine Gummiente flussaufwärts in den Fluss setzen oder eine Chat-Nachricht von einem Thread zum anderen senden.

Dateiname: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hallo");
        tx.send(val).unwrap();
    });
}

Listing 16-7: Verschieben von tx in einen erzeugten Thread und Senden von „hallo“

Wieder verwenden wir thread::spawn, um einen neuen Thread zu erstellen, und dann move, um tx in den Closure zu verschieben, sodass der erzeugte Thread tx besitzt. Der erzeugte Thread muss den Sender besitzen, um in der Lage zu sein, Nachrichten durch den Kanal zu senden.

Der Sender hat eine Methode send, die den Wert entgegennimmt, den wir senden wollen. Die Methode send gibt ein Result<T, E> zurück; wenn also die empfangende Seite bereits aufgeräumt wurde und es keinen Ort gibt, an den ein Wert gesendet werden kann, wird die Sendeoperation einen Fehler zurückgeben. In diesem Beispiel rufen wir unwrap auf, um im Falle eines Fehlers abzubrechen. Aber in einer echten Anwendung würden wir es ordentlich handhaben: Schlag in Kapitel 9 nach, um Strategien für eine korrekte Fehlerbehandlung anzusehen.

In Listing 16-8 erhalten wir den Wert vom Empfänger im Haupt-Thread. Das ist so, als würde man die Gummiente am Ende des Flusses aus dem Wasser holen oder eine Chat-Nachricht erhalten.

Dateiname: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hallo");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Erhalten: {received}");
}

Listing 16-8: Empfangen des Wertes „hallo“ im Haupt-Thread und Ausgeben des Wertes

Der Empfänger hat zwei nützliche Methoden: recv und try_recv. Wir benutzen recv, kurz für empfangen (receive), was die Ausführung des Haupt-Threads blockiert und wartet, bis ein Wert in den Kanal geschickt wird. Sobald ein Wert gesendet wurde, wird er von recv in einem Result<T, E> zurückgegeben. Wenn der Sender geschlossen ist, gibt recv einen Fehler zurück, um zu signalisieren, dass keine weiteren Werte mehr kommen werden.

Die Methode try_recv blockiert nicht, sondern gibt stattdessen sofort ein Result<T, E> zurück: Einen Ok-Wert, der eine Nachricht enthält, wenn eine verfügbar ist, und einen Err-Wert, wenn gerade keine Nachricht vorhanden ist. Die Verwendung von try_recv ist nützlich, wenn dieser Thread während des Wartens auf Nachrichten andere Arbeiten zu erledigen hat: Wir könnten eine Schleife schreiben, die try_recv ab und zu aufruft, eine Nachricht verarbeitet, wenn eine verfügbar ist, und ansonsten für eine Weile andere Arbeiten erledigt, bis sie erneut überprüft wird.

Wir haben in diesem Beispiel der Einfachheit halber recv verwendet; wir haben keine andere Arbeit für den Haupt-Thread zu erledigen, außer auf Nachrichten zu warten, daher ist es angebracht, den Haupt-Thread zu blockieren.

Wenn wir den Code in Listing 16-8 ausführen, sehen wir den durch den Haupt-Thread ausgegebenen Wert:

Erhalten: hallo

Perfekt!

Übertragen des Eigentums durch Kanäle

Die Eigentumsregeln spielen beim Nachrichtenversand eine entscheidende Rolle, da sie dir helfen, sicheren, nebenläufigen Code zu schreiben. Die Vermeidung von Fehlern bei der nebenläufigen Programmierung ist der Vorteil, den du durch Berücksichtigen der Eigentümerschaft in deinen Rust-Programmen erhältst. Lass uns ein Experiment machen, um zu zeigen, wie Kanäle und Eigentümerschaft zusammenwirken, um Probleme zu vermeiden: Wir versuchen, einen Wert val im erzeugten Thread zu verwenden, nachdem wir ihn in den Kanal geschickt haben. Versuche, den Code in Listing 16-9 zu kompilieren, um zu sehen, warum dieser Code nicht erlaubt ist.

Dateiname: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hallo");
        tx.send(val).unwrap();
        println!("val ist {val}");
    });

    let received = rx.recv().unwrap();
    println!("Erhalten: {received}");
}

Listing 16-9: Versuch, val zu benutzen, nachdem wir es in den Kanal geschickt haben

Hier versuchen wir, val auszugeben, nachdem wir es per tx.send in den Kanal geschickt haben. Dies zuzulassen wäre eine schlechte Idee: Sobald der Wert an einen anderen Thread gesendet wurde, könnte dieser Thread ihn ändern oder aufräumen, bevor wir versuchen, den Wert erneut zu verwenden. Möglicherweise können die Änderungen des anderen Threads aufgrund inkonsistenter oder nicht vorhandener Daten zu Fehlern oder unerwarteten Ergebnissen führen. Rust gibt uns jedoch einen Fehler, wenn wir versuchen, den Code in Listing 16-9 zu kompilieren:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:27
   |
 8 |         let val = String::from("hallo");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
 9 |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                           ^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error

Unser Nebenläufigkeitsfehler hat einen Kompilierzeitfehler verursacht. Die Funktion send übernimmt das Eigentum an ihrem Parameter und wenn der Wert verschoben wird, übernimmt der Empfänger das Eigentum an ihm. Dadurch wird verhindert, dass wir den Wert nach dem Senden versehentlich wieder verwenden; das Eigentumssystem prüft, ob alles in Ordnung ist.

Senden mehrerer Werte

Der Code in Listing 16-8 wurde kompiliert und ausgeführt, aber er zeigte uns nicht eindeutig, dass zwei getrennte Threads über den Kanal miteinander sprachen.

In Listing 16-10 haben wir einige Änderungen vorgenommen, die beweisen, dass der Code in Listing 16-8 nebenläufig ausgeführt wird: Der erzeugte Thread sendet nun mehrere Nachrichten und macht dazwischen eine Pause von einer Sekunde.

Dateiname: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hallo"),
            String::from("aus"),
            String::from("dem"),
            String::from("Thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Erhalten: {received}");
    }
}

Listing 16-10: Senden mehrerer Nachrichten mit Pausen dazwischen

Diesmal verwendet der erzeugte Thread einen Vektor von Strings, die wir an den Haupt-Thread senden wollen. Wir iterieren über diese Strings, senden jeden einzeln und pausieren dazwischen, indem wir die Funktion thread::sleep mit einem Duration-Wert von einer Sekunde aufrufen.

Im Haupt-Thread rufen wir die Funktion recv nicht mehr explizit auf: Stattdessen behandeln wir rx als Iterator. Jeden empfangenen Wert geben wir aus. Wenn der Kanal geschlossen wird, wird die Iteration beendet.

Wenn du den Code in Listing 16-10 ausführst, solltest du die folgende Ausgabe mit einer Ein-Sekunden-Pause zwischen jeder Zeile sehen:

Erhalten: hallo
Erhalten: aus
Erhalten: dem
Erhalten: Thread

Da wir keinen Code haben, der die for-Schleife im Haupt-Thread pausiert oder verzögert, können wir sagen, dass der Haupt-Thread darauf wartet, Werte vom erzeugten Thread zu erhalten.

Erstellen mehrerer Produzenten

Vorhin haben wir erwähnt, dass mpsc ein Akronym für mehrfacher Produzent, einzelner Konsument ist. Lass uns mpsc verwenden und den Code in Listing 16-10 erweitern, um mehrere Threads zu erzeugen, die alle Werte an den gleichen Empfänger senden. Wir können dies tun, indem wir den Sender klonen, wie in Listing 16-11 gezeigt:

Dateiname: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --abschneiden--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hallo"),
            String::from("aus"),
            String::from("dem"),
            String::from("Thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("mehr"),
            String::from("Nachrichten"),
            String::from("für"),
            String::from("dich"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Erhalten: {received}");
    }

    // --abschneiden--
}

Listing 16-11: Senden mehrerer Nachrichten von mehreren Produzenten

Bevor wir den ersten Thread erzeugen, rufen wir dieses Mal clone auf dem Sender auf. Dadurch erhalten wir einen weiteren Sender, den wir an den ersten erzeugten Thread weitergeben können. Wir übergeben den ursprünglichen Sender an einen zweiten erzeugten Thread. Dadurch erhalten wir zwei Threads, die jeweils unterschiedliche Nachrichten an den Empfänger senden.

Wenn du den Code ausführst, sollte deine Ausgabe in etwa so aussehen:

Erhalten: hallo
Erhalten: mehr
Erhalten: aus
Erhalten: Nachrichten
Erhalten: für
Erhalten: dem
Erhalten: Thread
Erhalten: dich

Möglicherweise siehst du die Werte in einer anderen Reihenfolge, dies hängt von deinem System ab. Das macht die Nebenläufigkeit sowohl interessant als auch schwierig. Wenn du mit thread::sleep experimentierst und ihm verschiedene Werte in den verschiedenen Threads gibst, wird jeder Durchlauf nicht-deterministischer sein und jedes Mal eine andere Ausgabe erzeugen.

Nachdem wir uns nun angesehen haben, wie Kanäle funktionieren, wollen wir uns eine andere Methode der Nebenläufigkeit ansehen.