Nachrichtenaustausch zwischen Strängen (threads)
Ein immer beliebter werdender Ansatz zur Gewährleistung einer sicheren Nebenläufigkeit (safe concurrency) ist der Nachrichtenaustausch (message passing), bei dem Stränge oder Akteure kommunizieren, indem sie sich gegenseitig Nachrichten mit Daten senden. Hier ist die Idee in einem Slogan aus der Go-Sprachdokumentation: „Kommuniziere nicht, indem du Arbeitsspeicher teilst; teile stattdessen Arbeitsspeicher durch Kommunikation.“
Um die Gleichzeitigkeit beim Senden von Nachrichten zu erreichen, bietet die Standardbibliothek von Rust eine Implementierung von Kanälen (channels). Ein Kanal ist ein allgemeines Programmierkonzept, mit dem Daten von einem Strang zu einem anderen gesendet werden.
Du kannst dir einen Kanal in der Programmierung wie einen gerichteten Wasserkanal vorstellen, z.B. einen Bach oder einen Fluss. Wenn du etwas wie eine Gummiente in einen Fluss setzt, wird sie stromabwärts bis zum Ende des Wasserwegs reisen.
Ein Kanal hat zwei Hälften: Einen Sender und einen Empfänger. Die Senderhälfte ist die stromaufwärts gelegene Stelle, an der du Gummienten in den Fluss setzt, und die Empfängerhälfte ist die Stelle, an der die Gummiente stromabwärts ankommt. Ein Teil deines Codes ruft Methoden auf dem Sender mit den Daten auf, die du senden möchtest, und ein anderer Teil überprüft die Empfangsseite auf ankommende Nachrichten. Ein Kanal gilt als geschlossen (closed), wenn entweder die Sender- oder die Empfängerhälfte aufgeräumt (dropped) wird.
Hier arbeiten wir uns zu einem Programm hoch, das einen Strang hat, um Werte zu generieren und sie über einen Kanal zu senden, und einen anderen Strang, der die Werte empfängt und ausgibt. Wir werden einfache Werte zwischen den Strängen über einen Kanal senden, um die Funktionalität zu veranschaulichen. Sobald du mit der Technik vertraut bist, kannst du Kanäle für alle Stränge verwenden, die miteinander kommunizieren müssen, z.B. für ein Chatsystem oder ein System, in dem viele Stränge Teile einer Berechnung durchführen und die Teile an einen Strang senden, der die Ergebnisse zusammenfasst.
Erstens werden wir in Codeblock 16-6 einen Kanal erstellen, aber nichts damit machen. Beachte, dass sich dieser noch nicht kompilieren lässt, weil Rust nicht sagen kann, welchen Typ von Werten wir über den Kanal senden wollen.
Dateiname: src/main.rs
use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); }
Wir erstellen einen neuen Kanal mit der Funktion mpsc::channel
; mpsc
steht
für mehrfacher Produzent, einzelner Konsument (multiple producer, single
consumer). Kurz gesagt, die Art und Weise, wie die Standardbibliothek von Rust
Kanäle implementiert, bedeutet, dass ein Kanal mehrere sendende Enden haben
kann, die Werte produzieren, aber nur ein empfangendes Ende, das diese Werte
konsumiert. Stell dir vor, mehrere Bäche würden zu einem großen Fluss
zusammenfließen: Alles, was in einem der Bäche hinuntergeschickt wird, landet
am Ende in einem Fluss. Wir fangen zunächst mit einem einzigen Produzenten an,
aber wir fügen mehrere Produzenten hinzu, wenn dieses Beispiel funktioniert.
Die Funktion mpsc::channel
gibt ein Tupel zurück, dessen erstes Element die
sendende Seite und dessen zweites Element die empfangende Seite ist. Die
Abkürzungen tx
und rx
werden traditionell in vielen Feldern für Sender
(transmitter) bzw. Empfänger (receiver) verwendet, daher benennen wir unsere
Variablen als solche, um jedes Ende anzugeben. Wir verwenden eine
let
-Anweisung mit einem Muster, das die Tupel destrukturiert; wir werden die
Verwendung von Mustern in let
-Anweisungen und die Destrukturierung in Kapitel
18 besprechen. Für den Moment solltest du wissen, dass die Verwendung einer
let
-Anweisung auf diese Weise ein bequemer Ansatz ist, um die Teile des
Tupels zu extrahieren, die von mpsc::channel
zurückgegeben werden.
Verschieben wir das sendende Ende in einen erzeugten Strang und lassen es eine Zeichenkette senden, sodass der erzeugte Strang mit dem Hauptstrang kommuniziert, wie in Codeblock 16-7 gezeigt. Das ist so, als würde man eine Gummiente flussaufwärts in den Fluss setzen oder eine Chat-Nachricht von einem Strang zum anderen senden.
Dateiname: src/main.rs
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hallo"); tx.send(val).unwrap(); }); }
Wieder verwenden wir thread::spawn
, um einen neuen Strang zu erstellen, und
dann move
, um tx
in den Funktionsabschluss zu verschieben, sodass der
erzeugte Strang tx
besitzt. Der erzeugte Strang muss den Sender besitzen, um
in der Lage zu sein, Nachrichten durch den Kanal zu senden. Der Sender hat eine
Methode send
, die den Wert nimmt, den wir senden wollen. Die Methode send
gibt ein Result<T, E>
zurück; wenn also die empfangende Seite bereits
aufgeräumt wurde und es keinen Ort gibt, an den ein Wert gesendet werden kann,
wird die Sendeoperation einen Fehler zurückgeben. In diesem Beispiel rufen wir
unwrap
auf, um im Falle eines Fehlers abzustürzen. Aber in einer echten
Anwendung würden wir es richtig handhaben: Kehre zu Kapitel 9 zurück, um
Strategien für eine korrekte Fehlerbehandlung anzusehen.
In Codeblock 16-8 erhalten wir den Wert vom Empfänger im Hauptstrang. Das ist so, als würde man die Gummiente am Ende des Flusses aus dem Wasser holen oder eine Chat-Nachricht erhalten.
Dateiname: src/main.rs
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hallo"); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); println!("Erhalten: {received}"); }
Das Empfänger hat zwei nützliche Methoden: recv
und try_recv
. Wir benutzen
recv
, kurz für empfangen (receive), was die Ausführung des Hauptstrangs
blockiert und wartet, bis ein Wert in den Kanal geschickt wird. Sobald ein Wert
gesendet wurde, wird er von recv
in einem Result<T, E>
zurückgegeben. Wenn
der Sender geschlossen wird, gibt recv
einen Fehler zurück, um zu
signalisieren, dass keine weiteren Werte mehr kommen werden.
Die Methode try_recv
blockiert nicht, sondern gibt stattdessen sofort ein
Result<T, E>
zurück: Einen Ok
-Wert, der eine Nachricht enthält, wenn eine
verfügbar ist, und einen Err
-Wert, wenn diesmal keine Nachricht vorhanden
ist. Die Verwendung von try_recv
ist nützlich, wenn dieser Strang während des
Wartens auf Nachrichten andere Arbeiten zu erledigen hat: Wir könnten eine
Schleife schreiben, die try_recv
ab und zu aufruft, eine Nachricht
verarbeitet, wenn eine verfügbar ist, und ansonsten für eine Weile andere
Arbeiten erledigt, bis sie erneut überprüft wird.
Wir haben in diesem Beispiel der Einfachheit halber recv
verwendet; wir haben
keine andere Arbeit für den Hauptstrang zu erledigen, außer auf Nachrichten zu
warten, daher ist es angebracht, den Hauptstrang zu blockieren.
Wenn wir den Code in Codeblock 16-8 ausführen, sehen wir den durch den Hauptstrang ausgegebenen Wert:
Erhalten: hallo
Perfekt!
Kanäle und Eigentümerschaftsübertragung
Die Eigentumsregeln spielen beim Nachrichtenversand eine entscheidende Rolle,
da sie dir helfen, sicheren, nebenläufigen Code zu schreiben. Die Vermeidung
von Fehlern bei der nebenläufigen Programmierung ist der Vorteil, wenn du bei
deinen Rust-Programmen an die Eigentümerschaft denkst. Lass uns ein Experiment
machen, um zu zeigen, wie Kanäle und Eigentümerschaft zusammenwirken, um
Probleme zu vermeiden: Wir versuchen, einen val
-Wert im erzeugten Strang zu
verwenden, nachdem wir ihn in den Kanal geschickt haben. Versuche, den Code
in Codeblock 16-9 zu kompilieren, um zu sehen, warum dieser Code nicht erlaubt
ist:
Dateiname: src/main.rs
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hallo"); tx.send(val).unwrap(); println!("val ist {val}"); }); let received = rx.recv().unwrap(); println!("Erhalten: {received}"); }
Hier versuchen wir, val
auszugeben, nachdem wir es per tx.send
in den Kanal
geschickt haben. Dies zuzulassen wäre eine schlechte Idee: Sobald der Wert an
einen anderen Strang gesendet wurde, könnte dieser Strang ihn ändern oder
aufräumen, bevor wir versuchen, den Wert erneut zu verwenden. Möglicherweise
können die Änderungen des anderen Strangs aufgrund inkonsistenter oder nicht
vorhandener Daten zu Fehlern oder unerwarteten Ergebnissen führen. Rust gibt
uns jedoch einen Fehler, wenn wir versuchen, den Code in Codeblock 16-9 zu kompilieren:
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:26
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {val}");
| ^^^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error
Unser Nebenläufigkeitsfehler hat einen Kompilierzeitfehler verursacht. Die
Funktion send
übernimmt die Eigentümerschaft an ihrem Parameter und wenn der
Wert verschoben wird, übernimmt der Empfänger die Eigentümerschaft an ihm.
Dadurch wird verhindert, dass wir den Wert nach dem Senden versehentlich wieder
verwenden; das Eigentumssystem prüft, ob alles in Ordnung ist.
Mehrere Werte senden und den Empfänger warten sehen
Der Code in Codeblock 16-8 wurde kompiliert und ausgeführt, aber er zeigte uns nicht eindeutig, dass zwei getrennte Stränge über den Kanal miteinander sprachen. In Codeblock 16-10 haben wir einige Änderungen vorgenommen, die beweisen, dass der Code in Codeblock 16-8 nebenläufig ausgeführt wird: Der erzeugte Strang sendet nun mehrere Nachrichten und macht dazwischen eine Pause von einer Sekunde.
Dateiname: src/main.rs
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let vals = vec![ String::from("hallo"), String::from("aus"), String::from("dem"), String::from("Strang"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("Erhalten: {received}"); } }
Diesmal hat der erzeugte Strang einen Vektor von Zeichenketten, die wir an den
Hauptstrang senden wollen. Wir iterieren über diese Zeichenketten, senden jede
einzeln und pausieren dazwischen, indem wir die Funktion thread::sleep
mit
einem Duration
-Wert von 1 Sekunde aufrufen.
Im Hauptstrang rufen wir die Funktion recv
nicht mehr explizit auf:
Stattdessen behandeln wir rx
als Iterator. Jeden empfangenen Wert geben wir
aus. Wenn der Kanal geschlossen wird, wird die Iteration beendet.
Wenn du den Code in Codeblock 16-10 ausführst, solltest du die folgende Ausgabe mit einer 1-Sekunden-Pause zwischen jeder Zeile sehen:
Erhalten: hallo
Erhalten: aus
Erhalten: dem
Erhalten: Strang
Da wir keinen Code haben, der die for
-Schleife im Hauptstrang pausiert oder
verzögert, können wir sagen, dass der Hauptstrang darauf wartet, Werte vom
erzeugten Strang zu erhalten.
Erstellen mehrerer Produzenten durch Klonen des Senders
Vorhin haben wir erwähnt, dass mpsc
ein Akronym für mehrfacher Produzent,
einzelner Konsument ist. Lass uns mpsc
verwenden und den Code in Codeblock
16-10 erweitern, um mehrere Stränge zu erzeugen, die alle Werte an den gleichen
Empfänger senden. Wir können dies tun, indem wir den Sender klonen, wie in
Codeblock 16-11 gezeigt:
Dateiname: src/main.rs
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { // --abschneiden-- let (tx, rx) = mpsc::channel(); let tx1 = tx.clone(); thread::spawn(move || { let vals = vec![ String::from("hallo"), String::from("aus"), String::from("dem"), String::from("Strang"), ]; for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { let vals = vec![ String::from("mehr"), String::from("Nachrichten"), String::from("für"), String::from("dich"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("Erhalten: {received}"); } // --abschneiden-- }
Bevor wir den ersten Strang erzeugen, rufen wir dieses Mal clone
auf dem
Sender auf. Dadurch erhalten wir einen weiteren Sender, das wir an den ersten
erzeugten Strang weitergeben können. Wir übergeben den ursprüngliche Sender an
einen zweiten erzeugten Strang. Dadurch erhalten wir zwei Stränge, die jeweils
unterschiedliche Nachrichten an den Empfänger senden.
Wenn du den Code ausführst, sollte deine Ausgabe in etwa so aussehen:
Erhalten: hallo
Erhalten: mehr
Erhalten: aus
Erhalten: Nachrichten
Erhalten: für
Erhalten: dem
Erhalten: Strang
Erhalten: dich
Möglicherweise siehst du die Werte in einer anderen Reihenfolge, dies hängt von
deinem System ab. Das macht die Nebenläufigkeit sowohl interessant als auch
schwierig. Wenn du mit thread::sleep
experimentierst und ihm verschiedene
Werte in den verschiedenen Strängen gibst, wird jeder Durchlauf
nicht-deterministischer sein und jedes Mal eine andere Ausgabe erzeugen.
Nachdem wir uns nun angesehen haben, wie Kanäle funktionieren, wollen wir uns eine andere Methode der Nebenläufigkeit ansehen.