Questo blog è per chiarire alcuni dei problemi di partenza quando un principiante si occupa di codici per il calcolo distribuito Spark. A parte l’apprendimento delle API, si devono conoscere i dettagli del cluster per sfruttare al meglio la potenza di Spark.
Il punto di partenza sarebbe la panoramica della modalità Cluster.
E alcune domande comuni che potrebbero sorgere sono:
- Non riesci ancora a capire i diversi processi nel cluster Spark Standalone e il parallelismo.
- Hai eseguito il bin\start-slave.sh e hai scoperto che ha generato il worker, che è effettivamente una JVM. Il worker è un processo JVM o no?
- Come da link sopra, un executor è un processo lanciato per un’applicazione su un nodo worker che esegue compiti. L’esecutore è anche una JVM.
- Gli esecutori sono per applicazione. Allora qual è il ruolo di un worker? Si coordina con l’esecutore e comunica il risultato al driver o il driver parla direttamente con l’esecutore? Se è così, qual è lo scopo del worker allora?
- Come controllare il numero di executor per un’applicazione?
- Può essere fatto in modo che i compiti vengano eseguiti in parallelo all’interno dell’executor? Se sì, come configurare il numero di thread per un executor?
- Qual è la relazione tra worker, executor e executor cores ( – total-executor-cores)?
- cosa significa avere più worker per nodo?
Ripercorriamo i dettagli della modalità Spark Cluster.
![](https://miro.medium.com/max/60/0*wgv-t-TXGvAzZYRm.png?q=20)
Spark utilizza un’architettura master/slave. Come potete vedere nella figura, ha un coordinatore centrale (Driver) che comunica con molti lavoratori distribuiti (esecutori). Il driver e ciascuno degli esecutori vengono eseguiti nei propri processi Java.
Il driver è il processo dove viene eseguito il metodo principale. Prima converte il programma utente in compiti e poi pianifica i compiti sugli esecutori.
Applicazione Spark – -> Driver – -> Elenco dei compiti – -> Scheduler – -> Esecutori
- EXECUTORS
- APPLICATION EXECUTION FLOW
- L’elenco seguente cattura alcune raccomandazioni da tenere a mente mentre le configura:
- Basta teoria… Andiamo sul pratico..
- Primo approccio: Tiny executors :
- Secondo approccio: Fat executors (Un esecutore per nodo):
- Terzo approccio: Equilibrio tra Fat (vs) Tiny
- Conclusione:
EXECUTORS
Gli esecutori sono processi di nodi lavoratori incaricati di eseguire singoli compiti in un dato lavoro Spark. Sono lanciati all’inizio di un’applicazione Spark e tipicamente vengono eseguiti per tutta la durata dell’applicazione. Una volta che hanno eseguito il compito, inviano i risultati al driver. Forniscono anche lo storage in-memory per gli RDD che vengono messi in cache dai programmi utente attraverso Block Manager.
Quando gli esecutori vengono avviati si registrano con il driver e da quel momento in poi comunicano direttamente. I worker hanno il compito di comunicare al gestore del cluster la disponibilità delle loro risorse.
In un cluster standalone si avrà un solo executor per worker a meno che non si giochi con spark.executor.cores e un worker abbia abbastanza core per contenere più di un executor.
- Un cluster autonomo con 5 nodi worker (ogni nodo ha 8 core) Quando avvio un’applicazione con impostazioni predefinite.
- Spark acquisirà avidamente tanti core ed esecutori quanti sono offerti dallo scheduler. Quindi alla fine si otterranno 5 esecutori con 8 core ciascuno.
Seguono le opzioni di spark-submit per giocare con il numero di esecutori:
– executor-memory MEM Memoria per esecutore (es. 1000M, 2G) (Predefinito: 1G).
Spark standalone e YARN solo:
– executor-cores NUM Numero di core per esecutore. (Predefinito: 1 in modalità YARN, o tutti i core disponibili sul worker in modalità standalone)
YARN-only:
– num-executors NUM Numero di executor da lanciare (Predefinito: 2). Se l’allocazione dinamica è abilitata, il numero iniziale di esecutori sarà almeno NUM.
- Istanza 2 worker significa un nodo worker con 2 processi worker?
Un nodo è una macchina, e non c’è una buona ragione per eseguire più di un worker per macchina. Quindi due nodi worker tipicamente significano due macchine, ognuna delle quali è un worker Spark.
1 nodo = 1 processo worker
- Ogni istanza worker contiene un executor per una specifica applicazione (che gestisce lo storage, il task) o un nodo worker contiene un executor?
I worker contengono molti executor, per molte applicazioni. Un’applicazione ha esecutori su molti lavoratori
Un nodo lavoratore può contenere più esecutori (processi) se ha sufficiente CPU, memoria e archiviazione.
![](https://miro.medium.com/max/60/0*WzKBVT6XKRqjVXvy.png?q=20)
- BTW, il numero di esecutori in un nodo lavoratore in un dato momento dipende interamente dal carico di lavoro sul cluster e dalla capacità del nodo di eseguire quanti esecutori.
APPLICATION EXECUTION FLOW
Con questo in mente, quando si invia un’applicazione al cluster con spark-submit questo è ciò che accade internamente:
- Un’applicazione standalone inizia e istanzia un’istanza SparkContext/SparkSession (ed è solo allora che si può chiamare l’applicazione driver).
- Il programma del driver chiede risorse al gestore del cluster per lanciare gli esecutori.
- Il gestore del cluster lancia gli esecutori.
- Il processo del driver gira attraverso l’applicazione utente. A seconda delle azioni e delle trasformazioni su RDD, i compiti vengono inviati agli esecutori.
- Gli esecutori eseguono i compiti e salvano i risultati.
- Se un lavoratore si blocca, i suoi compiti saranno inviati a diversi esecutori per essere elaborati nuovamente. Nel libro “Learning Spark: Lightning-Fast Big Data Analysis” si parla di Spark e Fault Tolerance:
Spark affronta automaticamente le macchine fallite o lente rieseguendo i compiti falliti o lenti. Per esempio, se il nodo che esegue una partizione di un’operazione map() si blocca, Spark la rieseguirà su un altro nodo; e anche se il nodo non si blocca ma è semplicemente molto più lento di altri nodi, Spark può lanciare preventivamente una copia “speculativa” del task su un altro nodo, e prenderne il risultato se questo finisce.
- Con SparkContext.stop() dal driver o se il metodo principale esce/crasha tutti gli esecutori saranno terminati e le risorse del cluster saranno rilasciate dal gestore del cluster.
Se guardiamo l’esecuzione da Spark prospect su qualsiasi gestore di risorse per un programma, che unisce due rdds e fa qualche operazione di riduzione allora filtra
![](https://miro.medium.com/max/60/0*W-NmTW4OfZ512mNH.jpg?q=20)
L’elenco seguente cattura alcune raccomandazioni da tenere a mente mentre le configura:
- Hadoop/Yarn/OS Deamons: Quando eseguiamo un’applicazione spark utilizzando un cluster manager come Yarn, ci saranno diversi demoni che verranno eseguiti in background come NameNode, Secondary NameNode, DataNode, JobTracker e TaskTracker. Quindi, mentre specifichiamo il numero di esecutori, dobbiamo assicurarci di lasciare da parte abbastanza core (~1 core per nodo) perché questi demoni funzionino senza problemi.
- Yarn ApplicationMaster (AM): ApplicationMaster è responsabile della negoziazione delle risorse dal ResourceManager e lavora con i NodeManager per eseguire e monitorare i container e il loro consumo di risorse. Se stiamo eseguendo spark su yarn, allora dobbiamo preventivare le risorse di cui AM avrebbe bisogno (~1024MB e 1 Executor).
- HDFS Throughput: Il client HDFS ha problemi con tonnellate di thread concorrenti. È stato osservato che HDFS raggiunge il pieno throughput di scrittura con ~ 5 compiti per esecutore. Quindi è bene mantenere il numero di core per esecutore al di sotto di quel numero.
- MemoryOverhead: L’immagine seguente mostra il consumo di memoria di spark-yarn.
![](https://miro.medium.com/max/60/0*vMOcWLXzWGg6_RB6.png?q=20)
Due cose da notare da questa immagine:
Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)
Quindi, se richiediamo 20GB per executor, AM otterrà effettivamente 20GB + memoryOverhead = 20 + 7% di 20GB = ~23GB di memoria per noi.
- Eseguire gli esecutori con troppa memoria spesso porta a ritardi eccessivi nella garbage collection.
- Eseguire esecutori minuscoli (con un singolo core e solo la memoria necessaria per eseguire un singolo compito, per esempio) butta via i benefici che vengono dall’esecuzione di più compiti in una singola JVM.
Basta teoria… Andiamo sul pratico..
Ora, consideriamo un cluster di 10 nodi con la seguente configurazione e analizziamo diverse possibilità di distribuzione di esecutori-core-memoria:
**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node
Primo approccio: Tiny executors :
Tiny executors significa essenzialmente un executor per core. La seguente tabella mostra i valori dei nostri parametri spar-config con questo approccio:
- `--num-executors` = `In this approach, we'll assign one executor per core`
= `total-cores-in-cluster`
= `num-cores-per-node * total-nodes-in-cluster`
= 16 x 10 = 160
- `--executor-cores` = 1 (one executor per core)
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/16 = 4GB
Analisi: Con un solo esecutore per core, come abbiamo discusso sopra, non saremo in grado di trarre vantaggio dall’esecuzione di più compiti nella stessa JVM. Inoltre, le variabili condivise/cache come le variabili broadcast e gli accumulatori saranno replicati in ogni core dei nodi che è 16 volte. Inoltre, non stiamo lasciando abbastanza overhead di memoria per i processi del demone Hadoop/Yarn e non stiamo contando in ApplicationManager. NON VA BENE!
Secondo approccio: Fat executors (Un esecutore per nodo):
Fat executors significa essenzialmente un esecutore per nodo. La seguente tabella mostra i valori dei nostri parametri spark-config con questo approccio:
- `--num-executors` = `In this approach, we'll assign one executor per node`
= `total-nodes-in-cluster`
= 10
- `--executor-cores` = `one executor per node means all the cores of the node are assigned to one executor`
= `total-cores-in-a-node`
= 16
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/1 = 64GB
Analisi: Con tutti i 16 core per esecutore, a parte ApplicationManager e i processi daemon non vengono contati, il throughput di HDFS ne risentirà e si tradurrà in risultati eccessivi di garbage. Inoltre, NON VA BENE!
Terzo approccio: Equilibrio tra Fat (vs) Tiny
Secondo le raccomandazioni che abbiamo discusso sopra:
- In base alle raccomandazioni menzionate sopra, assegniamo 5 core per esecutori => – executor-cores = 5 (per un buon throughput HDFS)
- Lascia 1 core per nodo per i demoni Hadoop/Yarn => Num core disponibili per nodo = 16-1 = 15
- So, Totale disponibile di core nel cluster = 15 x 10 = 150
- Numero di esecutori disponibili = (core totali / num-cores-per-esecutore) = 150 / 5 = 30
- Lasciare 1 esecutore per ApplicationManager => – num-esecutori = 29
- Numero di esecutori per nodo = 30/10 = 3
- Memoria per esecutore = 64GB / 3 = 21GB
- Contando l’overhead di heap = 7% di 21GB = 3GB. Perciò, la memoria effettiva – executor-memoria = 21-3 = 18GB
Quindi, la configurazione raccomandata è: 29 executor, 18GB di memoria ciascuno e 5 core ciascuno!
Analisi: È ovvio come questo terzo approccio abbia trovato il giusto equilibrio tra gli approcci Fat e Tiny. Inutile dire che ha raggiunto il parallelismo di un fat executor e i migliori throughput di un tiny executor!
Conclusione:
Abbiamo visto:
- Un paio di raccomandazioni da tenere a mente quando si configurano questi parametri per un’applicazione spark come:
- Budget nelle risorse che l’Application Manager di Yarn avrebbe bisogno
- Come dovremmo risparmiare alcuni core per i processi deamon di Hadoop/Yarn/OS
- Imparare su spark-yarn-memory-usage
- Inoltre, abbiamo controllato e analizzato tre diversi approcci per configurare questi parametri:
- Tiny Executors – Un esecutore per Core
- Fat Executors – Un esecutore per Nodo
- Approccio consigliato – Giusto equilibrio tra Tiny (Vs) Fat abbinato alle raccomandazioni.
– num-executors, – executor-cores e – executor-memory… questi tre parametri giocano un ruolo molto importante nelle prestazioni di spark in quanto controllano la quantità di memoria della CPU & della tua applicazione spark. Questo rende molto cruciale per gli utenti capire il modo giusto per configurarli. Spero che questo blog vi abbia aiutato a ottenere questa prospettiva…
.