Streaming - Kontinuierliche Datenströme

Motivation

Man kann nicht zweimal in denselben Fluss steigen.

Heraklit1

Der aktuelle Stand in Big Data und Analytics ist es, Daten per ETL von einem System ins Nächste zu schieben und dann Batch-Verarbeitungen auf großen Datenmengen in definierten Zeitintervallen auszuführen. Dafür wurden Systeme wie MapReduce, HDFS, HBase, Spark, usw. konzipiert und lösen dieses Problem inzwischen sehr gut.

Was jedoch, wenn wir diese Verarbeitungen zeitnah ausführen möchten, idealerweise sobald die Daten eintreffen?

Die Verkaufszahlen der letzten Woche sind interessant und wichtig, jedoch handelt es sich immer nur um nachträgliche Betrachtungen. Gab es Probleme mit dem Onlineshop oder wurde ein Werbebanner nicht korrekt geschaltet, kann ich die Auswirkungen nur analysieren, aber nicht mehr beeinflussen.

Noch wichtiger ist es, einen Betrugsversuch als solchen sofort, also während er statt findet, zu erkennen und die Ware gar nicht erst zu liefern statt im Nachhinein das Inkasso einzuschalten.

Es gibt also durchaus ein wirtschaftliches Interesse den Fokus von Big Data auf Fast Data zu erweitern.

Modellierung mit Ereignissen

Dazu ist es hilfreich, die Aktivitäten in einem Unternehmen und dessen Produkte als Ereignisse (engl. Events) zu sehen.

Ein Ereignis wird dabei definiert als etwas, das zu einem bestimmten Zeitpunkt passiert ist oder ausgelöst wurde und sich nicht nachträglich ändern lässt.

Eine Bestellung als einzelnes Ereignis würde direkt erzeugt werden, wenn der Kunde die Bestellung aufgibt. Wenn die Bestellung später ihren Zustand ändert (z.B. Storno oder die Zahlung wird validiert), so wird dazu ein neues Ereignis ausgegeben. Das ursprüngliche Ereignis bleibt davon unberührt2.

Nun möchten wir den aktuellen Umsatzstand zählen. Der Tagesumsatz ist dabei kein eigenes Feld in einer Tabelle, sondern berechnet sich aus der Summe der Auftragsvolumina der aufgegebenen Bestellungen abzüglich der widerrufenen Bestellungen. Eine widerrufene Bestellung wird dabei als ein eigenständiges Ereignis modelliert.

Wenn wir alle Ereignisse vorhalten (z.B. in einer Datenbank oder Message Queue), können wir bei einer Abfrage den Umsatz aller Bestellungen addieren und dann den Umsatz der widerrufenen Bestellungen subtrahieren. Um das Ganze zu beschleunigen, legen wir Zwischenstände (Snapshots) an, und zählen erst ab dem letzten Zwischenstand.

Dieses Vorgehen nennt man Event Sourcing, d.h. die einzelnen Ereignisse bilden die Quelle für unsere Sicht auf die Daten.

Warenkorb mit Events

Events werden nach folgenden Prinzipien modelliert:

  • Jedwede Aktivität im Unternehmen wird als einzelnes, unabhängiges Ereignis abgebildet
  • Ein Ereignis ist passiert, wird also im Präteritum benannt
  • Ein Ereignis ist ein unverändliches Faktum (engl. immutable); das bedeutet Änderungen werden nur durch neue Events modelliert
  • Ein Ereignis enthält nur die Daten, die zum Zeitpunkt des Ereignisses bekannt sind
  • Ein Ereignis ist domänenbezogen; Daten aus anderen Domänen sollten nicht enhalten sein, nur referenziert werden

Zum Modellieren hilfreich ist das Event Storming-Format - ein Workshop, indem eine Domäne anhand der von ihr ausgehenden Ereignisse erforscht wird. Dabei wird alles als Ereignis modelliert, was in der Domäne Relevantes passiert (eine wurde Bestellung aufgegeben, geändert, storniert, usw.). Ein Ereignis kann dabei auch weitere Ereignisse (auch in anderen Domänen) auslösen. Davon abzugrenzen sind Befehle, dies sind meistens Aktionen, die vom Benutzer ausgeführt werden, aber wiederum Ereignisse auslösen. Eine ausführliche Beschreibung bietet dieser Blogpost.

Eine Gefahr ist, in ein Ereignis zu viele Daten zu packen, die domänenfremd sind. Beispiel: Eine Bestellung wird von einem Kunden aufgegeben. Für die Bestellung relevant sind aber neben der Kunden-ID nur seine Rechnungs- und Lieferadresse. In anderen Domänen sind andere Daten relevant, diese sollten jedoch nicht im Bestellungsereignis enthalten sein. Man grenzt also die Zuständigkeiten klar voneinander ab; im Domain Driven Design wird das Bounded Context genannt.

Ströme (Streams) von Ereignissen

Die modellierten Ereignisse werden erzeugt und emitiert sobald sie auftreten und bilden ein kontinuierlichen Strom. Ein Strom (engl. Stream) ist dabei eine zeitabhängige Sequenz von unbekannter und möglicherweise endloser Größe.

Verarbeiten von Strömen

Das Arbeiten mit Strömen steht in einem gewissen Kontrast zu den meisten bekannten Datenstrukturen, die eine feste Größe haben. Die Menge aller Mitarbeiter in einem Unternehmen ist zu einem bestimmten Zeitpunkt fixiert. Wenn wir wissen wollen, wieviele Angestellte das Unternehmen hat, bestimmen wir einfach die Anzahl und haben unsere Antwort.

Interessanter wird es jedoch, wenn wir den Strom nutzen wollen, um aktuelle Metriken zu ermitteln oder zeitnah auf Ereignisse zu reagieren.

Beispielsweise möchten wir die aktuelle Zahl von Benutzern auf einer Webseite und deren durchschnittliche Warenkorbgröße kennen. Bei einer Liste von Zahlen ist es einfach einen Durchschnitt zu ermitteln, aber wie berechnet man den Durchschnitt eines Streams? Eine übliche Technik sind bewegende Fenster (Moving oder Sliding Windows), oder Moving Averages. Wir legen dabei ein Fenster über den Strom, sammeln also eine gewisse Zeit Daten, um auf ihnen Berechnungen ausführen. Die Größe eines Fensters wird über sein Intervall bestimmt, mit dem es sich weiterbewegt. Beispielsweise könnte ein Fenster 60 Sekunden groß sein, wir sammeln also die Daten der letzten 60 Sekunden. Wir könnten dann beispielsweise alle 30 Sekunden den Durchschnittswert bilden und würden dabei nicht nur das gerade eingetroffene Ereignis betrachten, sondern alle Ereignisse der letzten 60 Sekunden.

Durchschnittliche Warenkorbgröße per Intervall

Das bedeutet, dass alle unsere Berechnungen auf Streams ebenfalls zu Streams werden. Es gibt nicht ein festes Ergebnis, sondern immer wieder ein neues Ergebnis pro Fenster.

Eine gute ausführliche Erklärung zu Moving Averages gibt dieser Blogpost von Michael Noll.

Die im Strom berechneten Aggregate (typischerweise Summen und die statistischen Kennzahlen) können nun beispielsweise als Zeitreihe in einer Datenbank gespeichert und auf einem Dashboard angezeigt werden. So bekommen wir einen aktuellen Einblick in die Geschehnisse im Unternehmen und dessen Systeme.

Frameworks zum Verarbeiten von Streams

Es gibt einige Frameworks, die sich auf das Verarbeiten von Streams spezialisiert haben. Code, der mit einem solchen Framework geschrieben wird, wird permanent ausgeführt. Je nach Modell wird dabei auf jedes einzelne Event reagiert oder mehre Events zusammengefasst. In Abhängigkeit der Anforderungen an Latenz, Speicherverbrauch und Fehlertoleranz ist dabei das eine oder andere Modell zu empfehlen.

Verarbeitung von einzelnen Ereignissen

Wenn eine niedrige Latenz benötigt wird und man auf einzelne Ereignisse sofort reagieren möchte (z.B. Fraud Detection oder Complex Event Processing), muss ein Framework gewählt werden, das jedes Event einzeln verarbeitet.

Akka basiert auf dem Versenden von Nachrichten zwischen Aktoren. Aktoren sind deutlich leichtgewichtiger als Threads und können entweder in der gleichen JVM oder remote laufen. Akka selbst ist oft zu Low-Level für Streamverarbeitung, aber Akka Streams baut auf Akka auf, implementiert den Reactive Streams-Standard und erlaubt es den Datenfluss einfacher und typsicher zu modellieren.

In Storm geschieht die Verarbeitung einzelner Events (genannt Tuple) in so genannten Bolts. Jeder Bolt kann die Daten anpassen, filtern, und an andere Bolts weitergeben. Am Ende landen sie in einem Datenstore. Storm war eines der ersten Frameworks in diesem Bereich, hat inzwischen aber etwas an Fahrt verloren, da es anspruchsvoll zu betreiben ist.

Flink ist ein relativ junges, in Deutschland entwickeltes Framework, das ähnlich wie Spark eine einheitliche API für Batch, Streaming, CEP, und Machine Learning anbietet.

Micro-Batching

Sind die Anforderungen an Latenz nicht so hoch (z.B. die Verkaufszahlen der letzten fünf Sekunden in auf einem Dashboard), kann man ein Microbatching-Framework verwenden. Dabei werden mehrere Events in kleine Batches zusammengefasst und zusammen verarbeitet. Das hat den Vorteil, dass man auf einer höheren Abstraktionsebene arbeitet, den Strom mit einer Collection-ähnlichen API manipulieren kann und Fehlertoleranz einfacher umzusetzen ist.

Spark Streaming setzt auf der bekannten Apache Spark-Engine auf. Wie bereits beschrieben ist Sparks Basisabstraktion das RDD, eine verteilte Datenstruktur. Mit Spark Streaming programmiert es sich sehr ähnlich wie mit Spark Batch. Möchte man beispielsweise die Lambda-Architektur implementieren, die u.a. dafür kritisiert wird, dass Algorithmen doppelt implementiert werden müssen, kann man Spark und Spark Streaming nutzen um einen großen Teil des Codes zu teilen. Spark Streaming setzt zur Laufzeit dann die Operationen auf RDDs um und schickt sie ganz normal an die Spark-Engine.

Spark Streaming werde ich in einem folgenden Blogpost im Detail vorstellen.

Storm Trident setzt auf Storm auf. Man definiert ebenfalls Operationen auf dem Strom und hat eingebaute Unterstützung von Sliding Windows.

Fazit

Das kontinuierliche Streamen und Verarbeiten von Events erlaubt es uns, Aktivitäten im Unternehmen besser zu verstehen, zu korrelieren und auf wichtige Geschehnisse direkt zu reagieren. Auf das Modellieren mit diesen Strömen werde ich in einem folgenden Blogpost weiter eingehen.

  1. https://de.wikipedia.org/wiki/Panta_rhei#cite_ref-5

  2. Das hilft uns nicht nur bei der zeitnahen Verarbeitung, wir bekommen auch gleich einen Audit Trail mitgeliefert.

Diesen Post teilen

RSS-Feed

Neue Posts direkt im Newsreader.

RSS-Feed abonnieren

Newsletter

Neue Posts sowie Neuigkeiten rund um Big Data, Spark, und Scala. Maximal eine E-Mail im Monat.

Kommentare