Żargon Sparka dla początkujących

Ten blog ma na celu wyjaśnienie niektórych kłopotów początkowych, gdy początkujący koduje rozproszone obliczenia Sparka. Oprócz nauki API, trzeba wyposażyć się w szczegóły klastra, aby uzyskać najlepsze z mocy Sparka.

Punktem wyjścia będzie Przegląd trybu klastra .

A niektóre częste pytania, które mogą się pojawić, to:

  1. Nadal nie możesz zrozumieć różnych procesów w klastrze Spark Standalone i równoległości.
  2. Uruchamiałeś bin\start-slave.sh i odkryłeś, że wywołał on robotnika, który jest w rzeczywistości maszyną JVM. Czy worker jest procesem JVM, czy nie?
  3. Jak wynika z powyższego linku, executor jest procesem uruchomionym dla aplikacji na węźle robotniczym, który wykonuje zadania. Executor jest również JVM.
  4. Executors są na aplikację. Jaka jest zatem rola robotnika? Czy koordynuje on pracę z executorem i przekazuje wynik z powrotem do sterownika, czy też sterownik bezpośrednio rozmawia z executorem? Jeśli tak, to jaki jest cel worker’a?
  5. Jak kontrolować liczbę executorów dla aplikacji?
  6. Czy zadania mogą być wykonywane równolegle wewnątrz executora? Jeśli tak, to jak skonfigurować liczbę wątków dla executora?
  7. Jaka jest relacja między workerami, executorami i rdzeniami executora ( – total-executor-cores)?
  8. Co to znaczy mieć więcej workerow na węzeł?

Lets revisit the Spark Cluster mode details.

Spark używa architektury master/slave. Jak widać na rysunku, posiada on jeden centralny koordynator (Driver), który komunikuje się z wieloma rozproszonymi robotnikami (executorami). Sterownik i każdy z executorów działają we własnych procesach Java.

Sterownik jest procesem, w którym działa główna metoda. Najpierw konwertuje program użytkownika na zadania, a następnie rozplanowuje zadania na executorach.

Aplikacja Spark – -> Driver – -> Lista zadań – -> Scheduler – -> Executors

EXECUTORS

Executors to procesy węzłów robotniczych odpowiedzialne za uruchamianie poszczególnych zadań w danym zadaniu Spark. Są one uruchamiane na początku aplikacji Spark i zazwyczaj działają przez cały czas jej istnienia. Po wykonaniu zadania wysyłają wyniki do sterownika. Zapewniają również przechowywanie w pamięci dla RDD, które są buforowane przez programy użytkownika za pośrednictwem Block Managera.

Gdy executory są uruchamiane, rejestrują się ze sterownikiem i od tego momentu komunikują się bezpośrednio. Pracownicy są odpowiedzialni za przekazywanie menedżerowi klastra informacji o dostępności ich zasobów.

W samodzielnym klastrze otrzymasz jeden executor na pracownika, chyba że grasz z spark.executor.cores i pracownik ma wystarczająco dużo rdzeni, aby pomieścić więcej niż jeden executor.

  • Samodzielny klaster z 5 węzłami robotniczymi (każdy węzeł ma 8 rdzeni) Kiedy uruchamiam aplikację z domyślnymi ustawieniami.
  • Spark chciwie zdobędzie tyle rdzeni i executorów, ile oferuje scheduler. Więc w końcu otrzymasz 5 executorów z 8 rdzeniami każdy.

Następujące są opcje spark-submit do zabawy z liczbą executorów:

– executor-memory MEM Pamięć na executor (np. 1000M, 2G) (Domyślnie: 1G).

Spark standalone i tylko YARN:
– executor-cores NUM Liczba rdzeni na executor. (Domyślnie: 1 w trybie YARN, lub wszystkie dostępne rdzenie na robotniku w trybie autonomicznym)

YARN-only:
– num-executors NUM Liczba executorów do uruchomienia (Domyślnie: 2). Jeśli włączona jest alokacja dynamiczna, początkowa liczba executorów będzie wynosić co najmniej NUM.

  • Czy 2 instancje worker oznacza jeden węzeł z dwoma procesami worker?

Węzeł to maszyna i nie ma dobrego powodu, aby uruchamiać więcej niż jednego worker na maszynę. Tak więc dwa węzły robotnicze zazwyczaj oznaczają dwie maszyny, z których każda jest robotnikiem Sparka.

1 Węzeł = 1 Proces robotniczy

  • Czy każda instancja robotnicza posiada executor dla konkretnej aplikacji (który zarządza przechowywaniem, zadaniami) lub jeden węzeł robotniczy posiada jeden executor?

Workery posiadają wiele executorów, dla wielu aplikacji. Jedna aplikacja ma executory na wielu robotnikach

Węzeł robotniczy może trzymać wiele executorów (procesów), jeśli ma wystarczający CPU, Pamięć i Pamięć masową.

  • BTW, Liczba executorów w węźle robotniczym w danym momencie jest całkowicie zależna od obciążenia klastra i możliwości węzła do uruchomienia ilu executorów.

PŁYN WYKONANIA APELACJI

Mając to na uwadze, kiedy wysyłasz aplikację do klastra za pomocą spark-submit, oto co dzieje się wewnętrznie:

  1. Samodzielna aplikacja uruchamia i instancjonuje instancję SparkContext/SparkSession (i dopiero wtedy można nazwać aplikację sterownikiem).
  2. Program sterownika zwraca się o zasoby do menedżera klastra w celu uruchomienia executorów.
  3. Menedżer klastra uruchamia executory.
  4. Proces sterownika przebiega przez aplikację użytkownika. W zależności od działań i transformacji nad RDD zadania są wysyłane do executorów.
  5. Executory uruchamiają zadania i zapisują wyniki.
  6. Jeśli jakikolwiek worker ulegnie awarii, jego zadania zostaną wysłane do różnych executorów w celu ponownego przetworzenia. W książce „Learning Spark: Lightning-Fast Big Data Analysis” mówią o Spark and Fault Tolerance:

Spark automatycznie radzi sobie z niedziałającymi lub powolnymi maszynami, wykonując ponownie niedziałające lub powolne zadania. Na przykład, jeśli węzeł wykonujący partycję operacji map() ulegnie awarii, Spark wykona ją ponownie na innym węźle; a nawet jeśli węzeł nie ulegnie awarii, ale jest po prostu znacznie wolniejszy niż inne węzły, Spark może z wyprzedzeniem uruchomić „spekulatywną” kopię zadania na innym węźle i pobrać jej wynik, jeśli ta się zakończy.

  1. Z SparkContext.stop() ze sterownika lub jeśli główna metoda zakończy się/przerwie wszystkie executory zostaną zakończone, a zasoby klastra zostaną zwolnione przez menedżera klastra.

Jeśli spojrzymy na wykonanie ze Spark prospective nad dowolnym menedżerem zasobów dla programu, który dołącza do dwóch rdds i wykonuje jakąś operację redukcji, to filtr

Następująca lista ujmuje niektóre zalecenia, o których należy pamiętać podczas ich konfigurowania:

  • Deamony Hadoop/Yarn/OS: Kiedy uruchamiamy aplikację iskrową używając menedżera klastra jak Yarn, będzie kilka demonów, które będą działać w tle jak NameNode, Secondary NameNode, DataNode, JobTracker i TaskTracker. Tak więc, podczas określania liczby wykonawców, musimy upewnić się, że pozostawiamy wystarczająco dużo rdzeni (~1 rdzeń na węzeł), aby te demony działały płynnie.
  • Yarn ApplicationMaster (AM): ApplicationMaster jest odpowiedzialny za negocjowanie zasobów z ResourceManager i współpracę z NodeManagerami w celu wykonania i monitorowania kontenerów i ich zużycia zasobów. Jeśli używamy spark na yarn, to musimy zaplanować zasoby, których AM będzie potrzebował (~1024MB i 1 Executor).
  • Przepustowość HDFS: Klient HDFS ma problemy z dużą ilością współbieżnych wątków. Zauważono, że HDFS osiąga pełną przepustowość zapisu przy ~5 zadaniach na executor. Więc dobrze jest utrzymywać liczbę rdzeni na executor poniżej tej liczby.
  • MemoryOverhead: Poniższy obraz przedstawia spark-yarn-memory-usage.

Dwie rzeczy do zanotowania z tego obrazu:

Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)

Więc, jeśli zażądamy 20GB na executor, AM faktycznie otrzyma 20GB + memoryOverhead = 20 + 7% z 20GB = ~23GB pamięci dla nas.

  • Uruchamianie executorów ze zbyt dużą ilością pamięci często powoduje nadmierne opóźnienia w zbieraniu śmieci.
  • Uruchamianie malutkich executorów (z jednym rdzeniem i tylko tyle pamięci potrzebnej do uruchomienia pojedynczego zadania, na przykład) odrzuca korzyści, które pochodzą z uruchamiania wielu zadań w pojedynczej maszynie JVM.

Dość teorii…

Teraz rozważmy klaster 10 węzłów z następującą konfiguracją i przeanalizujmy różne możliwości dystrybucji executorów-rdzenia-pamięci:

**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node

Pierwsze podejście: Malutkie executory :

Malutkie executory zasadniczo oznaczają jeden executor na rdzeń. Poniższa tabela przedstawia wartości naszych parametrów spar-config przy tym podejściu:

- `--num-executors` = `In this approach, we'll assign one executor per core`
= `total-cores-in-cluster`
= `num-cores-per-node * total-nodes-in-cluster`
= 16 x 10 = 160
- `--executor-cores` = 1 (one executor per core)
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/16 = 4GB

Analiza: Z tylko jednym executorem na rdzeń, jak omówiliśmy powyżej, nie będziemy w stanie skorzystać z możliwości uruchamiania wielu zadań w tej samej maszynie JVM. Również współdzielone / buforowane zmienne, takie jak zmienne rozgłoszeniowe i akumulatory będą replikowane w każdym rdzeniu węzłów, co jest 16 razy. Ponadto nie pozostawiamy wystarczającej ilości pamięci nad głową dla procesów demona Hadoop/Yarn i nie liczymy w ApplicationManager. NIE DOBRZE!

Podejście drugie: Fat executors (One Executor per node):

Fat executors zasadniczo oznacza jeden executor na węzeł. Poniższa tabela przedstawia wartości naszych parametrów spark-config przy tym podejściu:

- `--num-executors` = `In this approach, we'll assign one executor per node`
= `total-nodes-in-cluster`
= 10
- `--executor-cores` = `one executor per node means all the cores of the node are assigned to one executor`
= `total-cores-in-a-node`
= 16
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/1 = 64GB

Analiza: Przy wszystkich 16 rdzeniach na executor, oprócz ApplicationManager i daemon procesy nie są liczone, przepustowość HDFS będzie bolała i spowoduje to nadmierne zaśmiecanie wyników. Także, NIE DOBRZE!

Trzecie podejście: Balans między Fat (vs) Tiny

Zgodnie z zaleceniami, które omówiliśmy powyżej:

  • Based on the recommendations mentioned above, Let’s assign 5 core per executors => – executor-cores = 5 (for good HDFS throughput)
  • Leave 1 core per node for Hadoop/Yarn daemons => Num cores available per node = 16-1 = 15
  • So, Całkowita dostępna liczba rdzeni w klastrze = 15 x 10 = 150
  • Liczba dostępnych executorów = (total cores / num cores-per-executor) = 150 / 5 = 30
  • Zostawienie 1 executora dla ApplicationManager => – num-executorów = 29
  • Liczba executorów na węzeł = 30/10 = 3
  • Pamięć na executora = 64GB / 3 = 21GB
  • Odliczając koszty ogólne sterty = 7% z 21GB = 3GB. Tak więc, rzeczywisty – executor-memory = 21-3 = 18GB

Więc, zalecany config to: 29 executorów, 18GB pamięci każdy i 5 rdzeni każdy!!

Analiza: Jest oczywiste, jak to trzecie podejście znalazło właściwą równowagę pomiędzy podejściami Fat vs Tiny. Nie trzeba dodawać, że osiągnięto równoległość grubego executora i najlepszą przepustowość malutkiego executora!!!

Konkluzja:

Widzieliśmy:

  • Kilka zaleceń, o których należy pamiętać konfigurując te parametry dla aplikacji iskrowych takich jak:
  • Budżet w zasobach, których potrzebowałby menedżer aplikacji Yarn
  • Jak powinniśmy oszczędzić kilka rdzeni dla procesów deamon Hadoop/Yarn/OS
  • Poznaj spark-yarn-memory-usage
  • Sprawdziliśmy również i przeanalizowaliśmy trzy różne podejścia do konfiguracji tych parametrów:
  1. Tiny Executors – One Executor per Core
  2. Fat Executors – One executor per Node
  3. Recommended approach – Right balance between Tiny (Vs) Fat coupled with the recommendations.

– num-executors, – executor-cores i – executor-memory… te trzy parametry odgrywają bardzo ważną rolę w wydajności iskry, ponieważ kontrolują ilość pamięci CPU &, którą dostaje twoja aplikacja iskry. To sprawia, że jest to bardzo ważne dla użytkowników, aby zrozumieć właściwy sposób, aby je skonfigurować. Mam nadzieję, że ten blog pomógł ci w uzyskaniu tej perspektywy…