Ströme (streams): Sequenz von Futures
Erinnere dich daran, wie wir den Empfänger unseres asynchronen Kanals
weiter in Abschnitt „Datenaustausch zwischen zwei Aufgaben mit
Nachrichtenübermittlung“ oben in diesem Kapitel verwendet
haben. Die asynchrone Methode recv erzeugt eine Sequenz von Elementen. Dies
ist ein Beispiel eines viel allgemeineren Musters, bekannt als Strom
(stream). Viele Konzepte lassen sich ganz natürlich als Ströme darstellen:
Elemente, die in einer Warteschlange verfügbar werden, Datenblöcke, die
schrittweise aus dem Dateisystem eingelesen werden, wenn der gesamte Datensatz
zu große für den Arbeitsspeicher des Computers ist, oder Daten, die nach und
nach über das Netzwerk eintreffen. Da Ströme Futures sind, können wir sie mit
jeder anderen Art von Future verwenden und auf interessante Weise kombinieren.
Beispielsweise können wir Ereignisse bündeln, um zu viele Netzwerkaufrufe zu
vermeiden, Zeitlimits für lang andauernde Vorgänge festlegen oder Ereignisse
der Benutzeroberfläche drosseln, um unnötige Arbeit zu vermeiden.
Wir haben eine Sequenz von Elementen in Kapitel 13 gesehen, als das Merkmal
Iterator im Abschnitt „Das Merkmal (trait) Iterator und die Methode
next“ betrachtet haben. Es gibt jedoch zwei Unterschiede zwischen
Iteratoren und dem asynchronen Kanalempfänger. Der erste ist die Zeit:
Iteratoren sind synchron, während der Kanalempfänger asynchron ist. Der zweite
ist die API. Wenn wir direkt mit einem Iterator arbeiten, rufen wir seine
synchrone Methode next auf. Mit dem Strom trpl::Receiver rufen wir
stattdessen die asynchrone Methode recv auf. Ansonsten sind sich diese APIs
sehr ähnlich, und diese Ähnlichkeit ist kein Zufall. Ein Strom ist wie eine
asynchrone Form der Iteration. Während trpl::Receiver jedoch speziell auf den
Empfang von Nachrichten wartet, ist die allgemeine Strom-API viel breiter
angelegt: Sie liefert das nächste Element auf die gleiche Weise wie Iterator,
aber asynchron.
Die Ähnlichkeit zwischen Iteratoren und Strömen in Rust bedeutet, dass wir aus
jedem Iterator einen Strom erzeugen können. Wie bei einem Iterator können wir
mit einem Strom arbeiten, indem wir seine Methode next aufrufen und dann auf
die Ausgabe warten, wie in Codeblock 17-21.
Dateiname: src/main.rs
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("Der Wert war: {value}");
}
});
}
Codeblock 17-21: Erstellen eines Stroms aus einem Iterator und Ausgeben seiner Werte
Wir beginnen mit einem Array von Zahlen, das wir in einen Iterator umwandeln
und dann map aufrufen, um alle Werte zu verdoppeln. Dann wandeln wir den
Iterator mit der Funktion trpl::stream_from_iter in einen Strom um.
Schließlich durchlaufen wir mit der while let-Schleife die Elemente im
Strom.
Leider lässt sich der Code nicht kompilieren, sondern wir bekommen die
Fehlermeldung, dass keine Methode next verfügbar ist:
error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
Wie diese Ausgabe erklärt, liegt der Grund für den Compilerfehler darin, dass
wir das richtige Merkmal im Gültigkeitsbereich benötigen, um die Methode next
verwenden zu können. In Anbetracht der bisherigen Diskussion könnte man
erwarten, dass es sich um das Merkmal Stream handelt, aber ist das Merkmal
StreamExt. Ext steht hier für „extension“ (engl. Erweiterung): Dies ist
eine gängige Vorgehensweise in der Rust-Gemeinschaft, um ein Merkmal mit einem
anderen zu erweitern.
Das Merkmal Stream definiert eine Low-Level-Schnittstelle, die die Merkmale
Iterator und Future effektiv kombiniert. StreamExt bietet eine Reihe von
APIs auf höherer Ebene ab, die auf Stream basieren, darunter die Methode
next sowie andere Hilfsmethoden, die denen des Merkmals Iterator ähneln.
Stream und StreamExt sind noch nicht Teil der Standardbibliothek von Rust,
aber die meisten Kisten des Ökosystems verwenden ähnliche Definitionen.
Um den Compilerfehler zu beheben, fügen wir eine use-Anweisung für
trpl::StreamExt hinzu, wie in Codeblock 17-22.
Dateiname: src/main.rs
use trpl::StreamExt;
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// --abschneiden--
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("Der Wert war: {value}");
}
});
}
Codeblock 17-22: Erfolgreiche Verwendung eines Iterators als Grundlage für einen Strom
Mit all diesen Teilen zusammen funktioniert der Code so, wie wir es wollen!
Außerdem können wir jetzt, da wir StreamExt im Gültigkeitsbereich haben, alle
seine Hilfsmethoden verwenden, genau wie bei Iteratoren.