read_pubsub
ストリーミングテーブル値関数
適用対象: Databricks SQL Databricks Runtime 13.3 LTS 以降
トピックの Pub/Sub から読み取られたレコードを含むテーブルを返します。 ストリーミング クエリのみをサポートします。
構文
read_pubsub( { parameter => value } [, ...])
引数
read_pubsub
には、名前付きパラメーター呼び出しが必要です。
必須の引数は subscriptionId
、projectId
、および topicId
のみです。 他の引数はすべて省略可能です。
詳細な引数の説明については、「Pub/Sub ストリーミング読み取りのオプションを構成する」を参照してください。
Databricks では、承認オプションを提供するときにシークレットを使用することをお勧めします。 「secret 関数」を参照してください。
Pub/Sub へのアクセスの構成の詳細については、「Pub/Sub へのアクセスを構成する」を参照してください。
パラメーター | 型 | 説明 |
---|---|---|
subscriptionId |
STRING |
必須。Pub/Sub サブスクリプションに割り当てられた一意識別子。 |
projectId |
STRING |
必須。Pub/Sub トピックに関連付けられている Google Cloud プロジェクト ID。 |
topicId |
STRING |
必須。サブスクライブする Pub/Sub トピックの ID または名前。 |
clientEmail |
STRING |
認証用のサービス アカウントに関連付けられているメール アドレス。 |
clientId |
STRING |
認証用のサービス アカウントに関連付けられているクライアント ID。 |
privateKeyId |
STRING |
サービス アカウントに関連付けられている秘密キーの ID。 |
privateKey |
STRING |
認証用のサービス アカウントに関連付けられている秘密キー。 |
これらの引数は、Pub/Sub から読み取るときにさらに微調整するために使用されます。
パラメーター | 型 | 説明 |
---|---|---|
numFetchPartitions |
STRING |
省略可能。Executor の既定の数。 サブスクリプションからレコードをフェッチする並列 Spark タスクの数。 |
deleteSubscriptionOnStreamStop |
BOOLEAN |
省略可能。既定値は false 。 true に設定すると、ストリームに渡されたサブスクリプションが、ストリーミング ジョブの終了時に削除されます。 |
maxBytesPerTrigger |
STRING |
トリガーされる各マイクロバッチの間に処理されるバッチ サイズのソフト制限。 既定値は ‘none’ です。 |
maxRecordsPerFetch |
STRING |
レコードを処理する前にタスクごとにフェッチするレコードの数。 既定値は ‘1000’ です。 |
maxFetchPeriod |
STRING |
レコードを処理する前に取得する各タスクの時間。 既定値は '10s' です。 |
返品
次のスキーマを持つ Pub/Sub レコードのテーブル。 属性列は null の可能性がありますが、他のすべての列は null ではありません。
名前 | データ型 | Nullable | Standard | 説明 |
---|---|---|---|---|
messageId |
STRING |
いいえ | Pub/Sub メッセージの一意識別子。 | |
payload |
BINARY |
いいえ | Pub/Sub メッセージのコンテンツ。 | |
attributes |
STRING |
はい | Pub/Sub メッセージの属性を表すキーと値のペア。 これは json でエンコードされた文字列です。 | |
publishTimestampInMillis |
BIGINT |
いいえ | メッセージが発行されたときのタイムスタンプ (ミリ秒単位)。 | |
sequenceNumber |
BIGINT |
いいえ | シャード内のレコードの一意識別子。 |
例
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
さらに分析するには、testing.streaming_table
に対してクエリを実行してデータを入手する必要があります。
誤ったクエリ:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);