Jargão do Park para Iniciantes

Este blog é para limpar alguns dos problemas iniciais quando novos códigos para Spark computação distribuída. Além de aprender as APIs, é preciso se equipar com detalhes de cluster para obter o melhor da Spark power.

O ponto de partida seria a Visão Geral do Modo Cluster .

E algumas questões comuns que podem aparecer são:

  1. Você ainda não consegue entender os diferentes processos no cluster Standalone Spark e o paralelismo.
  2. Você rodou o bin\start-slave.sh e descobriu que ele gerou o trabalhador, que na verdade é um JVM. O worker é ou não um processo JVM?
  3. Como pelo link acima, um executor é um processo lançado para uma aplicação em um nó de worker que executa tarefas. Executor também é um JVM.
  4. Executors são por aplicação. Então qual é o papel de um worker? Ele coordena com o executor e comunica o resultado ao driver? ou o driver fala diretamente com o executor? Se sim, qual é o propósito do worker então?
  5. Como controlar o número de executores para uma aplicação?
  6. As tarefas podem ser feitas para rodar em paralelo dentro do executor? Se sim, como configurar o número de threads para um executor?
  7. Qual é a relação entre worker, executors e núcleos do executor ( – total-executor-cores)?
  8. O que significa ter mais workers por nó?

Vamos rever os detalhes do modo Spark Cluster.

>

Spark usa uma arquitectura mestre/escravo. Como você pode ver na figura, ele tem um coordenador central (Motorista) que se comunica com muitos trabalhadores distribuídos (executores). O driver e cada um dos executores rodam em seus próprios processos Java.

O driver é o processo onde o método principal roda. Primeiro ele converte o programa do usuário em tarefas e depois agenda as tarefas nos executores.

Aplicação Spark – -> Driver – -> Lista de tarefas – -> Scheduler – -> Executores

EXECUTORES

Executores são processos de nós de trabalhadores encarregados de executar tarefas individuais em um dado trabalho Spark. Eles são lançados no início de uma aplicação Spark e normalmente rodam por toda a vida útil de uma aplicação. Uma vez que eles tenham executado a tarefa, eles enviam os resultados para o motorista. Eles também fornecem armazenamento em memória para RDDs que são armazenados em cache por programas do usuário através do Block Manager.

Quando os executores são iniciados, eles se registram com o driver e a partir daí se comunicam diretamente. Os trabalhadores estão encarregados de comunicar ao gerenciador de cluster a disponibilidade de seus recursos.

Em um cluster autônomo você terá um executor por trabalhador a menos que você brinque com spark.executor.cores e um trabalhador tenha núcleos suficientes para manter mais de um executor.

  • Um cluster autônomo com 5 nós de trabalhadores (cada nó tem 8 núcleos) Quando eu inicio uma aplicação com configurações padrão.
  • Spark irá avidamente adquirir tantos núcleos e executores quantos forem oferecidos pelo agendador. Então no final você irá obter 5 executores com 8 núcleos cada.

Sigam as opções spark-submit para jogar com o número de executores:

– executor-memória MEM Memória por executor (ex. 1000M, 2G) (Default: 1G).

Spark autônomo e apenas YARN:
– executor-cores NUM Número de núcleos por executor. (Default: 1 em modo YARN, ou todos os núcleos disponíveis no worker em modo autônomo)

Apenas YARN:
– num-executors NUM Número de executores a serem iniciados (Default: 2). Se a alocação dinâmica estiver habilitada, o número inicial de executores será pelo menos NUM.

  • A instância 2 worker significa um nó worker com 2 processos worker?

Um nó é uma máquina, e não há uma boa razão para executar mais de um worker por máquina. Então dois nós operários tipicamente significam duas máquinas, cada uma com um operário Spark.

1 Nó = 1 processo operário

  • Cada instância operária possui um executor para uma aplicação específica (que gerencia o armazenamento, tarefa) ou um nó operário possui um executor?

Os operários possuem muitos executores, para muitas aplicações. Uma aplicação tem executores em muitos trabalhadores

Um nó de trabalhador pode ter vários executores (processos) se ele tiver CPU, Memória e Armazenamento suficientes.

  • BTW, O número de executores em um nó operário em um determinado momento depende inteiramente da carga de trabalho no cluster e da capacidade do nó para executar quantos executores.

FLOW EXECUÇÃO DE APLICAÇÃO

Com isto em mente, quando você submete uma aplicação ao cluster com spark-submit isto é o que acontece internamente:

  1. Uma aplicação autônoma inicia e instancia uma instância SparkContext/SparkSession (e é somente então quando você pode chamar a aplicação de driver).
  2. O programa driver pede recursos ao gerenciador de cluster para iniciar executores.
  3. O gerenciador de cluster inicia executores.
  4. O processo do driver é executado através da aplicação do usuário. Dependendo das ações e transformações sobre a tarefa RDDs são enviadas para executores.
  5. Executores executam as tarefas e salvam os resultados.
  6. Se algum funcionário trava, suas tarefas serão enviadas para executores diferentes para serem processadas novamente. No livro “Learning Spark: Lightning-Fast Big Data Analysis” eles falam sobre Spark e Tolerância a Falhas:

Spark lida automaticamente com máquinas falhadas ou lentas, reexecutando tarefas falhadas ou lentas. Por exemplo, se o nó rodando uma partição de uma operação de mapa() trava, Spark irá reexecutá-la em outro nó; e mesmo se o nó não trava mas é simplesmente muito mais lento que outros nós, Spark pode lançar uma cópia “especulativa” da tarefa em outro nó, e tomar seu resultado se isso terminar.

  1. Com SparkContext.stop() do driver ou se o método principal sair/crashes todos os executores serão terminados e os recursos do cluster serão liberados pelo gerenciador do cluster.

Se olharmos a execução de Spark prospective sobre qualquer gerenciador de recursos para um programa, que junte dois rdds e faça alguma redução de operação, então filtre

Lista de acompanhamento captura algumas recomendações para ter em mente enquanto as configura:

>

  • Hadoop/Yarn/OS Deamons: Quando rodamos a aplicação spark usando um gerenciador de cluster como Yarn, haverá vários daemons que serão executados em segundo plano como NameNode, Secondary NameNode, DataNode, JobTracker e TaskTracker. Então, enquanto especificamos os daemons num-executors, precisamos ter certeza que deixamos de lado núcleos suficientes (~1 núcleo por nó) para que esses daemons rodem suavemente.
  • Yarn ApplicationMaster (AM): O ApplicationMaster é responsável por negociar recursos do ResourceManager e trabalhar com os NodeManagers para executar e monitorar os containers e seu consumo de recursos. Se estamos rodando faísca no fio, então precisamos orçar nos recursos que AM precisaria (~1024MB e 1 Executor).
  • HDFS Throughput: O cliente HDFS tem problemas com toneladas de fios simultâneos. Foi observado que o HDFS atinge uma taxa de transferência de escrita completa com ~5 tarefas por executor . Então é bom manter o número de núcleos por executor abaixo desse número.
  • MemoryOverhead: A figura seguinte mostra a utilização de spark-yarn-memory-usage.
>

Duas coisas para anotar a partir desta figura:

Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)

Então, se solicitarmos 20GB por executor, AM irá realmente obter 20GB + memoryOverhead = 20 + 7% de 20GB = ~23GB de memória para nós.

  • Executar executores com muita memória frequentemente resulta em atrasos excessivos na coleta de lixo.
  • Executar pequenos executores (com um único núcleo e apenas memória suficiente para executar uma única tarefa, por exemplo) joga fora os benefícios que vêm da execução de múltiplas tarefas em um único JVM.

Suficiente teoria… Vamos lá, vamos lá, hands-on…

Agora, vamos considerar um cluster de 10 nós com a seguinte configuração e analisar diferentes possibilidades de distribuição de executores-core-memória:

**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node

Primeira Abordagem: Executores minúsculos:

Exploradores minúsculos significam essencialmente um executor por núcleo. A tabela a seguir mostra os valores dos nossos parâmetros spar-config com esta abordagem:

- `--num-executors` = `In this approach, we'll assign one executor per core`
= `total-cores-in-cluster`
= `num-cores-per-node * total-nodes-in-cluster`
= 16 x 10 = 160
- `--executor-cores` = 1 (one executor per core)
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/16 = 4GB

Análise: Com apenas um executor por núcleo, como discutimos acima, não poderemos tirar vantagem de executar várias tarefas na mesma JVM. Além disso, variáveis compartilhadas/em cache como variáveis de broadcast e acumuladores serão replicadas em cada núcleo dos nós, o que é 16 vezes. Além disso, não estamos deixando sobrecarga de memória suficiente para os processos Hadoop/Yarn daemon e não estamos contando no ApplicationManager. NOT GOOD!

Second Approach: Executores gordos (Um Executor por nó):

Executadores gordos significam essencialmente um executor por nó. A tabela a seguir mostra os valores dos nossos parâmetros spark-config com esta abordagem:

- `--num-executors` = `In this approach, we'll assign one executor per node`
= `total-nodes-in-cluster`
= 10
- `--executor-cores` = `one executor per node means all the cores of the node are assigned to one executor`
= `total-cores-in-a-node`
= 16
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/1 = 64GB

Análise: Com todos os 16 núcleos por executor, além dos processos ApplicationManager e daemon não são contados, o rendimento do HDFS irá doer e resultará em resultados excessivos de lixo. Também,NÃO É BOM!

Terceira Abordagem: Equilíbrio entre Gordura (vs) Minúscula

De acordo com as recomendações que discutimos acima:

  • Baseado nas recomendações mencionadas acima, Vamos atribuir 5 núcleos por executores => – núcleos de executores = 5 (para bom rendimento do HDFS)
  • Deixar 1 núcleo por nó para os daemons Hadoop/Yarn => Números de núcleos disponíveis por nó = 16-1 = 15
  • Então, Total disponível de núcleos em cluster = 15 x 10 = 150
  • Número de executores disponíveis = (total de núcleos / num-cores por executor) = 150 / 5 = 30
  • Leaving 1 executor para ApplicationManager => – num-executores = 29
  • Número de executores por nó = 30/10 = 3
  • Memória por executor = 64GB / 3 = 21GB
  • Contagem das despesas gerais por pilha = 7% de 21GB = 3GB. Então, real – memória do executor = 21-3 = 18GB

Então, a configuração recomendada é: 29 executores, 18GB de memória cada e 5 núcleos cada!!

Análise: É óbvio como esta terceira abordagem encontrou o equilíbrio certo entre as abordagens Fat vs Tiny. Escusado será dizer que alcançou o paralelismo de um executor gordo e os melhores resultados de um executor minúsculo!!

Conclusão:

Vimos:

  • Par de recomendações para ter em mente quais as configurações destes parâmetros para uma aplicação tipo spark-application:
  • Orçamento nos recursos que o Gestor de Aplicações de Fios precisaria
  • Como devemos poupar alguns núcleos para processos Hadoop/Yarn/OS deamon
  • Saprender sobre a utilização de spark-yarn-memory-usage
  • Tambem, verificámos e analisámos três abordagens diferentes para configurar estes params:
  1. Executadores minúsculos – Um Executor por Núcleo
  2. Executadores de gordura – Um executor por Nó
  3. Aximação recomendada – Equilíbrio certo entre Tiny (Vs) Fat acoplado com as recomendações.

– num-executors, – executor-cores e – executor-memória… estes três parâmetros têm um papel muito importante no desempenho spark pois eles controlam a quantidade de CPU & memória que sua aplicação spark obtém. Isto faz com que seja muito crucial para os usuários entenderem a maneira correta de configurá-los. Espero que este blog o tenha ajudado a obter essa perspectiva…