Spark Jargon for Starters

このブログは、初心者がSpark分散コンピューティングのコードを書くときに、最初に困ることを解消するためのものです。 API の学習とは別に、Spark のパワーを最大限に活用するために、クラスタの詳細について学ぶ必要があります。

  1. Spark Standalone クラスターでのさまざまなプロセスと並列処理をまだ理解していません。
  2. binstart-slave.sh を実行して、実際には JVM であるワーカーがスポーンされることがわかりました。 worker は JVM プロセスですか。
  3. 上記のリンクにあるように、executor はワーカー ノード上でアプリケーション用に起動されたプロセスであり、タスクを実行します。
  4. エグゼキュータは、アプリケーションごとにあります。 では、ワーカーの役割とは何でしょうか。 それともドライバが直接エクゼキュータと対話するのでしょうか?
  5. アプリケーションのエグゼキュータの数を制御するにはどうしたらよいですか。
  6. Worker、Executor、Executor Core ( – total-executor-cores) の関係はどうなっていますか。
  7. 1ノードあたりのWorkerの数を増やすとはどういうことですか。

Spark Cluster モードの詳細を再確認しましょう。

Spark ではマスター/スレーブのアーキテクチャを使用しています。 図にあるように、1つの中央コーディネータ(Driver)を持ち、多くの分散ワーカー(Executor)と通信します。 ドライバと各エクゼキュータはそれぞれ独自の Java プロセスで実行されます。

ドライバは、メイン メソッドが実行されるプロセスです。

ドライバはメインメソッドが動作するプロセスで、まずユーザープログラムをタスクに変換し、その後エグゼキュータ上でタスクをスケジュールします。

Spark Application – -> Driver – -> List of Tasks – -> Scheduler – -> Executors

EXECUTORS

エグゼキュータは、与えられたスパークジョブの個々のタスク実行担当である、ワーカーノードのプロセスです。 彼らはSparkアプリケーションの開始時に起動され、通常、アプリケーションの寿命の全体にわたって実行されます。 タスクの実行後は、その結果をドライバに送信する。 また、ブロック マネージャーを通じてユーザー プログラムによってキャッシュされる RDD のインメモリー ストレージも提供します。

エグゼキューターは起動時にドライバーに登録し、それ以降、直接通信します。 ワーカーはクラスターマネージャーにリソースの可用性を伝える役割を果たします。

スタンドアロン クラスタでは、spark.executor.cores を使用してワーカーが複数のエグゼキューターを保持するのに十分なコアを持たない限り、ワーカーごとに 1 つのエグゼキューターを取得します。

  • 5 つのワーカー ノードを持つスタンドアロン クラスター (各ノードは 8 コア) で、デフォルト設定でアプリケーションを開始すると、
  • Spark はスケジューラーが提供するコアとエグゼキュータを欲張りに取得しようとします。

Following are the spark-submit options to play around with number of executors:

– executor-memory MEM Memory per executor (ex. 1000M, 2G) (Default: 1G).

Spark standalone and YARN only:
– executor-cores NUM executor per core number of cores.The core per executor.The core number of core with 8 coore. (デフォルト: YARN モードでは 1、スタンドアロン モードでは Worker 上の利用可能なすべてのコア)

YARN-only:
– num-executors NUM 起動するエグゼキューターの数 (デフォルト: 2)。 動的割り当てが有効な場合、最初のエグゼキュータ数は少なくとも NUM になります。

  • 2 worker インスタンスは、2 ワーカープロセスを持つ 1 ワーカーノードを意味しますか。

ノードはマシンであり、マシンごとに複数のワーカーを実行する良い理由はないでしょう。

1 ノード = 1 ワーカー プロセス

  • 各ワーカー インスタンスは、特定のアプリケーション (ストレージやタスクを管理する) のエグゼキューターを保持しますか、それとも 1 ワーカー ノードが 1 エグゼキューターを保持しますか。

ワーカーノードは、多くのアプリケーションのために、多くのエグゼキュータを保持します。

  • ところで、ある時点のワーカーノードの実行数は完全にクラスタの作業負荷と何個の実行を実行できるノードの性能に依存します。

APPLICATION EXECUTION FLOW

これを念頭に置いて、spark-submit でアプリケーションをクラスターに送信すると、内部では次のことが起こります。

  1. スタンドアロン アプリケーションが起動し、SparkContext/SPARKSession インスタンスが作成されます (そして、そのとき初めて、アプリケーションをドライバーと呼び出すことができる)。
  2. ドライバプログラムは、クラスタマネージャにリソースを要求して、エグゼキュータを起動します。
  3. クラスタマネージャはエグゼキュータを起動します。
  4. ワーカーがクラッシュした場合、そのタスクは別のワーカーに送られ、再度処理されます。 書籍「Learning Spark: 4122>

Spark は、失敗したタスクや低速のタスクを再実行することで、失敗したマシンや低速のマシンに自動的に対処します。 たとえば、map() 操作のパーティションを実行しているノードがクラッシュした場合、Spark はそれを別のノードで再実行します。また、ノードがクラッシュせず、単に他のノードよりはるかに遅い場合でも、Spark は別のノードでタスクの「投機」コピーを先行して起動し、それが終了したらその結果を取得することができます。

  1. With SparkContext.stop() from the driver or if the main method exits/crashes all the executors will be terminated and the cluster resources will be released by the cluster manager.ドライバから、またはメイン メソッドが終了/クラッシュした場合、すべてのエグゼキュータは終了し、クラスタ リソースはクラスタ マネージャーによって解放されます。

2 つの RD を結合して何らかの削減処理を行うプログラムについて、任意のリソース マネージャー上の Spark プロスペクティブから実行を見ると、フィルター

以下のリストは、構成中に覚えておくべきいくつかの推奨事項をまとめたものです。

  • Hadoop/Yarn/OS Deamons: Yarn のようなクラスタマネージャを使って spark アプリケーションを実行すると、NameNode、Secondary NameNode、DataNode、JobTracker、TaskTracker などのデーモンがバックグラウンドで実行されるようになります。 したがって、num-executors を指定する際に、これらのデーモンがスムーズに実行できるように十分なコア (ノードあたり ~1 コア) を残しておく必要があります。 ApplicationMaster は、ResourceManager からリソースをネゴシエートし、NodeManager と連携してコンテナとそのリソース消費を実行および監視する役割を担います。 yarn 上で spark を実行している場合、AM が必要とするリソース (~1024MB と 1 Executor) を予算化する必要があります。 HDFS クライアントは、大量の同時スレッドに悩まされています。 HDFS は、1 つのエグゼキューターあたり ~5 つのタスクで完全な書き込みスループットを達成することが観察されました。 そのため、実行者あたりのコア数をそれ以下に保つのがよいでしょう。
  • MemoryOverhead:

この画像から注意すべき点が2つあります。

Full memory requested to yarn per executor =
spark-executor-memory + spark.yarn.executor.memoryOverhead.
spark.yarn.executor.memoryOverhead =
Max(384MB, 7% of spark.executor-memory)

つまり、もし我々が1台のexecutorに20GBを要求すると、実際には20GB + memoryOverhead = 20 + 20% = ~23GBメモリを取得できることになります。

  • あまりに多くのメモリでエグゼキュータを実行すると、しばしば過剰なガベージ コレクション遅延が発生します。
  • (1 つのコアと、たとえば単一のタスクを実行するのに必要なだけのメモリで)小さなエグゼキュータを実行すると、単一の JVM で複数のタスクを実行することによる利点が失われます。

    さて、次の構成で 10 ノード クラスターを考え、エグゼキュータ、コア、メモリ分布のさまざまな可能性を分析しましょう。

    小さなエグゼキュータとは、基本的に1コアに1つのエグゼキュータを意味します。 以下の表は、このアプローチにおけるspar-configのパラメータ値を示しています:

    - `--num-executors` = `In this approach, we'll assign one executor per core`
    = `total-cores-in-cluster`
    = `num-cores-per-node * total-nodes-in-cluster`
    = 16 x 10 = 160
    - `--executor-cores` = 1 (one executor per core)
    - `--executor-memory` = `amount of memory per executor`
    = `mem-per-node/num-executors-per-node`
    = 64GB/16 = 4GB

    分析。 コアあたり1つのエグゼキュータだけでは、上で説明したように、同じJVMで複数のタスクを実行する利点を生かすことができません。 また、ブロードキャスト変数やアキュムレーターなどの共有/キャッシュされた変数は、ノードの各コアで 16 回複製されることになります。 また、Hadoop/Yarnのデーモンプロセスのための十分なメモリのオーバーヘッドを残していませんし、ApplicationManagerでカウントされていません。 良くない!

    2 番目のアプローチ。 Fat executors (One Executor per node):

    Fat-executorsとは、基本的にノードあたり1つのexecutorを意味します。 次の表は、このアプローチでの spark-config パラメータの値を示しています:

    - `--num-executors` = `In this approach, we'll assign one executor per node`
    = `total-nodes-in-cluster`
    = 10
    - `--executor-cores` = `one executor per node means all the cores of the node are assigned to one executor`
    = `total-cores-in-a-node`
    = 16
    - `--executor-memory` = `amount of memory per executor`
    = `mem-per-node/num-executors-per-node`
    = 64GB/1 = 64GB

    解析結果です。 ApplicationManagerとデーモンプロセスを除いて、1つのエグゼキュータにつき16個のコアを使用すると、HDFSのスループットが低下し、過剰なガーベッジ結果が発生することになります。 また、良くないことです!

    3 番目のアプローチ。 ファット (対タイニー) のバランス

    上記で説明した推奨事項に従って、ファット (対タイニー) のバランスをとることにしました。

    • 上記の推奨事項に基づいて、実行者ごとに 5 コアを割り当てましょう => – 実行者-コア = 5 (HDFS スループットを良くする)
    • Hadoop/Yarn デーモン用にノードあたり 1 コア残しておきます => ノードあたり使用できるコア = 16-1 = 15
    • つまり、次のようになります。 クラスタで利用可能なコアの合計 = 15 x 10 = 150
    • 利用可能なエグゼキュータの数 = (コア合計 / エグゼキュータあたりのコア数) = 150 / 5 = 30
    • アプリケーションマネージャ用に1エグゼキュータを残す => – num-
    • アプリケーションマネージャ用のエグゼキュータとして1コアを残す。executors = 29
    • Number of executors per node = 30/10 = 3
    • Memory per executor = 64GB / 3 = 21GB
    • heap overhead = 7% of 21GB = 3GBを計算。 つまり、実際-実行者-メモリ=21-3=18GB

    そこで、推奨構成は:29実行者、各18GBメモリ、各5コア!!

    解析してみました。 この 3 番目のアプローチは、Fat と Tiny のアプローチの間の適切なバランスをどのように見つけたかについて、明らかです。 言うまでもなく、ファットエクゼキュータの並列性とタイニーエクゼキュータの最高のスループットを達成しました!

    結論:

    私たちは、

    • これらのパラメーターをスパーク アプリケーション用に設定する際に念頭に置いておくべき推奨事項をいくつか見てきました。
    • Yarn のアプリケーション マネージャーが必要とするリソースの予算
    • Hadoop/Yarn/OS deamon プロセス用にいくつかのコアを確保する方法
    • Learnt about spark-yarn-memory-usage
    • また、3 つの異なるアプローチをチェックアウトして分析し、これらのパラメーターを構成することができました。
    1. Tiny Executors – One Executor per Core
    2. Fat Executors – One executor per Node
    3. Recommended approach – Right Balance between Tiny (Vs) Fat coupled with the Recommendations.

    – num-executors, – executor-cores and – executor-memory… この3つのパラメータは、spark アプリケーションが取得する CPU & メモリの量を制御するため、spark のパフォーマンスで非常に重要な役割を果たします。 このため、ユーザはこれらのパラメータを正しく設定することが非常に重要になります。 このブログがそのような視点を得るために役立つことを願っています…