Jargon Spark pour les débutants

Ce blog a pour but d’éclaircir certains des problèmes de départ lorsque le débutant code pour l’informatique distribuée Spark. Outre l’apprentissage des API, il faut s’équiper des détails du cluster pour tirer le meilleur de la puissance de Spark.

Le point de départ serait l’aperçu du mode cluster .

Et certaines questions courantes qui pourraient surgir sont :

  1. Vous ne pouvez toujours pas comprendre les différents processus dans le cluster Spark Standalone et le parallélisme.
  2. Vous avez exécuté le bin\start-slave.sh et constaté qu’il a spawné le worker, qui est en fait une JVM. Le worker est-il un processus JVM ou non ?
  3. Selon le lien ci-dessus, un exécuteur est un processus lancé pour une application sur un nœud worker qui exécute des tâches. L’exécuteur est aussi une JVM.
  4. Les exécuteurs sont par application. Alors quel est le rôle d’un worker ? Est-ce qu’il se coordonne avec l’exécuteur et communique le résultat en retour au pilote ? ou est-ce que le pilote parle directement à l’exécuteur ? Si oui, quel est alors le but du worker ?
  5. Comment contrôler le nombre d’exécuteurs pour une application ?
  6. Peut-on faire en sorte que les tâches s’exécutent en parallèle à l’intérieur de l’exécuteur ? Si oui, comment configurer le nombre de threads pour un exécuteur ?
  7. Quelle est la relation entre le travailleur, les exécuteurs et les cœurs d’exécuteurs ( – total-executor-cores) ?
  8. Qu’est-ce que cela signifie d’avoir plus de travailleurs par nœud ?

Revisitons les détails du mode cluster de Spark.

Spark utilise une architecture maître/esclave. Comme vous pouvez le voir sur la figure, il possède un coordinateur central (Driver) qui communique avec de nombreux travailleurs distribués (executeurs). Le pilote et chacun des exécuteurs s’exécutent dans leurs propres processus Java.

Le pilote est le processus où s’exécute la méthode principale. Il convertit d’abord le programme utilisateur en tâches et après cela, il planifie les tâches sur les exécuteurs.

Application Spark – -> Pilote – -> Liste de tâches – -> Planificateur – -> Exécuteurs

EXECUTEURS

Les exécuteurs sont des processus de nœuds travailleurs chargés d’exécuter des tâches individuelles dans un travail Spark donné. Ils sont lancés au début d’une application Spark et fonctionnent généralement pendant toute la durée de vie d’une application. Une fois qu’ils ont exécuté la tâche, ils envoient les résultats au pilote. Ils fournissent également un stockage en mémoire pour les RDD qui sont mis en cache par les programmes utilisateurs via Block Manager.

Lorsque les exécuteurs sont lancés, ils s’enregistrent auprès du pilote et à partir de là, ils communiquent directement. Les workers sont chargés de communiquer au gestionnaire du cluster la disponibilité de leurs ressources.

Dans un cluster autonome, vous aurez un exécuteur par worker, à moins que vous ne jouiez avec spark.executor.cores et qu’un worker ait suffisamment de cœurs pour contenir plus d’un exécuteur.

  • Un cluster autonome avec 5 nœuds travailleurs (chaque nœud ayant 8 cœurs) Lorsque je démarre une application avec les paramètres par défaut.
  • Spark va acquérir avec avidité autant de cœurs et d’exécuteurs que ceux offerts par l’ordonnanceur. Donc, à la fin, vous obtiendrez 5 exécuteurs avec 8 cœurs chacun.

Voici les options de spark-submit pour jouer avec le nombre d’exécuteurs:

– executor-memory MEM Mémoire par exécuteur (par exemple 1000M, 2G) (Défaut : 1G).

Spark standalone et YARN seulement:
– executor-cores NUM Nombre de cœurs par exécuteur. (Par défaut : 1 en mode YARN, ou tous les cœurs disponibles sur le worker en mode autonome)

YARN-only:
– num-executors NUM Nombre d’exécuteurs à lancer (par défaut : 2). Si l’allocation dynamique est activée, le nombre initial d’exécuteurs sera au moins NUM.

  • Does 2 worker instance mean one worker node with 2 worker processes?

Un nœud est une machine, et il n’y a pas de bonne raison d’exécuter plus d’un worker par machine. Donc deux nœuds de travailleurs signifient généralement deux machines, chacune un travailleur Spark.

1 nœud = 1 processus de travailleur

  • Chaque instance de travailleur détient-elle un exécuteur pour une application spécifique (qui gère le stockage, la tâche) ou un nœud de travailleur détient-il un exécuteur ?

Les travailleurs détiennent de nombreux exécuteurs, pour de nombreuses applications. Une application a des exécuteurs sur de nombreux travailleurs

Un nœud de travailleur peut contenir plusieurs exécuteurs (processus) s’il a suffisamment de CPU, de mémoire et de stockage.

  • BTW, Le nombre d’exécuteurs dans un worker node à un moment donné dépend entièrement de la charge de travail sur le cluster et de la capacité du nœud à exécuter combien d’exécuteurs.

FLOW D’EXECUTION DE L’APPLICATION

Avec ceci en tête, lorsque vous soumettez une application au cluster avec spark-submit voici ce qui se passe en interne :

  1. Une application autonome démarre et instancie une instance de SparkContext/SparkSession (et c’est seulement à ce moment-là que vous pouvez appeler l’application un pilote).
  2. Le programme pilote demande des ressources au gestionnaire de cluster pour lancer les exécuteurs.
  3. Le gestionnaire de cluster lance les exécuteurs.
  4. Le processus pilote s’exécute à travers l’application utilisateur. Selon les actions et les transformations sur les RDDs, les tâches sont envoyées aux exécuteurs.
  5. Les exécuteurs exécutent les tâches et enregistrent les résultats.
  6. Si un travailleur se plante, ses tâches seront envoyées à différents exécuteurs pour être traitées à nouveau. Dans le livre « Learning Spark : Lightning-Fast Big Data Analysis », ils parlent de Spark et de la tolérance aux pannes :

Spark traite automatiquement les machines défaillantes ou lentes en réexécutant les tâches défaillantes ou lentes. Par exemple, si le nœud qui exécute une partition d’une opération map() se plante, Spark la réexécutera sur un autre nœud ; et même si le nœud ne se plante pas mais est simplement beaucoup plus lent que les autres nœuds, Spark peut lancer de manière préemptive une copie « spéculative » de la tâche sur un autre nœud, et prendre son résultat si celui-ci se termine.

  1. Avec SparkContext.stop() du pilote ou si la méthode principale sort/crash, tous les exécuteurs seront terminés et les ressources du cluster seront libérées par le gestionnaire du cluster.

Si nous regardons l’exécution de Spark prospective sur n’importe quel gestionnaire de ressources pour un programme, qui joint deux rdds et fait une certaine opération de réduction alors filtre

La liste suivante capture quelques recommandations à garder à l’esprit tout en les configurant :

  • Deamons Hadoop/Yarn/OS : Lorsque nous exécutons une application spark en utilisant un gestionnaire de cluster comme Yarn, il y aura plusieurs démons qui fonctionneront en arrière-plan comme NameNode, Secondary NameNode, DataNode, JobTracker et TaskTracker. Ainsi, en spécifiant le nombre d’exécuteurs, nous devons nous assurer que nous laissons de côté suffisamment de cœurs (~1 cœur par nœud) pour que ces démons fonctionnent sans problème.
  • Yarn ApplicationMaster (AM) : ApplicationMaster est chargé de négocier les ressources auprès du ResourceManager et de travailler avec les NodeManagers pour exécuter et surveiller les conteneurs et leur consommation de ressources. Si nous exécutons spark sur yarn, alors nous devons budgétiser les ressources dont AM aurait besoin (~1024MB et 1 exécuteur).
  • Débit HDFS : Le client HDFS a des problèmes avec des tonnes de threads simultanés. Il a été observé que HDFS atteint le plein débit d’écriture avec ~5 tâches par exécuteur . Il est donc bon de garder le nombre de cœurs par exécuteur en dessous de ce nombre.
  • MemoryOverhead : L’image suivante décrit l’utilisation de la mémoire de spark-yarn.

Deux choses à noter à partir de cette image :

Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)

Donc, si nous demandons 20GB par exécuteur, AM obtiendra en fait 20GB + memoryOverhead = 20 + 7% de 20GB = ~23GB de mémoire pour nous.

  • L’exécution d’exécuteurs avec trop de mémoire entraîne souvent des délais excessifs de collecte des ordures.
  • L’exécution d’exécuteurs minuscules (avec un seul cœur et juste assez de mémoire nécessaire pour exécuter une seule tâche, par exemple) jette les avantages qui proviennent de l’exécution de plusieurs tâches dans une seule JVM.

Assez de théorie…. Allons-y concrètement…

Maintenant, considérons un cluster de 10 nœuds avec la configuration suivante et analysons différentes possibilités de distribution exécuteurs-cœur-mémoire :

**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node

Première approche : Minuscules exécuteurs :

Les minuscules exécuteurs signifient essentiellement un exécuteur par cœur. Le tableau suivant décrit les valeurs de nos paramètres spar-config avec cette approche :

- `--num-executors` = `In this approach, we'll assign one executor per core`
= `total-cores-in-cluster`
= `num-cores-per-node * total-nodes-in-cluster`
= 16 x 10 = 160
- `--executor-cores` = 1 (one executor per core)
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/16 = 4GB

Analyse : Avec un seul exécuteur par noyau, comme nous l’avons discuté ci-dessus, nous ne serons pas en mesure de tirer parti de l’exécution de plusieurs tâches dans la même JVM. De plus, les variables partagées/cachées comme les variables de diffusion et les accumulateurs seront répliquées dans chaque cœur des nœuds, ce qui représente 16 fois. De plus, nous ne laissons pas assez de surcharge mémoire pour les processus du démon Hadoop/Yarn et nous ne comptons pas dans ApplicationManager. PAS BON!

Seconde approche : Fat executors (Un exécuteur par nœud):

Fat executors signifie essentiellement un exécuteur par nœud. Le tableau suivant décrit les valeurs de nos paramètres spark-config avec cette approche:

- `--num-executors` = `In this approach, we'll assign one executor per node`
= `total-nodes-in-cluster`
= 10
- `--executor-cores` = `one executor per node means all the cores of the node are assigned to one executor`
= `total-cores-in-a-node`
= 16
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/1 = 64GB

Analyse : Avec tous les 16 cœurs par exécuteur, à part ApplicationManager et les processus de démon ne sont pas comptés pour, le débit HDFS va souffrir et il en résultera des résultats excessifs de garbage. Aussi, PAS BON!

Troisième approche : Équilibre entre le gros (vs) le minuscule

Selon les recommandations que nous avons discutées ci-dessus :

  • Selon les recommandations mentionnées ci-dessus, assignons 5 cœurs par exécuteurs => – executor-cores = 5 (pour un bon débit HDFS)
  • Laissons 1 cœur par nœud pour les démons Hadoop/Yarn => Num cores disponibles par nœud = 16-1 = 15
  • So, Total des cœurs disponibles dans le cluster = 15 x 10 = 150
  • Nombre d’exécuteurs disponibles = (total des cœurs / num-cœurs par exécuteur) = 150 / 5 = 30
  • Laissant 1 exécuteur pour ApplicationManager => – num.exécuteurs = 29
  • Nombre d’exécuteurs par noeud = 30/10 = 3
  • Mémoire par exécuteur = 64GB / 3 = 21GB
  • Compte tenu de la surcharge du tas = 7% de 21GB = 3GB. Donc, réel – exécuteur-mémoire = 21-3 = 18GB

Donc, la configuration recommandée est : 29 exécuteurs, 18GB de mémoire chacun et 5 cœurs chacun !!

Analyse : Il est évident quant à la façon dont cette troisième approche a trouvé le bon équilibre entre les approches Fat vs Tiny. Inutile de dire qu’elle a atteint le parallélisme d’un exécuteur gras et les meilleurs débits d’un exécuteur minuscule ! !!

Conclusion:

Nous avons vu:

  • Couple de recommandations à garder à l’esprit qui configurer ces paramètres pour une application spark comme :
  • Budgéter dans les ressources dont le gestionnaire d’application de Yarn aurait besoin
  • Comment nous devrions épargner quelques cœurs pour les processus Hadoop/Yarn/OS deamon
  • Apprendre à propos de spark-yarn-memory-usage
  • Aussi, vérifié et analysé trois approches différentes pour configurer ces paramètres :
  1. Tiny Executors – Un exécuteur par Core
  2. Fat Executors – Un exécuteur par Node
  3. Approche recommandée – Bon équilibre entre Tiny (Vs) Fat couplé avec les recommandations.

– num-exécuteurs, – executor-cores et – executor-memory… ces trois paramètres jouent un rôle très important dans les performances de spark car ils contrôlent la quantité de mémoire CPU & que votre application spark obtient. Cela rend très crucial pour les utilisateurs de comprendre la bonne façon de les configurer. J’espère que ce blog vous a aidé à obtenir cette perspective…