Spark-jargonia aloittelijoille

Tässä blogissa selvitetään joitakin aloitusvaikeuksia, kun aloitteleva koodaa Spark-hajautettua tietojenkäsittelyä. Sen lisäksi, että oppii API:t, täytyy varustautua klusterin yksityiskohdilla saadakseen parhaan mahdollisen hyödyn Sparkin tehosta.

Aloituskohta olisi Cluster Mode Overview .

Ja joitain yleisiä kysymyksiä, jotka saattavat tulla esiin, ovat:

  1. Et vieläkään ymmärrä Spark Standalone -klusterin eri prosesseja ja rinnakkaisuutta.
  2. Johdatit bin\start-slave.sh:n ja huomasit, että se spawnasi workerin, joka on itse asiassa JVM. Onko worker JVM-prosessi vai ei?
  3. Yllä olevan linkin mukaan executor on worker-solmussa sovellukselle käynnistetty prosessi, joka suorittaa tehtäviä. Executor on myös JVM.
  4. Executorit ovat sovelluskohtaisia. Mikä on sitten workerin rooli? Koordinoiko se suorittajan kanssa ja välittää tuloksen takaisin ohjaimelle vai puhuuko ohjain suoraan suorittajalle? Jos näin on, mikä on workerin tarkoitus?
  5. Miten valvotaan sovelluksen suorittajien määrää?
  6. Voidaanko tehtävät suorittaa rinnakkain suorittajan sisällä? Jos on, miten määritetään suorittajan säikeiden määrä?
  7. Millainen suhde on työntekijän, suorittajien ja suoritinytimien ( – total-executor-cores) välillä?
  8. Mitä tarkoittaa, että solmua kohti on enemmän työntekijöitä?

Palataan vielä Spark Cluster -tilan yksityiskohtiin.

Spark käyttää master/slave-arkkitehtuuria. Kuten kuvasta näkyy, siinä on yksi keskuskoordinaattori (Driver), joka kommunikoi monien hajautettujen työntekijöiden (executors) kanssa. Ajuri ja kukin suorittaja toimivat omissa Java-prosesseissaan.

Ajuri on prosessi, jossa päämetodi suoritetaan. Ensin se muuntaa käyttäjäohjelman tehtäviksi ja sen jälkeen se aikatauluttaa tehtävät suorittajille.

Spark-sovellus – -> Ajuri – -> Tehtäväluettelo – -> Aikatauluttaja – -> Suorittajat

SUORITTAJAT

Suorittimet ovat työläissolmujen prosesseja, joiden tehtävänä on suorittaa yksittäisiä tehtäviä tietyssä Spark-töissä. Ne käynnistetään Spark-sovelluksen alussa ja niitä suoritetaan tyypillisesti koko sovelluksen elinkaaren ajan. Kun ne ovat suorittaneet tehtävän, ne lähettävät tulokset ajurille. Ne tarjoavat myös muistin sisäistä tallennustilaa RDD-tietueille, jotka käyttäjäohjelmat tallentavat välimuistiin Block Managerin kautta.

Kun suorittajat käynnistetään, ne rekisteröivät itsensä ajuriin ja siitä eteenpäin ne kommunikoivat suoraan. Työntekijät vastaavat siitä, että ne viestivät klusterin hallinnoijalle resurssiensa saatavuudesta.

Erillisessä klusterissa saat yhden suorittajan per työntekijä, ellet pelaa spark.executor.cores:lla ja työntekijällä on tarpeeksi ytimiä, jotta siihen mahtuu useampi kuin yksi suorittaja.

  • A standalone cluster with 5 worker node (each node having 8 cores) Kun käynnistän sovelluksen oletusasetuksilla.
  • Spark hankkii ahnaasti niin monta ydintä ja suoritinta kuin scheduler tarjoaa. Joten lopulta saat 5 suoritinta, joissa jokaisessa on 8 ydintä.

Seuraavat ovat spark-submit-asetukset, joilla voit leikkiä suoritinten lukumäärällä:

– executor-memory MEM Muisti per suoritin (esim. 1000M, 2G) (Oletus: 1G).

Spark standalone ja YARN vain:
– executor-cores NUM Ytimien määrä per suoritin. (Oletusarvo: 1 YARN-tilassa tai kaikki työntekijän käytettävissä olevat ytimet standalone-tilassa)

YARN-only:
– num-executors NUM Käynnistettävien suoritinten lukumäärä (Oletusarvo: 2). Jos dynaaminen allokointi on käytössä, suoritinten alkuperäinen määrä on vähintään NUM.

  • Merkitseekö 2 worker-instanssia yhtä worker-solmua, jossa on 2 worker-prosessia?

Solmu on kone, eikä ole hyvää syytä käyttää useampaa kuin yhtä workeria per kone. Joten kaksi worker nodea tarkoittaa tyypillisesti kahta konetta, joissa kummassakin on Spark-työntekijä.

1 Node = 1 Worker-prosessi

  • Pidetäänkö jokaisessa worker-instanssissa tiettyyn sovellukseen liittyvää suorittajaa (joka hallinnoi tallennusta, tehtävää) vai pitääkö yksi worker node sisällään yhden suorittajan?

Workereissa on monta suorittajaa, monille sovelluksille. Yhdellä sovelluksella on toteuttajia monilla työläisillä

Työläissolmulla voi olla useita toteuttajia (prosesseja), jos sillä on riittävästi suorittimia, muistia ja tallennustilaa.

  • BTW, Suorittajien määrä työläissolmussa tietyllä hetkellä riippuu täysin klusterin työkuormituksesta ja solmun kyvystä pyörittää kuinka montaa suorittajaa.

SOVELLUKSEN TÄYTÄNTÖÖNPANOVIRTA

Tässä mielessä, kun lähetät sovelluksen klusteriin spark-submit-ohjelmalla, sisäisesti tapahtuu näin:

  1. Ensinäinen sovellus käynnistyy ja instanttisoi SparkContext/SparkSession-instanssin (ja vasta sen jälkeen voit kutsua sovellusta ajuriksi).
  2. Ajuriohjelma pyytää resursseja klusterinhallinnalta suorittajien käynnistämistä varten.
  3. Klusterinhallinta käynnistää suorittajat.
  4. Ajuriprosessi kulkee käyttäjän sovelluksen kautta. Riippuen RDD:n yli tapahtuvista toimista ja muunnoksista tehtävät lähetetään suorittajille.
  5. Toteuttajat suorittavat tehtävät ja tallentavat tulokset.
  6. Jos jokin työntekijä kaatuu, sen tehtävät lähetetään eri suorittajille uudelleen käsiteltäväksi. Kirjassa ”Learning Spark: Lightning-Fast Big Data Analysis” puhutaan Sparkista ja vikasietoisuudesta:

Spark käsittelee automaattisesti vikaantuneita tai hitaita koneita suorittamalla epäonnistuneet tai hitaat tehtävät uudelleen. Esimerkiksi jos solmu, joka suorittaa map()-operaation osion, kaatuu, Spark suorittaa sen uudelleen toisessa solmussa; ja vaikka solmu ei kaatuisikaan, vaan olisi vain paljon hitaampi kuin muut solmut, Spark voi preemptivisesti käynnistää ”spekulatiivisen” kopion tehtävästä toisessa solmussa ja ottaa sen tuloksen, jos se valmistuu.

  1. Ajurin SparkContext.stop():lla tai jos päämetodi poistuu/kaatuilee, kaikki suorittajat lopetetaan ja klusterinhallinta vapauttaa klusterin resurssit.

Jos tarkastelemme suoritusta Spark-prospektiivista minkä tahansa resurssienhallintaohjelman yli, joka yhdistää kaksi rdds:ää ja tekee jonkin reduktio-operaation, niin suodatin

Oheinen lista kaappaa joitain suosituksia, jotka on syytä pitää mielessä niiden konfiguroinnissa:

  • Hadoop/Yarn/OS Deamons: NameNode, Secondary NameNode, DataNode, JobTracker ja TaskTracker. Joten määrittäessämme num-executors, meidän on varmistettava, että jätämme tarpeeksi ytimiä (~1 ydin per solmu), jotta nämä daemonit voivat toimia sujuvasti.
  • Yarn ApplicationMaster (AM): ApplicationMaster vastaa resurssien neuvottelemisesta ResourceManagerilta ja työskentelee NodeManagerien kanssa konttien suorittamiseksi ja valvomiseksi sekä niiden resurssien kulutuksen seuraamiseksi. Jos ajamme sparkia yarnilla, meidän on budjetoitava AM:n tarvitsemat resurssit (~1024MB ja 1 Executor).
  • HDFS Throughput: HDFS-asiakkaalla on ongelmia tonnien samanaikaisten säikeiden kanssa. Havaittiin, että HDFS saavuttaa täyden kirjoitusläpimenon ~5 tehtävällä per suoritin . Joten on hyvä pitää ytimien määrä per suoritin alle tämän luvun.
  • MemoryOverhead: Seuraava kuva kuvaa spark-yarn-muistinkäyttöä.

Kaksi huomioitavaa asiaa tästä kuvasta:

Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)

Jos siis pyydämme 20 Gt muistia per executor, AM saa todellisuudessa 20 Gt + memoryOverhead = 20 + 7 %:a 20 Gt:n muistin osuudesta +7 %:a 20 Gt:n muistin osuudesta = ~23 Gt muistia meille.

  • Toteuttajien ajaminen liian suurella muistimäärällä johtaa usein liiallisiin roskienkeräysviiveisiin.
  • Pienienien toteuttajien ajaminen (esimerkiksi yhdellä ytimellä ja vain sen verran muistia, mitä tarvitaan yhden tehtävän suorittamiseen) heittää pois hyödyt, joita saadaan useiden tehtävien suorittamisesta yhdessä JVM:ssä.

Ei riitä teoria… Mennään käytännönläheisesti…

Katsotaan nyt 10 solmun klusteria seuraavalla konfiguraatiolla ja analysoidaan eri mahdollisuuksia suoritin-core-muistijakaumalle:

**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node

Ensimmäinen lähestymistapa:

Pienet suorittajat tarkoittavat lähinnä yhtä suorittajaa per ydin. Seuraava taulukko kuvaa spar-config-parametriemme arvoja tällä lähestymistavalla:

- `--num-executors` = `In this approach, we'll assign one executor per core`
= `total-cores-in-cluster`
= `num-cores-per-node * total-nodes-in-cluster`
= 16 x 10 = 160
- `--executor-cores` = 1 (one executor per core)
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/16 = 4GB

Analyysi: Vain yksi suoritin per ydin, kuten edellä keskustelimme, emme pysty hyödyntämään useiden tehtävien suorittamista samassa JVM:ssä. Lisäksi jaetut/välimuistiin tallennetut muuttujat, kuten broadcast-muuttujat ja akkumulaattorit, kopioidaan jokaisessa solmun ytimessä, mikä on 16 kertaa. Emme myöskään jätä riittävästi muistin päällekkäistä muistia Hadoop/Yarn-demoniprosesseille, emmekä ota huomioon ApplicationManageria. EI HYVÄ!

Toinen lähestymistapa: Fat executors (One Executor per node):

Fat executors tarkoittaa periaatteessa yhtä executoria per solmu. Seuraava taulukko kuvaa spark-config-parametriemme arvoja tällä lähestymistavalla:

- `--num-executors` = `In this approach, we'll assign one executor per node`
= `total-nodes-in-cluster`
= 10
- `--executor-cores` = `one executor per node means all the cores of the node are assigned to one executor`
= `total-cores-in-a-node`
= 16
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/1 = 64GB

Analyysi: Kun kaikki 16 ydintä per suoritin, lukuun ottamatta ApplicationManageria ja daemon-prosesseja ei lasketa, HDFS:n läpäisykyky kärsii ja se johtaa liiallisiin roskatuloksiin. Lisäksi EI HYVÄ!

Kolmas lähestymistapa: Tasapaino Fat (vs) Tiny

Yllämainittujen suositusten mukaisesti:

  • Based on the recommendations mentioned above, Let’s assign 5 core per executors => – executor-cores = 5 (for good HDFS throughput)
  • Lave 1 core per node for Hadoop/Yarn daemons => Num cores available per node = 16-1 = 15
  • So, Käytettävissä olevien ydinten kokonaismäärä klusterissa = 15 x 10 = 150
  • Käytettävissä olevien toteuttajien määrä = (ydinten kokonaismäärä / ydinten määrä per toteuttaja) = 150 / 5 = 30
  • Jättää 1 toteuttaja ApplicationManagerille => – num-executors = 29
  • Toteuttajien määrä per solmu = 30/10 = 3
  • Muisti per toteuttaja = 64GB / 3 = 21GB
  • Lasketaan pois heap overhead = 7 % 21GB:sta = 3GB. Eli todellinen – suorittaja-muisti = 21-3 = 18GB

Suositeltu konfiguraatio on siis: 29 suorittajaa, 18GB muistia kukin ja 5 ydintä kukin!!!

Analyysi: On selvää, miten tämä kolmas lähestymistapa on löytänyt oikean tasapainon Fat vs. Tiny -lähestymistapojen välillä. Sanomattakin on selvää, että sillä saavutettiin fat-toteuttajan rinnakkaisuus ja tiny-toteuttajan parhaat läpimenot!!!

Johtopäätökset:

Olemme nähneet:

  • Pari suositusta, jotka on syytä pitää mielessä, kun konfiguroidaan näitä parametreja spark-sovellusta varten:
  • Budjetoida resursseja, joita Yarnin sovellushallinta tarvitsisi
  • Miten meidän pitäisi säästää joitain ytimiä Hadoop/Yarn/OS deamon-prosesseja varten
  • Oppia spark-yarn-muistin käytöstä
  • Myös tarkistettu ja analysoitu kolmea erilaista tapaa konfiguroida näitä parameja:
  1. Tiny Executors – One Executor per Core
  2. Fat Executors – One executor per Node
  3. Suositeltu lähestymistapa – Oikea tasapaino Tiny:n (Vs) Fat:n välillä yhdistettynä suosituksiin.

– num-executors, – executor-cores ja – executor-memory.. näillä kolmella parametrilla on erittäin tärkeä rooli spark-suorituskyvyssä, koska ne kontrolloivat sitä, kuinka paljon CPU & muistia spark-sovelluksesi saa. Tämän vuoksi käyttäjien on erittäin tärkeää ymmärtää oikea tapa määrittää ne. Toivottavasti tämä blogi auttoi sinua saamaan tämän näkökulman…