Spark-jargong för nybörjare

Den här bloggen är till för att reda ut några av de problem som uppstår när nybörjare kodar för Spark distributed computing. Förutom att lära sig API:erna måste man utrusta sig med klusterdetaljer för att få ut det bästa av Spark-kraften.

Startpunkten är Cluster Mode Overview .

Och några vanliga frågor som kan dyka upp är:

  1. Du kan fortfarande inte förstå de olika processerna i Spark Standalone-klustret och parallelliteten.
  2. Du körde bin\start-slave.sh och upptäckte att den skapade arbetaren, som faktiskt är en JVM. Är arbetaren en JVM-process eller inte?
  3. Enligt ovanstående länk är en exekutor en process som startas för ett program på en arbetarnod som kör uppgifter. Exekutor är också en JVM.
  4. Exekutorer är per program. Vad har då en arbetare för roll? Samordnar den med utföraren och meddelar resultatet till föraren eller talar föraren direkt med utföraren? Om så är fallet, vad är då arbetarens syfte?
  5. Hur kan man kontrollera antalet utförare för ett program?
  6. Kan man få uppgifterna att köras parallellt i utföraren? Om så är fallet, hur konfigurerar man antalet trådar för en exekutor?
  7. Vad är förhållandet mellan arbetare, exekutorer och exekutorkärnor ( – total-executor-cores)?
  8. Vad innebär det att ha fler arbetare per nod?

Låt oss se över detaljerna om Spark Cluster-läget.

Spark använder en master/slave-arkitektur. Som du kan se i figuren har den en central samordnare (Driver) som kommunicerar med många distribuerade arbetare (executors). Föraren och var och en av utförarna körs i egna Java-processer.

Föraren är den process där huvudmetoden körs. Först omvandlar den användarprogrammet till uppgifter och därefter schemalägger den uppgifterna på utförarna.

Spark Application – – -> Driver – -> List of Tasks – -> Scheduler – – -> Executors

EXECUTORS

Executors är processer i arbetarnoderna som ansvarar för att köra enskilda uppgifter i ett givet Spark-arbete. De startas i början av en Spark-applikation och körs vanligtvis under hela applikationens livstid. När de har kört uppgiften skickar de resultaten till drivrutinen. De tillhandahåller också minneslagring för RDD:er som cachelagras av användarprogram genom Block Manager.

När exekutorer startas registrerar de sig hos drivrutinen och från och med då kommunicerar de direkt. Arbetarna ansvarar för att meddela klusterledaren om tillgången på deras resurser.

I ett fristående kluster får du en exekutor per arbetare om du inte leker med spark.executor.cores och en arbetare har tillräckligt många kärnor för att rymma mer än en exekutor.

  • Ett fristående kluster med 5 arbetarnoder (varje nod har 8 kärnor) När jag startar ett program med standardinställningar.
  • Spark kommer girigt att förvärva så många kärnor och exekutorer som erbjuds av schemaläggaren. Så i slutändan får du 5 exekutorer med 8 kärnor vardera.

Följande är spark-submit-alternativen för att leka med antalet exekutorer:

– executor-memory MEM Minne per exekutor (t.ex. 1000M, 2G) (Standard: 1G).

Spark standalone and YARN only:
– executor-cores NUM Antal kärnor per exekutor. (Standard: 1 i YARN-läge, eller alla tillgängliga kärnor på arbetaren i fristående läge)

YARN-only:
– num-executors NUM Antal exekutorer som ska startas (Standard: 2). Om dynamisk allokering är aktiverad kommer det initiala antalet utförare att vara minst NUM.

  • Bär 2 arbetarinstans en arbetarnod med 2 arbetsprocesser?

En nod är en maskin, och det finns ingen bra anledning att köra mer än en arbetare per maskin. Så två arbetarnoder innebär vanligtvis två maskiner med varsin Spark-arbetare.

1 nod = 1 arbetsprocess

  • Håller varje arbetarinstans en verkställare för en specifik tillämpning (som hanterar lagring, uppgift) eller håller en arbetarnod en verkställare?

Arbetsare håller många verkställare, för många tillämpningar. Ett program har utförare på många arbetare

En arbetarnod kan ha flera utförare (processer) om den har tillräckligt med processor, minne och lagringsutrymme.

  • BTW, Antalet utförare i en arbetsnod vid en viss tidpunkt beror helt och hållet på arbetsbelastningen på klustret och nodens förmåga att köra hur många utförare.

APPLIKATIONSUTFÖRANDEFLÖDE

Med detta i åtanke, när du skickar en ansökan till klustret med spark-submit är det här vad som händer internt:

  1. En fristående tillämpning startar och instansierar en SparkContext/SparkSession-instans (och det är först då som du kan kalla tillämpningen för en drivrutin).
  2. Drivrutinprogrammet ber om resurser till klusterhanteraren för att starta utförare.
  3. Klusterhanteraren startar utförare.
  4. Drivrutinprocessen körs genom användarprogrammet. Beroende på åtgärder och omvandlingar över RDD:er skickas uppgifter till utförare.
  5. Utförare kör uppgifterna och sparar resultaten.
  6. Om någon utförare kraschar skickas uppgifterna till olika utförare för att behandlas på nytt. I boken ”Learning Spark: Lightning-Fast Big Data Analysis” talar de om Spark och feltolerans:

Spark hanterar automatiskt misslyckade eller långsamma maskiner genom att utföra misslyckade eller långsamma uppgifter på nytt. Om t.ex. noden som kör en partition av en map()-operation kraschar kommer Spark att köra om den på en annan nod, och även om noden inte kraschar utan bara är mycket långsammare än andra noder kan Spark starta en ”spekulativ” kopia av uppgiften på en annan nod och ta dess resultat om den avslutas.

  1. Med SparkContext.stop() från drivrutinen eller om huvudmetoden avslutas/kraschar kommer alla utförare att avslutas och klusterresurserna kommer att frigöras av klusterhanteraren.

Om vi tittar på utförandet från Spark prospective över någon resurshanterare för ett program, som sammanfogar två rdds och gör någon reduktionsoperation så filter

Följande lista fångar några rekommendationer som man ska tänka på när man konfigurerar dem:

  • Hadoop/Yarn/OS Deamons: När vi kör spark-applikationen med hjälp av en klusterhanterare som Yarn kommer det att finnas flera daemons som körs i bakgrunden, t.ex. NameNode, Secondary NameNode, DataNode, JobTracker och TaskTracker. Så när vi anger num-executors måste vi se till att vi lämnar tillräckligt många kärnor åt sidan (~1 kärna per nod) för att dessa daemoner ska kunna köras smidigt.
  • Yarn ApplicationMaster (AM): ApplicationMaster ansvarar för att förhandla resurser från ResourceManager och arbetar med NodeManagers för att exekvera och övervaka behållarna och deras resursförbrukning. Om vi kör spark på yarn måste vi budgetera med de resurser som AM skulle behöva (~1024MB och 1 Executor).
  • HDFS Throughput: HDFS-klienten har problem med massor av samtidiga trådar. Det observerades att HDFS uppnår full genomströmning vid skrivning med ~5 uppgifter per exekutor . Det är därför bra att hålla antalet kärnor per exekutor under detta antal.
  • MemoryOverhead: Följande bild visar spark-yarn-memory-usage.

Två saker att notera från den här bilden:

Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)

Så, om vi begär 20GB per exekutor, kommer AM faktiskt att få 20GB + memoryOverhead = 20 + 7% av 20GB = ~23GB minne för oss.

  • Körning av exekutorer med för mycket minne resulterar ofta i alltför stora förseningar vid sophämtning.
  • Körning av små exekutorer (med en enda kärna och bara tillräckligt med minne för att köra en enskild uppgift, t.ex.) gör att man förlorar fördelarna med att köra flera uppgifter i en enda JVM.

Tillräckligt med teori… Låt oss gå till praktisk handling.

Nu kan vi betrakta ett kluster med 10 noder med följande konfiguration och analysera olika möjligheter till fördelning av utförare, kärna och minne:

**Cluster Config:**
10 Nodes
16 cores per Node
64GB RAM per Node

Första tillvägagångssättet: Små exekutorer :

Tiny exekutorer innebär i huvudsak en exekutor per kärna. Följande tabell visar värdena för våra spar-config-parametrar med detta tillvägagångssätt:

- `--num-executors` = `In this approach, we'll assign one executor per core`
= `total-cores-in-cluster`
= `num-cores-per-node * total-nodes-in-cluster`
= 16 x 10 = 160
- `--executor-cores` = 1 (one executor per core)
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/16 = 4GB

Analys: Med endast en exekutor per kärna, som vi diskuterade ovan, kommer vi inte att kunna dra nytta av att köra flera uppgifter i samma JVM. Dessutom kommer delade/cachade variabler som sändningsvariabler och ackumulatorer att replikeras i varje kärna i noderna, vilket är 16 gånger. Dessutom lämnar vi inte tillräckligt med minnesoverhead för Hadoop/Yarn daemonprocesser och vi räknar inte in ApplicationManager. INTE BRA!

Den andra metoden: Fat executors (en exekutor per nod):

Fat executors innebär i huvudsak en exekutor per nod. Följande tabell visar värdena för våra spark-config params med detta tillvägagångssätt:

- `--num-executors` = `In this approach, we'll assign one executor per node`
= `total-nodes-in-cluster`
= 10
- `--executor-cores` = `one executor per node means all the cores of the node are assigned to one executor`
= `total-cores-in-a-node`
= 16
- `--executor-memory` = `amount of memory per executor`
= `mem-per-node/num-executors-per-node`
= 64GB/1 = 64GB

Analys: Med alla 16 kärnor per exekutor, bortsett från ApplicationManager och daemonprocesser som inte räknas med, kommer HDFS-genomströmningen att skadas och det kommer att resultera i överdrivna skräpresultat. Också,NOT GOOD!

Tredje tillvägagångssättet: Balans mellan Fat (vs) Tiny

Enligt rekommendationerna som vi diskuterade ovan:

  • Baserat på rekommendationerna som nämns ovan, låt oss tilldela 5 kärnor per exekutor => – exekutor-kärnor = 5 (för god HDFS-genomströmning)
  • Lämna 1 kärna per nod för Hadoop/Yarn-demoner => Antal tillgängliga kärnor per nod = 16-1 = 15
  • Så, Totalt antal tillgängliga kärnor i klustret = 15 x 10 = 150
  • Antal tillgängliga utförare = (totalt antal kärnor / antal kärnor per utförare) = 150 / 5 = 30
  • Lämna 1 utförare till ApplicationManager => – num-exekutorer = 29
  • Antal exekutorer per nod = 30/10 = 3
  • Minne per exekutor = 64GB / 3 = 21GB
  • Medräknat heap overhead = 7% av 21GB = 3GB. Så det faktiska utförarminnet = 21-3 = 18 GB

Den rekommenderade konfigurationen är alltså: 29 utförare, 18 GB minne per utförare och 5 kärnor per utförare!

Analys: Det är uppenbart att detta tredje tillvägagångssätt har hittat den rätta balansen mellan Fat och Tiny. Det är onödigt att säga att den uppnådde parallellism av en fet exekutor och bästa genomströmning av en liten exekutor!!!

Slutsats:

Vi har sett:

  • Flera rekommendationer att tänka på när man konfigurerar dessa parametrar för en spark-applikation som:
  • Budgetera in de resurser som Yarns Application Manager skulle behöva
  • Hur vi bör spara några kärnor för Hadoop/Yarn/OS deamon-processer
  • Lär dig om spark-yarn-memory-usage
  • Också, kollat upp och analyserat tre olika tillvägagångssätt för att konfigurera dessa params:
  1. Tiny Executors – En exekutor per kärna
  2. Fat Executors – En exekutor per nod
  3. Rekommenderat tillvägagångssätt – Rätt balans mellan Tiny (Vs) Fat kopplat till rekommendationerna.

– num-executors, – executor-cores och – executor-memory… Dessa tre parametrar spelar en mycket viktig roll för sparkprestanda eftersom de styr hur mycket processorminne & ditt sparkprogram får. Detta gör det mycket viktigt för användare att förstå rätt sätt att konfigurera dem. Hoppas att den här bloggen hjälpte dig att få det perspektivet…