Spark-Jargon für Einsteiger

Dieser Blog soll einige der Startschwierigkeiten beseitigen, wenn Neulinge für Spark Distributed Computing programmieren. Abgesehen vom Erlernen der APIs muss man sich mit den Cluster-Details ausstatten, um das Beste aus der Spark-Power herauszuholen.

Der Startpunkt wäre der Überblick über den Cluster-Modus.

Und einige häufige Fragen, die auftauchen könnten, sind:

  1. Sie können die verschiedenen Prozesse im Spark Standalone-Cluster und die Parallelität immer noch nicht verstehen.
  2. Sie haben die Datei bin\start-slave.sh ausgeführt und festgestellt, dass sie den Worker, der eigentlich eine JVM ist, erzeugt hat. Ist der Worker ein JVM-Prozess oder nicht?
  3. Wie im obigen Link beschrieben, ist ein Executor ein Prozess, der für eine Anwendung auf einem Worker-Knoten gestartet wird und Aufgaben ausführt. Executor ist auch eine JVM.
  4. Executors sind pro Anwendung. Was ist dann die Rolle eines Workers? Koordiniert er sich mit dem Executor und meldet das Ergebnis an den Treiber zurück? oder spricht der Treiber direkt mit dem Executor? Wenn ja, was ist dann der Zweck des Workers?
  5. Wie lässt sich die Anzahl der Executors für eine Anwendung steuern?
  6. Können die Aufgaben innerhalb des Executors parallel ausgeführt werden? Wenn ja, wie konfiguriert man die Anzahl der Threads für einen Executor?
  7. Was ist das Verhältnis zwischen Worker, Executors und Executor-Cores ( – total-executor-cores)?
  8. Was bedeutet es, mehr Worker pro Knoten zu haben?

Werfen wir einen Blick auf die Details des Spark-Cluster-Modus.

Spark verwendet eine Master/Slave-Architektur. Wie Sie in der Abbildung sehen können, gibt es einen zentralen Koordinator (Driver), der mit vielen verteilten Arbeitern (Executors) kommuniziert. Der Treiber und jeder der Executors laufen in eigenen Java-Prozessen.

Der Treiber ist der Prozess, in dem die Hauptmethode läuft. Er wandelt zunächst das Benutzerprogramm in Tasks um und plant dann die Tasks auf den Executors ein.

Spark-Anwendung – -> Treiber – -> Liste der Tasks – -> Scheduler – -> Executors

EXECUTORS

Executors sind die Prozesse der Worker-Knoten, die für die Ausführung einzelner Tasks in einem bestimmten Spark-Job zuständig sind. Sie werden zu Beginn einer Spark-Anwendung gestartet und laufen in der Regel während der gesamten Lebensdauer einer Anwendung. Sobald sie die Aufgabe ausgeführt haben, senden sie die Ergebnisse an den Treiber. Sie bieten auch In-Memory-Speicher für RDDs, die von Benutzerprogrammen über den Block Manager zwischengespeichert werden.

Wenn Executors gestartet werden, registrieren sie sich beim Treiber und kommunizieren von da an direkt. Die Worker sind dafür zuständig, dem Clustermanager die Verfügbarkeit ihrer Ressourcen mitzuteilen.

In einem Standalone-Cluster gibt es einen Executor pro Worker, es sei denn, man spielt mit spark.executor.cores und ein Worker hat genug Cores, um mehr als einen Executor zu halten.

  • Ein Standalone-Cluster mit 5 Worker-Knoten (jeder Knoten hat 8 Kerne) Wenn ich eine Anwendung mit den Standardeinstellungen starte,
  • wird Spark gierig so viele Kerne und Executoren erwerben, wie vom Scheduler angeboten werden. Am Ende erhält man also 5 Executors mit je 8 Cores.

Nachfolgend die spark-submit Optionen, um mit der Anzahl der Executors zu spielen:

– executor-memory MEM Speicher pro Executor (z.B. 1000M, 2G) (Standard: 1G).

Spark standalone und nur YARN:
– executor-cores NUM Anzahl der Cores pro Executor. (Standard: 1 im YARN-Modus oder alle verfügbaren Kerne auf dem Worker im Standalone-Modus)

Nur YARN:
– num-executors NUM Anzahl der zu startenden Executors (Standard: 2). Wenn die dynamische Zuweisung aktiviert ist, wird die anfängliche Anzahl der Executors mindestens NUM sein.

  • Bedeutet 2 Worker-Instanzen einen Worker-Knoten mit 2 Worker-Prozessen?

Ein Knoten ist eine Maschine, und es gibt keinen guten Grund, mehr als einen Worker pro Maschine laufen zu lassen. Zwei Worker-Knoten bedeuten also typischerweise zwei Maschinen, jede ein Spark-Worker.

1 Knoten = 1 Worker-Prozess

  • Hält jede Worker-Instanz einen Executor für eine bestimmte Anwendung (der den Speicher, die Aufgabe verwaltet) oder hält ein Worker-Knoten einen Executor?

Worker halten viele Executors für viele Anwendungen. Eine Anwendung hat Executors auf vielen Workern

Ein Worker-Knoten kann mehrere Executors (Prozesse) enthalten, wenn er über ausreichend CPU, Speicher und Storage verfügt.

  • BTW, die Anzahl der Executors in einem Worker Node zu einem bestimmten Zeitpunkt hängt vollständig von der Arbeitslast auf dem Cluster und der Fähigkeit des Nodes ab, wie viele Executors auszuführen.

AUSFÜHRUNGSFLUSS DER ANWENDUNG

Wenn Sie also eine Anwendung mit spark-submit an den Cluster übermitteln, geschieht intern Folgendes:

  1. Eine eigenständige Anwendung startet und instanziiert eine SparkContext/SparkSession-Instanz (und erst dann kann die Anwendung als Treiber bezeichnet werden).
  2. Das Treiberprogramm fordert beim Clustermanager Ressourcen an, um Executors zu starten.
  3. Der Clustermanager startet Executors.
  4. Der Treiberprozess läuft durch die Benutzeranwendung. Abhängig von den Aktionen und Transformationen über RDDs werden Aufgaben an Executors gesendet.
  5. Executors führen die Aufgaben aus und speichern die Ergebnisse.
  6. Wenn ein Worker abstürzt, werden seine Aufgaben an andere Executors gesendet, um erneut verarbeitet zu werden. In dem Buch „Learning Spark: Lightning-Fast Big Data Analysis“ wird über Spark und Fehlertoleranz gesprochen:

Spark geht automatisch mit ausgefallenen oder langsamen Maschinen um, indem es ausgefallene oder langsame Aufgaben erneut ausführt. Wenn beispielsweise der Knoten, auf dem eine Partition einer map()-Operation ausgeführt wird, abstürzt, führt Spark sie auf einem anderen Knoten erneut aus; und selbst wenn der Knoten nicht abstürzt, sondern einfach nur viel langsamer als andere Knoten ist, kann Spark präemptiv eine „spekulative“ Kopie der Aufgabe auf einem anderen Knoten starten und deren Ergebnis übernehmen, wenn diese beendet wird.

  1. Mit SparkContext.stop() vom Treiber oder wenn die Hauptmethode beendet wird/abstürzt, werden alle Executors beendet und die Clusterressourcen vom Clustermanager freigegeben.

Wenn wir uns die Ausführung von Spark perspektivisch über einen beliebigen Ressourcenmanager für ein Programm ansehen, das zwei rdds verbindet und eine Reduktionsoperation durchführt, dann filter

Die folgende Liste enthält einige Empfehlungen, die bei der Konfiguration zu beachten sind:

  • Hadoop/Yarn/OS Deamons: Wenn wir eine Spark-Anwendung mit einem Clustermanager wie Yarn ausführen, gibt es mehrere Daemons, die im Hintergrund laufen, wie NameNode, Secondary NameNode, DataNode, JobTracker und TaskTracker. Während wir also die Anzahl der Ausführer angeben, müssen wir sicherstellen, dass wir genügend Kerne beiseite lassen (~1 Kern pro Knoten), damit diese Daemons reibungslos laufen können.
  • Yarn ApplicationMaster (AM): Der ApplicationMaster ist dafür verantwortlich, Ressourcen vom ResourceManager auszuhandeln und mit den NodeManagern zusammenzuarbeiten, um die Container und deren Ressourcenverbrauch auszuführen und zu überwachen. Wenn wir Spark auf Yarn laufen lassen, dann müssen wir die Ressourcen einplanen, die AM benötigen würde (~1024MB und 1 Executor).
  • HDFS-Durchsatz: Der HDFS-Client hat Probleme mit vielen gleichzeitigen Threads. Es wurde beobachtet, dass HDFS den vollen Schreibdurchsatz mit ~5 Tasks pro Executor erreicht. Es ist also gut, die Anzahl der Kerne pro Executor unter dieser Zahl zu halten.
  • MemoryOverhead: Das folgende Bild zeigt den Spark-Yarn-Speicherverbrauch.

Zwei Dinge, die man aus diesem Bild beachten sollte:

Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)

Wenn wir also 20GB pro Executor anfordern, bekommt AM tatsächlich 20GB + memoryOverhead = 20 + 7% von 20GB = ~23GB Speicher für uns.

  • Das Ausführen von Executors mit zu viel Speicher führt oft zu übermäßigen Verzögerungen bei der Garbage Collection.
  • Das Ausführen von winzigen Executors (mit einem einzigen Kern und gerade genug Speicher, um z.B. eine einzige Aufgabe auszuführen) macht die Vorteile zunichte, die sich aus der Ausführung mehrerer Aufgaben in einer einzigen JVM ergeben.

Genug Theorie… Gehen wir in die Praxis…

Betrachten wir nun einen 10-Knoten-Cluster mit folgender Konfiguration und analysieren wir verschiedene Möglichkeiten der Executor-Core-Memory-Verteilung:

**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node

Erster Ansatz: Winzige Executors :

Winzige Executors bedeuten im Wesentlichen einen Executor pro Kern. Die folgende Tabelle zeigt die Werte unserer spar-config-Parameter bei diesem Ansatz:

- `--num-executors` = `In this approach, we'll assign one executor per core`
= `total-cores-in-cluster`
= `num-cores-per-node * total-nodes-in-cluster`
= 16 x 10 = 160
- `--executor-cores` = 1 (one executor per core)
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/16 = 4GB

Analyse: Mit nur einem Executor pro Kern können wir, wie oben beschrieben, die Vorteile der Ausführung mehrerer Tasks in derselben JVM nicht nutzen. Außerdem werden gemeinsam genutzte/gepufferte Variablen wie Broadcast-Variablen und Akkumulatoren in jedem Kern der Knoten repliziert, also 16 Mal. Außerdem bleibt nicht genug Speicherplatz für Hadoop/Yarn-Daemon-Prozesse übrig, und der ApplicationManager wird nicht mitgerechnet. NICHT GUT!

Zweiter Ansatz: Fat Executors (ein Executor pro Knoten):

Fat Executors bedeutet im Wesentlichen ein Executor pro Knoten. Die folgende Tabelle zeigt die Werte unserer spark-config-Parameter bei diesem Ansatz:

- `--num-executors` = `In this approach, we'll assign one executor per node`
= `total-nodes-in-cluster`
= 10
- `--executor-cores` = `one executor per node means all the cores of the node are assigned to one executor`
= `total-cores-in-a-node`
= 16
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/1 = 64GB

Analyse: Mit allen 16 Kernen pro Executor, abgesehen von ApplicationManager und Daemon-Prozessen, die nicht mitgezählt werden, wird der HDFS-Durchsatz leiden und es wird zu übermäßigen Garbage-Ergebnissen führen. Also, NICHT GUT!

Dritter Ansatz: Balance zwischen Fat (vs) Tiny

Nach den Empfehlungen, die wir oben diskutiert haben:

  • Basierend auf den oben erwähnten Empfehlungen, weisen wir 5 Kerne pro Executor zu => – Executor-Kerne = 5 (für einen guten HDFS-Durchsatz)
  • Lassen Sie 1 Kern pro Knoten für Hadoop/Yarn-Daemons => Anzahl der verfügbaren Kerne pro Knoten = 16-1 = 15
  • So, Gesamtzahl der verfügbaren Kerne im Cluster = 15 x 10 = 150
  • Anzahl der verfügbaren Executors = (Gesamtzahl der Kerne / Anzahl der Kerne pro Executor) = 150 / 5 = 30
  • Abzüglich 1 Executor für ApplicationManager => – Anzahl der-Executors = 29
  • Anzahl der Executors pro Knoten = 30/10 = 3
  • Speicher pro Executor = 64GB / 3 = 21GB
  • Abzüglich Heap-Overhead = 7% von 21GB = 3GB. Also, tatsächlicher – Executor-Speicher = 21-3 = 18GB

Die empfohlene Konfiguration ist also: 29 Executors, je 18GB Speicher und je 5 Kerne!!!

Analyse: Es ist offensichtlich, dass dieser dritte Ansatz die richtige Balance zwischen den Ansätzen Fat und Tiny gefunden hat. Unnötig zu sagen, dass er die Parallelität eines Fat Executors und den besten Durchsatz eines Tiny Executors erreicht hat!!!

Fazit:

Wir haben gesehen:

  • Ein paar Empfehlungen, die man bei der Konfiguration dieser Parameter für eine Spark-Anwendung beachten sollte:
  • Ressourcen einplanen, die der Yarn Application Manager benötigen würde
  • Wie wir einige Kerne für Hadoop/Yarn/OS Deamon-Prozesse schonen sollten
  • Erfahren Sie mehr über spark-yarn-memory-usage
  • Außerdem haben wir drei verschiedene Ansätze zur Konfiguration dieser Parameter untersucht und analysiert:
  1. Tiny Executors – Ein Executor pro Core
  2. Fat Executors – Ein Executor pro Node
  3. Empfohlener Ansatz – Richtige Balance zwischen Tiny (Vs) Fat in Verbindung mit den Empfehlungen.

– num-executors, – executor-cores und – executor-memory… diese drei Parameter spielen eine sehr wichtige Rolle für die Spark-Performance, da sie die Menge an CPU &-Speicher steuern, die die Spark-Anwendung erhält. Daher ist es für Benutzer sehr wichtig, den richtigen Weg zu verstehen, um sie zu konfigurieren. Ich hoffe, dieser Blog hat Ihnen geholfen, diese Perspektive zu bekommen…