Einführung in Apache Spark

Was ist Apache Spark?

Apache Spark ist ein Framework für clustergestützte Berechnungen, das dem klassischen Hadoop Map/Reduce Konkurrenz bereitet, indem das im Cluster vorhandene RAM für schnellere Ausführung von Jobs genutzt wird. Daneben bietet Spark auch die Möglichkeit, die Daten per SQL anzusteuern, mittels Streaming in (Nah-)Echtzeit zu verarbeiten, und liefert eine eigene Graphen-Datenbank und eine Machine Learning-Bibliothek mit. Spark hat in der letzten Zeit viel Aufmerksamkeit bekommen und wird von immer mehr Unternehmen genutzt und unterstützt.

Spark API

Sparks Programmiermodell basiert auf Resilient Distributed Datasets (RDD), einer Collection-Klasse, die im Cluster verteilt (distributed) operiert. Sie ist resilient, d.h. fehlertolerant, indem fehlgeschlagene Berechnungen auf einem anderen Knoten nachgeholt werden können. Bereits berechnete Zwischenergebnisse können gespeichert werden und gehen damit nicht verloren. Ein laufender Job kann damit teilweise fehlschlagen und ohne kompletten Neustart doch noch zu Ende gebracht werden.

Die API orierentiert sich dabei lose an den Scala Collections, was nicht weiter verwundert, da Spark in Scala geschrieben ist. Neben der Scala-API bietet Spark aber auch eine Java- und eine Python-API, sowie eine R-Anbindung1.

RDDs folgen dem Modell der Collection Pipeline. Beginnend mit dem Input führt man eine Sequenz von Transformationen auf den RDDs aus, z.B. filtert man nicht-relevante Daten heraus, wendet Funktionen auf jedes Element an und aggregiert dann. Das Ergebnis wird dann im Dateisystem oder einer Datenbank gespeichert und kann dort von anderen Jobs aufgegriffen oder von Webanwendungen ausgelesen und in Dashboards angezeigt werden.

Spark Cluster

Die Spark-Infrastruktur funktioniert nach dem klassischen Master-Worker-Prinzip - eine Spark App wird zum Master geschickt, wo sie als eine eigene JVM-Instanz gestartet wird und als Driver fungiert. Der Driver selbst führt keine Berechnungen aus sondern koordiniert lediglich die Ausführung des Jobs. Dazu leitet er zunächst aus den RDD-Operationen einen gerichteten Graph ab, den er in Stages aufteilt. Die Stages wiederum werden in einzelne Tasks so zerlegt, dass sie parallel abgearbeitet werden können. Auf den Workern werden dann Executor (wiederum separate JVM-Instanzen) gestartet, an die die Tasks aufgeteilt werden. Auch innerhalb eines Executors werden mehrere Tasks ausgeführt um alle zugewiesenen Kerne auszulasten.

Jeder Task hat einen Teil-Input und generiert einen Teil-Output, den sogenannten Shuffle. Shuffles können wiederum Input für folgende Tasks werden. Tasks werden so verteilt, dass voneinander abhängige Tasks möglichst auf den gleichen Workern laufen, um Netzwerktransfers zu vermeiden (auch Datenlokalität oder Lokalitätslevel genannt). Hier kommt zum Tragen, dass die Tasks in Stages organisiert werden - Stages sind dadurch gekennzeichnet, dass ein Netzwerktransfer statt findet. Innerhalb einer Stage kann ein Task seinen Output direkt an den nächsten Task weitergeben - und wird letztlich zu einem Task zusammengefasst.

Spark kann eigenständig im Stand-Alone-Modus fungieren. Dies ist der einfachste Modus, der es lediglich erlaubt pro Job CPU-Kerne und Speicher zuzuteilen. Sollen die Cluster-Ressourcen mit anderen Komponenten geteilt werden, z.B. Map/Reduce-Jobs, unterstützt Spark auch YARN oder Mesos2.

Besonders hervorzuheben ist die enge Integration mit der Hadoop-Infrastruktur. Spark arbeitet nativ mit dem Hadoop Dateisystem HDFS und allen sein In- und Output-Formaten zusammen. Laufen die Spark-Worker auf den gleichen Knoten wie HDFS DataNodes, kann schon Datenlokalität beim Lesen der Daten gewährleistet werden.

Sparks Besonderheit ist das gezielte Caching von Zwischenergebnissen - entweder im Speicher, oder auch auf Festplatte mit verschiedenen Replikationsgraden. Reicht der Speicher nicht aus, cached Spark nur teilweise, und der Rest wird entweder auf Platte ausgelagert oder bei Bedarf neu berechnet. Das Caching ist besonders dann interessant, wenn berechnete Ergebnisse mehrfach verwendet werden, wie beispielsweise iterative Algorithmen im Machine Learning.

Spark interaktiv

Nachdem Spark auf der Entwicklungsmaschine installiert ist3, stehen die beiden Command Line Tools spark-shell und spark-submit zur Verfügung.

Die spark-shell startet eine Scala-Konsole, in der man interaktiv Spark-Befehle ausführen kann. Möchte man zum Experimentieren keinen Cluster aufsetzen, kann man Spark im Localmode starten. local startet eine einzelne Spark-Instanz, local[n] startet es mit n Cores, local[*] mit allen verfügbaren Kernen.

spark-shell --master local[*]

Automatisch verfügbar ist die Variable sc vom Typ SparkContext - dies ist die Verbindung zur laufenden Spark-App und wird vom Driver zur Verfügung gestellt. Über den Context können wir RDDs erstellen, entweder direkt über parallelize oder indem wir Daten über textFile() einlesen.

// Erstellt ein neues RDD vom Typ Int
scala> val numbers = sc.parallelize(Seq(1, 2, 3, 4, 5))
numbers: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:12

// Anzahl der Elemente, wird von Workern berechnet aber an den Driver zurückgegeben, der es dann anzeigt
scala> numbers.count()
INFO org.apache.spark.SparkContext: Starting job: count at <console>:15
...
res0: Long = 5

// Nur die geraden Zahlen behalten, diese mit 2 multiplizieren und summieren
numbers.filter(_ % 2 == 0).map(_ * 2).reduce(_ + _)
res1: Int = 12

Jeder Job, so auch die spark-shell, haben ein eigenes UI. Es wird vom Driver gestartet und läuft per default auf Port 4040 (weitere Jobs inkrementieren dann den Port um 1). Hier lassen sich die Stages und ihre Metriken (Dauer, Zeit im Garbage Collector, Datenlokalität, …) nachvollziehen.

Code-Beispiel

Die spark-shell dient hauptsächlich dem explorativen Arbeiten. Wenn wir Jobs von einem Spark-Cluster ausführen lassen möchten, entweder ad-hoc oder regelmäßig, kann der Job per spark-submit gestartet werden. Alternativ kann auch innerhalb einer Anwendung ein SparkContext gestartet werden.

Ein Anwendungsfall ist das Einlesen und Auswerten von Logfiles. Z.B. Webserverlogs im Common Log Format:

127.0.0.1 - - [10/Mar/2015:13:55:36 -0100] "GET /index.html HTTP/1.1" 200 5679

package example
import org.apache.spark.SparkContext

// Unsere Datenklasse
case class LogEntry(ip: String, date: String, method: String, route: String, status: Int, bytes: Long)

object LogEntry {
  def parseLogEntry(logEntry: String): LogEntry = ... // eine Logzeile in ein LogEntry-Objekt parsen
}

object CountLogErrors extends App {
  val sc = new SparkContext()

  val errors = sc.textFile("/var/log/webserver.log")
    .map(LogEntry.parseLogEntry _)
    .filter(_.status >= 500)
    .cache() // Inhalt für weitere Aufrufe cachen

  val totalErrorCount = errors.count()
  val errorCountByIp = errors.map(entry => (entry.ip, entry)).countByKey()
  val dataSpentOnErrors = errors.map(entry => entry.bytes).reduce(_ + _)
  
  println(s"Total Errors: $totalErrorCount")
  println(s"Errors By Ip: $errorCountByIp")
  println(s"Data Spent On Errors: $dataSpentOnErrors")
}

Das muss man nun als JAR verpacken4 und kann es dann mit spark-submit starten:

spark-submit --master local[*] --class example.CountLogErrors <JarName>.jar

Fazit

Im Vergleich zu Hadoop ist Spark noch relativ jung, daher muss man mit einigen Ecken und Kanten rechnen. Es sich in der Praxis jedoch bereits vielfach bewährt, und ermöglicht durch die schnelle Ausführung von Jobs und das Caching von Daten neue Use Cases im Bereich Big oder Fast Data. Und zuletzt bietet es eine einheitliche API für Tools, die sonst im Hadoop-Umfeld separat betrieben und bedient werden müssten.

  1. Wodurch einmalig die Möglichkeit besteht, R-Code parallelisiert auszuführen.

  2. Tatsächlich wurde Spark als erster Use Case für Mesos entwickelt.

  3. Auf dem Mac mit brew install apache-spark. Ansonsten gibt es diverse Docker Compose Files auf GitHub, mit denen man lokal ein Cluster starten kann.

  4. Sowohl für Maven als auch sbt gibt es entsprechende Assembly-Plugins.

Diesen Post teilen

RSS-Feed

Neue Posts direkt im Newsreader.

RSS-Feed abonnieren

Newsletter

Neue Posts sowie Neuigkeiten rund um Big Data, Spark, und Scala. Maximal eine E-Mail im Monat.

Kommentare