クエリのシャッフル

shuffle クエリは、 shuffle 戦略をサポートする一連の演算子で使用されるセマンティック保持変換です。 関連するデータに応じて、shuffle 戦略を使用してクエリを実行すると、パフォーマンスが向上する可能性があります。 shuffle キー (join キー、summarize キー、make-series キー、または partition キー) のカーディナリティが高く、通常の演算子クエリがクエリ制限に達する場合は、シャッフル クエリ戦略を使用することをお勧めします。

次の演算子をシャッフル コマンドと一緒に使用できます。

shuffle クエリ戦略を使用するには、 式 hint.strategy = shuffle または hint.shufflekey = <key> を追加します。 hint.strategy=shuffle を使用すると、すべてのキーによって演算子データがシャッフルされます。 複合キーが一意であるが、各キーが十分に一意でない場合は、この式を使用します。そのため、シャッフルされた演算子のすべてのキーを使用してデータをシャッフルします。

シャッフル戦略を使用してデータをパーティション分割すると、すべてのクラスター ノードでデータの負荷が共有されます。 各ノードは、データの 1 つのパーティションを処理します。 既定のパーティション数は、クラスター ノードの数と同じになります。

パーティション番号は、パーティションの数を制御する構文 hint.num_partitions = total_partitions を使用してオーバーライドできます。 これは、クラスターのクラスター ノード数が少ない場合に、既定のパーティション番号が小さかったり、クエリが失敗したり、実行時間が長かったりする場合に便利です。

Note

多数のパーティションを使用すると、より多くのクラスター リソースが消費され、パフォーマンスが低下する可能性があります。 hint.strategy = shuffle から始め、パーティション番号を慎重に選択し、パーティションを徐々に増やします。

場合によっては、 hint.strategy = shuffle は無視され、クエリは shuffle 戦略で実行されません。 これは、次の場合に発生することがあります。

  • join 演算子には、左側または右側に別の shuffle の互換性のある演算子 (joinsummarizemake-series または partition) があります。
  • summarize 演算子は、クエリ内の別 shuffle の互換性のある演算子 (joinsummarizemake-series または partition) の後に表示されます。

構文

With hint.strategy = shuffle

T | DataExpression | join hint.strategy = shuffle ( DataExpression )

T | summarize hint.strategy = shuffle DataExpression

T | Query | パーティション hint.strategy = shuffle ( SubQuery )

With hint.shufflekey = key

T | DataExpression | join hint.shufflekey = key ( DataExpression )

T | summarize hint.shufflekey = key DataExpression

T | make-series hint.shufflekey = key DataExpression

T | Query | partition hint.shufflekey = key ( SubQuery )

構文規則について詳しく知る。

パラメーター

件名 タイプ Required 説明
T string ✔️ 演算子によって処理されるデータを含む表形式のソース。
DataExpression string 暗黙的または明示的な表形式変換式。
クエリ string 変換式は、 T のレコードに対して実行されます。
キー string join キー、summarize キー、make-series キー、またはpartition キーを使用します。
SubQuery string 変換式。

Note

DataExpressionまたはQueryは、選択した構文に応じて指定する必要があります。

シャッフルで集計を使用する

summarize演算子を使用したshuffle戦略クエリは、すべてのクラスター ノードの負荷を共有します。この場合、各ノードはデータの 1 つのパーティションを処理します。

StormEvents
| summarize hint.strategy = shuffle count(), avg(InjuriesIndirect) by State
| count 

出力

カウント
67

シャッフルで結合を使用する

StormEvents
| where State has "West"
| where EventType has "Flood"
| join hint.strategy=shuffle 
    (
    StormEvents
    | where EventType has "Hail"
    | project EpisodeId, State, DamageProperty
    )
    on State
| count

出力

カウント
103

シャッフルで make-series を使用する

StormEvents
| where State has "North"
| make-series hint.shufflekey = State sum(DamageProperty) default = 0 on StartTime in range(datetime(2007-01-01 00:00:00.0000000), datetime(2007-01-31 23:59:00.0000000), 15d) by State

出力

都道府県 sum_DamageProperty StartTime
ノースダコタ [60000,0,0] ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"]
ノースカロライナ [20000,0,1000] ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"]
北大西洋 [0,0,0] ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"]

シャッフルでパーティションを使用する

StormEvents
| partition hint.strategy=shuffle by EpisodeId
(
    top 3 by DamageProperty
    | project EpisodeId, State, DamageProperty
)
| count

出力

カウント
22345

hint.strategy=shuffle と hint.shufflekey=key を比較する

hint.strategy=shuffle を使用すると、すべてのキーによってシャッフル済みの演算子データがシャッフルされます。 次の例では、クエリはキーとして EpisodeIdEventId の両方を使用してデータをシャッフルします。

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| join kind = inner hint.strategy=shuffle (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId
| count

出力

カウント
14

次のクエリでは hint.shufflekey = key を使用します。 上記のクエリは、このクエリと同等です。

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| join kind = inner hint.shufflekey = EpisodeId hint.shufflekey = EventId (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId

出力

カウント
14

複数のキーを使用してデータをシャッフルする

場合によっては、 hint.strategy=shuffle は無視され、クエリはシャッフル戦略では実行されません。 たとえば、次の例では、結合の左側に集計があるため、 hint.strategy=shuffle を使用してもクエリにシャッフル戦略は適用されません。

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| summarize count() by EpisodeId, EventId
| join kind = inner hint.strategy=shuffle (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId

出力

EpisodeId EventId ... EpisodeId1 EventId1 ...
1030 4407 ... 1030 4407 ...
1030 13721 ... 1030 13721 ...
2477 12530 ... 2477 12530 ...
2103 10237 ... 2103 10237 ...
2103 10239 ... 2103 10239 ...
... ... ... ... ... ...

この問題を克服し、シャッフル戦略で実行するには、 summarize および join 操作に共通するキーを選択します。 この場合、このキーは EpisodeId になります。 ヒント hint.shufflekey を使用して、 join のシャッフル キーを hint.shufflekey = EpisodeId に指定します。

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| summarize count() by EpisodeId, EventId
| join kind = inner hint.shufflekey=EpisodeId (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId

出力

EpisodeId EventId ... EpisodeId1 EventId1 ...
1030 4407 ... 1030 4407 ...
1030 13721 ... 1030 13721 ...
2477 12530 ... 2477 12530 ...
2103 10237 ... 2103 10237 ...
2103 10239 ... 2103 10239 ...
... ... ... ... ... ...

集計とシャッフルを使用してパフォーマンスを向上させる

この例では、summarize 演算子を shuffle 戦略と一緒に使用して、パフォーマンスを向上させます。 ソース テーブルには 1 億 5,000 万のレコードが含まれ、キーによるグループのカーディナリティは 10M であり、10 クラスター ノードにまたがっています。

次のように、shuffle 戦略なしの summarize 演算子を使用すると、クエリは 1:08 より後に終了し、メモリ使用量のピークは最大 3 GB です。

orders
| summarize arg_max(o_orderdate, o_totalprice) by o_custkey 
| where o_totalprice < 1000
| count

出力

カウント
1086

次のように、summarizeshuffle 戦略を使用している間、クエリは約 7 秒後に終了し、メモリ使用量のピークは 0.43 GB です。

orders
| summarize hint.strategy = shuffle arg_max(o_orderdate, o_totalprice) by o_custkey 
| where o_totalprice < 1000
| count

出力

カウント
1086

次の例は、2 つのクラスター ノードを持つクラスターのパフォーマンスを示しており、6,000 万のレコードを持つテーブルでは、キーによるグループのカーディナリティは 2M です。

次のように、hint.num_partitions なしでクエリを実行すると、(クラスター ノード番号として) 2 つのパーティションだけが使用され、次のクエリには最大 1 時間 10 分かかる場合があります。

lineitem 
| summarize hint.strategy = shuffle dcount(l_comment), dcount(l_shipdate) by l_partkey 
| consume

次のように、パーティション番号を 10 に設定すると、クエリは 23 秒後に終了します。

lineitem 
| summarize hint.strategy = shuffle hint.num_partitions = 10 dcount(l_comment), dcount(l_shipdate) by l_partkey 
| consume

結合とシャッフルを使用してパフォーマンスを向上させる

次の例は、 join 演算子で shuffle 戦略を使用してパフォーマンスを向上させる方法を示しています。

この例では、データがこれらすべてのノードに広がっている 10 のノードを含むクラスターでサンプリングされました。

クエリの左側のソース テーブルには、join キーのカーディナリティが最大 14M である 1 億 5,000 万のレコードがあります。 クエリの右側のソースには 1 億 5,000 万のレコードが含まれ、join キーのカーディナリティは 10M です。 次のように、クエリは最大 28 秒後に終了し、メモリ使用量のピークは 1.43 GB です。

customer
| join
    orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey

次のように、join 演算子で shuffle 戦略を使用する場合、クエリは最大 4 秒後に終了し、メモリ使用量のピークは 0.3 GB です。

customer
| join
    hint.strategy = shuffle orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey

別の例では、次の条件を使用して、より大きなデータセットに対して同じクエリを試します。

  • join の左側のソースは 150M、キーのカーディナリティは 148M です。
  • join の右側のソースは 1.5B、キーのカーディナリティは 最大 100M です。

join演算子のみを含むクエリは、4 分後に制限とタイムアウトに達します。 ただし、join 演算子で shuffle 戦略を使用する場合、クエリは最大 34 秒後に終了し、メモリ使用量のピークは 1.23 GB です。

次の例は、2 つのクラスター ノードを持つクラスターの機能強化を示しています。テーブルは 6,000 万のレコードがあり、join キーのカーディナリティは 2M です。 次のように、hint.num_partitions なしでクエリを実行すると、(クラスター ノード番号として) 2 つのパーティションだけが使用され、次のクエリには最大 1 時間 10 分かかる場合があります。

lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
    hint.shufflekey = l_partkey   part
on $left.l_partkey == $right.p_partkey
| consume

次のように、パーティション番号を 10 に設定する場合、クエリは 23 秒後に終了します。

lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
    hint.shufflekey = l_partkey  hint.num_partitions = 10    part
on $left.l_partkey == $right.p_partkey
| consume