Alles zusammenfügen: Futures, Aufgaben und Stränge

Wie wir in Kapitel 16 gesehen haben, bieten Stränge (threads) einen Ansatz für Nebenläufigkeit (concurrency). In diesem Kapitel haben wir einen anderen Ansatz für Nebenläufigkeit gesehen: Verwenden von asynchronem Code mit Futures und Strömen (streams). Wenn du dich fragst, wann du eine Methode der anderen vorziehen solltest, lautet die Antwort: Es kommt darauf an! Und in vielen Fällen ist die Wahl nicht Stränge oder asynchroner Code, sondern eher Stränge und asynchroner Code.

Viele Betriebssysteme bieten schon seit Jahrzehnten Strang-basierte Nebenläufigkeitsmodelle an, und viele Programmiersprachen unterstützen diese Modelle daher. Diese Modelle sind jedoch nicht frei von Kompromissen. Auf vielen Betriebssystemen wird für jeden Strang ein beträchtlicher Teil an Arbeitsspeicher verbraucht und es entsteht ein gewisser Overhead beim Starten und Beenden. Stränge sind auch nur dann eine Option, wenn dein Betriebssystem und deine Hardware sie unterstützen. Im Gegensatz zu herkömmlichen Desktop- und Mobilcomputern haben einige eingebettete Systeme überhaupt kein Betriebssystem, sodass sie auch keine Stränge haben.

Das asynchrone Modell bietet eine andere – und letztlich ergänzende – Reihe von Kompromissen. Im asynchronen Modell benötigen nebenläufige Vorgänge keine eigenen Stränge. Stattdessen können sie in Aufgaben laufen, so wie wir trpl::spawn_task verwendet haben, um die Arbeit von einer synchronen Funktion im Abschnitt „Ströme“ zu starten. Eine Aufgabe ähnelt einem Strang, wird aber nicht vom Betriebssystem, sondern von einem Code auf Bibliotheksebene verwaltet: der Laufzeitumgebung.

Im vorigen Abschnitt haben wir gesehen, dass wir einen Strom erstellen können, indem wir einen asynchronen Kanal verwenden und eine asynchrone Aufgabe erzeugen, die wir von synchronem Code aus aufrufen können. Wir können genau das Gleiche mit einem Strang machen! In Codeblock 17-40 haben wir trpl::spawn_task und trpl::sleep verwendet. In Codeblock 17-41 ersetzen wir diese durch die APIs thread::spawn und thread::sleep aus der Standardbibliothek.

Dateiname: src/main.rs

extern crate trpl;

use std::{pin::pin, thread, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval #{count}"))
            .throttle(Duration::from_millis(500))
            .timeout(Duration::from_secs(10));
        let merged = messages.merge(intervals).take(20);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(item) => println!("{item}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    });
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];

        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            if let Err(send_error) = tx.send(format!("Nachricht: '{message}'")) {
                eprintln!("Kann die Nachricht '{message}' nicht senden: {send_error}");
                break;
            }
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    // Dies ist *nicht* `trpl::spawn` sondern `std::thread::spawn`!
    thread::spawn(move || {
        let mut count = 0;
        loop {
            // Ebenso ist dies *nicht* `trpl::sleep` sondern `std::thread::sleep`!
            thread::sleep(Duration::from_millis(1));
            count += 1;

            if let Err(send_error) = tx.send(count) {
                eprintln!("Konnte das Intervall {count} nicht senden: {send_error}");
                break;
            };
        }
    });

    ReceiverStream::new(rx)
}

Codeblock 17-41: Verwenden der std::thread-APIs anstelle der asynchronen trpl-APIs für die Funktion get_intervals

Wenn du diesen Code ausführst, ist die Ausgabe identisch zu der von Codeblock 17-40. Und beachte, wie wenig sich hier aus der Sicht des aufrufenden Codes ändert! Und obwohl eine unserer Funktionen eine asynchrone Aufgabe in der Laufzeitumgebung und die andere einen Betriebssystem-Strang startet, hat es keinen Effekt auf die resultierenden Ströme.

Trotz deren Ähnlichkeiten verhalten sich diese beiden Ansätze sehr unterschiedlich, obwohl wir es in diesem sehr einfachen Beispiel schwer haben, dies zu messen. Wir könnten Millionen von asynchronen Aufgaben auf jedem modernen Computer ausführen. Wenn wir das mit Strängen versuchen würden, würde uns buchstäblich der Speicher ausgehen!

Es gibt jedoch einen Grund, warum sich diese APIs so ähnlich sind. Stränge dienen als Grenze für Gruppen von synchronen Operationen; Nebenläufigkeit ist zwischen Strängen möglich. Aufgaben fungieren als Grenze für Gruppen von asynchronen Vorgängen; Nebenläufigkeit ist sowohl zwischen als auch innerhalb von Aufgaben möglich, da eine Aufgabe in ihrem Rumpf zwischen Futures wechseln kann. Schließlich sind Futures die granularste Einheit der Nebenläufigkeit in Rust, und jedes Future kann einen Baum von anderen Futures darstellen. Die Laufzeitumgebung – genauer gesagt ihr Executor – verwaltet Aufgaben, und Aufgaben verwalten Futures. In dieser Hinsicht ähneln Aufgaben leichtgewichtigen, von der Laufzeitumgebung verwalteten Strängen mit zusätzlichen Fähigkeiten, die sich daraus ergeben, dass sie von der Laufzeitumgebung und nicht vom Betriebssystem verwaltet werden.

Das bedeutet nicht, dass asynchrone Aufgaben immer besser sind als Stränge (oder umgekehrt). Nebenläufigkeit mit Strängen ist in gewisser Weise ein einfacheres Programmiermodell als Nebenläufigkeit mit async. Das kann eine Stärke und eine Schwäche sein. Stränge sind eine Art „Feuern und Vergessen“; sie haben kein natives Äquivalent zu einem Future, also laufen sie einfach bis zum Ende, ohne Unterbrechung, außer durch das Betriebssystem selbst. Das heißt, sie haben keine eingebaute Unterstützung für Nebenläufigkeit innerhalb der Aufgabe, wie es Futures haben. Stränge in Rust haben auch keine Mechanismen um die abzubrechen – ein Thema, das wir in diesem Kapitel nicht eingehend behandelt haben, das aber implizit in der Tatsache enthalten ist, dass immer dann, wenn wir ein Future beenden, sein Zustand korrekt aufgeräumt wird.

Diese Einschränkungen machen es auch schwieriger, Stränge zu kombinieren, als Futures. Es ist zum Beispiel viel schwieriger, Stränge zu verwenden, um Helfer wie timeout und throttle zu erstellen, die wir weiter oben in diesem Kapitel erstellt haben. Die Tatsache, dass Futures reichhaltigere Datenstrukturen sind, bedeutet, dass sie natürlicher zusammengesetzt werden können, wie wir gesehen haben.

Aufgaben geben uns dann zusätzliche Kontrolle über Futures, da man wählen kann, wo und wie man die Futures gruppiert. Und es stellt sich heraus, dass Stränge und Aufgaben oft sehr gut zusammenarbeiten, weil Aufgaben (zumindest in einigen Laufzeitumgebungen) zwischen Strängen verschoben werden können. Unter der Haube ist die Laufzeitumgebung, die wir verwenden – einschließlich der Funktionen spawn_blocking und spawn_task – standardmäßig mehrstängig (multithreaded)! Viele Laufzeitumgebungen verwenden einen Ansatz namens Work Stealing, um Aufgaben transparent zwischen Strängen zu verschieben, je nachdem, wie die Stränge gerade ausgelastet sind, um die Gesamtleistung des Systems zu verbessern. Dieser Ansatz erfordert eigentlich Stränge und Aufgaben, und damit Futures.

Wenn du überlegst, welche Methode du wann anwenden solltest, beachte diese Faustregeln:

  • Wenn die Arbeit sehr parallelisierbar ist, z.B. bei der Verarbeitung einer Menge von Daten, bei der jeder Teil separat verarbeitet werden kann, sind Stränge die bessere Wahl.
  • Wenn es sich um sehr nebenläufige Arbeit handelt, wie die Bearbeitung von Nachrichten aus einer Reihe von verschiedenen Quellen, die in unterschiedlichen Intervallen oder mit unterschiedlicher Geschwindigkeit eintreffen können, ist asynchroner Code die bessere Wahl.

Und wenn du Parallelität und Nebenläufigkeit benötigst, musst du dich nicht zwischen Strängen und asynchronem Code entscheiden. Du kannst beide zusammen verwenden, wobei jede der beiden die Aufgabe übernimmt, für die sie am besten geeignet ist. Codeblock 17-42 zeigt zum Beispiel ein gängiges Beispiel für dieses Zusammenspiel in echtem Rust-Code.

Dateiname: src/main.rs

extern crate trpl;

use std::{thread, time::Duration};

fn main() {
    let (tx, mut rx) = trpl::channel();

    thread::spawn(move || {
        for i in 1..11 {
            tx.send(i).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    trpl::run(async {
        while let Some(message) = rx.recv().await {
            println!("{message}");
        }
    });
}

Codeblock 17-42: Senden von Nachrichten mit blockierendem Code in einem Strang und Warten auf die Nachrichten in einem asynchronen Block

Wir beginnen mit der Erstellung eines asynchronen Kanals. Dann legen wir einen Strang an, der für die Absenderseite des Kanals zuständig ist. Innerhalb des Strangs senden wir die Zahlen 1 bis 10 und schlafen dazwischen jeweils eine Sekunde lang. Schließlich führen wir ein Future aus, das mit einem asynchronen Block erstellt wurde, der an trpl::run übergeben wurde, so wie wir es im ganzen Kapitel getan haben. In diesem Future warten wir auf diese Nachrichten, genau wie in den anderen Beispielen mit Nachrichten-Weitergabe, die wir gesehen haben.

Um zu den Szenarien zurückzukehren, mit denen wir das Kapitel eröffnet haben, könnte man sich vorstellen, dass eine Reihe von Videokodierungsaufgaben über einen dedizierten Strang ausgeführt wird, da die Videokodierung rechenintensiv ist. Die Benutzeroberfläche kann aber über einen asynchronen Kanal informiert werden, wenn diese Vorgänge ausgeführt werden. Es gibt unzählige Beispiele für diese Art von Kombinationen in der Praxis.

Zusammenfassung

Dies ist nicht das letzte Mal, dass du in diesem Buch etwas über Nebenläufigkeit lesen wirst. Das Projekt in Kapitel 21 wird die Konzepte dieses Kapitels in einer realistischeren Situation anwenden als die hier besprochenen einfacheren Beispiele und einen direkteren Vergleich anstellen, wie es aussieht, wenn man diese Art von Problemen mit Strängen und mit Aufgaben und Futures löst.

Welchen Ansatz du auch immer wählst, Rust gibt dir die Werkzeuge an die Hand, die du benötigst, um sicheren, schnellen und nebenläufigen Code zu schreiben – sei es für einen durchsatzstarken Webserver oder ein eingebettetes Betriebssystem.

Als nächstes werden wir über idiomatische Wege sprechen, Probleme zu modellieren und Lösungen zu strukturieren, wenn deine Rust-Programme größer werden. Außerdem werden wir erörtern, wie die Idiome von Rust mit denen verwandt sind, die du vielleicht aus der objektorientierten Programmierung kennst.