31. Aug. 2021

Stream Processing und Analytics

Für die Kommunikation von Systemen im IoT-Umfeld in Echtzeit spielt die echzeitfähige Verarbeitung und Analyse der daten eine elementare Rolle. Ein Einblick.
Software Engineer

Autor:in

Henri Johannes Rößler

Ein Bildschirm auf dem ein Graf abgebildet ist.

Erfassen, Analysieren und Reagieren – in Echtzeit

Erfassen, Analysieren und Reagieren – in Echtzeit

Jedes Ereignis, jede Interaktion, jede Messung kann heutzutage im IoT-Umfeld in Echtzeit an andere Systeme kommuniziert werden. Das Potenzial dieser Datenströme ist groß: Systeme und ihre Umgebung können in nahezu Echtzeit analysiert werden, womit schnellstmöglich und automatisiert in Prozesse eingegriffen werden kann. Die Anwendungsfelder sind grenzenlos: Autonomes Fahren, umweltsensitive Verkehrssteuerung, autonome Industrieanlagen und viele mehr. Eine wichtige Komponente spielt hierbei eine echtzeitfähige Verarbeitung und Analyse der Daten, um (im Sinne der Geschäftslogik) die richtigen Prozessentscheidungen zu treffen.

Stream vs. Batch vs. Mini-Batch Processing

Man unterscheidet im Wesentlichen zwischen der Stream-, Batch- und Mini-Batch-Verarbeitung von Daten. Alle drei Ansätze sind konzeptuell sehr verschieden und haben jeweils ihre Vor- und Nachteile.

Beim „klassischen“ Batch Processing sind die Datenquellen begrenzt („bounded“), das heißt sie haben einen definierten Anfang und ein definiertes Ende. Ein Datenstrom ist hingegen unbegrenzt („unbounded“) und besitzt somit kein oder kein bekanntes Ende. Während bei einer begrenzten Datenquelle somit alle Daten auf einmal bekannt sind, muss ein Datenstrom kontinuierlich weiterverarbeitet werden, ohne vorher wissen zu können, welche Daten in Zukunft empfangen werden.

Einen Datenstrom nach jedem Eintreffen eines neuen Datums wie eine begrenzte Datenquelle zu verarbeiten wäre höchst ineffizient, zumal der Aufwand immer weiter ansteigen würde. Daher muss die Analyse kontinuierlich erfolgen, sodass die relevanten Kennzahlen mit jedem neuen Datum aktualisiert werden. Andersherum lässt sich Batch Processing aber auch über eine Stream Processing Pipeline realisieren, weswegen einige Tools wie Apache Beam beide Verarbeitungsformen gleichermaßen unterstützen.

Eine Zwischenform von Stream und Batch Processing ist das sogenannte Mini-Batch Processing, wie zum Beispiel bei Apache Spark Streaming. Dabei werden die Daten eines Streams in kleine Mini-Batches gepuffert und dann zusammen verarbeitet. Dies kann zwar effizienter erfolgen, als jedes Datum einzeln zu verarbeiten (insbesondere bei sehr großen Datenraten), zieht aber nach sich, dass mit dem Puffern eine Latenz entsteht. Der Tradeoff liegt hier also zwischen der eingesparten Last bei großen Mini-Batches (die Größe eines Mini-Batches kann sich auch auf eine Dauer beziehen) und der dabei entstehenden Verzögerung.

20210821_Stream Processing_Stream Batch

Hürden bei der Streamverarbeitung

Ebenfalls ein wichtiger Unterschied zwischen begrenzten und unbegrenzten Datenquellen ist, dass der Datenstrom per se nicht persistiert ist. Daten, die einmal abgeschickt wurden, sind weg; ein nachträgliches Abfragen der Daten ist nicht möglich. Manche Technologien, die als unbegrenzte Datenquellen eingesetzt werden, ermöglichen jedoch eine (zeitweilige) Speicherung der Datenströme, z. B. MQTT oder Kafka. Dementsprechend muss man sich beim Stream Processing also damit auseinandersetzen, wie man mit möglicherweise entstehenden Datenlücken umgeht, falls die Verbindung zum Datenstrom abbricht, oder wie der letzte Zustand eines persistierten Datenstroms wiederhergestellt werden kann, um eben keine Datenlücken zu verursachen. Diese Fragen werden umso komplexer in verteilten und hochparallelisierten Systemen. Glücklicherweise gibt es viele Tools, die diese Aufgaben übernehmen können.

Eine weitere Hürde: Datenströme sind in der Regel Zeitreihen, auf denen man entsprechend ihres Zeitpunktes Analysen durchführen möchte. Es existiert aber keine Garantie dafür, dass die Daten zeitlich chronologisch ankommen. Zwei unterschiedlich schnelle Datenströme können dafür sorgen, dass Messungen zu einem älteren Zeitpunkt über den langsamen Strom erst dann ankommen, wenn schon jüngere Messungen aus dem schnellen Strom verarbeitet worden sind. Das ist ein in der Regel nicht vermeidbares Verhalten von Datenströmen, das insbesondere bei Zeitintervall-basierten Aggregationen bedacht werden muss.

Stateful Streaming

Um mit solchen Hürden umgehen zu können, muss die Verarbeitung der Datenströme zustandsbehaftet („stateful“) erfolgen. Insofern nicht jedes Datum alleinstehend und unabhängig von allen anderen Daten verarbeitet werden kann, ist es erforderlich, dass Informationen über mehrere Ereignisse hinweg aufrechterhalten werden. Die kontinuierliche Datenstromverarbeitung wird dadurch garantiert, dass durch ein neues Ereignis die Zustände des Streams entsprechend aktualisiert werden.

Operatoren sind beispielsweise zustandsbehaftet, wenn

  • Ereignisse über bestimmte Zeiträume aggregiert werden sollen; der Zustand beinhaltet dann das akkumulierte Aggregat
  • Muster in den Ereignissen erkannt werden sollen; der Zustand speichert dann möglicherweise eine Sequenz von früheren Ereignissen

Stateful Streaming ist ein Konzept, welches für unterschiedliche Anwendungsfälle wie Ereignis-gesteuerte Programme, ETL-Pipelines oder Datenstromanalyse eingesetzt werden kann.

Tools, die Stateful Stream Processing unterstützen, sind unter anderem Apache Flink, Apache Beam, Apache Storm und Apache Spark Streaming.

Checkpointing & Savepointing

Werden die Zustände von allen Operatoren innerhalb der Datenstromverarbeitung synchronisiert und gespeichert, spricht man von „Checkpointing“ bzw. „Savepointing“. Ein Checkpoint repräsentiert alle Zustände der Stream Processing Pipeline zu einem bestimmten Zeitpunkt und wird in der Regel periodisch gespeichert. Checkpoints existieren nur während der Laufzeit des Programms.

Im Falle des Ausfalls einer Programmkomponente kann die Datenstromverarbeitung dann mithilfe des letzten Checkpoints vollständig fortgesetzt werden. Die gesamte Pipeline wird dann neu gestartet, wobei jedem Operator sein Zustand aus dem letzten Checkpoint mitgegeben wird. Das Auslesen der Datenquelle wird dann an der Stelle fortgesetzt, an dem sich der Datenstrom zum letzten Checkpoint befand.

Das ist nur möglich, wenn die Datenquelle (z. B. ein Message Broker wie Kafka) den Datenstrom zurückspulen kann, und bei jedem Checkpoint die zuletzt gelesene Position (Offset) im jeweiligen Datenstrom gespeichert wird.

Savepoints hingegen sind dauerhaft persistierte Checkpoints und werden nicht automatisch, sondern manuell getriggert. Sie werden benötigt, wenn die Stream Processing Pipeline angehalten werden muss, z. B. um

  • ein Update einzuspielen
  • Fehler zu beheben
  • die Pipeline auf einem anderen Cluster bereitzustellen oder
  • A/B Tests mit unterschiedlichen Programmversionen durchzuführen

ohne dabei die Zustände zu verlieren.

20210821_Stream Processing_02

Timeseries Stream Processing

Die zeitliche Komponente spielt eine entscheidende Rolle bei Datenströmen, z. B. wenn man Zeitreihenanalysen durchführen möchte. Man unterscheidet dabei zwischen der „Prozesszeit“ („Processing time“) und der „Ereigniszeit“ („Event time“).

Bei der Prozesszeit entspricht der Zeitstempel aller Ereignisse der Systemzeit, also der Zeit, zu der das Ereignis verarbeitet wird. Das ist die simpelste Form, Daten mit Zeitstempeln zu versehen, da die Ereignisse nach Prozesszeit strikt nacheinander verarbeitet werden und somit keine Synchronisierung zwischen unterschiedlichen Streams und verteilten Maschinen notwendig ist. Analysen auf Zeitintervallen können sofort nach Ende des Zeitintervalls abgeschlossen werden, da alle zukünftigen Ereignisse eine spätere Prozesszeit besitzen und damit nicht mehr in frühere Intervalle fallen. Prozesszeit ermöglicht also eine maximale Performanz, respektive eine minimale Latenz.

In der Realität möchte man jedoch eher, dass Analysen auf der Ereigniszeit basieren, also dem Zeitpunkt, an dem die Ereignisse aufgetreten sind. Abhängig von Faktoren wie der Geschwindigkeit der Datenverbindung kann die Differenz zwischen Ereignis- und Prozesszeitpunkten unterschiedlich groß ausfallen. Das heißt insbesondere, dass die Daten der Streams nicht zwangsläufig nach Ereigniszeit sortiert ankommen; man spricht auch von „out-of-order events“. Möchte man z. B. Messungen über ein Zeitintervall aggregieren und dann bereitstellen, ist es praktisch unmöglich zu wissen, wann die letzte Messung angekommen sein wird, die in diesen Zeitbereich fällt. Damit lässt sich auch nicht mit Sicherheit sagen, wann das aggregierte Ergebnis korrekt ist und bereitgestellt werden kann.

20210821_Stream Processing_03

Watermarks

Mit sogenannten „Watermarks“, die nach einer zu definierenden Logik erzeugt werden müssen, kann der Fortschritt eines Datenstroms in der Ereigniszeit gemessen werden. Watermarks W(t) sind mit einer Ereigniszeit t assoziiert und unterteilen den Datenstrom unter der Annahme, dass keine weiteren Ereignisse mehr auftreten, deren Ereigniszeit t’ vor t liegt. Die Verarbeitung eines Zeitintervalls, das bis t geht, kann also dann abgeschlossen werden, wenn die Watermark W(t) erreicht wurde.

Kommen die Ereignisse in-order, also in chronologischer Ereigniszeit oder Prozesszeit an, so lassen sich die Watermarks sehr einfach setzen. Für die Zeitpunkte t, für die man eine Watermark W(t) erzeugen will (z.B. alle 5 Sekunden), wartet man auf das erste Event des Streams, dessen Ereignis- bzw. Prozesszeit t‘ >= t ist.

20210821_Stream Processing_04

Bei out-of-order Ereignissen gibt es mehrere Möglichkeiten, Watermarks zu definieren. Um eine geeignete zu wählen, sollte man in etwa abschätzen können, welche zeitlichen Latenzen im Stream zu erwarten sind. Beispielsweise könnte man Watermarks so wie bei einem in-order Stream definieren, nur dass zusätzlich ein konstantes Zeitintervall gewartet wird (hier angedeutet durch die dünnen gestrichelten Linien). Dann würde man lediglich Verzögerungen zwischen Ereigniszeit und Prozesszeit tolerieren, die dieses Zeitintervall nicht überschreiten.

20210821_Stream Processing_05

Out-of-order Ereignisse mit t’ <= t, die erst nach der Watermark W(t) ankommen, werden dann jedoch nicht mehr berücksichtigt, da das Zeitintervall bereits abgeschlossen wurde. Diese Ereignisse werden auch „late elements“ genannt. In dieser Darstellung wäre das Ereignis 3, welches erst nach W(5) eingetroffen ist, ein solches late element ( siehe rechts)

Je nachdem, wie man Watermarks definiert, entsteht ein Tradeoff zwischen der Genauigkeit von Analysen und ihrer Latenz. Auf der einen Seite möchte man möglichst wenige late elements haben, die nicht mehr in die Analysen einfließen, andererseits sollte die tolerierte Verzögerung möglichst klein gehalten werden, damit Zeitintervalle schnell abgeschlossen werden können. Um hier eine gute Entscheidung zu treffen, ist es wichtig, das zeitliche Verhalten der Datenströme gut abzuschätzen.

20210821_Stream Processing_06

Windowing

Da Streams unbegrenzt sind, müssen Aggregationen immer auf Segmenten des Datenstroms, sogenannten „Windows“, durchgeführt werden. Windows können zeitgetrieben sein, z. B. 30-Minuten-Zeitintervalle, oder datengetrieben, z. B. gleich große Segmente mit 3 Ereignissen. Man unterteilt auch in Windows, die überlappend („sliding“) oder überlappungsfrei („tumbling“) sind.

20210821_Stream Processing_07
20210821_Stream Processing_08

Partitioniert man den Datenstrom anhand von Pulks, also einer Menge von Ereignissen, die schnell hintereinander ankommen, spricht man hingegen von „session windows“. Dabei wird eine bestimmte Zeit nach Eintreffen eines Events gewartet (hier angedeutet durch die dünnen hellblauen Linien). Kommt kein neues Ereignis innerhalb dieser Zeitspanne an, so fängt ein neues session window an:

Ausblick

Daten in Echtzeit auswerten zu können, eröffnet ganz neue Anwendungsfälle, die mit klassischem Batch Processing undenkbar sind. Für operative System, die immer dynamischer werden, ist es entscheidend, so schnell wie möglich auf Änderungen in der Umgebung reagieren zu können und sich entsprechend anzupassen. Das ist nur möglich, wenn alle Echtzeit-Informationen auch sofort in Handlungen übersetzt werden. Die Echtzeitfähigkeit eines datengetriebenen Systems zu gewährleisten wird also immer wichtiger, und damit wird in Zukunft auch der Bedarf für Stream Processing steigen. Umso wichtiger ist es, die spezifischen Herausforderungen, die es bei normalem Batch Processing so nicht gibt, zu kennen und zu wissen, wie man mit ihnen umgeht.