範囲結合の最適化
"範囲結合" は、2 つのリレーションが間隔内のポイントまたは間隔の重複条件を使用して結合されるときに実行されます。 Databricks Runtime での範囲結合の最適化のサポートにより、クエリのパフォーマンスを大幅に向上させることができますが、慎重に手動で調整する必要があります。
Databricks では、パフォーマンスが低い場合に範囲結合に結合ヒントを使用することをお勧めします。
間隔内のポイントによる範囲結合
"間隔内のポイントによる範囲結合" は、一方のリレーションの値が、もう一方のリレーションの 2 つの値の間にあることを示す述語が条件に含まれる結合です。 次に例を示します。
-- using BETWEEN expressions
SELECT *
FROM points JOIN ranges ON points.p BETWEEN ranges.start and ranges.end;
-- using inequality expressions
SELECT *
FROM points JOIN ranges ON points.p >= ranges.start AND points.p < ranges.end;
-- with fixed length interval
SELECT *
FROM points JOIN ranges ON points.p >= ranges.start AND points.p < ranges.start + 100;
-- join two sets of point values within a fixed distance from each other
SELECT *
FROM points1 p1 JOIN points2 p2 ON p1.p >= p2.p - 10 AND p1.p <= p2.p + 10;
-- a range condition together with other join conditions
SELECT *
FROM points, ranges
WHERE points.symbol = ranges.symbol
AND points.p >= ranges.start
AND points.p < ranges.end;
間隔の重複による範囲結合
"間隔の重複による範囲結合" は、各リレーションの 2 つの値の間隔が重複することを示す述語が条件に含まれる結合です。 次に例を示します。
-- overlap of [r1.start, r1.end] with [r2.start, r2.end]
SELECT *
FROM r1 JOIN r2 ON r1.start < r2.end AND r2.start < r1.end;
-- overlap of fixed length intervals
SELECT *
FROM r1 JOIN r2 ON r1.start < r2.start + 100 AND r2.start < r1.start + 100;
-- a range condition together with other join conditions
SELECT *
FROM r1 JOIN r2 ON r1.symbol = r2.symbol
AND r1.start <= r2.end
AND r1.end >= r2.start;
範囲結合の最適化
範囲結合の最適化は、次のような結合に対して実行されます。
- 間隔内のポイントまたは間隔の重複による範囲結合として解釈できる条件が含まれる。
- 範囲結合の条件に含まれるすべての値が、数値型 (整数、浮動小数点、10進数)、
DATE
、またはTIMESTAMP
である。 - 範囲結合の条件に含まれるすべての値が同じ型である。 また 10 進型の場合、値の小数点以下桁数と有効桁数が同じである必要がある。
INNER JOIN
である。また、間隔内のポイントによる範囲結合の場合、左側にポイント値を含むLEFT OUTER JOIN
があるか、右側にポイント値を含むRIGHT OUTER JOIN
がある。- ビン サイズのチューニング パラメーターがある。
ビンのサイズ
"ビンのサイズ" は、範囲条件の値ドメインを、同じサイズの複数の "ビン" に分割する数値チューニング パラメーターです。 たとえば、ビン サイズが 10 の場合、最適化によって、ドメインは長さ 10 の間隔のビンに分割されます。
p BETWEEN start AND end
という範囲内のポイント条件があり、start
が 8 で、end
が 22 の場合、この値間隔は、長さ 10 の 3 つのビンに重なります。1 番目のビンは 0 から 10、2 番目のビンは 10 から 20、3 番目のビンは 20 から 30 です。 同じ 3 つのビンに分類されるポイントのみ、その間隔の結合一致候補と見なす必要があります。 たとえば、p
が 32 の場合、30 から 40 のビンに分類されるため、start
8 から end
22 までの分類では除外できます。
注意
DATE
値では、ビン サイズの値は日数として解釈されます。 たとえば、ビン サイズ値 7 は 1 週間を表します。TIMESTAMP
値では、ビン サイズの値は秒数として解釈されます。 1 秒未満の値が必要な場合、小数値を使用できます。 たとえば、ビン サイズ値 60 は 1 分を表し、ビン サイズ値 0.1 は 100 ミリ秒を表します。
ビン サイズを指定するには、クエリで範囲結合ヒントを使用するか、セッション構成パラメーターを設定します。 範囲結合の最適化は、ビン サイズを手動で指定した "場合にのみ" 適用されます。 「ビン サイズを選択する」セクションで、最適なビン サイズを選択する方法について説明します。
範囲結合ヒントを使用して範囲結合を有効にする
SQL クエリで範囲結合の最適化を有効にするために、"範囲結合ヒント" を使用してビン サイズを指定できます。 このヒントには、結合対象のいずれかのリレーションのリレーション名と、数値ビン サイズ パラメーターを含める必要があります。 リレーション名として、テーブル、ビュー、またはサブクエリを使用できます。
SELECT /*+ RANGE_JOIN(points, 10) */ *
FROM points JOIN ranges ON points.p >= ranges.start AND points.p < ranges.end;
SELECT /*+ RANGE_JOIN(r1, 0.1) */ *
FROM (SELECT * FROM ranges WHERE ranges.amount < 100) r1, ranges r2
WHERE r1.start < r2.start + 100 AND r2.start < r1.start + 100;
SELECT /*+ RANGE_JOIN(c, 500) */ *
FROM a
JOIN b ON (a.b_key = b.id)
JOIN c ON (a.ts BETWEEN c.start_time AND c.end_time)
注意
3 番目の例では、ヒントを c
に配置する "必要" があります。
これは、結合が左結合であり、そのためクエリが (a JOIN b) JOIN c
として解釈され、a
のヒントが、c
との結合ではなく、a
と b
の結合に適用されるためです。
#create minute table
minutes = spark.createDataFrame(
[(0, 60), (60, 120)],
"minute_start: int, minute_end: int"
)
#create events table
events = spark.createDataFrame(
[(12, 33), (0, 120), (33, 72), (65, 178)],
"event_start: int, event_end: int"
)
#Range_Join with "hint" on the from table
(events.hint("range_join", 60)
.join(minutes,
on=[events.event_start < minutes.minute_end,
minutes.minute_start < events.event_end])
.orderBy(events.event_start,
events.event_end,
minutes.minute_start)
.show()
)
#Range_Join with "hint" on the join table
(events.join(minutes.hint("range_join", 60),
on=[events.event_start < minutes.minute_end,
minutes.minute_start < events.event_end])
.orderBy(events.event_start,
events.event_end,
minutes.minute_start)
.show()
)
また、結合対象のいずれかの DataFrame に範囲結合ヒントを配置することもできます。 その場合、ヒントには、数値ビン サイズ パラメーターのみ含めます。
val df1 = spark.table("ranges").as("left")
val df2 = spark.table("ranges").as("right")
val joined = df1.hint("range_join", 10)
.join(df2, $"left.type" === $"right.type" &&
$"left.end" > $"right.start" &&
$"left.start" < $"right.end")
val joined2 = df1
.join(df2.hint("range_join", 0.5), $"left.type" === $"right.type" &&
$"left.end" > $"right.start" &&
$"left.start" < $"right.end")
セッション構成を使用して範囲結合を有効にする
クエリを変更したくない場合、構成パラメーターとしてビン サイズを指定できます。
SET spark.databricks.optimizer.rangeJoin.binSize=5
この構成パラメーターは、範囲条件を含む結合に適用されます。 ただし、範囲結合ヒントで設定した異なるビン サイズの方が、パラメーターで設定したビン サイズよりも常に優先されます。
ビンのサイズを選択する
範囲結合の最適化の効果は、適切なビン サイズを選択するかどうかによって決まります。
ビン サイズを小さくすると、より多くのビンが生成されます。これは、一致候補のフィルター処理に役立ちます。
ただし、ビン サイズが検出された値間隔よりも大幅に小さく、値間隔が複数の "ビン" 間隔と重なる場合は、非効率的になります。 たとえば、start
が 1,000,000 で、end
が 1,999,999 で、ビン サイズが 10 である条件 p BETWEEN start AND end
では、値間隔は 100,000 個のビンに重なります。
間隔の長さが判明しており、きわめて均一である場合、ビン サイズを、通常予想される値間隔の長さに設定することをお勧めします。 ただし、間隔の長さが多様で不均一である場合は、短い間隔を効率的にフィルター処理し、一方で長い間隔が非常に多くのビンに重ならないようにするビン サイズを設定するようにバランスを取る必要があります。テーブル ranges
で、列 start
と end
の間に間隔がある場合、次のクエリを使用して、不均一な間隔長値のさまざまなパーセンタイルを特定できます。
SELECT APPROX_PERCENTILE(CAST(end - start AS DOUBLE), ARRAY(0.5, 0.9, 0.99, 0.999, 0.9999)) FROM ranges
ビン サイズの推奨設定は、最大として 90 番目のパーセンタイルの値、99 番目のパーセンタイルの値を 10 で割った値、99.9 番目のパーセンタイルの値を 100 で割った値などです。 理由を次に示します。
- 90 番目のパーセンタイルの値がビン サイズの場合、値間隔長の 10% のみ、ビン間隔よりも長くなり、そのため、2 つの隣接するビン間隔を超えた範囲に及びます。
- 99 番目のパーセンタイルの値がビン サイズの場合、値間隔長の 1% だけが 11 個の隣接するビン間隔を超えた範囲に及びます。
- 99.9 番目のパーセンタイルの値がビン サイズの場合、値間隔長の 0.1% だけが 101 個の隣接するビン間隔を超えた範囲に及びます。
- 必要に応じて、99.99 番目、99.999 番目のパーセンタイルの値などにも同じことを繰り返すことができます。
説明されている方法では、複数のビン間隔に重なる、不均一な長さの値間隔の量が制限されます。 この方法で取得したビン サイズ値は、微調整のための開始ポイントに過ぎません。実際の結果は、特定のワークロードによって異なる場合があります。