Durable Functions での外部イベントの処理 (Azure Functions)
[アーティクル] 05/10/2024
12 人の共同作成者
フィードバック
この記事の内容
オーケストレーター関数には、外部イベントを待ち受ける機能があります。 Durable Functions のこの機能は、多くの場合、人による操作または他の外部トリガーを処理するときに便利です。
Note
外部イベントは、一方向の非同期操作です。 それらは、イベントを送信するクライアントが、オーケストレーター関数からの同期応答を必要とする状況には適していません。
イベントを待つ
オーケストレーション トリガー バインド の "wait-for-external-event" API を使用すると、オーケストレーター関数で外部クライアントから配信されたイベントを非同期に待機してリッスンできるようになります。 リスニング オーケストレーター関数は、受信するイベントの "名前 " と "データのシェイプ " を宣言します。
[FunctionName("BudgetApproval")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
bool approved = await context.WaitForExternalEvent<bool>("Approval");
if (approved)
{
// approval granted - do the approved action
}
else
{
// approval denied - send a notification
}
}
Note
前記の C# コードは Durable Functions 2.x 用です。 Durable Functions 1.x の場合、IDurableOrchestrationContext
の代わりに DurableOrchestrationContext
を使用する必要があります。 バージョン間の相違点の詳細については、Durable Functions のバージョン に関する記事を参照してください。
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const approved = yield context.df.waitForExternalEvent("Approval");
if (approved) {
// approval granted - do the approved action
} else {
// approval denied - send a notification
}
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
approved = yield context.wait_for_external_event('Approval')
if approved:
# approval granted - do the approved action
else:
# approval denied - send a notification
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$approved = Start-DurableExternalEventListener -EventName "Approval"
if ($approved) {
# approval granted - do the approved action
} else {
# approval denied - send a notification
}
@FunctionName("WaitForExternalEvent")
public void waitForExternalEvent(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
boolean approved = ctx.waitForExternalEvent("Approval", boolean.class).await();
if (approved) {
// approval granted - do the approved action
} else {
// approval denied - send a notification
}
}
前の例では、特定の 1 つのイベントをリッスンし、受信したときにアクションを実行します。
次の例に示すように、複数のイベントを同時にリッスンできます。この例では、発生する可能性がある 3 つのイベント通知のいずれかを待っています。
[FunctionName("Select")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var event1 = context.WaitForExternalEvent<float>("Event1");
var event2 = context.WaitForExternalEvent<bool>("Event2");
var event3 = context.WaitForExternalEvent<int>("Event3");
var winner = await Task.WhenAny(event1, event2, event3);
if (winner == event1)
{
// ...
}
else if (winner == event2)
{
// ...
}
else if (winner == event3)
{
// ...
}
}
Note
前記の C# コードは Durable Functions 2.x 用です。 Durable Functions 1.x の場合、IDurableOrchestrationContext
の代わりに DurableOrchestrationContext
を使用する必要があります。 バージョン間の相違点の詳細については、Durable Functions のバージョン に関する記事を参照してください。
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const event1 = context.df.waitForExternalEvent("Event1");
const event2 = context.df.waitForExternalEvent("Event2");
const event3 = context.df.waitForExternalEvent("Event3");
const winner = yield context.df.Task.any([event1, event2, event3]);
if (winner === event1) {
// ...
} else if (winner === event2) {
// ...
} else if (winner === event3) {
// ...
}
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
event1 = context.wait_for_external_event('Event1')
event2 = context.wait_for_external_event('Event2')
event3 = context.wait_for_external_event('Event3')
winner = yield context.task_any([event1, event2, event3])
if winner == event1:
# ...
elif winner == event2:
# ...
elif winner == event3:
# ...
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$event1 = Start-DurableExternalEventListener -EventName "Event1" -NoWait
$event2 = Start-DurableExternalEventListener -EventName "Event2" -NoWait
$event3 = Start-DurableExternalEventListener -EventName "Event3" -NoWait
$winner = Wait-DurableTask -Task @($event1, $event2, $event3) -Any
if ($winner -eq $event1) {
# ...
} else if ($winner -eq $event2) {
# ...
} else if ($winner -eq $event3) {
# ...
}
@FunctionName("Select")
public void selectOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
Task<Void> event1 = ctx.waitForExternalEvent("Event1");
Task<Void> event2 = ctx.waitForExternalEvent("Event2");
Task<Void> event3 = ctx.waitForExternalEvent("Event3");
Task<?> winner = ctx.anyOf(event1, event2, event3).await();
if (winner == event1) {
// ...
} else if (winner == event2) {
// ...
} else if (winner == event3) {
// ...
}
}
前の例では、複数のイベントの "いずれか " をリッスンします。 "すべて " のイベントを待つこともできます。
[FunctionName("NewBuildingPermit")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string applicationId = context.GetInput<string>();
var gate1 = context.WaitForExternalEvent("CityPlanningApproval");
var gate2 = context.WaitForExternalEvent("FireDeptApproval");
var gate3 = context.WaitForExternalEvent("BuildingDeptApproval");
// all three departments must grant approval before a permit can be issued
await Task.WhenAll(gate1, gate2, gate3);
await context.CallActivityAsync("IssueBuildingPermit", applicationId);
}
Note
前のコードは Durable Functions 2.x 用です。 Durable Functions 1.x の場合、IDurableOrchestrationContext
の代わりに DurableOrchestrationContext
を使用する必要があります。 バージョン間の相違点の詳細については、Durable Functions のバージョン に関する記事を参照してください。
.NET では、イベント ペイロードを想定された型 T
に変換できない場合、例外がスローされます。
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const applicationId = context.df.getInput();
const gate1 = context.df.waitForExternalEvent("CityPlanningApproval");
const gate2 = context.df.waitForExternalEvent("FireDeptApproval");
const gate3 = context.df.waitForExternalEvent("BuildingDeptApproval");
// all three departments must grant approval before a permit can be issued
yield context.df.Task.all([gate1, gate2, gate3]);
yield context.df.callActivity("IssueBuildingPermit", applicationId);
});
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
application_id = context.get_input()
gate1 = context.wait_for_external_event('CityPlanningApproval')
gate2 = context.wait_for_external_event('FireDeptApproval')
gate3 = context.wait_for_external_event('BuildingDeptApproval')
yield context.task_all([gate1, gate2, gate3])
yield context.call_activity('IssueBuildingPermit', application_id)
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$applicationId = $Context.Input
$gate1 = Start-DurableExternalEventListener -EventName "CityPlanningApproval" -NoWait
$gate2 = Start-DurableExternalEventListener -EventName "FireDeptApproval" -NoWait
$gate3 = Start-DurableExternalEventListener -EventName "BuildingDeptApproval" -NoWait
Wait-DurableTask -Task @($gate1, $gate2, $gate3)
Invoke-ActivityFunction -FunctionName 'IssueBuildingPermit' -Input $applicationId
@FunctionName("NewBuildingPermit")
public void newBuildingPermit(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String applicationId = ctx.getInput(String.class);
Task<Void> gate1 = ctx.waitForExternalEvent("CityPlanningApproval");
Task<Void> gate2 = ctx.waitForExternalEvent("FireDeptApproval");
Task<Void> gate3 = ctx.waitForExternalEvent("BuildingDeptApproval");
// all three departments must grant approval before a permit can be issued
ctx.allOf(List.of(gate1, gate2, gate3)).await();
ctx.callActivity("IssueBuildingPermit", applicationId).await();
}
"wait-for-external-event" API は、何らかの入力を無期限に待機します。 関数アプリは、待機中に安全にアンロードすることができます。 これは、このオーケストレーション インスタンスのイベントが到着すると、自動的に起動され、すぐにそのイベントを処理します。
Note
関数アプリに従量課金プランを使用している場合、オーケストレーター関数が外部イベントのタスクを待っている間は、待ち時間がどんなに長くなっても課金されません。
アクティビティ関数と同様に、外部イベントには "1 回以上" の配信保証があります。 つまり、特定の条件 (再起動、スケーリング、クラッシュなど) で、アプリケーションが同じ外部イベントを重複して受け取ることがあります。 そのため、オーケストレーターで手動で重複を解除できるように、外部イベントに何らかの ID を含めることをお勧めします。
送信イベント
オーケストレーション クライアント バインドによって定義された "raise-event" API を使用して、オーケストレーションに外部イベントを送信できます。 組み込みの raise event HTTP API を使用して、外部のイベントをオーケストレーションに送信することもできます。
生成されたイベントには、instance ID 、eventName 、eventData パラメーターとして含まれます。 オーケストレーター関数は、"wait-for-external-event" API を使用してこれらのイベントを処理します。 イベントが処理されるためには、送信側と受信側の両方で、eventName が一致している必要があります。 また、イベント データは、JSON でシリアル化できる必要があります。
内部的には、"raise event" メカニズムが、待機中のオーケストレーター関数によって取得されるメッセージをエンキューします。 指定した "イベント名 " でインスタンスが待機していない場合、イベント メッセージがインメモリ キューに追加されます。 オーケストレーション インスタンスが後でその "イベント名 " のリッスンを開始した場合、キューにイベント メッセージがあるかどうかを確認します。
Note
指定した "インスタンス ID " のオーケストレーション インスタンスが存在しない場合、イベント メッセージは破棄されます。
次の例は、キューによってトリガーされる関数で、"承認" イベントをオーケストレーター関数インスタンスに送信します。 オーケストレーション インスタンス ID は、キュー メッセージの本文から取得されます。
[FunctionName("ApprovalQueueProcessor")]
public static async Task Run(
[QueueTrigger("approval-queue")] string instanceId,
[DurableClient] IDurableOrchestrationClient client)
{
await client.RaiseEventAsync(instanceId, "Approval", true);
}
Note
前記の C# コードは Durable Functions 2.x 用です。 Durable Functions 1.x では、DurableClient
属性の代わりに OrchestrationClient
属性を使用する必要があります。また、IDurableOrchestrationClient
ではなく DurableOrchestrationClient
パラメーター型を使用する必要があります。 バージョン間の相違点の詳細については、Durable Functions のバージョン に関する記事を参照してください。
const df = require("durable-functions");
module.exports = async function(context, instanceId) {
const client = df.getClient(context);
await client.raiseEvent(instanceId, "Approval", true);
};
import azure.functions as func
import azure.durable_functions as df
async def main(instance_id:str, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
await client.raise_event(instance_id, 'Approval', True)
param($instanceId)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName "Approval"
@FunctionName("ApprovalQueueProcessor")
public void approvalQueueProcessor(
@QueueTrigger(name = "instanceID", queueName = "approval-queue") String instanceID,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
durableContext.getClient().raiseEvent(instanceID, "Approval", true);
}
内部的には、"raise event" API が、待機中のオーケストレーター関数によって取得されるメッセージをエンキューします。 インスタンスが指定した "イベント名" で待機していない場合、イベント メッセージがインメモリ バッファーに追加されます。 オーケストレーション インスタンスが後でその "イベント名" のリッスンを開始すると、バッファー内でイベント メッセージの存在が確認され、待機していたタスクがトリガーされます。
Note
指定した "インスタンス ID " のオーケストレーション インスタンスが存在しない場合、イベント メッセージは破棄されます。
HTTP
次に示したのは、オーケストレーション インスタンスに "Approval" イベントを発生させる HTTP 要求の例です。
POST /runtime/webhooks/durabletask/instances/MyInstanceId/raiseEvent/Approval&code=XXX
Content-Type: application/json
"true"
このケースでは、インスタンス ID が MyInstanceId としてハードコーディングされています。
次のステップ