Einführung in Apache Kafka

Motivation

Apache Kafka ist ein persistenter, verteilter Message-Broker, Streaming- und Datenintegrationsplattform. Er wird oft als zentraler Datenhub und Streaming-Komponente in einer Fast Data Plattform eingesetzt.

Überblick und Terminologie

Kafka sammelt Nachrichten und persistiert sie auf Festplatte. Nachrichten sind Daten, die veröffentlicht und empfangen werden (publish-subscribe).

Kafka wird über mehrere Nodes, genannt Broker (engl. für Makler, Vermittler), verteilt um zu skalieren und für Ausfallsicherheit zu sorgen. Koordiniert wird ein Kafka-Cluster über Apache ZooKeeper.

Nachrichten werden in benannten Containern namens Topics zusammengefasst. Neue Nachrichten werden an das Ende eines Topics angehängt und ihnen wird eine hochgezählte Nummer zugewiesen. Diese Nummer nennt sich Offset. Kafka wird daher auch als Log im Sinne einer fortlaufenden Sequenz von Nachrichten bezeichnet. Ein Topic wird auf mehrere Broker repliziert.

Ein Topic wird in Partitionen unterteilt. Die Anzahl der Partitionen wird bei Topicerstellung festgelegt, und sie bestimmt, wie das Topic skaliert. Die Nachrichten eines Topics werden auf die Partitionen verteilt. Der Offset gilt pro Partition.

Producer veröffentlichen Nachrichten in Topics. Consumer abonnieren Topics und lesen die eintreffenden Nachrichten im Stream.

Persistenz

Kafka speichert jede empfangene Nachricht auf Festplatte in Segmenten. Ein Segment ist eine Datei, die eine bestimmte, konfigurierbare Menge an Nachrichten enthält, bevor ein neues Segment aufgemacht wird.

Kafka verfügt trotz Persistenz über eine sehr hohe Performance. Dies wird unter anderem dadurch erreicht, dass Nachrichten ans Ende der immer offenen aktiven Segmentdatei geschrieben werden und dass über Partitionen skaliert wird. Zudem gehen Writes in den Linux Page Cache, Reads kommen über ein Netzwerk-Socket aus dem Page Cache. Beim Schreiben werden anders als bei einer Datenbank keine benutzerdefinierten Indizes aktualisiert, lediglich der Offset wird hochgezählt und das Mapping von Offset zu Nachricht aktualisiert.

Grundsätzlich sind SSDs empfohlen, aber auch drehende Festplatten sind durch diese Art des Zugriffs sehr performant.

Topics

Übersicht

Ein Topic ist eine benannte Sammlung von Nachrichten. Kafka gibt dabei nicht vor, welche Daten in einem Topic landen, für Kafka selbst handelt es sich es nur um einen Container, der einen Namen und bestimmte Konfigurationsparameter besitzt. In ein Topic wird immer seriell ans Ende geschrieben (append-only).

Ein Topic wird explizit mit einem Replikationsfaktor und einer Anzahl an Partitionen erstellt. Viele Konfigurationsparameter in Kafka sind Topic-spezifisch oder können auf Topicebene überschrieben werden.

Partitionen

Jedes Topic wird in Untereinheiten aufgeteilt, also partitioniert. Partitionen sind der elementare Mechanismus, über den sowohl Skalierung als auch Replikation funktionieren. Wenn in ein Topic geschrieben oder aus einem Topic gelesen wird, so bezieht sich das immer auf eine Partition. Auch der Offset gilt pro Partition.

Jede Partition ist über ihren Offset sortiert. Wenn man eine Nachricht in ein Topic schreibt, hat man die Möglichkeit, einen Key anzugeben. Über den Hash dieses Keys wird sichergestellt, dass alle Nachrichten mit dem gleichen Key in der gleichen Partition landen. Innerhalb einer Partition wird das Einhalten der Reihenfolge der eingehenden Nachrichten garantiert.

Segmente

Letztlich entspricht eine Partition mehreren Segmentdateien auf der Festplatte. Neue Nachrichten werden in das aktive Segment gespeichert. Wenn das Segment das eingestellte Größen- oder Zeitlimit erreicht, wird es geschlossen. Dann wird ein neues Segment erzeugt, das nun das aktive Segment ist.

Jede Segmentdatei ist nach dem Offset der ersten enthaltenen Nachricht benannt und endet auf .log. Dazu gibt es eine passende Indexdatei, die die Offsets auf die Position der Nachrichten in der Segmentdatei mappt.

Auf einem Broker, der vom Topic “Test” die Partitionen 0 und 2 mit 100 Nachrichten pro Segment beherbergt, würde folgende Ordnerstruktur vorliegen.

|- /tmp/kafka-logs/
|-- Test-0/
|---- 00000000000000000000.index
|---- 00000000000000000000.log
|---- 00000000000000000101.index
|---- 00000000000000000101.log
|-- Test-2/
|---- 00000000000000000000.index
|---- 00000000000000000000.log

Eine hohe Zahl an Partitionen bedeutet also zum einen, dass mehr Parallelität beim Schreiben und Lesen möglich ist, aber auch, dass mehr offene File-Handles aktiv und mehr Threads belegt sind. Bei wenigen Partitionen kann die Parallelität zu gering sein.

Sollen nur wenige Nachrichten verarbeitet werden, reicht auch eine niedrige Zahl an Partitionen. Man startet üblicherweise mit einem kleinen Wert und passt dann nach und nach an. Partitionen zu erhöhen ist unkompliziert, Partitionen zu verringern ist nicht möglich, da das einen Datenverlust bedeuten würde.

Log-Erhalt

Für jedes Topics kann eine Beschränkung in Form eines Time-to-Live (TTL), oder umgekehrt Erhalt (engl. Retention) in Kafkas Sprachgebrauch, nach Zeit (Parameter log.retention.hours, log.retention.minutes, log.retention.ms), nach Größe (Parameter log.retention.bytes), oder beidem eingestellt werden. Dabei ist vor allem der Speicherplatz die limitierende Größe, d.h. Topics können durchaus eine TTL von einem Jahr haben. Ist eine Retention gesetzt, so entfernt Kafka die geschlossenen Segmente der Partitionen, in denen alle Nachrichten das Höchstalter überschritten haben. Bei einer Größenbeschränkung werden geschlossene Segmente entfernt, sobald die Beschänkung in Bytes pro Partition in Summe überschritten wird. Das aktive Segment bleibt in beiden Fällen unberührt.

Daneben gibt es noch Compacted Topics, die pro Key nur die neueste Nachricht vorhalten. Alte Nachrichten mit dem gleichen Key werden in regelmäßigen Abständen entfernt. Eine Nachricht ohne Inhalt (=null) löscht alle Nachrichten mit dem Key.

Replikation, Leader und Replikas

Zwei Broker mit einem Topic von vier Partitionen

Wie erwähnt ist Kafka auf mehrere Broker verteilt und repliziert Topics zur Ausfallsicherheit. Auch hier sind die Partitionen das Mittel um die Replikation unkompliziert zu ermöglichen. Jede Node ist dabei für eine Partition entweder Leader oder Replika.

Der Replikationsfaktor eines Topics besagt, auf wieviele Broker jede Partition eines Topics repliziert wird. Ein Faktor von 2 würde beispielsweise bedeuten, dass das Topic neben dem Leader auf einer weiteren Replika gespeichert würde. Eine Partition mit dem Faktor r existiert daher r-mal im Cluster.

Nicht das Topic sondern jede einzelne Partition hat eine Leaderpartition und r-1 Followers. Die Leaderpartition ist schlicht die Partition auf dem zugewiesenen Leader. Welcher Broker Leader für welche Partition ist, wird beim Erstellen des Topics automatisch per Round Robin gewählt, kann über Partitions-Maps aber auch manuell geschehen. Das ist sogar nötig, wenn die Anzahl der Partitionen in einem Topic nachträglich geändert werden soll.

Ein heiteres Durcheinander:
Geschrieben wird nur auf die Leader, dann auf die Replika verteilt

Eine besondere Rolle kommt den In-Sync Replica (ISR) zu. Sie beschreiben die Replika, die auf dem Stand des Leaders sind. Stürzt ein Leader ab, wird ein neuer aus dem Pool der ISRs gewählt.

Damit können r-1 ISR ausfallen, ohne dass Datenverlust stattfindet.

Nachrichten

Eine Nachricht besteht aus einer Sequenz von Bytes. Einer Nachricht kann auch ein Schlüssel zugeordnet sein, ebenfalls aus einer Bytesequenz bestehend. Daneben liegen zur Nachricht Metadaten wie der Offset, eine Prüfsumme, Nachrichtengröße, Kompression, und Timestamp vor. Zudem kann man auch eigene Header hinzufügen.

Der Inhalt der Nachricht und des Keys, also auch deren Format, liegen damit komplett in der Verantwortung des Nutzers. Die Kafka Client API bietet nur De-/Serializer für Strings und Byte Arrays an. Ein eigener De-/Serializer muss letztlich immer einen Byte-Array lesen bzw. erzeugen, damit Kafka die Daten verarbeiten kann.

Nachrichten bzw. Batches von Nachrichten können komprimiert sein. Dabei ist Kafka in der Lage, die komprimierten Daten direkt zu speichern ohne sie vorher zu entpacken. Das Kompressionsformat wird in den Metadaten der Nachrichten hinterlegt. Kompression stellt einen der größten Performance-Zugewinne im Einsatz von Kafka dar.

Producer

Übersicht

Ein Producer schreibt Nachrichten in Topics.

Beim Verschicken der Nachricht kann der Key gesetzt werden, über den Kafka per Hash die Partition ermittelt, in die der Producer die Nachricht schreibt. Ist kein Key vorhanden, wird die Partition zufällig ausgewählt. Zu Beginn verbindet sich der Producer mit irgend einem Broker aus dem Cluster, der dann den Leader der Partition vermittelt, an den der Producer die Nachricht überträgt.

Ein Kafka-Producer arbeitet ansynchron und schreibt seine Nachrichten in Batches. Die Batchgröße oder auch eine maximale Wartezeit lassen sich konfigurieren um so für Latenz oder Durchsatz zu optimieren.

Durability

Ein wichtiger Konfigurationsparameter ist acks. Er steuert die Zuverlässigkeit und den Durchsatz. Steht er auf 0, wird im Fire-and-Forget-Modus gesendet. Es gibt keine Garantien, dass eine Nachricht geschrieben wurde, aber der Durchsatz ist maximal. Dieser Modus ist z.B. für Unit Tests relevant, die möglichst schnell ausgeführt werden sollen. Steht acks auf 1, reicht die Bestätigung des Leaders. Jede andere Zahl n benötigt dann n Bestätigungen von Leader + n-1 Replikas. all schließlich bedeutet, dass die konfigurierte Mindestanzahl von ISRs (Broker-Konfigurationsparameter min.insync.replicas) ihre Bestätigung für den Write gegeben haben müssen.

Insgesamt gilt, je mehr Bestätigungen, desto sicherer ist der Write, aber auch umso geringer die Performance. Können die Anforderungen von acks nicht erfüllt werden, wird eine Exception geworfen.

Kompression

Kompression (compression.type) sollte immer aktiviert sein. Per Default wird meistens Snappy benutzt, je nach Umgebung können andere Algorithmen wie LZ4 eine bessere Performance bieten. Topics selbst können auch komprimiert sein. Wählt man für die Kompression des Topics, also in der Broker-Konfiguration, den Wert producer, so wird nicht nur der gleiche Algorithmus wie auf dem Producer genutzt, sondern Kafka spart CPU-Zeit, indem die komprimierten Nachrichten nicht erst entkomprimiert werden müssen sondern per zero-copy direkt gespeichert werden.

Consumer

Übersicht

Ein Consumer liest Nachrichten aus einem oder mehreren Topics.

Ein Kafka-Consumer wird nicht per Push benachrichtigt, sondern pollt nach neuen Nachrichten und bestimmt damit selbst, mit welcher Geschwindigkeit neue Daten an ihn ausgeliefert werden.

Der Key einer Nachricht ist auch bei der Verarbeitung von Daten relevant. Wenn die Daten z.B. nach User ID partitioniert sind, so weiß ein Consumer, dass er alle Daten eines Nutzers erhält und sie nicht noch in anderen Partitionen liegen, die vielleicht einem anderen Consumer zugewiesen sind.

Offsets

Die aktuelle Position eines Consumers im Log wird über den Nachrichten-Offset verwaltet. Über den Offset wird also gesteuert, welche Nachrichten bereits verarbeitet wurden. Damit Nachrichten beim Absturz oder Neustart eines Consumers nicht doppelt verarbeitet werden, muss der zuletzt verarbeitete Offset gespeichert werden um dann an die richtige Stelle im Log zu springen.

Hinkt der Consumer mit dem Einlesen der Nachrichten hinterher, ist also der aktuelle Offset des Consumers kleiner als der höchste Offset in in der Partition, spricht man von Consumer Lag. Nachrichten werden also schneller produziert als dass sie konsumiert werden.

Der Offset kann von Kafka automatisch im Topic _offsets gespeichert werden. Das ist insbesondere praktisch, da Kafka sich dann auch selbst darum kümmert, bei einem Neustart des Consumers zum letzten Offset zu springen. Zudem erlaubt diese Methode transaktionales Schreiben von einem Topic in ein anderes.

Alternativ kann man das Offset-Management auch komplett selbst übernehmen. Es kann beispielsweise praktisch sein, den Offset in eine Datenbank zu speichern, die von Monitoring-Tools abgefragt werden kann. Außerdem kann man den Offset hier direkt zurücksetzen, falls eine Verarbeitung fehlgeschlagen und erneut durchgeführt werden soll (Replay). Zu guter letzt kann es dabei helfen, die wiederholte Verarbeitung von Nachrichten zu vermeiden, wenn man den Offset zusammen mit den verarbeiteten Daten speichert.

Gruppen

Ein Topic wird von einer Consumergruppe ausgelesen;
dem zweiten Consumer sind zwei Partitionen zugewiesen und er hat noch nicht alle Nachrichten gelesen (Consumer Lag)

Consumer werden in Gruppen organisiert. Die Zugehörigkeit zu einer Gruppe wird über den Konfigurationsparameter group.id gesteuert. Wird ein Consumer gestartet, der Teil einer Gruppe ist, werden die bereits aktiven Consumer in dieser neu ausbalanciert (engl. rebalance). Das Gleiche passiert, wenn ein Consumer abstürzt. Bei Kafka bekommen nicht alle Consumer einer Gruppe alle Daten, sondern die Daten werden auf die Gruppenmitglieder aufgeteilt. Das wird dadurch erreicht, dass jede Partition genau einem Consumer in der Gruppe zugewiesen wird. Ein Consumer kann aus einer oder mehreren Partitionen lesen.

Somit ist die Anzahl der Partitionen auch das obere Parallelitätslimit. Die Anzahl der Partitionen ist also auch für die Verarbeitung essenziell. Soll mit hoher Parallelität gelesen werden, müssen auch entsprechend viele Partitionen vorhanden sein. Wenn jede Partition einem Consumer zugewiesen ist, würde ein weiterer Consumer schlicht keine Daten erhalten. Eine sehr hohe Anzahl an Partitionen bei wenigen Consumern kann insbesondere auf der JVM den Speicherdruck erhöhen, zum einen weil ein Kafka Consumer pro Partition einen Thread startet, zum anderen, weil ein Consumer, der Daten nicht Real-time verarbeitet, Batches an Nachrichten aus mehreren Partitionen parallel lädt.

Möchte man die Daten mehrfach in unterschiedlichen Szenarien verarbeiten, trennt man die Verarbeitung über eigene Gruppen. So kann z.B. die eine Consumergruppe den Stream sofort in Real-time verarbeiten, während eine andere nur einmal nachts gestartet wird und im Batch-Betrieb arbeitet.

Ökosystem

Kafka selbst besteht nur aus dem Kern von persistenten Nachrichten und APIs für Producer und Consumer. Um Kafka herum gibt es noch einige Tools, mit denen sich insbesondere Monitoring, Cross-Data-Center-Replikation, und das Befüllen und Verarbeiten von Streams erledigen lassen.

Mitgelieferte Tools

Mirror Maker ist Producer und Consumer und spiegelt Topics von einem Cluster in einen anderen.

Kafka Connect ist ein ETL-Tool, das Daten in und aus Kafka lädt. Ein häufiger Anwendungsfall von Connect ist das Anbinden von relationalen Datenquellen, deren Inhalt komplett oder teilweise in Kafka Topics geladen wird. Dafür werden Konnektoren für JDBC oder den Change Data Capture (CDC)-Mechanismus vieler Datenbanken bereitgestellt.

Kafka Streams ist ein Stream Processing Framework, das nur mit Kafka funktioniert. Es wird als Bibliothek in JVM-Anwendungen eingebunden und ist insbesondere für event-basierte Microservices geeignet. Ein besonderes Feature sind das Vorhalten von lokalem State, z.B. eine Tabelle, die aus einem Compacted Topic aufgebaut wird.

KSQL erlaubt es per SQL Daten aus Topics abzufragen und zu transformieren. Da in die Topics permanent neue Nachrichten laufen, sind auch die Abfragen immer gestreamt. KSQL läuft in einem eigenen Service auf dem Kafka Cluster und eignet sich besonders für Verarbeitungen, die gut mit SQL formuliert werden können.

Separate Tools

Kafka Manager ist ein externes Web-Tool, mit dem man einen Kafka Cluster verwalten kann.

Trifecta ist ein externes Web-Tool, mit dem man die Daten in Topics anschauen kann.

Use Cases

Mit den beschriebenen Eigenschaften, Komponenten, und dem umgebenden Ökosystem können folgende Anwendungsfälle mit Kafka besonders gut implementiert werden.

Messaging (Publish-Subscribe) - Eine Messaging-Plattform entkoppelt zwei Systeme voneinander und erlaubt es, Nachrichten asynchron zuzustellen, d.h. der Empfänger kann sie später abrufen und muss nicht sofort reagieren. Das ist eine der Kernfunktionalitäten von Kafka und wird über Topics und die Trennung in Producer und Consumer erreicht.

Stream Processing, Complex Event Processing (CEP) - Kafka ist eine Streaming Plattform und bietet niedrige Latenzen um kontinuierlich Nachrichten ohne Versatz zu verarbeiten. So kann auf auftretende Ereignisse sofort reagiert werden.

Integration - Kafka kann genutzt werden um die Daten verschiedenster Quellsysteme des Unternehmens zentral zu sammeln und weiterzuverarbeiten. Ein anderer Begriff dafür wäre ETL (Extract-Transform-Load) aus dem Business Intelligence-Umfeld. Kafka ermöglicht ETL im Stream, indem Kafka Connect den E&L-Teil, und Kafka Streams sowie KSQL Transform-Teil übernehmen.

Tracking, Metriken erfassen, Log-Aggregierung - Hier geht es um das Erfassen vieler Daten in unterschiedlichen Formaten und Ausprägungen. Kafka erlaubt einen hohen Durchsatz bei vielen Nachrichten, nimmt jede Art von Daten an und ist Format-agnostisch.

Event Sourcing - Event Sourcing bezeichnet das Abbilden des aktuellen Application State aus Einzel-Events. Die Events können in Kafka als Nachrichten gespeichert werden. Beim Anwendungsneustart können die Nachrichten neu eingelesen werden (Replay). Aus dem Topic mit Einzel-Event kann ein weiteres Topic mit Snapshots abgeleitet werden.

Alternativen

Neben vielen anderen Message Bussen gibt es inbesondere zwei Systeme, die ähnlich zu Kafka sind.

Kinesis ist als Teil von Amazon Web Services (AWS) eine Cloud-basierte Message Queue und Streaming Plattform. Kinesis bietet sich an, wenn man sich nicht um die Infrastruktur kümmern möchte und andere, komplementäre AWS-Dienste wie Lambda oder Spark on EMR benutzt.

Apache Pulsar ist etwas jünger als Kafka und noch Teil des Apache Incubators. Pulsar bietet dafür ein paar Features, die Kafka (noch) fehlen, unter anderem die Aufteilung von Reads und Writes, Geo-Replication, und allgemein mehr Features direkt im Broker.

Kafka in der Fast Data Plattform

Wenn Ihnen diese Einführung in Apache Kafka gefallen hat, könnte Sie auch mein Buch Fast Data in der Praxis interessieren. Es enthält diesen Post in einer ausführlicheren Fassung, und baut darauf im Kapitel Kafka in der Fast Data Plattform auf, das konkret auf den Einsatz von Kafka in der Fast Data-Praxis eingeht. Es wird zudem auch Kapitel zum Thema Performance, Topic-Versionierung, Migrationen, Replay, und mehr geben.

Diesen Post teilen

RSS-Feed

Neue Posts direkt im Newsreader.

RSS-Feed abonnieren

Newsletter

Neue Posts sowie Neuigkeiten rund um Big Data, Spark, und Scala. Maximal eine E-Mail im Monat.