Este blog es para aclarar algunos de los problemas iniciales cuando los códigos de novatos para la computación distribuida de Spark. Aparte de aprender las APIs, uno necesita equiparse con los detalles del cluster para obtener lo mejor de la potencia de Spark.
El punto de partida sería la visión general del modo cluster .
Y algunas preguntas comunes que pueden surgir son:
- Todavía no puedes entender los diferentes procesos en el clúster de Spark Standalone y el paralelismo.
- Ejecutaste el bin\start-slave.sh y descubriste que generó el trabajador, que es en realidad una JVM. ¿Es el trabajador un proceso JVM o no?
- Según el enlace anterior, un ejecutor es un proceso lanzado para una aplicación en un nodo trabajador que ejecuta tareas. El ejecutor es también una JVM.
- Los ejecutores son por aplicación. Entonces, ¿cuál es el papel de un trabajador? ¿Se coordina con el ejecutor y comunica el resultado al controlador? o ¿el controlador habla directamente con el ejecutor? Si es así, ¿cuál es el propósito del trabajador?
- ¿Cómo controlar el número de ejecutores para una aplicación?
- ¿Se puede hacer que las tareas se ejecuten en paralelo dentro del ejecutor? Si es así, ¿cómo configurar el número de hilos para un ejecutor?
- ¿Cuál es la relación entre worker, executors y executor cores ( – total-executor-cores)?
- ¿Qué significa tener más workers por nodo?
Volvamos a revisar los detalles del modo Spark Cluster.
Spark utiliza una arquitectura maestro/esclavo. Como puedes ver en la figura, tiene un coordinador central (Driver) que se comunica con muchos trabajadores distribuidos (ejecutores). El driver y cada uno de los ejecutores se ejecutan en sus propios procesos Java.
El driver es el proceso donde se ejecuta el método principal. Primero convierte el programa de usuario en tareas y después programa las tareas en los ejecutores.
Aplicación Spark – -> Controlador – -> Lista de Tareas – -> Programador – -> Ejecutores
- EXECUTORES
- FLUJO DE EJECUCIÓN DE LA APLICACIÓN
- La siguiente lista recoge algunas recomendaciones a tener en cuenta mientras se configuran:
- Suficiente teoría… Vamos a la práctica..
- Primer enfoque: Ejecutores diminutos :
- Segundo enfoque: Ejecutores gordos (Un ejecutor por nodo):
- Tercer Enfoque: Equilibrio entre Fat (vs) Tiny
- Conclusión:
EXECUTORES
Los ejecutores son procesos de nodos trabajadores encargados de ejecutar tareas individuales en un determinado trabajo Spark. Se lanzan al principio de una aplicación Spark y normalmente se ejecutan durante toda la vida de una aplicación. Una vez que han ejecutado la tarea, envían los resultados al controlador. También proporcionan almacenamiento en memoria para los RDDs que son almacenados en caché por los programas de usuario a través de Block Manager.
Cuando se inician los ejecutores se registran con el driver y a partir de ahí se comunican directamente. Los workers se encargan de comunicar al gestor del cluster la disponibilidad de sus recursos.
En un cluster independiente tendrás un ejecutor por worker a no ser que juegues con spark.executor.cores y un worker tenga suficientes núcleos para albergar más de un ejecutor.
- Un cluster independiente con 5 nodos trabajadores (cada nodo tiene 8 núcleos) Cuando inicio una aplicación con la configuración por defecto.
- Spark adquirirá con avidez tantos núcleos y ejecutores como le ofrezca el planificador. Así que al final obtendrá 5 ejecutores con 8 núcleos cada uno.
Las siguientes son las opciones de spark-submit para jugar con el número de ejecutores:
– executor-memory MEM Memoria por ejecutor (por ejemplo, 1000M, 2G) (Por defecto: 1G).
Spark standalone y YARN solamente:
– executor-cores NUM Número de núcleos por ejecutor. (Por defecto: 1 en modo YARN, o todos los núcleos disponibles en el trabajador en modo autónomo)
Sólo YARN:
– num-executors NUM Número de ejecutores a lanzar (Por defecto: 2). Si la asignación dinámica está habilitada, el número inicial de ejecutores será al menos NUM.
- ¿Significa 2 instancias de trabajador un nodo de trabajador con 2 procesos de trabajador?
Un nodo es una máquina, y no hay una buena razón para ejecutar más de un trabajador por máquina. Así que dos nodos de trabajadores suelen significar dos máquinas, cada una de ellas un trabajador de Spark.
1 Nodo = 1 proceso de trabajador
- ¿Cada instancia de trabajador contiene un ejecutor para una aplicación específica (que gestiona el almacenamiento, la tarea) o un nodo de trabajador contiene un ejecutor?
Los trabajadores contienen muchos ejecutores, para muchas aplicaciones. Una aplicación tiene ejecutores en muchos workers
Un nodo worker puede contener múltiples ejecutores (procesos) si tiene suficiente CPU, Memoria y Almacenamiento.
- BTW, el número de ejecutores en un nodo trabajador en un momento dado es totalmente depende de la carga de trabajo en el clúster y la capacidad del nodo para ejecutar cuántos ejecutores.
FLUJO DE EJECUCIÓN DE LA APLICACIÓN
Con esto en mente, cuando envías una aplicación al cluster con spark-submit esto es lo que ocurre internamente:
- Una aplicación independiente se inicia e instala una instancia de SparkContext/SparkSession (y es sólo entonces cuando puedes llamar a la aplicación un controlador).
- El programa del driver pide recursos al gestor del cluster para lanzar ejecutores.
- El gestor del cluster lanza ejecutores.
- El proceso del driver se ejecuta a través de la aplicación de usuario. En función de las acciones y transformaciones sobre RDDs se envían las tareas a los ejecutores.
- Los ejecutores ejecutan las tareas y guardan los resultados.
- Si algún trabajador se bloquea, sus tareas serán enviadas a diferentes ejecutores para ser procesadas de nuevo. En el libro «Learning Spark: Lightning-Fast Big Data Analysis» hablan de Spark y la tolerancia a fallos:
Spark se ocupa automáticamente de las máquinas que fallan o son lentas reejecutando las tareas fallidas o lentas. Por ejemplo, si el nodo que ejecuta una partición de una operación map() se bloquea, Spark la volverá a ejecutar en otro nodo; e incluso si el nodo no se bloquea, sino que simplemente es mucho más lento que otros nodos, Spark puede lanzar de forma preventiva una copia «especulativa» de la tarea en otro nodo, y tomar su resultado si éste termina.
- Con SparkContext.stop() del controlador o si el método principal sale/se cuelga todos los ejecutores serán terminados y los recursos del cluster serán liberados por el administrador del cluster.
Si miramos la ejecución desde la prospectiva de Spark sobre cualquier gestor de recursos para un programa, que une dos rdds y hace alguna operación de reducción entonces filtrar