Jargonul Spark pentru începători

Acest blog este pentru a clarifica unele dintre problemele de început atunci când un începător codifică pentru calcul distribuit Spark. În afară de învățarea API-urilor, cineva trebuie să se echipeze cu detalii despre cluster pentru a obține cele mai bune rezultate din puterea Spark.

Punctul de plecare ar fi Cluster Mode Overview .

Și câteva întrebări comune care ar putea apărea sunt:

  1. Încă nu puteți înțelege diferitele procese din clusterul Spark Standalone și paralelismul.
  2. Ați rulat bin\start-slave.sh și ați constatat că a generat lucrătorul, care este de fapt un JVM. Este lucrătorul un proces JVM sau nu?
  3. Conform linkului de mai sus, un executor este un proces lansat pentru o aplicație pe un nod lucrător care execută sarcini. Executorul este, de asemenea, un JVM.
  4. Executorii sunt per aplicație. Atunci care este rolul unui lucrător? Se coordonează cu executorul și comunică rezultatul înapoi către driver? sau driverul vorbește direct cu executorul? În acest caz, care este scopul lucrătorului?
  5. Cum se poate controla numărul de executori pentru o aplicație?
  6. Se poate face ca sarcinile să ruleze în paralel în cadrul executorului? Dacă da, cum se configurează numărul de fire de execuție pentru un executor?
  7. Care este relația dintre worker, executori și nucleele executorilor ( – total-executor-cores)?
  8. Ce înseamnă să ai mai mulți workers pe nod?

Să revenim asupra detaliilor privind modul Spark Cluster.

Spark folosește o arhitectură master/slave. După cum puteți vedea în figură, are un coordonator central (Driver) care comunică cu mulți lucrători distribuiți (executori). Șoferul și fiecare dintre executori rulează în propriile procese Java.

Șoferul este procesul în care se execută metoda principală. Mai întâi convertește programul utilizatorului în sarcini și după aceea programează sarcinile pe executori.

Aplicația Spark – -> Driver – -> Listă de sarcini – -> Planificator – -> Executori

EXECUTORI

Executorii sunt procese ale nodurilor de lucru însărcinate cu rularea sarcinilor individuale dintr-o anumită sarcină Spark. Aceștia sunt lansați la începutul unei aplicații Spark și rulează de obicei pe toată durata de viață a unei aplicații. După ce au executat sarcina, aceștia trimit rezultatele către driver. De asemenea, ele asigură stocarea în memorie a RDD-urilor care sunt stocate în memoria cache de către programele utilizatorului prin Block Manager.

Când executorii sunt lansați, ei se înregistrează cu driverul și de aici încolo comunică direct. Lucrătorii sunt însărcinați să comunice managerului clusterului disponibilitatea resurselor lor.

Într-un cluster autonom veți obține un singur executor per lucrător, cu excepția cazului în care vă jucați cu spark.executor.cores și un lucrător are suficiente nuclee pentru a deține mai mult de un executor.

  • Un cluster de sine stătător cu 5 noduri de lucrători (fiecare nod având 8 nuclee) Când pornesc o aplicație cu setările implicite.
  • Spark va achiziționa cu lăcomie atâtea nuclee și executori câte sunt oferite de planificator. Deci, în final, veți obține 5 executori cu 8 nuclee fiecare.

În cele ce urmează sunt opțiunile spark-submit pentru a vă juca cu numărul de executori:

– executor-memory MEM Memorie pentru fiecare executor (de exemplu, 1000M, 2G) (implicit: 1G).

Spark standalone și YARN numai:
– executor-cores NUM Numărul de nuclee pentru fiecare executor. (Implicit: 1 în modul YARN, sau toate nucleele disponibile pe lucrător în modul standalone)

YARN-only:
– num-executors NUM Numărul de executori de lansat (Implicit: 2). Dacă este activată alocarea dinamică, numărul inițial de executori va fi cel puțin NUM.

  • Omul 2 worker instance înseamnă un nod worker cu 2 procese worker?

Un nod este o mașină și nu există un motiv întemeiat pentru a rula mai mult de un worker pe mașină. Deci, două noduri de lucrător înseamnă de obicei două mașini, fiecare un lucrător Spark.

1 nod = 1 proces de lucrător

  • Care instanță de lucrător deține un executor pentru o aplicație specifică (care gestionează stocarea, sarcina) sau un nod de lucrător deține un executor?

Lucrătorii dețin mulți executori, pentru multe aplicații. O aplicație are executori pe mai mulți lucrători

Un nod de lucrător poate deține mai mulți executori (procese) dacă are suficient CPU, memorie și stocare.

  • BTW, numărul de executori dintr-un nod de lucru la un moment dat depinde în întregime de sarcina de lucru din cluster și de capacitatea nodului de a rula câți executori.

FLOW DE EXECUȚIE A APLICAȚIEI

Din acest punct de vedere, atunci când trimiteți o aplicație la cluster cu spark-submit, iată ce se întâmplă pe plan intern:

  1. O aplicație independentă pornește și instanțiază o instanță SparkContext/SparkSession (și abia atunci puteți numi aplicația un driver).
  2. Programul driverului cere resurse managerului de cluster pentru a lansa executori.
  3. Managerul de cluster lansează executori.
  4. Procesul driverului se execută prin intermediul aplicației utilizatorului. În funcție de acțiunile și transformările asupra RDD-urilor, sarcinile sunt trimise către executori.
  5. Executorii execută sarcinile și salvează rezultatele.
  6. În cazul în care un lucrător se blochează, sarcinile sale vor fi trimise către executori diferiți pentru a fi procesate din nou. În cartea „Learning Spark: Lightning-Fast Big Data Analysis” se vorbește despre Spark și toleranța la erori:

Spark se ocupă automat de mașinile care au eșuat sau sunt lente prin re-executarea sarcinilor eșuate sau lente. De exemplu, dacă nodul care execută o partiție a unei operații map() se prăbușește, Spark o va reexecuta pe un alt nod; și chiar dacă nodul nu se prăbușește, dar este pur și simplu mult mai lent decât alte noduri, Spark poate lansa în mod preemptiv o copie „speculativă” a sarcinii pe un alt nod și poate lua rezultatul acesteia dacă aceasta se termină.

  1. Cu SparkContext.stop() din driver sau dacă metoda principală iese/se blochează, toți executorii vor fi terminați și resursele clusterului vor fi eliberate de către managerul clusterului.

Dacă ne uităm la execuția din perspectiva Spark peste orice manager de resurse pentru un program, care unește două rdds și face o operațiune de reducere, atunci filtrul

Lista de mai jos surprinde câteva recomandări de care trebuie să se țină cont în timpul configurării lor:

  • Hadoop/Yarn/OS Deamons: Atunci când rulăm o aplicație Spark folosind un manager de cluster precum Yarn, vor exista mai multe daemoni care vor rula în fundal, cum ar fi NameNode, Secondary NameNode, DataNode, JobTracker și TaskTracker. Așadar, în timp ce specificăm num-executori, trebuie să ne asigurăm că lăsăm deoparte suficiente nuclee (~1 nucleu pe nod) pentru ca aceste demoni să ruleze fără probleme.
  • Yarn ApplicationMaster (AM): ApplicationMaster este responsabil pentru negocierea resurselor de la ResourceManager și lucrează cu NodeManagers pentru a executa și monitoriza containerele și consumul lor de resurse. Dacă rulăm spark pe yarn, atunci trebuie să bugetăm resursele de care AM ar avea nevoie (~1024MB și 1 Executor).
  • HDFS Throughput: Clientul HDFS are probleme cu tone de fire concurente. S-a observat că HDFS atinge debitul complet de scriere cu ~5 sarcini pe executor . Deci este bine să se mențină numărul de nuclee pe executor sub acest număr.
  • MemoryOverhead: Următoarea imagine descrie utilizarea memoriei pentru spark-yarn.

Două lucruri de reținut din această imagine:

Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)

Atunci, dacă solicităm 20GB pe executor, AM va primi de fapt 20GB + memoryOverhead = 20 + 7% din 20GB = ~23GB memorie pentru noi.

  • Executarea executorilor cu prea multă memorie duce adesea la întârzieri excesive de colectare a gunoiului.
  • Executarea executorilor mici (cu un singur nucleu și doar atâta memorie necesară pentru a rula o singură sarcină, de exemplu) aruncă pe apa sâmbetei beneficiile care vin din rularea mai multor sarcini într-o singură JVM.

Suficientă teorie… Să trecem la treabă…

Acum, să luăm în considerare un cluster de 10 noduri cu următoarea configurație și să analizăm diferite posibilități de distribuție a executorilor, a nucleului și a memoriei:

**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node

Prima abordare: Executori mici :

Executori mici înseamnă, în esență, un executor pe nucleu. Tabelul următor descrie valorile parametrilor noștri spar-config cu această abordare:

- `--num-executors` = `In this approach, we'll assign one executor per core`
= `total-cores-in-cluster`
= `num-cores-per-node * total-nodes-in-cluster`
= 16 x 10 = 160
- `--executor-cores` = 1 (one executor per core)
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/16 = 4GB

Analiză: Cu un singur executor pe nucleu, așa cum am discutat mai sus, nu vom putea profita de rularea mai multor sarcini în aceeași JVM. De asemenea, variabilele partajate/cache, cum ar fi variabilele de difuzare și acumulatorii, vor fi replicate în fiecare nucleu al nodurilor, ceea ce înseamnă de 16 ori. De asemenea, nu lăsăm suficient overhead de memorie pentru procesele Hadoop/Yarn daemon și nu contăm în ApplicationManager. NU ESTE BUN!

A doua abordare: Executori grași (Un executor pe nod):

Executori grași înseamnă, în esență, un executor pe nod. Tabelul următor descrie valorile parametrilor noștri spark-config cu această abordare:

- `--num-executors` = `In this approach, we'll assign one executor per node`
= `total-nodes-in-cluster`
= 10
- `--executor-cores` = `one executor per node means all the cores of the node are assigned to one executor`
= `total-cores-in-a-node`
= 16
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/1 = 64GB

Analiză: Cu toate cele 16 nuclee pe executor, în afară de ApplicationManager și procesele de demon nu sunt luate în considerare, debitul HDFS va fi afectat și va duce la rezultate excesive de gunoi. De asemenea, NU ESTE BUN!

A treia abordare: Echilibrul între Fat (vs) Tiny

În conformitate cu recomandările pe care le-am discutat mai sus:

  • Bazându-ne pe recomandările menționate mai sus, Să alocăm 5 nuclee pentru executori => – nuclee-executori = 5 (pentru un randament bun al HDFS)
  • Lăsați 1 nucleu pe nod pentru demonii Hadoop/Yarn =>Număr de nuclee disponibile pe nod = 16-1 = 15
  • Deci, Totalul de nuclee disponibile în cluster = 15 x 10 = 150
  • Numărul de executori disponibili = (total nuclee / num-cores-per-executor) = 150 / 5 = 30
  • Lăsând 1 executor pentru ApplicationManager => – num.de executori = 29
  • Numărul de executori pe nod = 30/10 = 3
  • Memorie pe executor = 64GB / 3 = 21GB
  • Contribuie la calculul de overhead heap = 7% din 21GB = 3GB. Deci, memorie reală – executor-memorie = 21-3 = 18GB

Deci, configurația recomandată este: 29 de executori, 18GB memorie fiecare și 5 nuclee fiecare!!!

Analiză: Este evident modul în care această a treia abordare a găsit echilibrul corect între abordările Fat vs. Tiny. Inutil să mai spunem că a obținut paralelismul unui executor gras și cele mai bune debite ale unui executor mic!!!

Concluzie:

Am văzut:

  • Câteva recomandări de care trebuie să ținem cont la configurarea acestor parametri pentru o aplicație Spark, cum ar fi:
  • Buget în resursele de care ar avea nevoie managerul de aplicații Yarn
  • Cum ar trebui să economisim câteva nuclee pentru procesele deamon Hadoop/Yarn/OS deamon
  • Învățați despre spark-yarn-memory-usage
  • De asemenea, am verificat și analizat trei abordări diferite pentru a configura acești parametri:
  1. Tiny Executors – Un executor pe nucleu
  2. Fat Executors – Un executor pe nod
  3. Abordare recomandată – Echilibrul corect între Tiny (Vs) Fat cuplat cu recomandările.

– num-executors, – executor-cores și – executor-memory… acești trei parametri joacă un rol foarte important în performanța Spark, deoarece controlează cantitatea de memorie & CPU pe care o primește aplicația Spark. Acest lucru face ca pentru utilizatori să fie foarte important să înțeleagă modul corect de a le configura. Sper că acest blog v-a ajutat să obțineți această perspectivă…

.