Spark Jargon voor Starters

Deze blog is om een aantal van de startproblemen op te lossen wanneer een newbie codeert voor Spark distributed computing. Afgezien van het leren van de API’s, moet men zichzelf uit te rusten met cluster details om het beste van de Spark power.

Het startpunt zou zijn Cluster Mode Overview .

En een aantal veel voorkomende vragen die kunnen opduiken zijn:

  1. Je begrijpt nog steeds niet de verschillende processen in de Spark Standalone cluster en het parallellisme.
  2. Je rende de bin\start-slave.sh en ontdekte dat het de werker spawned, wat eigenlijk een JVM is. Is de uitvoerder een JVM-proces of niet?
  3. Volgens de bovenstaande link is een uitvoerder een proces dat wordt gestart voor een toepassing op een werkerknooppunt dat taken uitvoert. Executor is ook een JVM.
  4. Executors zijn per applicatie. Wat is dan de rol van een werker? Coördineert het met de uitvoerder en communiceert het resultaat terug naar de bestuurder? Of praat de bestuurder rechtstreeks met de uitvoerder? Zo ja, wat is dan het doel van de uitvoerder?
  5. Hoe kan het aantal uitvoerders voor een toepassing worden geregeld?
  6. Kunnen de taken parallel worden uitgevoerd in de uitvoerder? Zo ja, hoe kan het aantal threads voor een executor worden geconfigureerd?
  7. Wat is de relatie tussen worker, executors en executor cores ( – total-executor-cores)?
  8. Wat betekent het om meer workers per node te hebben?

Laten we de details van de Spark Cluster modus nog eens bekijken.

Spark maakt gebruik van een master/slave architectuur. Zoals je in de figuur kunt zien, heeft het één centrale coördinator (stuurprogramma) die communiceert met vele gedistribueerde werkers (uitvoerders). Het stuurprogramma en elk van de uitvoerders draaien in hun eigen Java-processen.

Het stuurprogramma is het proces waar de hoofdmethode draait. Eerst zet het het gebruikersprogramma om in taken en daarna plant het de taken op de executors.

Spark Application – -> Driver – -> List of Tasks – -> Scheduler – -> Executors

EXECUTORS

Executors zijn worker nodes’ processen die belast zijn met het uitvoeren van individuele taken in een bepaalde Spark job. Ze worden aan het begin van een Spark-applicatie gestart en draaien meestal gedurende de gehele levensduur van een applicatie. Zodra ze de taak hebben uitgevoerd, sturen ze de resultaten naar de driver. Ze bieden ook in-memory opslag voor RDDs die door gebruikersprogramma’s worden gecached via Block Manager.

Wanneer executors worden gestart, registreren ze zichzelf bij de driver en vanaf dat moment communiceren ze direct. De werkers zijn verantwoordelijk voor het communiceren van de cluster manager de beschikbaarheid van hun middelen.

In een standalone cluster krijg je een executor per werker, tenzij je speelt met spark.executor.cores en een werker heeft genoeg cores om meer dan een executor te houden.

  • Een standalone cluster met 5 worker nodes (elke node heeft 8 cores) Wanneer ik een applicatie start met standaard instellingen.
  • Spark zal gulzig zoveel cores en executors verwerven als er worden aangeboden door de scheduler. Dus uiteindelijk krijg je 5 executors met 8 cores per stuk.

Dit zijn de spark-submit opties om te spelen met het aantal executors:

– executor-memory MEM Geheugen per executor (bijv. 1000M, 2G) (Standaard: 1G).

Spark standalone en YARN alleen:
– executor-cores NUM Aantal cores per executor. (Standaard: 1 in YARN-modus, of alle beschikbare cores op de werker in standalone-modus)

YARN-only:
– num-executors NUM Aantal executors om te starten (Standaard: 2). Als dynamische toewijzing is ingeschakeld, is het aanvankelijke aantal uitvoerders ten minste NUM.

  • Zoals 2 worker-nodes betekent dit één werkstation met 2 werkprocessen?

Een station is een machine, en er is geen goede reden om meer dan één werkstation per machine te gebruiken. Dus twee worker nodes betekent meestal twee machines, elk een Spark worker.

1 Node = 1 Worker process

  • Bevat elke worker instance een executor voor een specifieke applicatie (die de opslag beheert, een taak) of bevat één worker node één executor?

Workers bevatten veel executors, voor veel applicaties. Een toepassing heeft uitvoerders op veel werkers

Een werkerknooppunt kan meerdere uitvoerders (processen) bevatten als het voldoende CPU, geheugen en opslagruimte heeft.

  • BTW, het aantal uitvoerders in een werkstation op een bepaald moment hangt volledig af van de werkbelasting op het cluster en de capaciteit van het station om het aantal uitvoerders uit te voeren.

APPLICATION EXECUTION FLOW

Met dit in gedachten, wanneer u een toepassing bij het cluster indient met spark-submit, is dit wat er intern gebeurt:

  1. Een standalone-toepassing start en instantieert een SparkContext/SparkSession-instantie (en pas daarna kunt u de toepassing een driver noemen).
  2. Het stuurprogramma vraagt middelen aan de clustermanager om executors te starten.
  3. De clustermanager lanceert executors.
  4. Het stuurprogrammaproces loopt door de gebruikersapplicatie. Afhankelijk van de acties en transformaties over RDDs worden taken naar executors gestuurd.
  5. Executors voeren de taken uit en slaan de resultaten op.
  6. Als een worker crasht, worden zijn taken naar verschillende executors gestuurd om opnieuw verwerkt te worden. In het boek “Learning Spark: Lightning-Fast Big Data Analysis” wordt gesproken over Spark en Fault Tolerance:

Spark gaat automatisch om met defecte of langzame machines door defecte of langzame taken opnieuw uit te voeren. Als bijvoorbeeld het knooppunt dat een partitie van een map()-bewerking uitvoert, crasht, voert Spark deze opnieuw uit op een ander knooppunt; en zelfs als het knooppunt niet crasht maar gewoon veel langzamer is dan andere knooppunten, kan Spark preventief een “speculatieve” kopie van de taak op een ander knooppunt lanceren, en het resultaat daarvan overnemen als dat klaar is.

  1. Met SparkContext.stop() van het stuurprogramma of als de hoofdmethode afloopt/crasht, worden alle uitvoerders beëindigd en worden de clusterresources vrijgegeven door de clustermanager.

Als we kijken naar de uitvoering van Spark prospective over een willekeurige resource manager voor een programma, dat twee rdds samenvoegt en een reduceerbewerking uitvoert, dan filtert

De volgende lijst bevat enkele aanbevelingen die u in gedachten moet houden tijdens het configureren ervan:

  • Hadoop/Yarn/OS Deamons: Als we een vonkapplicatie uitvoeren met een clustermanager zoals Yarn, zijn er verschillende daemons die op de achtergrond draaien, zoals NameNode, Secondary NameNode, DataNode, JobTracker en TaskTracker. Dus, bij het specificeren van het aantal uitvoerders, moeten we er zeker van zijn dat we genoeg cores opzij zetten (~1 core per node) om deze daemons soepel te laten draaien.
  • Yarn ApplicationMaster (AM): ApplicationMaster is verantwoordelijk voor het onderhandelen over resources van de ResourceManager en het werken met de NodeManagers om de containers en hun resource-verbruik uit te voeren en te monitoren. Als we draaien spark op garen, dan moeten we budget in de middelen die AM nodig zou hebben (~ 1024MB en 1 Executor).
  • HDFS Throughput: HDFS client heeft moeite met tonnen gelijktijdige threads. Er is waargenomen dat HDFS volledige schrijfdoorvoer bereikt met ~5 taken per uitvoerder. Het is dus goed om het aantal cores per executor onder dat aantal te houden.
  • MemoryOverhead: De volgende afbeelding toont spark-yarn-geheugengebruik.

Twee dingen om op te merken van deze afbeelding:

Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)

Dus, als we 20GB per executor aanvragen, krijgt AM in werkelijkheid 20GB + memoryOverhead = 20 + 7% van 20GB = ~23GB geheugen voor ons.

  • Het draaien van executors met te veel geheugen resulteert vaak in buitensporige vertragingen in de vuilnisophaal.
  • Het draaien van kleine executors (met een enkele core en net genoeg geheugen om een enkele taak te draaien, bijvoorbeeld) gooit de voordelen weg die komen van het draaien van meerdere taken in een enkele JVM.

Genoeg theorie…

Nu, laten we eens kijken naar een 10 node cluster met de volgende configuratie en verschillende mogelijkheden van executors-core-memory distributie analyseren:

**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node

Eerste benadering: Tiny executors :

Tiny executors betekent in wezen één executor per core. De volgende tabel toont de waarden van onze spar-config params met deze aanpak:

- `--num-executors` = `In this approach, we'll assign one executor per core`
= `total-cores-in-cluster`
= `num-cores-per-node * total-nodes-in-cluster`
= 16 x 10 = 160
- `--executor-cores` = 1 (one executor per core)
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/16 = 4GB

Analyse: Met slechts één uitvoerder per kern, zoals we hierboven besproken hebben, zullen we niet in staat zijn om voordeel te halen uit het uitvoeren van meerdere taken in dezelfde JVM. Ook zullen gedeelde/cached variabelen zoals broadcast variabelen en accumulators gerepliceerd worden in elke core van de nodes, wat 16 keer is. Ook laten we niet genoeg geheugen over voor Hadoop/Yarn daemon processen en we tellen niet mee in ApplicationManager. NIET GOED!

Tweede benadering: Fat executors (Eén executor per node):

Fat executors betekent in wezen één executor per node. De volgende tabel toont de waarden van onze spark-config params met deze aanpak:

- `--num-executors` = `In this approach, we'll assign one executor per node`
= `total-nodes-in-cluster`
= 10
- `--executor-cores` = `one executor per node means all the cores of the node are assigned to one executor`
= `total-cores-in-a-node`
= 16
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/1 = 64GB

Analyse: Met alle 16 cores per executor, afgezien van ApplicationManager en daemon processen niet worden meegeteld voor, HDFS doorvoer zal pijn doen en het zal resulteren in buitensporige vuilnis resultaten. Ook, NIET GOED!

Derde benadering: Balans tussen Fat (vs) Tiny

Volgens de aanbevelingen die we hierboven bespraken:

  • Gebaseerd op de aanbevelingen hierboven, Laten we 5 core per uitvoerder toewijzen => – uitvoerder-cores = 5 (voor goede HDFS doorvoer)
  • Laat 1 core per node over voor Hadoop/Yarn daemons => Num cores beschikbaar per node = 16-1 = 15
  • Dus, Totaal aantal beschikbare cores in cluster = 15 x 10 = 150
  • Aantal beschikbare executors = (totaal aantal cores / aantal cores per executor) = 150 / 5 = 30
  • Er blijft 1 executor over voor ApplicationManager => – aantal-executors = 29
  • Aantal executors per node = 30/10 = 3
  • Geheugen per executor = 64GB / 3 = 21GB
  • Aftellen van heap overhead = 7% van 21GB = 3GB. Dus, werkelijk – executor-geheugen = 21-3 = 18GB

Dus, aanbevolen configuratie is: 29 executors, 18GB geheugen elk en 5 kernen elk!!

Analyse: Het is duidelijk hoe deze derde benadering de juiste balans heeft gevonden tussen Fat en Tiny benaderingen. Onnodig te zeggen dat het parallellisme van een fat executor en de beste doorvoer van een tiny executor heeft bereikt!

Conclusie:

We hebben gezien:

  • Een paar aanbevelingen om in gedachten te houden bij het configureren van deze params voor een spark-applicatie zoals:
  • Budget in de middelen die Yarn’s Application Manager nodig zou hebben
  • Hoe we enkele kernen moeten sparen voor Hadoop/Yarn/OS deamon processen
  • Leren over spark-yarn-memory-usage
  • Ook, uitgecheckt en geanalyseerd drie verschillende benaderingen om deze params te configureren:
  1. Tiny Executors – Een executor per Core
  2. Fat Executors – Een executor per Node
  3. Aanbevolen aanpak – Juiste balans tussen Tiny (Vs) Fat in combinatie met de aanbevelingen.

– num-executors, – executor-cores en – executor-memory… deze drie params spelen een zeer belangrijke rol in spark prestaties als ze de hoeveelheid CPU & geheugen uw spark applicatie krijgt controleert. Dit maakt het zeer cruciaal voor gebruikers om te begrijpen de juiste manier om ze te configureren. Hoop dat deze blog u geholpen in het krijgen van dat perspectief …