Keyboard shortcuts

Press or to navigate between chapters

Press ? to show this help

Press Esc to hide this help

Streams: Sequenz von Futures

Erinnere dich daran, wie wir den Empfänger unseres asynchronen Kanals weiter in Abschnitt „Datenaustausch zwischen zwei Aufgaben mit Nachrichtenübermittlung“ oben in diesem Kapitel verwendet haben. Die asynchrone Methode recv erzeugt eine Sequenz von Elementen. Dies ist ein Beispiel eines viel allgemeineren Musters, bekannt als Stream (Strom, Fluss). Viele Konzepte lassen sich ganz natürlich als Streams darstellen: Elemente, die in einer Warteschlange verfügbar werden, Datenblöcke, die schrittweise aus dem Dateisystem eingelesen werden, wenn der gesamte Datensatz zu groß für den Arbeitsspeicher des Computers ist, oder Daten, die nach und nach über das Netzwerk eintreffen. Da Streams Futures sind, können wir sie mit jeder anderen Art von Future verwenden und auf interessante Weise kombinieren. Beispielsweise können wir Ereignisse bündeln, um zu viele Netzwerkaufrufe zu vermeiden, Zeitlimits für lang andauernde Vorgänge festlegen oder Ereignisse der Benutzeroberfläche drosseln, um unnötige Arbeit zu vermeiden.

Wir haben eine Sequenz von Elementen in Kapitel 13 gesehen, als wir das Trait Iterator im Abschnitt „Das Trait Iterator und die Methode next betrachtet haben. Es gibt jedoch zwei Unterschiede zwischen Iteratoren und dem asynchronen Kanalempfänger. Der erste ist die Zeit: Iteratoren sind synchron, während der Kanalempfänger asynchron ist. Der zweite ist die API. Wenn wir direkt mit einem Iterator arbeiten, rufen wir seine synchrone Methode next auf. Mit dem Stream trpl::Receiver rufen wir stattdessen die asynchrone Methode recv auf. Ansonsten sind sich diese APIs sehr ähnlich, und diese Ähnlichkeit ist kein Zufall. Ein Stream ist wie eine asynchrone Form der Iteration. Während trpl::Receiver jedoch speziell auf den Empfang von Nachrichten wartet, ist die allgemeine Stream-API viel breiter angelegt: Sie liefert das nächste Element auf die gleiche Weise wie Iterator, aber asynchron.

Die Ähnlichkeit zwischen Iteratoren und Streams in Rust bedeutet, dass wir aus jedem Iterator einen Stream erzeugen können. Wie bei einem Iterator können wir mit einem Stream arbeiten, indem wir seine Methode next aufrufen und dann auf die Ausgabe warten, wie in Listing 17-21.

Dateiname: src/main.rs

fn main() {
    trpl::block_on(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("Der Wert war: {value}");
        }
    });
}

Listing 17-21: Erstellen eines Streams aus einem Iterator und Ausgeben seiner Werte

Wir beginnen mit einem Array von Zahlen, das wir in einen Iterator umwandeln und dann map aufrufen, um alle Werte zu verdoppeln. Dann wandeln wir den Iterator mit der Funktion trpl::stream_from_iter in einen Stream um. Schließlich durchlaufen wir mit der while let-Schleife die Elemente im Stream.

Leider lässt sich der Code nicht kompilieren, sondern wir bekommen die Fehlermeldung, dass keine Methode next verfügbar ist:

error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
  --> src/main.rs:10:40
   |
10 |         while let Some(value) = stream.next().await {
   |                                        ^^^^
   |
   = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
   |
1  + use crate::trpl::StreamExt;
   |
1  + use futures_util::stream::stream::StreamExt;
   |
1  + use std::iter::Iterator;
   |
1  + use std::str::pattern::Searcher;
   |
help: there is a method `try_next` with a similar name
   |
10 |         while let Some(value) = stream.try_next().await {
   |                                        ~~~~~~~~

Wie diese Ausgabe erklärt, liegt der Grund für den Compilerfehler darin, dass wir das richtige Trait im Gültigkeitsbereich benötigen, um die Methode next verwenden zu können. In Anbetracht der bisherigen Diskussion könnte man erwarten, dass es sich um das Trait Stream handelt, aber ist das Trait StreamExt. Ext steht hier für „extension“ (Erweiterung): Dies ist eine gängige Vorgehensweise in der Rust-Gemeinschaft, um ein Trait mit einem anderen zu erweitern.

Das Trait Stream definiert eine Low-Level-Schnittstelle, die die Traits Iterator und Future effektiv kombiniert. StreamExt bietet eine Reihe von APIs auf höherer Ebene, die auf Stream basieren, darunter die Methode next sowie andere Hilfsmethoden, die denen des Traits Iterator ähneln. Stream und StreamExt sind noch nicht Teil der Standardbibliothek von Rust, aber die meisten Crates des Ökosystems verwenden ähnliche Definitionen.

Um den Compilerfehler zu beheben, fügen wir eine use-Anweisung für trpl::StreamExt hinzu, wie in Listing 17-22.

Dateiname: src/main.rs

use trpl::StreamExt;

fn main() {
    trpl::block_on(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        // --abschneiden--
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("Der Wert war: {value}");
        }
    });
}

Listing 17-22: Erfolgreiche Verwendung eines Iterators als Grundlage für einen Stream

Mit all diesen Teilen zusammen funktioniert der Code so, wie wir es wollen! Außerdem können wir jetzt, da wir StreamExt im Gültigkeitsbereich haben, alle seine Hilfsmethoden verwenden, genau wie bei Iteratoren.