Spark Performance-Optimierung - RDDs, Shuffles, Partitionen

Motivation

Spark bietet zunächst eine sehr einfach zu benutzende API, Erfolgserlebnisse mit den ersten geschriebenen Jobs stellen sich schnell ein. Doch sobald man anfängt sowohl komplexere Jobs zu schreiben als auch größere Datenmengen zu verarbeiten, laufen die Jobs plötzlich inakzeptabel langsam. Um sie optimieren zu können hilft es, einige Konzepte von Spark genauer zu verstehen.

Im ersten Teil dieser Übersicht schauen wir uns diese Spark-Grundlagen an: RDDs, Partitionen, Abhängigkeiten, und Shuffles.

Die Basics: RDDs

Wie im Einführungsartikel zu Spark erwähnt, wird der Spark-Code, den man schreibt, in einen gerichteten Graphen überführt. Ausgehend vom Input erzeugt jede Transformation ein neues RDD, das die jeweils auf die Daten anzuwendende Funktion enthält.

// Alle Codebeispiele sind in der spark-shell nachvollziehbar

scala> val input = sc.parallelize(Seq(1,2,3,4,5))
input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val mapped = input.map(_ * 3)
mapped: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26

scala> val filtered = mapped.filter(_ % 2 == 0)
filtered: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at filter at <console>:28

Das Input-RDD entspricht der Quelle, z.B. HadoopRDD, KafkaRDD oder hier im Beispiel ParallelCollectionRDD. Alle nachfolgendenen RDDs sind Transformationen, von denen viele eine Funktion enthalten. Transformationen führen zum jeweils passenden Typ von RDD, wobei viele Transformationen mit MapPartitionsRDD oder ShuffledRDD abgebildet werden können. Gestartet wird eine Berechnung dann über eine Action, etwa take, collect, oder in realen Jobs eher saveAsTextFiles, saveAsNewAPIHadoopFile, oder einen Datenbank-Write in foreach.

Ein RDD besteht aus diesen Komponenten, auf die im folgenden eingegangen wird:

  • Aufteilung in Partitionen, deren Anzahl über die Methode getPartitions ermittelt wird
  • Eine oder mehrere Abhängigkeiten zu Eltern-RDDs (Input-RDDs haben keine Eltern)
  • einen Partitionierer, der bei Key-Value-RDDs bestimmt, wie die Partitionen nach Key in einem Shuffle verteilt werden (optional)
  • Zuordnung der Partitionen zu einem Host über getPreferredLocations (optional)

Partitionen

Das Parallelisieren in Spark geschieht durch Partitionierung der RDDs. Das bedeutet, dass die Daten (im Spark Job UI Records genannt, zu deutsch Einträge oder Datensätze), die in einem RDD enthalten sind oder verarbeitet werden, in Blöcke aufgeteilt werden. Diese Blöcke werden Partitionen genannt.

Eine Datei, die aus dem HDFS gelesen wird, wird beispielsweise durch die HDFS-Blockgröße aufgeteilt. Bei einer Blockgröße von 128 MB würde eine 384 MB Datei gleichmäßig in drei Partitionen aufgeteilt.

RDDs vs Collections

Diese Aufteilung dient primär der Parallelisierung von Transformationen. Die Tasks, die aus den Transformationen abgeleitet werden, werden zusammen mit Partitionen auf die einzelnen Executors verteilt. Bei vier Partitionen und zwei Executors (und gleichmäßiger Verteilung) würde jeder Executor zwei Partitionen bekommen. Wenn jeder Executor zwei Cores hat, können also alle vier Partitionen parallel abgearbeitet werden. Da ein Executor einer JVM-Instanz entspricht, wird durch die Partitionierung auch der Speicherverbrauch beschränkt.

Performance-Regel: Zu wenig Partitionen verhindern, dass Tasks sowhl im Cluster als auch innerhalb des Executors auf CPUs verteilt werden können. Zu wenig Partitionen können außerdem zu Speicherproblemen führen, wenn in einer Partition zu viele Daten enthalten sind. Zu viele Partitionen können wiederum dazu führen, dass der Overhead des Ausführens von Tasks zu groß wird, wenn nur wenige Records enthalten sind. Als ersten Anhaltspunkt kann man 2-3 Partitionen pro Core vergeben.

// 8 Partitionen, da mein Rechner 8 vCores hat
scala> input.partitions.size
res0: Int = 8

// Die Partitionen enthalten natürlich noch keine Daten, im Moment definieren wir lediglich den Ablauf des Programmes
scala> input.partitions.map(p => p.index.toString).reduce((a,b) => s"$a, $b")
res1: String = 0, 1, 2, 3, 4, 5, 6, 7

// Wenn wir das Programm auswerten (collect), sehen wir, dass unsere 5 Inputs wie erwartet da sind
scala> input.map(_.toString).collect().reduce((a,b) => s"$a, $b")
res2: String = 1, 2, 3, 4, 5

// Interessant wird es, wenn wir direkt auf den Partitionen arbeiten - die überschüssigen Partitionen bleiben leer
input.mapPartitions(p => if (p.nonEmpty) p.map(_.toString) else Iterator("-leer-")).collect().reduce((a,b) => s"$a, $b")
res3: String = -leer-, 1, -leer-, 2, 3, -leer-, 4, 5

Abhängigkeiten

Ein RDD zeichnet sich weiterhin dadurch aus, dass es Abhängigkeiten zu seinen Eltern-RDDs enthält, genauer gesagt auf die Partitionen seines Eltern-RDDs. Um die eigene Funktion auszuführen, wird zunächst das Eltern-RDD angefragt, um dessen Daten zu erhalten. Somit wird der Graph von unten nach oben aufgerollt.

Wird eine Action ausgelöst, so startet sie beim untersten RDD und führt dessen compute-Methode für jede Partition aus. Die in der Transformation enthaltene Funktion wird also einmal pro Partition aufgerufen. Dabei werden ihr nicht direkt die Daten übergeben, sondern es werden die Daten des Eltern-RDDs angefragt, die dann zuerst ausgewertet werden, und so weiter.

Hier wird auch klar, warum es MapPartitionsRDD heißt. map(), flatMap() usw. rufen ihre map-Funktion auf den Partitionen auf. Mit der Transformation mapPartitions können wir das auch selber tun, und alle anderen MapPartitionsRDD-Transformationen basieren auf diesem Aufruf. In welchen Situationen man mapPartitions selbst aufruft, sehen wir im nächsten Teil.

// Abhängigkeiten sind vom Typ org.apache.spark.Dependency.
scala> input.dependencies
res0: Seq[org.apache.spark.Dependency[_]] = List()

// mapped hat eine Dependency auf input
scala> mapped.dependencies
res1: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@69ab1abc)
scala> mapped.dependencies.head.rdd
res2: org.apache.spark.rdd.RDD[_] = ParallelCollectionRDD[0] at parallelize at <console>:24

// filtered hat eine Dependency auf mapped
scala> filtered.dependencies
res3: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@19fc48a0)
scala> filtered.dependencies.map(_.rdd)
res4: Seq[org.apache.spark.rdd.RDD[_]] = List(MapPartitionsRDD[2] at map at <console>:26)

Die RDD-Methode toDebugString gibt den ermittelten Graphen aus. Hier sehen wir, dass es sich um die erwarteten RDDs handelt und die Auswertung in umgekehrter Reihenfolge geschieht.

scala> filtered.toDebugString
res5: String =
(8) MapPartitionsRDD[2] at filter at <console>:28 []
 |  MapPartitionsRDD[1] at map at <console>:26 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:24 []

Enge und weite Abhängigkeiten

Abhängigkeiten werden weiterhin eingeteilt in eng (narrow) und weit (wide).

Eng bedeutet, dass eine Partition im RDD von genau einer Partition im Eltern-RDD abhängt, oder bei mehreren Partitionen, dass diese zur Compilezeit bereits bekannt sind. Die Daten liegen sozusagen in gleicher Form vor.

Enge Abhängigkeiten leiten von der abstrakten Klasse NarrowDependency ab und werden von map, mapPartitions, flatMap, filter (OneToOneDependency) und coalesce (interne Dependency) erzeugt. filter kann zwar zu leeren Partitionen führen, aber ändert nichts an der Zahl der Partitionen. coalesce ist ein Sonderfall, da es die Zahl der Partitionen ändert, aber nur reduziert und keinen Shuffle ausführt. Bei der RangeDependency schließlich hängen mehrere Partitionen des Kindes von mehreren Partitionen des Eltern-RDDs ab. Dies zählt als enge Abhängigkeit, da es nicht erst zur Laufzeit ermittelt werden muss, und wird vor allem durch ein union mehrerer RDDs erzeugt.

Narrow Dependencies

Umgekehrt ist es bei weiten Abhängigkeiten erst zur Laufzeit klar, auf wieviele Partitionen zugegriffen wird.

Aus weiten Abhängigkeiten entstehen die Shuffle Files. Allgemein ist der Shuffle der Task-Output, der auf Platte geschrieben wird. Der Name deutet jedoch schon an, dass, wenn er als Input für den nächsten Task dient, ein Netzwerktransfer zwischen den Nodes nötig sein kann. Ein Netzwerktransfer innerhalb des Clusters kann heutzutage recht schnell sein, abhängig davon wie viele und welche anderen Services auf den Nodes aktiv sind (also ob es sich um einen noisy Cluster handelt).

Wide Dependencies

Weite Abhängigkeiten (Typ ShuffleDependency) begründen eine neue Stage, der Rest der Stage besteht nur aus Transformationen mit engen Abhängigkeiten.

Die Transformationen einer Stage werden zu einem Task zusammengefasst, der dann von den Executors ausgeführt wird. Innerhalb eines Jobs arbeitet Spark nur eine Stage ab, da die nächste auf die Shuffles warten muss. Das schränkt wiederum die Parallelität ein.

// Dem sort sieht man den Shuffle erstmal nicht an, den es handelt sich augenscheinlich um ein normales MapPartitionsRDD
val sorted = filtered.sortBy(x => -x)
sorted: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[51] at sortBy at <console>:30

// In den Dependencies erkennen wir aber, dass sich ein ShuffledRDD eingeschlichen hat
scala> sorted.dependencies.map(_.rdd)
res0: Seq[org.apache.spark.rdd.RDD[_]] = List(ShuffledRDD[50] at sortBy at <console>:30)

// der Graph lässt keinen Zweifel zu, es findet ein Shuffle statt; an der Einrückung erkennen wir auch die Trennung in Stages
scala> sorted.toDebugString
res1: String =
(6) MapPartitionsRDD[5] at sortBy at <console>:30 []
 |  ShuffledRDD[4] at sortBy at <console>:30 []
 +-(8) MapPartitionsRDD[3] at sortBy at <console>:30 []
    |  MapPartitionsRDD[2] at filter at <console>:28 []
    |  MapPartitionsRDD[1] at map at <console>:26 []
    |  ParallelCollectionRDD[0] at parallelize at <console>:24 []

Performance-Regel: Möglichst wenige Operationen verwenden, die Shuffles erzeugen, um den Netzwerktraffic zu minimieren und gleichzeitig die Parallelität zu maximieren.

Key-Value-RDDs

Eine besondere Rolle in Spark haben die PairRDD, das sind RDDs, die als Typparameter TupleN aus Scala haben. Der erste Wert wird dabei immer als Key interpretiert. In Scala können Tuple2 mit der Notation a -> b, oder mit Klammern (a,b) erzeugt werden. Alle anderen Typen werden mit Klammern erzeugt. Der Typ wird dabei als (A, B) dargestellt, was aber nur Syntaxzucker für Tuple2[A, B] ist. Dabei geht es von Tuple1 bis Tuple22.

In diesem Beispiel erzeugen wir aus einem RDD von Städten ein RDD mit Tuples von String zu Int, wobei jeder Stadt die Zahl 1 zugeordnet wird. Für Spark ist nun der Städtename der Key, für den wir diverse byKey-Funktionen aufrufen können. reduceByKey sammelt alle Werte mit dem gleichen Key (basierend auf dem Hash) und führt eine Reducer-Funktionen auf den Werten aus. Daraus können wir die Häufigkeit jeder Stadt in der Liste ermitteln.

scala> val cities = sc.parallelize(Seq("Köln","Bremen","Hamburg","Berlin","Köln","Hamburg","Berlin","Hannover","Köln"))
cities: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val mapped = cities.map(city => city -> 1)
mapped: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[1] at map at <console>:27

scala> val counted = mapped.reduceByKey((a,b) => a + b)
counted: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at reduceByKey at <console>:29

scala> counted.collect
res1: Array[(String, Int)] = Array((Bremen,1), (Berlin,2), (Hamburg,2), (Köln,3), (Hannover,1))

Wir wir sehen, handelt sich bei PairRDDs um keinen eigenen Typen (mehr), sondern nur um eine Bezeichnung von RDDs mit Tuples, für die bestimmte Funktionen vorhanden sind1.

Interessant für Performancebetrachtungen ist, dass reduceByKey() direkt ein ShuffledRDD zurückgibt, also eine neue Stage erzeugt.

scala> counted.toDebugString
res0: String =
(8) ShuffledRDD[2] at reduceByKey at <console>:28 []
 +-(8) MapPartitionsRDD[1] at map at <console>:26 []
    |  ParallelCollectionRDD[0] at parallelize at <console>:24 []

Das ist auch leicht nachvollziehbar. Unsere Daten sind über die Partitionen verteilt. Wenn Spark nun alle Werte eines Keys zusammenfassen will, müssen alle Partitionen mit den gleichen Keys auf die gleiche Node verschoben werden - ein Netzwerk-Shuffle findet für die Partitionen statt, die zusammengehören, sich aber nicht auf den gleichen Nodes befinden.

Partitionierer

Die Records in den Partitionen werden bei einem Shuffle von einem Partitionierer auf Nodes verteilt. Der Standard-Partitionierer, der auch von reduceByKey verwendet wird, ist der HashPartitioner - er ermittelt den Hash jedes Keys, nimmt ihn mit Modulo der Anzahl der Partitionen und erzeugt damit eine gleichmäßige Verteilung auf die Nodes. Einige Partitionen können jetzt leer sein, daher kann es sinnvoll sein, die Anzahl der Partitionen bei einer solchen Operation zu ändern. Allen Shuffle-Operationen kann man die Zahl der gewünschten Partitionen oder direkt einen Partitionierer übergeben.

Jedem RDD kann optional ein Partitionierer zugewiesen werden, entweder durch die Methode repartition oder bei PairRDDs zusätzlich durch partitionBy. Dies erzeugt zwar direkt einen Shuffle, verhindert den Shuffle aber bei nachfolgenden Operationen, die sonst einen Shuffle erzeugen würden und den gleichen Partitionierer nutzen. Kurzgesagt nutzen Shuffle-Operationen einen bereits vorhanden Partitionierer.

Im folgenden Beispiel sieht man, dass die Daten durch partitionBy partitioniert werden und dadurch ein ShuffledRDD erzeugt wird. Das nachfolgende reduceByKey erzeugt nur noch ein MapPartitionsRDD.

scala> val mappedAndPartitioned = mapped.partitionBy(new org.apache.spark.HashPartitioner(8))
mappedAndPartitioned: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at partitionBy at <console>:28

scala> mappedAndPartitioned.reduceByKey((a,b) => a + b)
res0: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at reduceByKey at <console>:31

scala> res0.toDebugString
res1: String =
(8) MapPartitionsRDD[3] at reduceByKey at <console>:31 []
 |  ShuffledRDD[2] at partitionBy at <console>:28 []
 +-(8) MapPartitionsRDD[1] at map at <console>:26 []
    |  ParallelCollectionRDD[0] at parallelize at <console>:24 []

In diesem Fall ist dadurch natürlich nichts gewonnen, sinnvoll ist es bei mehreren folgenden Shuffle-Operationen, wie es bei iterativen Algorithmen der Fall ist. Das Gleiche gilt für Joins - wenn beide RDDs gleich partitioniert sind, ist kein Shuffle mehr nötig um sie zu joinen.

Performance-Regel: Shuffles können vermieden werden, wenn Daten bereits vorpartitioniert sind.

Ungleiche Partitionen

Selbt wenn die genau richtige Anzahl von Partitionen gewählt wurde, kann es immer noch sein, dass der Job langsam ausgeführt wird, wenn die Daten in den Partition ungleich verteilt sind (der sogenannte partition skew).

Das passiert vor allem nach Shuffles, wenn Daten nach einem bestimmten Schlüssel verteilt werden (key skew).

Verarbeitet man beispielsweise die Seitenzugriffe seines internationalen Onlineshops und teilt seine Daten nach Shop auf, so kann es ein Problem geben, wenn ein Land viel mehr Zugriffe bekommt als die anderen. Die Partitionen der weniger besuchten Länder werden schneller abgearbeitet und die Executors warten dann auf das Verarbeiten des beliebtesten Landes. Hier gilt es einen Schlüssel zu finden, der die Daten besser verteilt.

Ein Alternative dazu ist der RangePartitioner, der die Records in etwa gleichgroße Partitionen einteilt.

Man kann zudem auch einen eigenen Partitionierer entwickeln, der durch fachliche Kenntnis der Daten die Partitionen sinnvoll einteilt.

Performance-Regel: Die Größe der Partitionen (d.h. die Anzahl der Records in ihnen) sollte nicht übermäßig voneinander abweichen, um eine bessere Auslastung von Ressourcen zu gewährleisten.

Datenlokalität

Die (optionale) Methode getPreferredLocations in einem RDD enthält die bevorzugten Hosts pro Partition. Wenn nun ein Task auf einer Partition gestartet wird, kann Spark diesen Task bevorzugt (nicht garantiert, z.B. bei Retries) auf der bervorzugten Node2 ausführen. Enthält die bevorzugte Node die Daten der Partition, wird damit Netzwerk-Traffic vermieden. Das wird Datenlokalität genannt.

Dies ist nur für das initiale Laden der Daten relevant, die Shuffles werden auf Platte geschrieben und die entsprechenden Tasks von Spark automatisch an richtiger Stelle gestartet. HadoopRDD, KafkaRDD, der Cassandra-Treiber und viele weitere unterstützen dieses Feature automatisch.

Performance-Regel: In einem Cluster mit niedriger Netzwerk-Bandbreite oder einem sehr aktiven Netzwerktraffic sollten Spark-Nodes auf den gleichen Nodes der Datenhaltung laufen um direkt von der Festplatte lesen zu können.

Bei Datenbanken ist natürlich abzuwägen, ob Spark hier den Betrieb nicht zu sehr stört. Hier bietet es sich an, solche Analysen auf einem separaten, replizierten Cluster auszuführen.

Zusammenfassung

Diese Grundlagen sollte man beim Schreiben von Spark-Jobs beachten, um erste Performance-Probleme zu vermeiden. Im nächsten Post geht es dann um weitere Details der Partitionierung, GC-Probleme und wie man die Bottlenecks im UI identizifieren kann.

Sind Ihre Spark-Jobs immer noch zu langsam? Wenn Sie Unterstützung bei der Optimierung Ihrer Spark-Jobs benötigen, kontaktieren Sie mich gerne.

  1. In Scala wird das mit sogenannten implicits erreicht. Implicits haben diverse Funktionen, u.a. können sie vorhanden Klassen Methoden hinzufügen.

  2. Natürlich nur falls die Node Teil des Spark-Clusters ist.

Diesen Post teilen

RSS-Feed

Neue Posts direkt im Newsreader.

RSS-Feed abonnieren

Newsletter

Neue Posts sowie Neuigkeiten rund um Big Data, Spark, und Scala. Maximal eine E-Mail im Monat.

Kommentare