Jerga de Spark para principiantes

Este blog es para aclarar algunos de los problemas iniciales cuando los códigos de novatos para la computación distribuida de Spark. Aparte de aprender las APIs, uno necesita equiparse con los detalles del cluster para obtener lo mejor de la potencia de Spark.

El punto de partida sería la visión general del modo cluster .

Y algunas preguntas comunes que pueden surgir son:

  1. Todavía no puedes entender los diferentes procesos en el clúster de Spark Standalone y el paralelismo.
  2. Ejecutaste el bin\start-slave.sh y descubriste que generó el trabajador, que es en realidad una JVM. ¿Es el trabajador un proceso JVM o no?
  3. Según el enlace anterior, un ejecutor es un proceso lanzado para una aplicación en un nodo trabajador que ejecuta tareas. El ejecutor es también una JVM.
  4. Los ejecutores son por aplicación. Entonces, ¿cuál es el papel de un trabajador? ¿Se coordina con el ejecutor y comunica el resultado al controlador? o ¿el controlador habla directamente con el ejecutor? Si es así, ¿cuál es el propósito del trabajador?
  5. ¿Cómo controlar el número de ejecutores para una aplicación?
  6. ¿Se puede hacer que las tareas se ejecuten en paralelo dentro del ejecutor? Si es así, ¿cómo configurar el número de hilos para un ejecutor?
  7. ¿Cuál es la relación entre worker, executors y executor cores ( – total-executor-cores)?
  8. ¿Qué significa tener más workers por nodo?

Volvamos a revisar los detalles del modo Spark Cluster.

Spark utiliza una arquitectura maestro/esclavo. Como puedes ver en la figura, tiene un coordinador central (Driver) que se comunica con muchos trabajadores distribuidos (ejecutores). El driver y cada uno de los ejecutores se ejecutan en sus propios procesos Java.

El driver es el proceso donde se ejecuta el método principal. Primero convierte el programa de usuario en tareas y después programa las tareas en los ejecutores.

Aplicación Spark – -> Controlador – -> Lista de Tareas – -> Programador – -> Ejecutores

EXECUTORES

Los ejecutores son procesos de nodos trabajadores encargados de ejecutar tareas individuales en un determinado trabajo Spark. Se lanzan al principio de una aplicación Spark y normalmente se ejecutan durante toda la vida de una aplicación. Una vez que han ejecutado la tarea, envían los resultados al controlador. También proporcionan almacenamiento en memoria para los RDDs que son almacenados en caché por los programas de usuario a través de Block Manager.

Cuando se inician los ejecutores se registran con el driver y a partir de ahí se comunican directamente. Los workers se encargan de comunicar al gestor del cluster la disponibilidad de sus recursos.

En un cluster independiente tendrás un ejecutor por worker a no ser que juegues con spark.executor.cores y un worker tenga suficientes núcleos para albergar más de un ejecutor.

  • Un cluster independiente con 5 nodos trabajadores (cada nodo tiene 8 núcleos) Cuando inicio una aplicación con la configuración por defecto.
  • Spark adquirirá con avidez tantos núcleos y ejecutores como le ofrezca el planificador. Así que al final obtendrá 5 ejecutores con 8 núcleos cada uno.

Las siguientes son las opciones de spark-submit para jugar con el número de ejecutores:

– executor-memory MEM Memoria por ejecutor (por ejemplo, 1000M, 2G) (Por defecto: 1G).

Spark standalone y YARN solamente:
– executor-cores NUM Número de núcleos por ejecutor. (Por defecto: 1 en modo YARN, o todos los núcleos disponibles en el trabajador en modo autónomo)

Sólo YARN:
– num-executors NUM Número de ejecutores a lanzar (Por defecto: 2). Si la asignación dinámica está habilitada, el número inicial de ejecutores será al menos NUM.

  • ¿Significa 2 instancias de trabajador un nodo de trabajador con 2 procesos de trabajador?

Un nodo es una máquina, y no hay una buena razón para ejecutar más de un trabajador por máquina. Así que dos nodos de trabajadores suelen significar dos máquinas, cada una de ellas un trabajador de Spark.

1 Nodo = 1 proceso de trabajador

  • ¿Cada instancia de trabajador contiene un ejecutor para una aplicación específica (que gestiona el almacenamiento, la tarea) o un nodo de trabajador contiene un ejecutor?

Los trabajadores contienen muchos ejecutores, para muchas aplicaciones. Una aplicación tiene ejecutores en muchos workers

Un nodo worker puede contener múltiples ejecutores (procesos) si tiene suficiente CPU, Memoria y Almacenamiento.

  • BTW, el número de ejecutores en un nodo trabajador en un momento dado es totalmente depende de la carga de trabajo en el clúster y la capacidad del nodo para ejecutar cuántos ejecutores.

FLUJO DE EJECUCIÓN DE LA APLICACIÓN

Con esto en mente, cuando envías una aplicación al cluster con spark-submit esto es lo que ocurre internamente:

  1. Una aplicación independiente se inicia e instala una instancia de SparkContext/SparkSession (y es sólo entonces cuando puedes llamar a la aplicación un controlador).
  2. El programa del driver pide recursos al gestor del cluster para lanzar ejecutores.
  3. El gestor del cluster lanza ejecutores.
  4. El proceso del driver se ejecuta a través de la aplicación de usuario. En función de las acciones y transformaciones sobre RDDs se envían las tareas a los ejecutores.
  5. Los ejecutores ejecutan las tareas y guardan los resultados.
  6. Si algún trabajador se bloquea, sus tareas serán enviadas a diferentes ejecutores para ser procesadas de nuevo. En el libro «Learning Spark: Lightning-Fast Big Data Analysis» hablan de Spark y la tolerancia a fallos:

Spark se ocupa automáticamente de las máquinas que fallan o son lentas reejecutando las tareas fallidas o lentas. Por ejemplo, si el nodo que ejecuta una partición de una operación map() se bloquea, Spark la volverá a ejecutar en otro nodo; e incluso si el nodo no se bloquea, sino que simplemente es mucho más lento que otros nodos, Spark puede lanzar de forma preventiva una copia «especulativa» de la tarea en otro nodo, y tomar su resultado si éste termina.

  1. Con SparkContext.stop() del controlador o si el método principal sale/se cuelga todos los ejecutores serán terminados y los recursos del cluster serán liberados por el administrador del cluster.

Si miramos la ejecución desde la prospectiva de Spark sobre cualquier gestor de recursos para un programa, que une dos rdds y hace alguna operación de reducción entonces filtrar

La siguiente lista recoge algunas recomendaciones a tener en cuenta mientras se configuran:

  • Hadoop/Yarn/OS Deamons: Cuando ejecutamos una aplicación spark utilizando un gestor de clústeres como Yarn, habrá varios daemons que se ejecutarán en segundo plano como NameNode, Secondary NameNode, DataNode, JobTracker y TaskTracker. Por lo tanto, al especificar el número de ejecutores, debemos asegurarnos de dejar suficientes núcleos (~1 núcleo por nodo) para que estos demonios se ejecuten sin problemas.
  • Yarn ApplicationMaster (AM): El ApplicationMaster se encarga de negociar los recursos desde el ResourceManager y trabajar con los NodeManagers para ejecutar y monitorizar los contenedores y su consumo de recursos. Si estamos ejecutando spark en yarn, entonces necesitamos presupuestar los recursos que AM necesitaría (~1024MB y 1 Executor).
  • HDFS Throughput: El cliente HDFS tiene problemas con toneladas de hilos concurrentes. Se observó que HDFS alcanza un rendimiento de escritura completo con ~5 tareas por ejecutor . Así que es bueno mantener el número de núcleos por ejecutor por debajo de ese número.
  • MemoryOverhead: La siguiente imagen muestra el uso de memoria de spark-yarn.

Dos cosas a tener en cuenta de esta imagen:

Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)

Así que, si solicitamos 20GB por ejecutor, AM obtendrá realmente 20GB + memoryOverhead = 20 + 7% de 20GB = ~23GB de memoria para nosotros.

  • Ejecutar ejecutores con demasiada memoria suele provocar retrasos excesivos en la recolección de basura.
  • Ejecutar ejecutores minúsculos (con un solo núcleo y la memoria justa necesaria para ejecutar una sola tarea, por ejemplo) echa por tierra los beneficios que se obtienen al ejecutar múltiples tareas en una sola JVM.

Suficiente teoría… Vamos a la práctica..

Ahora, consideremos un cluster de 10 nodos con la siguiente configuración y analicemos diferentes posibilidades de distribución de ejecutores-núcleo-memoria:

**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node

Primer enfoque: Ejecutores diminutos :

Ejecutores diminutos significa esencialmente un ejecutor por núcleo. La siguiente tabla muestra los valores de nuestros parámetros spar-config con este enfoque:

- `--num-executors` = `In this approach, we'll assign one executor per core`
= `total-cores-in-cluster`
= `num-cores-per-node * total-nodes-in-cluster`
= 16 x 10 = 160
- `--executor-cores` = 1 (one executor per core)
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/16 = 4GB

Análisis: Con un solo ejecutor por núcleo, como hemos comentado anteriormente, no podremos aprovechar la ejecución de múltiples tareas en la misma JVM. Además, las variables compartidas/en caché como las variables de difusión y los acumuladores se replicarán en cada núcleo de los nodos que son 16 veces. Además, no estamos dejando suficiente sobrecarga de memoria para los procesos daemon de Hadoop/Yarn y no estamos contando en ApplicationManager. NO ES BUENO!

Segundo enfoque: Ejecutores gordos (Un ejecutor por nodo):

Ejecutores gordos significa esencialmente un ejecutor por nodo. La siguiente tabla muestra los valores de nuestros parámetros de spark-config con este enfoque:

- `--num-executors` = `In this approach, we'll assign one executor per node`
= `total-nodes-in-cluster`
= 10
- `--executor-cores` = `one executor per node means all the cores of the node are assigned to one executor`
= `total-cores-in-a-node`
= 16
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/1 = 64GB

Análisis: Con todos los 16 núcleos por ejecutor, aparte de ApplicationManager y los procesos de demonio no se cuentan para, el rendimiento de HDFS se verá afectado y que va a dar lugar a resultados excesivos de basura. Además, ¡NO ES BUENO!

Tercer Enfoque: Equilibrio entre Fat (vs) Tiny

Según las recomendaciones que comentamos anteriormente:

  • En base a las recomendaciones mencionadas anteriormente, Vamos a asignar 5 núcleos por ejecutores => – executor-cores = 5 (para un buen rendimiento de HDFS)
  • Dejar 1 núcleo por nodo para los demonios Hadoop/Yarn => Num cores disponibles por nodo = 16-1 = 15
  • Así, Total de núcleos disponibles en el clúster = 15 x 10 = 150
  • Número de ejecutores disponibles = (total de núcleos / número de núcleos por ejecutor) = 150 / 5 = 30
  • Dejando 1 ejecutor para ApplicationManager => – num-ejecutores = 29
  • Número de ejecutores por nodo = 30/10 = 3
  • Memoria por ejecutor = 64GB / 3 = 21GB
  • Contando la sobrecarga de la pila = 7% de 21GB = 3GB. Por lo tanto, real – ejecutor-memoria = 21-3 = 18GB

Así, la configuración recomendada es: 29 ejecutores, 18GB de memoria cada uno y 5 núcleos cada uno!!!

Análisis: Es obvio como este tercer enfoque ha encontrado el equilibrio correcto entre los enfoques Fat vs Tiny. No hace falta decir que ha conseguido el paralelismo de un ejecutor gordo y los mejores rendimientos de un ejecutor diminuto.

Conclusión:

Hemos visto:

  • Un par de recomendaciones a tener en cuenta a la hora de configurar estos parámetros para una aplicación Spark como:
  • Presupuestar en los recursos que necesitaría el gestor de aplicaciones de Yarn
  • Cómo deberíamos prescindir de algunos núcleos para los procesos de Hadoop/Yarn/OS deamon
  • Aprender sobre spark-yarn-memory-usage
  • También, comprobar y analizar tres enfoques diferentes para configurar estos params:
  1. Ejecutores diminutos – Un ejecutor por núcleo
  2. Ejecutores gordos – Un ejecutor por nodo
  3. Enfoque recomendado – Equilibrio correcto entre Tiny (Vs) Fat junto con las recomendaciones.

– num-executors, – executor-cores y – executor-memory.. estos tres parámetros juegan un papel muy importante en el rendimiento de spark ya que controlan la cantidad de CPU &memoria que obtiene su aplicación spark. Esto hace que sea muy crucial para los usuarios entender la forma correcta de configurarlos. Espero que este blog te haya ayudado a tener esa perspectiva…