Apache Kafka と Azure Databricks を使用したストリーム処理
この記事では、Azure Databricks で構造化ストリーミング ワークロードを実行するときに、Apache Kafka をソースまたはシンクとして使用する方法について説明します。
Kafka の詳細については、Kafka のドキュメントを参照してください。
Kafka からデータを読み取る
Kafka から読み取られたストリーミングの例を次に示します。
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
)
Azure Databricks では、次の例に示すように、Kafka データ ソースのバッチ読み取りセマンティクスもサポートされています。
df = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
)
増分バッチ読み込みの場合、Databricks では、Trigger.AvailableNow
で Kafka を使用することをお勧めします。 「増分バッチ処理の構成」を参照してください。
Databricks Runtime 13.3 LTS 以降では、Azure Databricks には Kafka データを読み取るための SQL 関数が用意されています。 SQL を使用したストリーミングは、Delta Live Tables または Databricks SQL のストリーミング テーブルでのみサポートされます。 「read_kafka テーブル値関数」を参照してください。
Kafka 構造化ストリーミング リーダーを構成する
Azure Databricks には、Kafka 0.10 以降への接続を構成するためのデータ形式として kafka
キーワードが用意されています。
Kafka の最も一般的な構成を次に示します。
サブスクライブするトピックを指定するには、複数の方法があります。 次のパラメーターのいずれか 1 つのみを指定する必要があります。
オプション | 値 | 説明 |
---|---|---|
subscribe | トピックのコンマ区切りリスト。 | サブスクライブするトピックの一覧。 |
subscribePattern | Java 正規表現文字列。 | トピックのサブスクライブに使用するパターン。 |
assign | JSON 文字列 {"topicA":[0,1],"topic":[2,4]} 。 |
使用する特定の topicPartition。 |
その他の注目すべき構成:
オプション | 値 | Default value | 説明 |
---|---|---|---|
kafka.bootstrap.servers | "ホスト:ポート" のコンマ区切りリストです。 | empty | [必須] Kafka bootstrap.servers の構成。 Kafka からのデータがないことがわかった場合は、まず、ブローカー アドレスの一覧を確認します。 ブローカー アドレスの一覧が正しくない場合、エラーは発生しないことがあります。 これは、Kafka クライアントによって、ブローカーは最終的に利用可能になると想定され、ネットワーク エラーが発生した場合は永久に再試行されるためです。 |
failOnDataLoss | true または false 。 |
true |
[省略可能] データが失われる恐れがある場合にクエリを失敗させるかどうか。 クエリは、トピックの削除、処理前のトピックの切り捨てなど、多くのシナリオが原因で、Kafka からのデータの読み取りに永続的に失敗する可能性があります。 データが失われた可能性があるかどうかについて控えめな推定を試みます。 これにより、誤ってアラームが発生する場合があります。 予期したとおりに動作しない場合、またはデータ損失にかかわらずクエリの処理を続行する場合は、このオプションを false に設定します。 |
minPartitions | >= 0 の整数、0 = 無効。 | 0 (無効) | [省略可能] Kafka から読み取るパーティションの最小数。 minPartitions オプションを使用して、Kafka から読み取るパーティションの最小数に任意の数を指定して Spark を構成することができます。 通常、Spark では、Kafka の topicPartitions と、Kafka から使用する Spark パーティションが 1 対 1 でマッピングされます。 minPartitions オプションを、Kafka の topicPartitions よりも大きな値に設定すると、Spark では、大きな Kafka パーティションが小さな部分に分割されます。 負荷のピーク時、データ スキュー時、ストリームが遅れているときに、このオプションを設定すると、処理速度を上げることができます。 トリガーごとに Kafka コンシューマーを初期化すると、Kafka に接続するときに SSL を使用する場合、パフォーマンスに影響を与える可能性があります。 |
kafka.group.id | Kafka コンシューマー グループ ID。 | 設定しない | [省略可能] Kafka から読み取り中に使用するグループ ID。 注意して使用する必要があります。 既定では、各クエリによって、データを読み取るための一意のグループ ID が生成されます。 これにより、各クエリでは、独自のコンシューマー グループが使用され、他のコンシューマーによる干渉を受けないため、サブスクライブされたトピックのすべてのパーティションを確実に読み取ることができます。 一部のシナリオ (たとえば、Kafka グループ ベースの承認) では、データを読み取るために特定の承認済みグループ ID を使用することが必要な場合があります。 必要に応じて、グループ ID を設定できます。 ただし、予期しない動作が発生する可能性があるため、細心の注意を払って設定してください。 - クエリ (バッチとストリーミングの両方) を同じグループ ID で同時実行すると、相互に干渉し合い、各クエリでデータの一部しか読み取れなくなる可能性があります。 - これは、クエリを立て続けに起動または再起動した場合にも発生する可能性があります。 このような問題を最小限に抑えるには、Kafka コンシューマー構成 session.timeout.ms が非常に小さくなるように設定します。 |
startingOffsets | earliest、latest | latest | [省略可能] クエリが開始されたときの開始点。最も古いオフセットの "earliest" か、各 TopicPartition の開始オフセットを指定する JSON 文字列。 JSON では、オフセットとして -2 を使用して earliest を、-1 で latest を示すことができます。 注: バッチ クエリの場合、latest (暗黙的に、または JSON で -1 を使用して) は許可されません。 ストリーミング クエリの場合、これは新しいクエリが開始されるときにのみ適用され、再開では常にクエリが中止された時点の情報が引き継がれます。 クエリ中に新しく検出されたパーティションは、earliest で開始されます。 |
他の省略可能な構成については、「構造化ストリーミング + Kafka 統合ガイド」を参照してください。
Kafka レコードのスキーマ
Kafka レコードのスキーマは次のとおりです。
列 | Type |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | long |
timestampType | int |
key
と value
は、ByteArrayDeserializer
を使用して常にバイト配列として逆シリアル化されます。 キーと値を明示的に逆シリアル化するには、DataFrame 操作 (cast("string")
など) を使用します。
Kafka にデータを書き込む
Kafka へのストリーミング書き込みの例を次に示します。
(df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.start()
)
Azure Databricks では、次の例に示すように、Kafka データ シンクへのバッチ書き込みセマンティクスもサポートされています。
(df
.write
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.save()
)
Kafka 構造化ストリーミング ライターを構成する
重要
Databricks Runtime 13.3 LTS 以降には、既定でべき等の書き込みを有効にする新しいバージョンの kafka-clients
ライブラリが含まれています。 Kafka シンクでバージョン 2.8.0 以下を使用していて、ACL が構成されているが IDEMPOTENT_WRITE
が有効になっていない場合、書き込みはエラー メッセージ org.apache.kafka.common.KafkaException:
Cannot execute transactional method because we are in an error state
で失敗します。
このエラーを解決するには、Kafka バージョン 2.8.0 以降にアップグレードするか、構造化ストリーミング ライターの構成時に .option(“kafka.enable.idempotence”, “false”)
を設定します。
DataStreamWriter に提供されるスキーマは、Kafka シンクと対話します。 次のフィールドを使用できます。
列名 | 必須または省略可能 | Type |
---|---|---|
key |
省略可能 | STRING または BINARY |
value |
必須 | STRING または BINARY |
headers |
省略可能 | ARRAY |
topic |
省略可能 (topic がライター オプションとして設定されている場合は無視されます) |
STRING |
partition |
省略可能 | INT |
Kafka への書き込み中に設定される一般的なオプションを次に示します。
オプション | 値 | Default value | 説明 |
---|---|---|---|
kafka.boostrap.servers |
<host:port> のコンマ区切りリスト。 |
なし | [必須] Kafka bootstrap.servers の構成。 |
topic |
STRING |
設定しない | [省略可能] すべての行のトピックが書き込まれるように設定します。 このオプションは、データに存在するすべてのトピック列をオーバーライドします。 |
includeHeaders |
BOOLEAN |
false |
[省略可能] 行に Kafka ヘッダーを含めるかどうか。 |
他の省略可能な構成については、「構造化ストリーミング + Kafka 統合ガイド」を参照してください。
Kafka メトリックを取得する
avgOffsetsBehindLatest
、maxOffsetsBehindLatest
、minOffsetsBehindLatest
メトリックを使用して、サブスクライブされたすべてのトピックの中で、ストリーミング クエリが利用可能な最新のオフセットより後であるオフセットの平均、最小、最大数を取得できます。 「対話形式によるメトリックの読み取り」を参照してください。
注意
Databricks Runtime 9.1 以降で使用できます。
estimatedTotalBytesBehindLatest
の値を調べることで、クエリ プロセスによって消費されていない推定合計バイト数をサブスクライブされたトピックから取得します。 この推定値は、過去 300 秒間に処理されたバッチ数に基づきます。 推定値の基になる期間は、オプション bytesEstimateWindowLength
を別の値に設定することによって変更できます。 たとえば、10 分に設定するには、次のようにします。
df = (spark.readStream
.format("kafka")
.option("bytesEstimateWindowLength", "10m") # m for minutes, you can also use "600s" for 600 seconds
)
ノートブックでストリームを実行している場合、これらのメトリックは、ストリーミング クエリの進行状況ダッシュボードの [生データ] タブに表示されます。
{
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic]]",
"metrics" : {
"avgOffsetsBehindLatest" : "4.0",
"maxOffsetsBehindLatest" : "4",
"minOffsetsBehindLatest" : "4",
"estimatedTotalBytesBehindLatest" : "80.0"
},
} ]
}
SSL を使用して Azure Databricks を Kafka に接続する
Kafka への SSL 接続を有効にするには、Confluent のドキュメント「SSL による暗号化と認証」の手順に従います。 オプションとして、そこで説明されている構成を、プレフィックス kafka.
を付けて指定できます。 たとえば、プロパティ kafka.ssl.truststore.location
で、信頼ストアの場所を指定します。
Databricks では、次のことが推奨されています。
- 証明書をクラウド オブジェクト ストレージに格納します。 証明書へのアクセスは、Kafka にアクセスできるクラスターのみに制限できます。 「Unity Catalog を使用したデータ ガバナンス」を参照してください。
- 証明書のパスワードをシークレットとしてシークレット スコープに格納します。
次の例では、オブジェクト ストレージの場所と Databricks シークレットを使用して SSL 接続を有効にします。
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", <truststore-location>)
.option("kafka.ssl.keystore.location", <keystore-location>)
.option("kafka.ssl.keystore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<keystore-password-key-name>))
.option("kafka.ssl.truststore.password", dbutils.secrets.get(scope=<certificate-scope-name>,key=<truststore-password-key-name>))
)
HDInsight 上の Kafka を Azure Databricks に接続する
HDInsight Kafka クラスターを作成します。
手順については、「Azure Virtual Network 経由で HDInsight 上の Apache Kafka に接続する」を参照してください。
正しいアドレスをアドバタイズするように Kafka ブローカーを構成します。
「IP をアドバタイズするように Kafka を構成する」の手順に従います。 Azure Virtual Machines で Kafka を自分で管理する場合、ブローカーの
advertised.listeners
構成がホストの内部 IP に設定されていることを確認してください。Azure Databricks クラスターを作成します。
Kafka クラスターを Azure Databricks クラスターにピアリングします。
「仮想ネットワークをピアリングする」の手順に従います。
Microsoft Entra ID と Azure Event Hubs を使用したサービス プリンシパル認証
Azure Databricks は、Event Hubs サービスを使用した Spark ジョブの認証をサポートしています。 認証は Microsoft Entra ID によって OAuth で経由で行われます。
Azure Databricks では、以下のコンピューティング環境で、クライアント ID とシークレットを使用した Microsoft Entra ID 認証をサポートしています。
- シングル ユーザー アクセス モードで構成されたコンピューティング上の Databricks Runtime 12.2 LTS 以降。
- 共有アクセス モードで構成されたコンピューティング上の Databricks Runtime 14.3 LTS 以降。
- Unity Catalog を使用せずに構成された Delta Live Tables パイプライン。
Azure Databricks では、コンピューティング環境内または Unity Catalog を使用して構成された Delta Live Tables パイプラインで、証明書を使用した Microsoft Entra ID 認証はサポートされていません。
この認証は、共有クラスターや Unity Catalog の Delta Live Tables では機能しません。
構造化ストリーミング Kafka コネクタの構成
Microsoft Entra ID で認証を実行するには、次の値が必要です。
テナント ID。 これは、Microsoft Entra ID の [サービス] タブにあります。
clientID (アプリケーション ID とも呼ばれます)。
クライアント シークレット。 これを取得したら、シークレットとして Databricks ワークスペースに追加する必要があります。 このシークレットを追加するには、「シークレットの管理」を参照してください。
EventHubs トピック。 トピックの一覧は、特定の Event Hubs 名前空間ページの [エンティティ] セクションの [Event Hubs] セクションにあります。 複数のトピックを操作するために、Event Hubs レベルで IAM ロールを設定できます。
EventHubs サーバー。 これは、特定の Event Hubs 名前空間の概要ページにあります。
さらに、Entra ID を使用するには、OAuth SASL メカニズム (SASL は汎用プロトコルであり、OAuth は SASL "メカニズム" の一種です) を使用するように Kafka に指示する必要があります。
kafka.security.protocol
は、SASL_SSL
である必要があります。kafka.sasl.mechanism
は、OAUTHBEARER
である必要があります。kafka.sasl.login.callback.handler.class
は、シェーディングされた Kafka クラスのログイン コールバック ハンドラーについて値kafkashaded
を持つ Java クラスの完全修飾名にする必要があります。 正確なクラスについては、次の例を参照してください。
例
次に、実行例を見てみましょう。
Python
# This is the only section you need to modify for auth purposes!
# ------------------------------
tenant_id = "..."
client_id = "..."
client_secret = dbutils.secrets.get("your-scope", "your-secret-name")
event_hubs_server = "..."
event_hubs_topic = "..."
# -------------------------------
sasl_config = f'kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
# Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers": f"{event_hubs_server}:9093",
"kafka.sasl.jaas.config": sasl_config,
"kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
"subscribe": event_hubs_topic,
# You should not need to modify these
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.mechanism": "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class": "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
}
df = spark.readStream.format("kafka").options(**kafka_options)
display(df)
Scala
// This is the only section you need to modify for auth purposes!
// -------------------------------
val tenantId = "..."
val clientId = "..."
val clientSecret = dbutils.secrets.get("your-scope", "your-secret-name")
val eventHubsServer = "..."
val eventHubsTopic = "..."
// -------------------------------
val saslConfig = s"""kafkashaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="$clientId" clientSecret="$clientSecret" scope="https://$eventHubsServer/.default" ssl.protocol="SSL";"""
val kafkaOptions = Map(
// Port 9093 is the EventHubs Kafka port
"kafka.bootstrap.servers" -> s"$eventHubsServer:9093",
"kafka.sasl.jaas.config" -> saslConfig,
"kafka.sasl.oauthbearer.token.endpoint.url" -> s"https://login.microsoft.com/$tenantId/oauth2/v2.0/token",
"subscribe" -> eventHubsTopic,
// You should not need to modify these
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "OAUTHBEARER",
"kafka.sasl.login.callback.handler.class" -> "kafkashaded.org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler"
)
val scalaDF = spark.readStream
.format("kafka")
.options(kafkaOptions)
.load()
display(scalaDF)
潜在的なエラーの処理
ストリーミング オプションがサポートされません。
Unity Catalog を使用して構成された Delta Live Tables パイプラインでこの認証メカニズムを使用しようとすると、次のエラーが表示されることがあります。
このエラーを解決するには、サポートされているコンピューティング構成を使用します。 「Microsoft Entra ID と Azure Event Hubs を使用したサービス プリンシパル認証」を参照してください。
新しい
KafkaAdminClient
の作成に失敗しました。これは、次のいずれかの認証オプションが正しくない場合に Kafka がスローする内部エラーです。
- クライアント ID (アプリケーション ID とも呼ばれます)
- テナント ID
- Event Hubs サーバー
エラーを解決するには、これらのオプションの値が正しいことを確認します。
また、この例で既定で提供されている (変更しないように求められた)
kafka.security.protocol
などの構成オプションを変更すると、このエラーが表示される場合があります。返されるレコードがありません
データ フレームを表示または処理しようとしているが結果が得られない場合は、UI に次の情報が表示されます。
このメッセージは、認証は成功したが、Event Hubs がデータを返さなかったことを意味します。 次のような理由が考えられます (ただし、決して網羅的ではありません)。
- 正しくない EventHubs トピックを指定しました。
startingOffsets
の既定の Kafka 構成オプションがlatest
であり、現在、トピック経由でデータをまだ受信していません。 Kafka の最も古いオフセットからデータの読み取りを開始するように、startingOffsetstoearliest
を設定できます。