Spark-jargon for begyndere

Denne blog har til formål at rydde op i nogle af de problemer, der opstår, når nybegyndere koder for Spark distributed computing. Ud over at lære API’erne skal man udstyre sig selv med klyngedetaljer for at få det bedste ud af Spark-kraften.

Startpunktet vil være Cluster Mode Overview .

Og nogle almindelige spørgsmål, der kan dukke op, er:

  1. Du kan stadig ikke forstå de forskellige processer i Spark Standalone-klyngen og parallelismen.
  2. Du kørte bin\start-slave.sh og fandt ud af, at den spawnede arbejderen, som faktisk er en JVM. Er worker en JVM-proces eller ej?
  3. Som angivet i ovenstående link er en eksekutor en proces, der lanceres for en applikation på en arbejderknude, som udfører opgaver. Executor er også en JVM.
  4. Executorerne er pr. program. Hvad er så rollen for en arbejdstager? Koordinerer den med eksekutoren og kommunikerer resultatet tilbage til føreren? eller taler føreren direkte med eksekutoren? Hvis det er tilfældet, hvad er så arbejdstagerens formål?
  5. Hvordan styres antallet af eksekutorer for en applikation?
  6. Kan opgaverne bringes til at køre parallelt i eksekutoren? Hvis ja, hvordan konfigureres antallet af tråde til en eksekutor?
  7. Hvad er forholdet mellem worker, eksekutorer og eksekutor-kerner ( – total-executor-cores)?
  8. Hvad betyder det at have flere workers pr. node?

Lader os genbesøge detaljerne om Spark Cluster-tilstand.

Spark anvender en master/slave-arkitektur. Som du kan se på figuren, har den en central koordinator (Driver), der kommunikerer med mange distribuerede arbejdere (executorer). Driveren og hver af eksekutorerne kører i deres egne Java-processer.

Driveren er den proces, hvor hovedmetoden kører. Først konverterer den brugerprogrammet til opgaver, og derefter planlægger den opgaverne på eksekutorerne.

Spark-applikation – -> Driver – -> Liste over opgaver – – -> Scheduler – – -> Eksekutorer

EXECUTORER

Eksekutorer er processer i arbejderknudepunkter, der er ansvarlige for at køre de enkelte opgaver i et givet Spark-job. De startes i begyndelsen af en Spark-applikation og kører typisk i hele applikationens levetid. Når de har kørt opgaven, sender de resultaterne til driveren. De leverer også in-memory-lagring til RDD’er, der lagres i cache af brugerprogrammer via Block Manager.

Når eksekutorer startes, registrerer de sig selv hos driveren, og fra da af kommunikerer de direkte. Arbejderne er ansvarlige for at kommunikere klyngelederen om tilgængeligheden af deres ressourcer.

I en standalone klynge får du én eksekutor pr. arbejder, medmindre du leger med spark.executor.cores, og en arbejder har nok kerner til at rumme mere end én eksekutor.

  • En standalone klynge med 5 arbejderknuder (hver knude har 8 kerner) Når jeg starter et program med standardindstillinger.
  • Spark vil grådigt erhverve så mange kerner og eksekutorer som tilbydes af scheduleren. Så i sidste ende får du 5 eksekutorer med 8 kerner hver.

Følgende er spark-submit indstillingerne for at lege med antallet af eksekutorer:

– executor-memory MEM Hukommelse pr. eksekutor (f.eks. 1000M, 2G) (Standard: 1G).

Spark standalone og YARN only:
– executor-cores NUM Antal kerner pr. eksekutor. (Standard: 1 i YARN-tilstand, eller alle tilgængelige kerner på medarbejderen i standalone-tilstand)

YARN-only:
– num-executors NUM Antal eksekutorer, der skal startes (Standard: 2). Hvis dynamisk allokering er aktiveret, vil det oprindelige antal eksekutorer være mindst NUM.

  • Betyder 2 arbejderinstans en arbejderknude med 2 arbejderprocesser?

En knude er en maskine, og der er ikke en god grund til at køre mere end én arbejder pr. maskine. Så to arbejderknuder betyder typisk to maskiner med hver en Spark-arbejder.

1 knude = 1 arbejderproces

  • Holder hver arbejderinstans en eksekutor for specifik applikation (som administrerer lagring, opgave) eller holder en arbejderknude en eksekutor?

Arbejdere holder mange eksekutorer, for mange applikationer. Et program har eksekutorer på mange arbejdere

En arbejderknude kan have flere eksekutorer (processer), hvis den har tilstrækkelig CPU, hukommelse og lagerplads.

  • BTW, Antallet af eksekutorer i en arbejderknude på et givet tidspunkt afhænger helt af arbejdsbyrden på klyngen og knodens evne til at køre hvor mange eksekutorer.

APPLIKATIONSUDFØRELSESFLOW

Med dette i tankerne, når du sender en applikation til klyngen med spark-submit, er dette, hvad der sker internt:

  1. En standalone-applikation starter og instantierer en SparkContext/SparkSession-instans (og det er først derefter, at du kan kalde applikationen en driver).
  2. Driverprogrammet beder om ressourcer til klyngeadministratoren for at starte eksekutorer.
  3. Klyngeadministratoren lancerer eksekutorer.
  4. Driverprocessen kører gennem brugerprogrammet. Afhængigt af handlinger og transformationer over RDD’er sendes opgaver til eksekutorer.
  5. Eksekutorer kører opgaverne og gemmer resultaterne.
  6. Hvis en arbejdstager går ned, sendes dens opgaver til forskellige eksekutorer for at blive behandlet igen. I bogen “Learning Spark: Lightning-Fast Big Data Analysis” taler de om Spark og fejltolerance:

Spark håndterer automatisk fejlslagne eller langsomme maskiner ved at genudføre fejlslagne eller langsomme opgaver. Hvis f.eks. den knude, der kører en partition af en map()-operation, går ned, vil Spark genudføre den på en anden knude; og selv hvis knuden ikke går ned, men blot er meget langsommere end andre knuder, kan Spark præemptivt starte en “spekulativ” kopi af opgaven på en anden knude og tage dens resultat, hvis den bliver færdig.

  1. Med SparkContext.stop() fra driveren, eller hvis hovedmetoden afsluttes/nedbrydes, vil alle eksekutorer blive afsluttet, og klyngeressourcerne vil blive frigivet af klyngeadministratoren.

Hvis vi ser udførelsen fra Spark prospektivt over en ressourceforvalter for et program, som føjer to rdds sammen og udfører nogle reduktionsoperationer, så filter

Følgende liste indfanger nogle anbefalinger, der skal holdes i tankerne, mens du konfigurerer dem:

  • Hadoop/Yarn/OS Deamons: Når vi kører spark-applikation ved hjælp af en cluster manager som Yarn, vil der være flere daemons, der kører i baggrunden som NameNode, Secondary NameNode, DataNode, JobTracker og TaskTracker. Så mens vi angiver num-executors, skal vi sikre os, at vi afsætter nok kerner (~1 kerne pr. node) til, at disse dæmoner kan køre problemfrit.
  • Yarn ApplicationMaster (AM): ApplicationMaster er ansvarlig for at forhandle ressourcer fra ResourceManager og arbejde sammen med NodeManagers for at udføre og overvåge containerne og deres ressourceforbrug. Hvis vi kører spark på yarn, skal vi budgettere med de ressourcer, som AM ville have brug for (~1024MB og 1 Executor).
  • HDFS Throughput: HDFS-klient har problemer med tonsvis af samtidige tråde. Det blev observeret, at HDFS opnår fuld skrivegennemstrømning med ~5 opgaver pr. eksekutor . Så det er godt at holde antallet af kerner pr. eksekutor under dette antal.
  • MemoryOverhead: Følgende billede viser spark-yarn-memory-usage:

To ting at gøre opmærksom på fra dette billede:

Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)

Så, hvis vi anmoder om 20 GB pr. eksekutor, vil AM faktisk få 20 GB + memoryOverhead = 20 + 7% af 20 GB = ~23 GB hukommelse til os.

  • Kørsel af eksekutorer med for meget hukommelse resulterer ofte i for store forsinkelser i forbindelse med garbage collection.
  • Kørsel af små eksekutorer (med en enkelt kerne og kun nok hukommelse til at køre en enkelt opgave, f.eks.) smider de fordele væk, der følger af at køre flere opgaver i en enkelt JVM.

Så er det nok med teori… Lad os gå praktisk til værks..

Lad os nu overveje en klynge med 10 knudepunkter med følgende konfiguration og analysere forskellige muligheder for fordeling af eksekutorer-kerner-hukommelse:

**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node

Første fremgangsmåde: Tiny executors :

Tiny executors betyder i bund og grund en executor pr. kerne. Følgende tabel viser værdierne for vores spar-config-params med denne tilgang:

- `--num-executors` = `In this approach, we'll assign one executor per core`
= `total-cores-in-cluster`
= `num-cores-per-node * total-nodes-in-cluster`
= 16 x 10 = 160
- `--executor-cores` = 1 (one executor per core)
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/16 = 4GB

Analyse: Med kun én eksekutor pr. kerne, som vi diskuterede ovenfor, vil vi ikke kunne drage fordel af at køre flere opgaver i den samme JVM. Desuden vil delte/cachede variabler som broadcast-variabler og akkumulatorer blive replikeret i hver kerne af knudepunkterne, hvilket er 16 gange. Desuden efterlader vi ikke nok hukommelsesoverhead til Hadoop/Yarn-dæmonprocesser, og vi tæller ikke med i ApplicationManager. IKKE GODT!

Second Approach: Fede eksekutorer (én eksekutor pr. knude):

Fede eksekutorer betyder i bund og grund én eksekutor pr. knude. Følgende tabel viser værdierne for vores spark-config-params med denne tilgang:

- `--num-executors` = `In this approach, we'll assign one executor per node`
= `total-nodes-in-cluster`
= 10
- `--executor-cores` = `one executor per node means all the cores of the node are assigned to one executor`
= `total-cores-in-a-node`
= 16
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/1 = 64GB

Analyse: Med alle 16 kerner pr. eksekutor, bortset fra ApplicationManager og dæmonprocesser er ikke talt med, vil HDFS-gennemstrømning skade, og det vil resultere i overdrevne garbage-resultater. Også,IKKE GODT!

Den tredje tilgang: Balance mellem Fat (vs) Tiny

I overensstemmelse med de anbefalinger, som vi diskuterede ovenfor:

  • Baseret på de anbefalinger, der er nævnt ovenfor, Lad os tildele 5 kerne pr. eksekutorer => – eksekutor-kerner = 5 (for god HDFS-gennemstrømning)
  • Lad 1 kerne pr. knude til Hadoop/Yarn-dæmoner => Num cores available per node = 16-1 = 15
  • Så:
    • Samlet antal kerner til rådighed i klyngen = 15 x 10 = 150
    • Antal af tilgængelige eksekutorer = (total kerner / antal kerner pr. eksekutor) = 150 / 5 = 30
    • Lader 1 eksekutor til ApplicationManager => – antal kerner pr. eksekutoreksekutorer = 29
    • Antal eksekutorer pr. knude = 30/10 = 3
    • Hukommelse pr. eksekutor = 64 GB / 3 = 21 GB
    • Afregning af heap-overhead = 7 % af 21 GB = 3 GB. Så faktisk – eksekutor-hukommelse = 21-3 = 18 GB

    Så den anbefalede konfiguration er: 29 eksekutorer, 18 GB hukommelse hver og 5 kerner hver!!!

    Analyse: Det er indlysende, hvordan denne tredje tilgang har fundet den rette balance mellem Fat vs Tiny-tilgange. Det er unødvendigt at sige, at den har opnået parallelitet som en fed eksekutor og de bedste gennemstrømninger som en lille eksekutor!!!

    Slutning:

    Vi har set:

    • Et par anbefalinger at huske på, som konfigurerer disse params for en spark-applikation som:
    • Budget i de ressourcer, som Yarns Application Manager ville have brug for
    • Hvordan vi bør spare nogle kerner til Hadoop/Yarn/OS deamon-processer
    • Lær om spark-yarn-memory-usage
    • Også, tjekket og analyseret tre forskellige tilgange til at konfigurere disse params:
    1. Tiny Executors – One Executor per Core
    2. Fat Executors – One Executor per Node
    3. Anbefalet tilgang – Rigtig balance mellem Tiny (Vs) Fat koblet med anbefalingerne.

    – num-executors, – executor-cores og – executor-memory… disse tre params spiller en meget vigtig rolle i spark ydeevne, da de styrer mængden af CPU & hukommelse, som din spark applikation får. Dette gør det meget afgørende for brugerne at forstå den rigtige måde at konfigurere dem på. Håber denne blog hjalp dig med at få dette perspektiv …