Azure Functions の Apache Kafka 出力バインド
出力バインドを使用すると、Azure Functions アプリで Kafka トピックにメッセージを書き込むことができます。
重要
Kafka バインドは、エラスティック Premium プランおよび 専用 (App Service) プランの Functions でのみ使用できます。 これらは、バージョン 3.x 以降のバージョンの Functions ランタイムでのみサポートされます。
例
バインドの使用方法は、拡張機能パッケージのバージョンと、関数アプリで使用される C# のモダリティによって異なり、次のいずれかになります。
分離ワーカー プロセス クラス ライブラリでコンパイルされた C# 関数は、ランタイムから分離されたプロセスで実行されます。
使用する属性は、個別のイベント プロバイダーによって異なります。
次の例には、HTTP 応答と Kafka 出力で構成されるカスタム戻り値の型 MultipleOutputType
があります。
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
クラス MultipleOutputType
では、Kevent
が Kafka バインドの出力バインド変数です。
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
イベントのバッチを送信するには、次の例に示すように、出力の種類に文字列配列を渡します。
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
文字列配列は、出力バインドが定義されているクラスの Kevents
プロパティとして定義されます。
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
次の関数は、Kafka 出力データにヘッダーを追加します。
[Function("KafkaOutputWithHeaders")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = kevent,
HttpResponse = response
};
}
動作する .NET の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。
注意
同等の TypeScript の例については、Kafka 拡張機能リポジトリを参照してください
function.json ファイルの個別のプロパティは、イベント プロバイダーによって異なります。ここにあげた例では Confluent または Azure Event Hubs のいずれかです。 次の例は、HTTP 要求によってトリガーされ、要求から Kafka トピックにデータを送信する関数の Kafka 出力バインドを示しています。
この function.json では、次の例の特定のプロバイダーのトリガーを定義します。
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "ConfluentCloudUsername",
"password": "ConfluentCloudPassword",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}
次に、以下のコードで、トピックにメッセージが送信されます。
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message);
context.bindings.outputKafkaMessage = message;
context.res = {
// status: 200, /* Defaults to 200 */
body: 'Ok'
};
}
次のコードは、複数のメッセージを配列として同じトピックに送信します。
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
context.bindings.outputKafkaMessages = ["one", "two"];
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
次の例に、ヘッダーを含むイベント メッセージを同じ Kafka トピックに送信する方法を示します。
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message || (req.body && req.body.message));
const responseMessage = message
? "Message received: " + message + ". The message transfered to the kafka broker."
: "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
動作する JavaScript の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。
function.json ファイルの個別のプロパティは、イベント プロバイダーによって異なります。ここにあげた例では Confluent または Azure Event Hubs のいずれかです。 次の例は、HTTP 要求によってトリガーされ、要求から Kafka トピックにデータを送信する関数の Kafka 出力バインドを示しています。
この function.json では、次の例の特定のプロバイダーのトリガーを定義します。
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username" : "%ConfluentCloudUserName%",
"password" : "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "Response"
}
]
}
次に、以下のコードで、トピックにメッセージが送信されます。
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
$message
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
次のコードは、複数のメッセージを配列として同じトピックに送信します。
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
次の例に、ヘッダーを含むイベント メッセージを同じ Kafka トピックに送信する方法を示します。
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
$message = $Request.Body.Message
}
$kevent = @{
Offset = 364
Partition = 0
Topic = "kafkaeventhubtest1"
Timestamp = "2022-04-09T03:20:06.591Z"
Value = $message
Headers= @(@{
Key= "test"
Value= "powershell"
}
)
}
Push-OutputBinding -Name Message -Value $kevent
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
Body = 'ok'
})
動作する PowerShell の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。
function.json ファイルの個別のプロパティは、イベント プロバイダーによって異なります。ここにあげた例では Confluent または Azure Event Hubs のいずれかです。 次の例は、HTTP 要求によってトリガーされ、要求から Kafka トピックにデータを送信する関数の Kafka 出力バインドを示しています。
この function.json では、次の例の特定のプロバイダーのトリガーを定義します。
{
"scriptFile": "main.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"direction": "out",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
次に、以下のコードで、トピックにメッセージが送信されます。
import logging
import azure.functions as func
def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
input_msg = req.params.get('message')
outputMessage.set(input_msg)
return 'OK'
次のコードは、複数のメッセージを配列として同じトピックに送信します。
import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json
def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
outputMessage.set(['one', 'two'])
return 'OK'
次の例に、ヘッダーを含むイベント メッセージを同じ Kafka トピックに送信する方法を示します。
import logging
import azure.functions as func
import json
def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
message = req.params.get('message')
kevent = { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
out.set(json.dumps(kevent))
return 'OK'
動作する Python の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。
出力バインドの構成に使用する注釈は、個別のイベント プロバイダーによって異なります。
次の関数は、Kafka トピックにメッセージを送信します。
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
次の例は、Kafka トピックに複数のメッセージを送信する方法を示しています。
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
この例では、出力バインド パラメーターが文字列配列に変更されています。
最後の例では、これらの KafkaEntity
クラスと KafkaHeader
クラスを使用します。
public class KafkaEntity {
public int Offset;
public int Partition;
public String Timestamp;
public String Topic;
public String Value;
public KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
public class KafkaHeaders{
public String Key;
public String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
次の関数の例では、ヘッダーを含むメッセージを Kafka トピックに送信します。
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
Confluent で動作する Java の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。
属性
インプロセスと分離ワーカー プロセスの C# ライブラリは、どちらも Kafka
属性を使用して関数トリガーを定義します。
次の表では、この属性を使用して設定できるプロパティについて説明します。
パラメーター | 説明 |
---|---|
BrokerList | (必須) 出力の送信先となる Kafka ブローカーのリスト。 詳細については、「接続」を参照してください。 |
トピック | (必須) 出力の送信先となるトピック。 |
AvroSchema | (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。 |
MaxMessageBytes | (省略可能) 送信される出力メッセージの最大サイズ (MB 単位)、既定値は 1 。 |
BatchSize | (省略可能) 1 つのメッセージ セットでバッチ処理されるメッセージの最大数。既定値は 10000 。 |
EnableIdempotence | (省略可能) true に設定すると、メッセージが 1 回だけ元の生成順序で正常に生成される。既定値は false 。 |
MessageTimeoutMs | (省略可能) ローカル メッセージのタイムアウト (ミリ秒単位)。 この値はローカルでのみ適用され、生成されたメッセージが正常に配信されるまで待機する時間を制限します。既定値は 300000 です。 時間 0 は無限です。 この値は、メッセージの配信に使用される最大時間 (再試行を含む) です。 配信エラーは、再試行回数またはメッセージのタイムアウトを超えた場合に発生します。 |
RequestTimeoutMs | (省略可能) 出力要求の受信確認タイムアウト (ミリ秒単位)。既定値は 5000 。 |
MaxRetries | (省略可能) 失敗したメッセージの送信を再試行する回数。既定値は 2 。 EnableIdempotence が true に設定されている場合を除き、再試行すると、並べ替えが発生する場合があります。 |
AuthenticationMode | (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされる値は、Gssapi 、Plain (既定値)、ScramSha256 、ScramSha512 です。 |
ユーザー名 | (省略可能) SASL 認証のユーザー名。 AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
パスワード | (省略可能) SASL 認証のパスワード。 AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
プロトコル | (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされる値は、plaintext (既定値)、ssl 、sasl_plaintext 、sasl_ssl です。 |
SslCaLocation | (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。 |
SslCertificateLocation | (省略可能) クライアントの証明書へのパス。 |
SslKeyLocation | (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。 |
SslKeyPassword | (省略可能) クライアントの証明書のパスワード。 |
注釈
注釈 KafkaOutput
を使用すると、特定のトピックに書き込む関数を作成できます。 サポートされるオプションには、次の要素が含まれます。
要素 | 説明 |
---|---|
name | 関数コード内のブローカー データを表す変数の名前。 |
brokerList | (必須) 出力の送信先となる Kafka ブローカーのリスト。 詳細については、「接続」を参照してください。 |
topic | (必須) 出力の送信先となるトピック。 |
dataType | Functions によりパラメーター値が処理される方法を定義します。 既定では、値は文字列として取得され、Functions により文字列を実際の単純な従来の Java オブジェクト (POJO) への逆シリアル化が試みられます。 string の場合、入力は単なる文字列として扱われます。 binary の場合、メッセージはバイナリ データとして受信され、Functions により実際のパラメーター型 byte[] への逆シリアル化が試みられます。 |
avroSchema | (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。 (現在、Java ではサポートされていません)。 |
maxMessageBytes | (省略可能) 送信される出力メッセージの最大サイズ (MB 単位)、既定値は 1 。 |
batchSize | (省略可能) 1 つのメッセージ セットでバッチ処理されるメッセージの最大数。既定値は 10000 。 |
enableIdempotence | (省略可能) true に設定すると、メッセージが 1 回だけ元の生成順序で正常に生成される。既定値は false 。 |
messageTimeoutMs | (省略可能) ローカル メッセージのタイムアウト (ミリ秒単位)。 この値はローカルでのみ適用され、生成されたメッセージが正常に配信されるまで待機する時間を制限します。既定値は 300000 です。 時間 0 は無限です。 これは、メッセージの配信に使用される最大時間 (再試行を含む) です。 配信エラーは、再試行回数またはメッセージのタイムアウトを超えた場合に発生します。 |
requestTimeoutMs | (省略可能) 出力要求の受信確認タイムアウト (ミリ秒単位)。既定値は 5000 。 |
maxRetries | (省略可能) 失敗したメッセージの送信を再試行する回数。既定値は 2 。 EnableIdempotence が true に設定されている場合を除き、再試行すると、並べ替えが発生する場合があります。 |
authenticationMode | (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされる値は、Gssapi 、Plain (既定値)、ScramSha256 、ScramSha512 です。 |
username | (省略可能) SASL 認証のユーザー名。 AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
password | (省略可能) SASL 認証のパスワード。 AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
protocol | (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされる値は、plaintext (既定値)、ssl 、sasl_plaintext 、sasl_ssl です。 |
sslCaLocation | (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。 |
sslCertificateLocation | (省略可能) クライアントの証明書へのパス。 |
sslKeyLocation | (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。 |
sslKeyPassword | (省略可能) クライアントの証明書のパスワード。 |
構成
次の表は、function.json ファイルで設定したバインド構成のプロパティを説明しています。
function.json のプロパティ | 説明 |
---|---|
type | kafka に設定する必要があります。 |
direction | out に設定する必要があります。 |
name | 関数コード内のブローカー データを表す変数の名前。 |
brokerList | (必須) 出力の送信先となる Kafka ブローカーのリスト。 詳細については、「接続」を参照してください。 |
topic | (必須) 出力の送信先となるトピック。 |
avroSchema | (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。 |
maxMessageBytes | (省略可能) 送信される出力メッセージの最大サイズ (MB 単位)、既定値は 1 。 |
batchSize | (省略可能) 1 つのメッセージ セットでバッチ処理されるメッセージの最大数。既定値は 10000 。 |
enableIdempotence | (省略可能) true に設定すると、メッセージが 1 回だけ元の生成順序で正常に生成される。既定値は false 。 |
messageTimeoutMs | (省略可能) ローカル メッセージのタイムアウト (ミリ秒単位)。 この値はローカルでのみ適用され、生成されたメッセージが正常に配信されるまで待機する時間を制限します。既定値は 300000 です。 時間 0 は無限です。 これは、メッセージの配信に使用される最大時間 (再試行を含む) です。 配信エラーは、再試行回数またはメッセージのタイムアウトを超えた場合に発生します。 |
requestTimeoutMs | (省略可能) 出力要求の受信確認タイムアウト (ミリ秒単位)。既定値は 5000 。 |
maxRetries | (省略可能) 失敗したメッセージの送信を再試行する回数。既定値は 2 。 EnableIdempotence が true に設定されている場合を除き、再試行すると、並べ替えが発生する場合があります。 |
authenticationMode | (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされる値は、Gssapi 、Plain (既定値)、ScramSha256 、ScramSha512 です。 |
username | (省略可能) SASL 認証のユーザー名。 AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
password | (省略可能) SASL 認証のパスワード。 AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
protocol | (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされる値は、plaintext (既定値)、ssl 、sasl_plaintext 、sasl_ssl です。 |
sslCaLocation | (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。 |
sslCertificateLocation | (省略可能) クライアントの証明書へのパス。 |
sslKeyLocation | (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。 |
sslKeyPassword | (省略可能) クライアントの証明書のパスワード。 |
使用法
イベントのオフセット、パーティション、タイムスタンプは実行時に生成されます。 関数内で設定できるのは、値とヘッダーだけです。 トピックは function.json で設定されます。
書き込もうとしている Kafka トピックにアクセスできることを確認します。 Kafka トピックへのアクセスおよび接続資格情報を使用してバインドを構成します。
Premium プランでは、Kafka 出力のランタイム スケール監視を有効にして、複数のインスタンスにスケール アウトできるようにする必要があります。 詳細については、「ランタイム スケールを有効にする」を参照してください。
Kafka トリガーでサポートされている host.json 設定の完全なセットについては、「host.json 設定」を参照してください。
接続
トリガーとバインドに必要なすべての接続情報は、コード内のバインド定義ではなく、アプリケーション設定に保持する必要があります。 これは、コードに格納してはならない資格情報にも言えることです。
重要
資格情報の設定はアプリケーション設定を参照する必要があります。 コードや構成のファイル内に資格情報をハードコーディングしないでください。 ローカルで実行する場合は、資格情報に local.settings.json ファイルを使用します。local.settings.json ファイルは公開しないでください。
Azure の Confluent によって提供されるマネージド Kafka クラスターに接続する場合は、Confluent Cloud 環境の次の認証資格情報がトリガーまたはバインドに設定されていることを確認します。
設定 | 推奨値 | 説明 |
---|---|---|
BrokerList | BootstrapServer |
BootstrapServer という名前のアプリ設定には、Confluent Cloud の設定ページで検出されたブートストラップ サーバーの値が含まれています。 値は xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 のようになります。 |
ユーザー名 | ConfluentCloudUsername |
ConfluentCloudUsername という名前のアプリ設定には、Confluent Cloud Web サイトからの API アクセス キーが含まれています。 |
パスワード | ConfluentCloudPassword |
ConfluentCloudPassword という名前のアプリ設定には、Confluent Cloud Web サイトから取得した API シークレットが含まれています。 |
これらの設定に使用する文字列値は、Azure のアプリケーション設定として、またはローカル開発中に local.settings.json ファイル内の Values
コレクションに存在する必要があります。
また、バインド定義で Protocol
、AuthenticationMode
、SslCaLocation
を設定する必要があります。