Daten modellieren mit Apache Avro

Motivation

Der klassische Datensee mit seinem Mix aus allen möglichen Datenformaten, sei es XML, semistrukturiertem JSON, oder völlig unstrukturierten Textdateien, entwickelt sich schnell zu einer Datenmüllhalde. Wer kann nach einiger Zeit und dem Anschließen unzähliger Systeme noch den Überblick behalten, welche Daten welches Format in welcher Version haben? Jeder, der Daten aus dem See lesen möchte, muss immer wieder die gleichen Probleme lösen - welche Felder gibt es, welche davon sind optional, welche Datentypen könnten sie haben, und was ändert sich daran im Laufe der Zeit.

Abhilfe schaffen Modellierungsformate. Recht bekannt sind dabei Google Protocol Buffers und Apache Thrift. Eine dritte, in Hadoop-Umgebungen verbreitete Variante, mit der ich bisher gute Erfahrungen gemacht habe, möchte ich hier vorstellen - Apache Avro.

Apache Avro

Avro ist ein System zur Datenserialisierung. Zunächst werden Schemata entsprechend der Avro-Spezifikation modelliert. Ein Schema besteht aus Datensätzen (Records), die wiederum Felder enhalten. Jedes Feld hat einen festen Datentypen, und es wird definiert, ob das Feld optional ist und ob es einen Defaultwert hat.

Daten, die konform mit dem Avro-Schema sind, können dann mit Hilfe des Schemas serialisiert und deserialisert werden. Dazu bietet Avro eine Bibliothek für verschiedene Programmiersprachen an. Durch das Schema ist das Serialisieren sehr effizient. Daten können entweder als JSON oder binär serialisiert werden.

Mit Hilfe von diversen Plugins können aus dem Schema Klassen für die jeweilige Programmiersprache generiert werden, mit denen in der Applikation gearbeitet werden kann. Die Modellklassen können ebenfalls direkt de-/serialisiert werden. Zur dynamischen Verarbeitung kann das Schema auch mit-serialisiert und von jedem Avro-kompatiblen System gelesen werden.

Beispielsweise würden zwei Anwendungen ein User-Objekt über einen Message Broker austauschen. Die Klasse User ist per Avro-Record modelliert worden. Anwendung A nutzt Scala als Programmiersprache und generiert eine case class User. Instanzen dieser Klasse werden innerhalb der Anwendung erzeugt, mit Avro binär serialisiert und auf dem Message Broker abgelegt. Anwendung B ist in Java geschrieben und generiert aus dem gleichen Schema nun eine class User. Wenn B die binäre Nachricht vom Broker abholt, kann sie per Avro in eine Instanz von User deserialisiert und weiterverarbeitet werden.

Avro Use Case

Schema-Definition

Ein Schema kann entweder als JSON oder in Avros Interface Definition Language (IDL) definiert werden.

JSON-Schemadateien haben das Suffix .avsc und enthalten alle in dem Schema definierten Typen und in Felder in Form eines JSON-Objektes, das mindestens den Typen spezifiziert, z.B. {"type":"string", "name":"email"}.

Die IDL kann genau die gleichen Schemas erzeugen wie die JSON-basierte Syntax. Gerade bei komplexeren Schemas ist die C-ähnliche Syntax jedoch leichter zu lesen und ähnelt auch eher Proto Buffs und Thrift. Zudem kann nur die IDL andere Schema-Dateien importieren. Letztlich wird auch das IDL-Schema in ein JSON-Schema kompiliert, d.h. wenn man zur Laufzeit das Schema ausliest, liegt es immer im JSON-Format vor.

Ein IDL-Schema wird in einer Datei mit der Endung .avdl geschrieben. Eine IDL definiert als erstes immer ein Protokoll1. Im Protokoll werden dann die Records, also die Typen, definiert.

Das JSON-Schema eignet sich vor allem für einfache Records, die unabhängig von einem Protokoll definiert werden sollen. Die IDL eignet sich für komplexe Schemas, die auf verschiedene Dateien verteilt werden sollen.

Datentypen

Avro unterstützt die in der JVM-gängigen primitiven Datentypen.

  • null: kein Wert, besonders interessant um ein Feld optional zu machen
  • boolean: true / false
  • int: 32-bit Integer mit Vorzeichen
  • long: 64-bit Integer mit Vorzeichen
  • float: single precision (32-bit) IEEE 754 floating-point number
  • double: double precision (64-bit) IEEE 754 floating-point number
  • bytes: sequence of 8-bit unsigned bytes
  • string: Unicode String

Zudem gibt es noch einige Primitive, die nicht auf die JVM mappen, und folgende komplexe Typen.

Records entsprechen wie erwähnt in etwa Klassen, kennen jedoch keine Vererbung. Beim Modellieren mit Avro herrscht also das Motto Composition over Inheritance. Records enthalten vor allem Felder (Fields), die wiederum von jedem anderen Typen sein können (primitiv oder komplex). Arrays und Maps gehören ebenfalls zu den komplexen Typen, können aber nur innerhalb von Records definiert werden. Beide werden ebenfalls typisiert, wobei der Key der Map immer ein String sein muss und deshalb nicht angegeben wird.

record User {
  string email;
  long createdTs;
  array<string> loginTokens = []; // Defaultwert ist leerer Array
  map<string> customInfo;
}

Unions sind Vereinigungstypen, die aber nur von vereinzelten Programmiersprachen überhaupt unterstützt werden. In Avro werden sie daher vor allem verwendet, um per Vereinigung mit null optionale Felder zu definieren. Im Umkehrschluss sind alle anderen Felder Pflichtfelder.

record User {
  string email;
  union {null, string} name; // Optionales Feld, kann also null oder string als Wert haben
  long createdTs;
  array<string> loginTokens = [];
  map<string> customInfo;
}

Enums sind Aufzählungen von festen Werten.

enum UserType {
  Prospect, Customer, Premium, Admin
}

record User {
  string email;
  union {null, string} name;
  long createdTs;
  array<string> loginTokens = [];
  map<string> customInfo;
  UserType userType;
}

Eine vollständige Auflistung findet sich in der Spezifikation.

Serialisierung

Avro Records können als JSON serialisiert werden. Da optionale Felder als Union kodiert werden und somit der Typ immer mit angegeben wird, weicht das von Avro produzierte JSON von dem sonst üblichen Format ab.

{"email":"john@example.org", "name":{"string":"John Doe"}, ...}

Da die binäre Serialisierung zudem effizienter ist, wird JSON oft nur zum Entwickeln oder Debuggen verwendet.

Besonders einfach ist das Serialisieren von generierten Klassen, den SpecificRecords, die von Avros Basisklasse SpecificRecordBase ableiten. Diese Klassen müssen einigen Regeln folgen, unter anderem benötigen sie ein statisches Feld SCHEMA$, das das Avro-Schema als JSON enthält. Unter Java können diese mit einem von Avro mitgelieferten SpecificDatumWriter verarbeitet werden, der dann mit einem Binary Encoder in einen ByteOutputStream schreibt.

import org.apache.avro.specific._
val writer = new SpecificDatumWriter[User](User.SCHEMA$)
val stream = new ByteArrayOutputStream()
val binaryEncoder = EncoderFactory.get().binaryEncoder(stream, null)
writer.write(user, binaryEncoder)
binaryEncoder.flush()
stream.toByteArray

Unter Scala kann man bequem die Bibliothek Bijection von Twitter verwenden.

import com.twitter.bijection.avro.SpecificAvroCodecs
val codec = SpecificAvroCodecs.toBinary[User]
val binary = codec(user)
val userInverted: Try[User] = codec.invert(binary)

Die Records können zudem auch direkt von Avro mit beispielsweise Snappy komprimiert werden.

Zu guter Letzt gibt es auch noch GenericRecords, die dann verwendet werden, wenn keine generierten Klassen vorliegen und dynamisch zur Laufzeit mit den Avro-Daten umgegangen werden soll, beispielsweise wenn eingehende Daten automatisch in eine Tabellenstruktur übernommen werden.

Schema-Evolution

Was passiert, wenn das Schema erweitert wird und wir beispielsweise neue Felder hinzufügen? Je nach Anwendungsfall ist es nicht unbedingt notwendig, Schemas zu versionieren. Für diesen Fall bietet Avro eine automatische Schema-Evolution an. Wenn Schemaänderungen bestimmten Regeln folgen, können sie sowohl auf- als auch abwärtskomptaibel sein. Bei neuen Feldern muss beispielsweise ein Defaultwert gesetzt werden. Wenn es keinen sinnvollen Default gibt, muss das Feld optional mit null als Default sein.

record Address {
  string address1;
  string address2;
  string city;
  string country;
}

record User {
  string email;
  union {null, string} name;
  long createdTs;
  array<string> loginTokens = [];
  map<string> customInfo;
  UserType userType;
  
  // neue Felder, abwärtskompatibel
  long lastLoginTs = 0;
  union {null, Address} address = null;
}

Damit Avro die Schema-Evolution durchführen kann, muss das Schema, mit dem der Record geschrieben wurde, bekannt sein. Das kann man erreichen indem man entweder mit jedem Record das volle Schema mitschreibt (so generieren auch die Avro-Tools binäre Dateien), oder man sich nur die Version merkt und das Schema in einem zentralen Service, der sogenannten Schema-Registry, versioniert.

Beispiel

In der Datei UserProtocol.avdl definieren wir unsere Enums und Records.

@namespace("de.soutier.model")

protocol UserProtocol {

  enum UserType {
    Prospect, Customer, Premium, Admin
  }
  
  record Address {
    string address1;
    string {null, string} address2;
    string city;
    string country;
  }
  
  record User {
    string email;
    union {null, string} name;
    long createdTs;
    long lastLoginTs;
    array<string> loginTokens = [];
    map<string> customInfo;
    UserType userType;
    
    union {null, Address} address = null;
  }
}

Aus dieser IDL-Datei werden dann folgende Klassen erzeugt:

src
|- main
   |- avro
      |- UserProtocol.avdl
target
|- generated-classes
   |- de
      |- soutier
         |- model
            |- Address.scala
            |- User.scala
            |- UserType.java

Wir befinden uns in einem Scala-Projekt, daher werden Scala-Klassen erzeugt. Nur die Enumeration wird eine Java-Enumeration.

Fazit - Anwendung im Unternehmen

An dieser Stelle mag man sich fragen, wozu den ganzen Aufwand betreiben, nur um seine Modellklassen zu erzeugen?

Neben einer effizienten Serialisierung bietet Avro die Möglichkeit, Daten unternehmensweit zu definieren. Man kann festlegen, welches System genau welche Daten produziert. Werden diese Daten zentral gesammelt, wie beispielsweise in der Kappa-Architektur üblich, kann sich jeder, der die Daten weiterverarbeitet, darauf verlassen, dass alles im erwarteten Format ankommt und muss keine Datenbereinigung mehr betreiben. Einen Schritt weiter gedacht werden auch Daten im Frontend direkt mit Avro modelliert, so dass die komplette Strecke (End-to-End) in einem fest definierten Format verarbeitet wird.

  1. Das Scala-Plugin (sbt / Maven) generiert aus dem Protokoll einen sealed trait. Alle Records innerhalb des Protokolls leiten von dem trait ab.

Diesen Post teilen

RSS-Feed

Neue Posts direkt im Newsreader.

RSS-Feed abonnieren

Newsletter

Neue Posts sowie Neuigkeiten rund um Big Data, Spark, und Scala. Maximal eine E-Mail im Monat.

Kommentare