Gergo Spark per i principianti

Questo blog è per chiarire alcuni dei problemi di partenza quando un principiante si occupa di codici per il calcolo distribuito Spark. A parte l’apprendimento delle API, si devono conoscere i dettagli del cluster per sfruttare al meglio la potenza di Spark.

Il punto di partenza sarebbe la panoramica della modalità Cluster.

E alcune domande comuni che potrebbero sorgere sono:

  1. Non riesci ancora a capire i diversi processi nel cluster Spark Standalone e il parallelismo.
  2. Hai eseguito il bin\start-slave.sh e hai scoperto che ha generato il worker, che è effettivamente una JVM. Il worker è un processo JVM o no?
  3. Come da link sopra, un executor è un processo lanciato per un’applicazione su un nodo worker che esegue compiti. L’esecutore è anche una JVM.
  4. Gli esecutori sono per applicazione. Allora qual è il ruolo di un worker? Si coordina con l’esecutore e comunica il risultato al driver o il driver parla direttamente con l’esecutore? Se è così, qual è lo scopo del worker allora?
  5. Come controllare il numero di executor per un’applicazione?
  6. Può essere fatto in modo che i compiti vengano eseguiti in parallelo all’interno dell’executor? Se sì, come configurare il numero di thread per un executor?
  7. Qual è la relazione tra worker, executor e executor cores ( – total-executor-cores)?
  8. cosa significa avere più worker per nodo?

Ripercorriamo i dettagli della modalità Spark Cluster.

Spark utilizza un’architettura master/slave. Come potete vedere nella figura, ha un coordinatore centrale (Driver) che comunica con molti lavoratori distribuiti (esecutori). Il driver e ciascuno degli esecutori vengono eseguiti nei propri processi Java.

Il driver è il processo dove viene eseguito il metodo principale. Prima converte il programma utente in compiti e poi pianifica i compiti sugli esecutori.

Applicazione Spark – -> Driver – -> Elenco dei compiti – -> Scheduler – -> Esecutori

EXECUTORS

Gli esecutori sono processi di nodi lavoratori incaricati di eseguire singoli compiti in un dato lavoro Spark. Sono lanciati all’inizio di un’applicazione Spark e tipicamente vengono eseguiti per tutta la durata dell’applicazione. Una volta che hanno eseguito il compito, inviano i risultati al driver. Forniscono anche lo storage in-memory per gli RDD che vengono messi in cache dai programmi utente attraverso Block Manager.

Quando gli esecutori vengono avviati si registrano con il driver e da quel momento in poi comunicano direttamente. I worker hanno il compito di comunicare al gestore del cluster la disponibilità delle loro risorse.

In un cluster standalone si avrà un solo executor per worker a meno che non si giochi con spark.executor.cores e un worker abbia abbastanza core per contenere più di un executor.

  • Un cluster autonomo con 5 nodi worker (ogni nodo ha 8 core) Quando avvio un’applicazione con impostazioni predefinite.
  • Spark acquisirà avidamente tanti core ed esecutori quanti sono offerti dallo scheduler. Quindi alla fine si otterranno 5 esecutori con 8 core ciascuno.

Seguono le opzioni di spark-submit per giocare con il numero di esecutori:

– executor-memory MEM Memoria per esecutore (es. 1000M, 2G) (Predefinito: 1G).

Spark standalone e YARN solo:
– executor-cores NUM Numero di core per esecutore. (Predefinito: 1 in modalità YARN, o tutti i core disponibili sul worker in modalità standalone)

YARN-only:
– num-executors NUM Numero di executor da lanciare (Predefinito: 2). Se l’allocazione dinamica è abilitata, il numero iniziale di esecutori sarà almeno NUM.

  • Istanza 2 worker significa un nodo worker con 2 processi worker?

Un nodo è una macchina, e non c’è una buona ragione per eseguire più di un worker per macchina. Quindi due nodi worker tipicamente significano due macchine, ognuna delle quali è un worker Spark.

1 nodo = 1 processo worker

  • Ogni istanza worker contiene un executor per una specifica applicazione (che gestisce lo storage, il task) o un nodo worker contiene un executor?

I worker contengono molti executor, per molte applicazioni. Un’applicazione ha esecutori su molti lavoratori

Un nodo lavoratore può contenere più esecutori (processi) se ha sufficiente CPU, memoria e archiviazione.

  • BTW, il numero di esecutori in un nodo lavoratore in un dato momento dipende interamente dal carico di lavoro sul cluster e dalla capacità del nodo di eseguire quanti esecutori.

APPLICATION EXECUTION FLOW

Con questo in mente, quando si invia un’applicazione al cluster con spark-submit questo è ciò che accade internamente:

  1. Un’applicazione standalone inizia e istanzia un’istanza SparkContext/SparkSession (ed è solo allora che si può chiamare l’applicazione driver).
  2. Il programma del driver chiede risorse al gestore del cluster per lanciare gli esecutori.
  3. Il gestore del cluster lancia gli esecutori.
  4. Il processo del driver gira attraverso l’applicazione utente. A seconda delle azioni e delle trasformazioni su RDD, i compiti vengono inviati agli esecutori.
  5. Gli esecutori eseguono i compiti e salvano i risultati.
  6. Se un lavoratore si blocca, i suoi compiti saranno inviati a diversi esecutori per essere elaborati nuovamente. Nel libro “Learning Spark: Lightning-Fast Big Data Analysis” si parla di Spark e Fault Tolerance:

Spark affronta automaticamente le macchine fallite o lente rieseguendo i compiti falliti o lenti. Per esempio, se il nodo che esegue una partizione di un’operazione map() si blocca, Spark la rieseguirà su un altro nodo; e anche se il nodo non si blocca ma è semplicemente molto più lento di altri nodi, Spark può lanciare preventivamente una copia “speculativa” del task su un altro nodo, e prenderne il risultato se questo finisce.

  1. Con SparkContext.stop() dal driver o se il metodo principale esce/crasha tutti gli esecutori saranno terminati e le risorse del cluster saranno rilasciate dal gestore del cluster.

Se guardiamo l’esecuzione da Spark prospect su qualsiasi gestore di risorse per un programma, che unisce due rdds e fa qualche operazione di riduzione allora filtra

L’elenco seguente cattura alcune raccomandazioni da tenere a mente mentre le configura:

  • Hadoop/Yarn/OS Deamons: Quando eseguiamo un’applicazione spark utilizzando un cluster manager come Yarn, ci saranno diversi demoni che verranno eseguiti in background come NameNode, Secondary NameNode, DataNode, JobTracker e TaskTracker. Quindi, mentre specifichiamo il numero di esecutori, dobbiamo assicurarci di lasciare da parte abbastanza core (~1 core per nodo) perché questi demoni funzionino senza problemi.
  • Yarn ApplicationMaster (AM): ApplicationMaster è responsabile della negoziazione delle risorse dal ResourceManager e lavora con i NodeManager per eseguire e monitorare i container e il loro consumo di risorse. Se stiamo eseguendo spark su yarn, allora dobbiamo preventivare le risorse di cui AM avrebbe bisogno (~1024MB e 1 Executor).
  • HDFS Throughput: Il client HDFS ha problemi con tonnellate di thread concorrenti. È stato osservato che HDFS raggiunge il pieno throughput di scrittura con ~ 5 compiti per esecutore. Quindi è bene mantenere il numero di core per esecutore al di sotto di quel numero.
  • MemoryOverhead: L’immagine seguente mostra il consumo di memoria di spark-yarn.

Due cose da notare da questa immagine:

Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)

Quindi, se richiediamo 20GB per executor, AM otterrà effettivamente 20GB + memoryOverhead = 20 + 7% di 20GB = ~23GB di memoria per noi.

  • Eseguire gli esecutori con troppa memoria spesso porta a ritardi eccessivi nella garbage collection.
  • Eseguire esecutori minuscoli (con un singolo core e solo la memoria necessaria per eseguire un singolo compito, per esempio) butta via i benefici che vengono dall’esecuzione di più compiti in una singola JVM.

Basta teoria… Andiamo sul pratico..

Ora, consideriamo un cluster di 10 nodi con la seguente configurazione e analizziamo diverse possibilità di distribuzione di esecutori-core-memoria:

**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node

Primo approccio: Tiny executors :

Tiny executors significa essenzialmente un executor per core. La seguente tabella mostra i valori dei nostri parametri spar-config con questo approccio:

- `--num-executors` = `In this approach, we'll assign one executor per core`
= `total-cores-in-cluster`
= `num-cores-per-node * total-nodes-in-cluster`
= 16 x 10 = 160
- `--executor-cores` = 1 (one executor per core)
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/16 = 4GB

Analisi: Con un solo esecutore per core, come abbiamo discusso sopra, non saremo in grado di trarre vantaggio dall’esecuzione di più compiti nella stessa JVM. Inoltre, le variabili condivise/cache come le variabili broadcast e gli accumulatori saranno replicati in ogni core dei nodi che è 16 volte. Inoltre, non stiamo lasciando abbastanza overhead di memoria per i processi del demone Hadoop/Yarn e non stiamo contando in ApplicationManager. NON VA BENE!

Secondo approccio: Fat executors (Un esecutore per nodo):

Fat executors significa essenzialmente un esecutore per nodo. La seguente tabella mostra i valori dei nostri parametri spark-config con questo approccio:

- `--num-executors` = `In this approach, we'll assign one executor per node`
= `total-nodes-in-cluster`
= 10
- `--executor-cores` = `one executor per node means all the cores of the node are assigned to one executor`
= `total-cores-in-a-node`
= 16
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/1 = 64GB

Analisi: Con tutti i 16 core per esecutore, a parte ApplicationManager e i processi daemon non vengono contati, il throughput di HDFS ne risentirà e si tradurrà in risultati eccessivi di garbage. Inoltre, NON VA BENE!

Terzo approccio: Equilibrio tra Fat (vs) Tiny

Secondo le raccomandazioni che abbiamo discusso sopra:

  • In base alle raccomandazioni menzionate sopra, assegniamo 5 core per esecutori => – executor-cores = 5 (per un buon throughput HDFS)
  • Lascia 1 core per nodo per i demoni Hadoop/Yarn => Num core disponibili per nodo = 16-1 = 15
  • So, Totale disponibile di core nel cluster = 15 x 10 = 150
  • Numero di esecutori disponibili = (core totali / num-cores-per-esecutore) = 150 / 5 = 30
  • Lasciare 1 esecutore per ApplicationManager => – num-esecutori = 29
  • Numero di esecutori per nodo = 30/10 = 3
  • Memoria per esecutore = 64GB / 3 = 21GB
  • Contando l’overhead di heap = 7% di 21GB = 3GB. Perciò, la memoria effettiva – executor-memoria = 21-3 = 18GB

Quindi, la configurazione raccomandata è: 29 executor, 18GB di memoria ciascuno e 5 core ciascuno!

Analisi: È ovvio come questo terzo approccio abbia trovato il giusto equilibrio tra gli approcci Fat e Tiny. Inutile dire che ha raggiunto il parallelismo di un fat executor e i migliori throughput di un tiny executor!

Conclusione:

Abbiamo visto:

  • Un paio di raccomandazioni da tenere a mente quando si configurano questi parametri per un’applicazione spark come:
  • Budget nelle risorse che l’Application Manager di Yarn avrebbe bisogno
  • Come dovremmo risparmiare alcuni core per i processi deamon di Hadoop/Yarn/OS
  • Imparare su spark-yarn-memory-usage
  • Inoltre, abbiamo controllato e analizzato tre diversi approcci per configurare questi parametri:
  1. Tiny Executors – Un esecutore per Core
  2. Fat Executors – Un esecutore per Nodo
  3. Approccio consigliato – Giusto equilibrio tra Tiny (Vs) Fat abbinato alle raccomandazioni.

– num-executors, – executor-cores e – executor-memory… questi tre parametri giocano un ruolo molto importante nelle prestazioni di spark in quanto controllano la quantità di memoria della CPU & della tua applicazione spark. Questo rende molto cruciale per gli utenti capire il modo giusto per configurarli. Spero che questo blog vi abbia aiutato a ottenere questa prospettiva…

.