Durable Functions とは
Durable Functions は、サーバーレス コンピューティング環境でステートフル関数を記述できる Azure Functions の機能です。 この拡張機能では、Azure Functions プログラミング モデルを使用して、オーケストレーター関数を記述することでステートフル ワークフローを定義でき、エンティティ関数を記述することでステートフル エンティティを定義できます。 拡張機能によって状態、チェックポイント、再起動がバックグラウンドで管理されるため、ユーザーはビジネス ロジックに専念できます。
サポートされている言語
Durable Functions は、Azure Functions のすべてのプログラミング言語で動作するように設計されていますが、各言語によって最小要件が異なる場合があります。 次の表は、サポートされているアプリの最小構成を示しています。
言語スタック | Azure Functions Runtime バージョン | 言語ワーカー バージョン | 最小バンドル バージョン |
---|---|---|---|
.NET / C# / F# | Functions 1.0 以上 | インプロセス アウトプロセス |
該当なし |
JavaScript/TypeScript (V3 prog. モデル) | Functions 2.0 以上 | Node 8 以上 | 2.x バンドル |
JavaScript/TypeScript (V4 prog. モデル) | Functions 4.25 以上 | Node 18+ | 3.15 以上のバンドル |
Python | Functions 2.0 以上 | Python 3.7 以上 | 2.x バンドル |
Python (V2 プログラミング モデル) | Functions 4.0 以上 | Python 3.7 以上 | 3.15 以上のバンドル |
PowerShell | Functions 3.0 以上 | PowerShell 7 以上 | 2.x バンドル |
Java | Functions 4.0 以上 | Java 8 以上 | 4.x バンドル |
重要
この記事では、タブを使用して、Node.js プログラミング モデルの複数のバージョンに対応しています。 v4 モデルは一般提供されており、JavaScript と TypeScript の開発者にとって、より柔軟で直感的なエクスペリエンスが得られるように設計されています。 v4 モデルの動作の詳細については、Azure Functions Node.js 開発者ガイドを参照してください。 v3 と v4 の違いの詳細については、移行ガイドを参照してください。
重要
この記事では、タブを使用し、Python プログラミング モデルの複数のバージョンに対応しています。 v2 モデルは一般提供されており、デコレーターを使用して関数を作成するための、コード中心の方法を提供するように設計されています。 v2 モデルの動作の詳細については、Azure Functions Node.js 開発者ガイドを参照してください。
Azure Functions と同様に、Visual Studio、Visual Studio Code、および Azure portal を使用して Durable Functions を開発するのに役立つテンプレートがあります。
アプリケーション パターン
Durable Functions の主なユース ケースは、サーバーレス アプリケーションにおける複雑でステートフルな調整の要件をシンプルにすることです。 以下のセクションでは、Durable Functions を使用することでメリットがある、典型的なアプリケーションのパターンを示します。
パターン #1: 関数チェーン
関数チェーン パターンでは、一連の関数が特定の順序で実行されます。 このパターンでは、ある関数の出力が、別の関数の入力に適用されます。 各関数間でキューを使用すると、ある関数から次の関数への制御フローがあるとしても、システムは永続的でスケーラブルな状態を維持できます。
次の例に示すように、Durable Functions を使用して、関数チェーン パターンを簡潔に実装できます。
この例では、F1
、F2
、F3
、および F4
という値が、同じ関数アプリ内の他の関数の名前です。 通常の命令型のコーディング構造を使用して、制御フローを実装できます。 コードは、上から下に実行されます。 コードに条件文やループなどの既存言語の制御フロー セマンティクスを含めることができます。 try
/catch
/finally
ブロックに、エラー処理ロジックを含めることができます。
[FunctionName("Chaining")]
public static async Task<object> Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
try
{
var x = await context.CallActivityAsync<object>("F1", null);
var y = await context.CallActivityAsync<object>("F2", x);
var z = await context.CallActivityAsync<object>("F3", y);
return await context.CallActivityAsync<object>("F4", z);
}
catch (Exception)
{
// Error handling or compensation goes here.
}
}
context
パラメーターを使用して、他の関数を名前で呼び出し、パラメーターを渡して、関数の出力を返すことができます。 コードが await
を呼び出すたびに Durable Functions フレームワークは、現在の関数インスタンスの進行状況にチェックポイントを設定します。 プロセスまたは仮想マシンが実行途中でリサイクルされる場合、関数インスタンスは直前の await
呼び出しから再開されます。 詳細については、次のセクション (パターン #2: ファンアウト/ファンイン) を参照してください。
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
try {
const x = yield context.df.callActivity("F1");
const y = yield context.df.callActivity("F2", x);
const z = yield context.df.callActivity("F3", y);
return yield context.df.callActivity("F4", z);
} catch (error) {
// Error handling or compensation goes here.
}
});
context.df
オブジェクトを使用して、他の関数を名前で呼び出し、パラメーターを渡して、関数の出力を返すことができます。 コードが yield
を呼び出すたびに Durable Functions フレームワークは、現在の関数インスタンスの進行状況にチェックポイントを設定します。 プロセスまたは仮想マシンが実行途中でリサイクルされる場合、関数インスタンスは直前の yield
呼び出しから再開されます。 詳細については、次のセクション (パターン #2: ファンアウト/ファンイン) を参照してください。
Note
JavaScript の context
オブジェクトは、関数コンテキスト全体を表しています。 Durable Functions コンテキストには、メイン コンテキストの df
プロパティを使用してアクセスします。
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
x = yield context.call_activity("F1", None)
y = yield context.call_activity("F2", x)
z = yield context.call_activity("F3", y)
result = yield context.call_activity("F4", z)
return result
main = df.Orchestrator.create(orchestrator_function)
context
オブジェクトを使用して、他の関数を名前で呼び出し、パラメーターを渡して、関数の出力を返すことができます。 コードが yield
を呼び出すたびに Durable Functions フレームワークは、現在の関数インスタンスの進行状況にチェックポイントを設定します。 プロセスまたは仮想マシンが実行途中でリサイクルされる場合、関数インスタンスは直前の yield
呼び出しから再開されます。 詳細については、次のセクション (パターン #2: ファンアウト/ファンイン) を参照してください。
Note
Python の context
オブジェクトは、オーケストレーション コンテキストを表します。 メインの Azure Functions コンテキストには、オーケストレーション コンテキストの function_context
プロパティを使用してアクセスします。
param($Context)
$X = Invoke-DurableActivity -FunctionName 'F1'
$Y = Invoke-DurableActivity -FunctionName 'F2' -Input $X
$Z = Invoke-DurableActivity -FunctionName 'F3' -Input $Y
Invoke-DurableActivity -FunctionName 'F4' -Input $Z
Invoke-DurableActivity
コマンドを使用して他の関数を名前で呼び出し、パラメーターを渡して、関数の出力を返すことができます。 コードで NoWait
スイッチを用いずに Invoke-DurableActivity
を呼び出すたびに、Durable Functions フレームワークによって、現在の関数インスタンスの進行状況に対するチェックポイントが設定されます。 プロセスまたは仮想マシンが実行途中でリサイクルされる場合、関数インスタンスは直前の Invoke-DurableActivity
呼び出しから再開されます。 詳細については、次のセクション (パターン #2: ファンアウト/ファンイン) を参照してください。
@FunctionName("Chaining")
public double functionChaining(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
String input = ctx.getInput(String.class);
int x = ctx.callActivity("F1", input, int.class).await();
int y = ctx.callActivity("F2", x, int.class).await();
int z = ctx.callActivity("F3", y, int.class).await();
return ctx.callActivity("F4", z, double.class).await();
}
ctx
オブジェクトを使用して、他の関数を名前で呼び出し、パラメーターを渡して、関数の出力を返すことができます。 これらのメソッドの出力は Task<V>
オブジェクトです。ここで、V
は呼び出された関数によって返されるデータの型です。 Task<V>.await()
を呼び出すたびに Durable Functions フレームワークによって、現在の関数インスタンスの進行状況のチェックポイントが記録されます。 プロセスが実行途中で予期せずにリサイクルされると、関数インスタンスは直前の Task<V>.await()
呼び出しから再開されます。 詳細については、次のセクション (パターン #2: ファンアウト/ファンイン) を参照してください。
パターン #2: ファンアウト/ファンイン
ファンアウト/ファンイン パターンでは、複数の関数を並列で実行し、すべての関数が完了するまで待機します。 複数の関数から返される結果に基づいて集計作業が行われることは、よくあることです。
通常の関数では、複数のメッセージを 1 つのキューに送信することでファンアウトできます。 ファンインして戻ることはずっと難しくなります。 通常の関数でファンインするには、キューによってトリガーされた関数の終了を追跡した後、関数の出力を格納するコードを記述する必要があります。
Durable Functions 拡張機能では、比較的単純なコードでこのパターンを処理します。
[FunctionName("FanOutFanIn")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
var parallelTasks = new List<Task<int>>();
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("F1", null);
for (int i = 0; i < workBatch.Length; i++)
{
Task<int> task = context.CallActivityAsync<int>("F2", workBatch[i]);
parallelTasks.Add(task);
}
await Task.WhenAll(parallelTasks);
// Aggregate all N outputs and send the result to F3.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("F3", sum);
}
ファンアウト作業は、F2
関数の複数のインスタンスに分散されます。 動的タスク リストを使用して、この作業が追跡されます。 Task.WhenAll
が呼び出され、すべての呼び出された関数が終了するまで待機します。 その後、F2
関数の出力が動的タスク リストから集計され、F3
関数に渡されます。
Task.WhenAll
の await
呼び出しの際に設定される自動チェックポイントによって、実行途中でクラッシュや再起動が発生した場合でも、既に完了したすべてのタスクをやり直す必要がなくなります。
const df = require("durable-functions");
module.exports = df.orchestrator(function*(context) {
const parallelTasks = [];
// Get a list of N work items to process in parallel.
const workBatch = yield context.df.callActivity("F1");
for (let i = 0; i < workBatch.length; i++) {
parallelTasks.push(context.df.callActivity("F2", workBatch[i]));
}
yield context.df.Task.all(parallelTasks);
// Aggregate all N outputs and send the result to F3.
const sum = parallelTasks.reduce((prev, curr) => prev + curr, 0);
yield context.df.callActivity("F3", sum);
});
ファンアウト作業は、F2
関数の複数のインスタンスに分散されます。 動的タスク リストを使用して、この作業が追跡されます。 context.df.Task.all
API が呼び出され、すべての呼び出された関数が終了するまで待機します。 その後、F2
関数の出力が動的タスク リストから集計され、F3
関数に渡されます。
context.df.Task.all
の yield
呼び出しの際に設定される自動チェックポイントによって、実行途中でクラッシュや再起動が発生した場合でも、既に完了したすべてのタスクをやり直す必要がなくなります。
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
# Get a list of N work items to process in parallel.
work_batch = yield context.call_activity("F1", None)
parallel_tasks = [ context.call_activity("F2", b) for b in work_batch ]
outputs = yield context.task_all(parallel_tasks)
# Aggregate all N outputs and send the result to F3.
total = sum(outputs)
yield context.call_activity("F3", total)
main = df.Orchestrator.create(orchestrator_function)
ファンアウト作業は、F2
関数の複数のインスタンスに分散されます。 動的タスク リストを使用して、この作業が追跡されます。 context.task_all
API が呼び出され、すべての呼び出された関数が終了するまで待機します。 その後、F2
関数の出力が動的タスク リストから集計され、F3
関数に渡されます。
context.task_all
の yield
呼び出しの際に設定される自動チェックポイントによって、実行途中でクラッシュや再起動が発生した場合でも、既に完了したすべてのタスクをやり直す必要がなくなります。
param($Context)
# Get a list of work items to process in parallel.
$WorkBatch = Invoke-DurableActivity -FunctionName 'F1'
$ParallelTasks =
foreach ($WorkItem in $WorkBatch) {
Invoke-DurableActivity -FunctionName 'F2' -Input $WorkItem -NoWait
}
$Outputs = Wait-ActivityFunction -Task $ParallelTasks
# Aggregate all outputs and send the result to F3.
$Total = ($Outputs | Measure-Object -Sum).Sum
Invoke-DurableActivity -FunctionName 'F3' -Input $Total
ファンアウト作業は、F2
関数の複数のインスタンスに分散されます。 F2
関数の呼び出しで NoWait
スイッチが使用されていることに注意してください。このスイッチを使用すると、オーケストレーターはアクティビティの完了を待たなくても F2
を呼び出すことができます。 動的タスク リストを使用して、この作業が追跡されます。 Wait-ActivityFunction
コマンドが呼び出され、呼び出されたすべての関数が終了するまで待機します。 その後、F2
関数の出力が動的タスク リストから集計され、F3
関数に渡されます。
Wait-ActivityFunction
呼び出しの際に設定される自動チェックポイントによって、実行途中でのクラッシュや再起動が発生した場合でも、既に完了したタスクをやり直す必要がなくなります。
@FunctionName("FanOutFanIn")
public Integer fanOutFanInOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
// Get the list of work-items to process in parallel
List<?> batch = ctx.callActivity("F1", List.class).await();
// Schedule each task to run in parallel
List<Task<Integer>> parallelTasks = batch.stream()
.map(item -> ctx.callActivity("F2", item, Integer.class))
.collect(Collectors.toList());
// Wait for all tasks to complete, then return the aggregated sum of the results
List<Integer> results = ctx.allOf(parallelTasks).await();
return results.stream().reduce(0, Integer::sum);
}
ファンアウト作業は、F2
関数の複数のインスタンスに分散されます。 動的タスク リストを使用して、この作業が追跡されます。 ctx.allOf(parallelTasks).await()
が呼び出され、すべての呼び出された関数が終了するまで待機します。 次に、F2
関数の出力が動的タスク リストから集計され、オーケストレーター関数の出力として返されます。
ctx.allOf(parallelTasks)
の .await()
呼び出し時に発生する自動チェックポイントによって、予期しないプロセスのリサイクルが発生しても、既に完了したタスクを再起動する必要がなくなります。
Note
まれな状況ですが、アクティビティ関数の完了後、完了がオーケストレーション履歴に保存される前の時間帯にクラッシュが発生する可能性があります。 この場合、アクティビティ関数は、プロセス復旧後に最初から再実行されます。
パターン #3: 非同期 HTTP API
非同期 HTTP API パターンでは、外部クライアントとの間の実行時間の長い操作の状態を調整するという問題に対処します。 このパターンを実装する一般的な方法は、HTTP エンドポイントによって実行時間の長いアクションをトリガーすることです。 その後、ポーリングによって操作が完了したことを認識できる状態エンドポイントにクライアントをリダイレクトします。
Durable Functions には、このパターン向けの組み込みサポートが用意されており、長時間の関数の実行を操作するために記述する必要があるコードを簡略化できます。または、そのようなコードの記述が不要になります。 たとえば、Durable Functions のクイック スタート サンプル (C#、JavaScript、TypeScript、Python、PowerShell、および Java) に、新しいオーケストレーター関数のインスタンスを開始するために使用できる、シンプルな REST コマンドが示されています。 インスタンスが開始されると、この拡張機能によって、オーケストレーター関数の状態をクエリする Webhook HTTP API が公開されます。
次の例は、オーケストレーターを開始し、その状態をクエリする REST コマンドを示しています。 わかりやすくするため、この例ではプロトコルの細部をいくらか省略しています。
> curl -X POST https://myfunc.azurewebsites.net/api/orchestrators/DoWork -H "Content-Length: 0" -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"id":"b79baf67f717453ca9e86c5da21e03ec", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 202 Accepted
Content-Type: application/json
Location: https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec
{"runtimeStatus":"Running","lastUpdatedTime":"2019-03-16T21:20:47Z", ...}
> curl https://myfunc.azurewebsites.net/runtime/webhooks/durabletask/instances/b79baf67f717453ca9e86c5da21e03ec -i
HTTP/1.1 200 OK
Content-Length: 175
Content-Type: application/json
{"runtimeStatus":"Completed","lastUpdatedTime":"2019-03-16T21:20:57Z", ...}
状態は Durable Functions ランタイムによって自動的に管理されるため、自分で状態追跡メカニズムを実装する必要はありません。
Durable Functions 拡張機能には、長時間実行されるオーケストレーションを管理する組み込みの HTTP API が公開されています。 または、独自の関数トリガー (HTTP、キュー、Azure Event Hubs など) と永続的なクライアント バインドを使用して、このパターンを自分で実装できます。 たとえば、キュー メッセージを使用して終了をトリガーできます。 または、生成されたキーを認証に使う組み込みの HTTP API の代わりに、Microsoft Entra の認証ポリシーによって保護される HTTP トリガーを使用できます。
詳細については、HTTP 機能の記事を参照してください。その記事では、Durable Functions 拡張機能を使用して、非同期の長時間プロセスを HTTP 経由で公開する方法について説明しています。
パターン #4: モニター
監視パターンは、ワークフロー内の柔軟な繰り返しプロセスを指します。 一例は、特定の条件が満たされるまでポーリングすることです。 通常のタイマー トリガーを使用して、定期的なクリーンアップ ジョブなどの基本的なシナリオに対処できますが、その間隔は静的であり、インスタンスの有効期間の管理は複雑になります。 Durable Functions を使用して、柔軟な繰り返し間隔の作成、タスクの有効期間の管理、単一のオーケストレーションからの複数の監視プロセスの作成を実行できます。
監視パターンの一例は、前述の非同期 HTTP API シナリオを逆にすることです。 外部クライアント用のエンドポイントを公開して実行時間の長い操作を監視する代わりに、長時間実行される監視で外部エンドポイントを使用して、状態が変化するまで待機します。
数行のコードで、Durable Functions を使用して任意のエンドポイントを観察する複数のモニターを作成できます。 条件が満たされたときにこれらのモニターによって実行を終了するか、別の関数から持続的オーケストレーション クライアントを使用してモニターを終了できます。 特定の条件に基づいてモニターの wait
間隔を変更できます (指数関数的バックオフなど)。
次のコードでは、基本的なモニターが実装されます。
[FunctionName("MonitorJobStatus")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
int jobId = context.GetInput<int>();
int pollingInterval = GetPollingInterval();
DateTime expiryTime = GetExpiryTime();
while (context.CurrentUtcDateTime < expiryTime)
{
var jobStatus = await context.CallActivityAsync<string>("GetJobStatus", jobId);
if (jobStatus == "Completed")
{
// Perform an action when a condition is met.
await context.CallActivityAsync("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
var nextCheck = context.CurrentUtcDateTime.AddSeconds(pollingInterval);
await context.CreateTimer(nextCheck, CancellationToken.None);
}
// Perform more work here, or let the orchestration end.
}
const df = require("durable-functions");
const moment = require("moment");
module.exports = df.orchestrator(function*(context) {
const jobId = context.df.getInput();
const pollingInterval = getPollingInterval();
const expiryTime = getExpiryTime();
while (moment.utc(context.df.currentUtcDateTime).isBefore(expiryTime)) {
const jobStatus = yield context.df.callActivity("GetJobStatus", jobId);
if (jobStatus === "Completed") {
// Perform an action when a condition is met.
yield context.df.callActivity("SendAlert", jobId);
break;
}
// Orchestration sleeps until this time.
const nextCheck = moment.utc(context.df.currentUtcDateTime).add(pollingInterval, 's');
yield context.df.createTimer(nextCheck.toDate());
}
// Perform more work here, or let the orchestration end.
});
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
job = json.loads(context.get_input())
job_id = job["jobId"]
polling_interval = job["pollingInterval"]
expiry_time = job["expiryTime"]
while context.current_utc_datetime < expiry_time:
job_status = yield context.call_activity("GetJobStatus", job_id)
if job_status == "Completed":
# Perform an action when a condition is met.
yield context.call_activity("SendAlert", job_id)
break
# Orchestration sleeps until this time.
next_check = context.current_utc_datetime + timedelta(seconds=polling_interval)
yield context.create_timer(next_check)
# Perform more work here, or let the orchestration end.
main = df.Orchestrator.create(orchestrator_function)
param($Context)
$output = @()
$jobId = $Context.Input.JobId
$machineId = $Context.Input.MachineId
$pollingInterval = New-TimeSpan -Seconds $Context.Input.PollingInterval
$expiryTime = $Context.Input.ExpiryTime
while ($Context.CurrentUtcDateTime -lt $expiryTime) {
$jobStatus = Invoke-DurableActivity -FunctionName 'GetJobStatus' -Input $jobId
if ($jobStatus -eq "Completed") {
# Perform an action when a condition is met.
$output += Invoke-DurableActivity -FunctionName 'SendAlert' -Input $machineId
break
}
# Orchestration sleeps until this time.
Start-DurableTimer -Duration $pollingInterval
}
# Perform more work here, or let the orchestration end.
$output
@FunctionName("Monitor")
public String monitorOrchestrator(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
JobInfo jobInfo = ctx.getInput(JobInfo.class);
String jobId = jobInfo.getJobId();
Instant expiryTime = jobInfo.getExpirationTime();
while (ctx.getCurrentInstant().compareTo(expiryTime) < 0) {
String status = ctx.callActivity("GetJobStatus", jobId, String.class).await();
// Perform an action when a condition is met
if (status.equals("Completed")) {
// send an alert and exit
ctx.callActivity("SendAlert", jobId).await();
break;
}
// wait N minutes before doing the next poll
Duration pollingDelay = jobInfo.getPollingDelay();
ctx.createTimer(pollingDelay).await();
}
return "done";
}
要求が受信されると、そのジョブ ID 用の新しいオーケストレーション インスタンスが作成されます。 インスタンスは、条件が満たされるか、タイムアウトが期限切れになるまで状態をポーリングします。 ポーリング間隔は、永続タイマーによって制御されます。 その後、さらに作業を実行するか、オーケストレーションを終了できます。
パターン #5: 人による操作
多くの自動化されたプロセスには、何らかの人による操作が含まれます。 自動化されたプロセスに人による操作が含まれる場合に問題になるのが、人は必ずしもクラウド サービスのように可用性と応答性が高くないということです。 自動化されたプロセスでは、タイムアウトと補正ロジックを使用して、この操作を許容する必要があります。
承認プロセスは、人による操作を含むビジネス プロセスの一例です。 特定の金額を超えた経費報告書は、上司の承認が必要な場合があります。 上司が (休暇中などで) その経費報告書を 72 時間以内に承認しない場合は、他の誰か (おそらくは上司の上司) から承認を得るためにエスカレーション プロセスを開始します。
オーケストレーター関数を使用して、この例のパターンを実装できます。 オーケストレーターが永続タイマーを使用して承認を要求します。 オーケストレーターは、タイムアウトが発生した場合はエスカレーションを行います。 オーケストレーターは、外部イベント (人による操作によって生成される通知など) が発生するまで待機します。
次の例では、人による操作パターンを示すための承認プロセスを作成しています。
[FunctionName("ApprovalWorkflow")]
public static async Task Run(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
await context.CallActivityAsync("RequestApproval", null);
using (var timeoutCts = new CancellationTokenSource())
{
DateTime dueTime = context.CurrentUtcDateTime.AddHours(72);
Task durableTimeout = context.CreateTimer(dueTime, timeoutCts.Token);
Task<bool> approvalEvent = context.WaitForExternalEvent<bool>("ApprovalEvent");
if (approvalEvent == await Task.WhenAny(approvalEvent, durableTimeout))
{
timeoutCts.Cancel();
await context.CallActivityAsync("ProcessApproval", approvalEvent.Result);
}
else
{
await context.CallActivityAsync("Escalate", null);
}
}
}
永続タイマーを作成するために、context.CreateTimer
が呼び出されます。 通知は context.WaitForExternalEvent
が受け取ります。 その後、エスカレーションする (タイムアウトが先に発生した場合) か承認を処理する (タイムアウト前に承認を得た場合) かを決定するために、Task.WhenAny
が呼び出されます。
const df = require("durable-functions");
const moment = require('moment');
module.exports = df.orchestrator(function*(context) {
yield context.df.callActivity("RequestApproval");
const dueTime = moment.utc(context.df.currentUtcDateTime).add(72, 'h');
const durableTimeout = context.df.createTimer(dueTime.toDate());
const approvalEvent = context.df.waitForExternalEvent("ApprovalEvent");
const winningEvent = yield context.df.Task.any([approvalEvent, durableTimeout]);
if (winningEvent === approvalEvent) {
durableTimeout.cancel();
yield context.df.callActivity("ProcessApproval", approvalEvent.result);
} else {
yield context.df.callActivity("Escalate");
}
});
永続タイマーを作成するために、context.df.createTimer
が呼び出されます。 通知は context.df.waitForExternalEvent
が受け取ります。 その後、エスカレーションする (タイムアウトが先に発生した場合) か承認を処理する (タイムアウト前に承認を得た場合) かを決定するために、context.df.Task.any
が呼び出されます。
import azure.durable_functions as df
import json
from datetime import timedelta
def orchestrator_function(context: df.DurableOrchestrationContext):
yield context.call_activity("RequestApproval", None)
due_time = context.current_utc_datetime + timedelta(hours=72)
durable_timeout_task = context.create_timer(due_time)
approval_event_task = context.wait_for_external_event("ApprovalEvent")
winning_task = yield context.task_any([approval_event_task, durable_timeout_task])
if approval_event_task == winning_task:
durable_timeout_task.cancel()
yield context.call_activity("ProcessApproval", approval_event_task.result)
else:
yield context.call_activity("Escalate", None)
main = df.Orchestrator.create(orchestrator_function)
永続タイマーを作成するために、context.create_timer
が呼び出されます。 通知は context.wait_for_external_event
が受け取ります。 その後、エスカレーションする (タイムアウトが先に発生した場合) か承認を処理する (タイムアウト前に承認を得た場合) かを決定するために、context.task_any
が呼び出されます。
param($Context)
$output = @()
$duration = New-TimeSpan -Seconds $Context.Input.Duration
$managerId = $Context.Input.ManagerId
$output += Invoke-DurableActivity -FunctionName "RequestApproval" -Input $managerId
$durableTimeoutEvent = Start-DurableTimer -Duration $duration -NoWait
$approvalEvent = Start-DurableExternalEventListener -EventName "ApprovalEvent" -NoWait
$firstEvent = Wait-DurableTask -Task @($approvalEvent, $durableTimeoutEvent) -Any
if ($approvalEvent -eq $firstEvent) {
Stop-DurableTimerTask -Task $durableTimeoutEvent
$output += Invoke-DurableActivity -FunctionName "ProcessApproval" -Input $approvalEvent
}
else {
$output += Invoke-DurableActivity -FunctionName "EscalateApproval"
}
$output
永続タイマーを作成するために、Start-DurableTimer
が呼び出されます。 通知は Start-DurableExternalEventListener
が受け取ります。 その後、エスカレーションする (タイムアウトが先に発生した場合) か承認を処理する (タイムアウト前に承認を得た場合) かを決定するために、Wait-DurableTask
が呼び出されます。
@FunctionName("ApprovalWorkflow")
public void approvalWorkflow(
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
ApprovalInfo approvalInfo = ctx.getInput(ApprovalInfo.class);
ctx.callActivity("RequestApproval", approvalInfo).await();
Duration timeout = Duration.ofHours(72);
try {
// Wait for an approval. A TaskCanceledException will be thrown if the timeout expires.
boolean approved = ctx.waitForExternalEvent("ApprovalEvent", timeout, boolean.class).await();
approvalInfo.setApproved(approved);
ctx.callActivity("ProcessApproval", approvalInfo).await();
} catch (TaskCanceledException timeoutEx) {
ctx.callActivity("Escalate", approvalInfo).await();
}
}
ctx.waitForExternalEvent(...).await()
メソッド呼び出しは、boolean
ペイロードを持つ ApprovalEvent
という名前のイベントを受信するまでオーケストレーションを一時停止します。 イベントを受信すると、承認結果を処理するアクティビティ関数が呼び出されます。 ただし、timeout
(72 時間) が期限切れになる前にこのようなイベントが受信されない場合は、TaskCanceledException
が発生し、Escalate
アクティビティ関数が呼び出されます。
Note
従量課金プランで実行するときに、外部イベントの待機に費やされた時間は課金されません。
外部クライアントでは、組み込みの HTTP API を使用して、待機中のオーケストレーター関数にイベント通知を配信できます。
curl -d "true" http://localhost:7071/runtime/webhooks/durabletask/instances/{instanceId}/raiseEvent/ApprovalEvent -H "Content-Type: application/json"
同じ関数アプリ内の別の関数から持続的オーケストレーション クライアントを使用してイベントを発生させることもできます。
[FunctionName("RaiseEventToOrchestration")]
public static async Task Run(
[HttpTrigger] string instanceId,
[DurableClient] IDurableOrchestrationClient client)
{
bool isApproved = true;
await client.RaiseEventAsync(instanceId, "ApprovalEvent", isApproved);
}
import azure.durable_functions as df
async def main(client: str):
durable_client = df.DurableOrchestrationClient(client)
is_approved = True
await durable_client.raise_event(instance_id, "ApprovalEvent", is_approved)
Send-DurableExternalEvent -InstanceId $InstanceId -EventName "ApprovalEvent" -EventData "true"
@FunctionName("RaiseEventToOrchestration")
public void raiseEventToOrchestration(
@HttpTrigger(name = "instanceId") String instanceId,
@DurableClientInput(name = "durableContext") DurableClientContext durableContext) {
DurableTaskClient client = durableContext.getClient();
client.raiseEvent(instanceId, "ApprovalEvent", true);
}
パターン #6:アグリゲーター (ステートフル エンティティ)
6 番目のパターンは、ある期間のイベント データを 1 つのアドレス可能なエンティティ に集計することに関連しています。 このパターンでは、集計されるデータは、複数のソースから取得されるか、バッチで配信されるか、または長期間にわたって分散される可能性があります。 アグリゲーターがイベント データの到着時にイベント データに対してアクションを行ったり、外部クライアントが集計されたデータをクエリする必要が生じたりする場合があります。
このパターンを通常のステートレス関数で実装しようとする際に注意が必要な点は、同時実行制御が大きな課題となることです。 複数のスレッドが同時に同じデータを変更することに注意する必要があるだけでなく、アグリゲーターが一度に 1 つの VM 上でのみ実行されるようにすることも注意する必要があります。
持続エンティティを使用すると、このパターンを単一の関数として簡単に実装できます。
[FunctionName("Counter")]
public static void Counter([EntityTrigger] IDurableEntityContext ctx)
{
int currentValue = ctx.GetState<int>();
switch (ctx.OperationName.ToLowerInvariant())
{
case "add":
int amount = ctx.GetInput<int>();
ctx.SetState(currentValue + amount);
break;
case "reset":
ctx.SetState(0);
break;
case "get":
ctx.Return(currentValue);
break;
}
}
持続エンティティは、.NET のクラスとしてモデル化することもできます。 このモデルは、操作の一覧が固定されていて、サイズが大きくなった場合に役立ちます。 次の例は、.NET クラスおよびメソッドを使用した Counter
エンティティの同等の実装です。
public class Counter
{
[JsonProperty("value")]
public int CurrentValue { get; set; }
public void Add(int amount) => this.CurrentValue += amount;
public void Reset() => this.CurrentValue = 0;
public int Get() => this.CurrentValue;
[FunctionName(nameof(Counter))]
public static Task Run([EntityTrigger] IDurableEntityContext ctx)
=> ctx.DispatchAsync<Counter>();
}
const df = require("durable-functions");
module.exports = df.entity(function(context) {
const currentValue = context.df.getState(() => 0);
switch (context.df.operationName) {
case "add":
const amount = context.df.getInput();
context.df.setState(currentValue + amount);
break;
case "reset":
context.df.setState(0);
break;
case "get":
context.df.return(currentValue);
break;
}
});
import azure.functions as func
import azure.durable_functions as df
def entity_function(context: df.DurableOrchestrationContext):
current_value = context.get_state(lambda: 0)
operation = context.operation_name
if operation == "add":
amount = context.get_input()
current_value += amount
context.set_result(current_value)
elif operation == "reset":
current_value = 0
elif operation == "get":
context.set_result(current_value)
context.set_state(current_value)
main = df.Entity.create(entity_function)
注意
現在、持続エンティティは PowerShell ではサポートされていません。
注意
持続エンティティは現在、Java でサポートされていません。
クライアントは、エンティティ クライアント バインディングを使用して、エンティティ関数の操作をエンキューすることができます ("シグナル通知" とも呼ばれます)。
[FunctionName("EventHubTriggerCSharp")]
public static async Task Run(
[EventHubTrigger("device-sensor-events")] EventData eventData,
[DurableClient] IDurableEntityClient entityClient)
{
var metricType = (string)eventData.Properties["metric"];
var delta = BitConverter.ToInt32(eventData.Body, eventData.Body.Offset);
// The "Counter/{metricType}" entity is created on-demand.
var entityId = new EntityId("Counter", metricType);
await entityClient.SignalEntityAsync(entityId, "add", delta);
}
Note
動的に生成されたプロキシは、.NET においてタイプセーフな方法でシグナル通知エンティティに対して使用することもできます。 シグナル通知に加えて、クライアントは、オーケストレーション クライアント バインディングでタイプ セーフのメソッドを使用して、エンティティ関数の状態をクエリすることもできます。
import azure.functions as func
import azure.durable_functions as df
async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
client = df.DurableOrchestrationClient(starter)
entity_id = df.EntityId("Counter", "myCounter")
instance_id = await client.signal_entity(entity_id, "add", 1)
return func.HttpResponse("Entity signaled")
エンティティ関数は、Durable Functions 2.0 以降の C#、JavaScript、Python で使用できます。
テクノロジ
Durable Functions 拡張機能の背後には、コードでワークフローを構築するために使用される GitHub のオープン ソース ライブラリである Durable Task Framework があり、この拡張機能はその上に構築されています。 Azure Functions が Azure WebJobs のサーバーレスな進化形であるのと同じように、Durable Functions は Durable Task Framework のサーバーレスな進化形です。 Microsoft や他の組織は、ミッション クリティカルなプロセスを自動化するために、Durable Task Framework を広範囲にわたって使用しています。 サーバーレスな Azure Functions 環境には自然に適合します。
コードの制約
信頼性の高い長時間の実行の保証を提供するために、オーケストレーター関数には、従う必要がある一連のコーディング規則があります。 詳細については、「オーケストレーター関数コードの制約」の記事を参照してください。
課金
Durable Functions は Azure Functions と同じように課金されます。 詳細については、Azure Functions の価格に関するページを参照してください。 Azure Functions の従量課金プランでオーケストレーター関数を実行する場合は、注意する必要がある課金動作がいくつかあります。 これらの動作の詳細については、「Durable Functions の課金」の記事を参照してください。
すぐに始める
次の言語固有のクイック スタート チュートリアルのいずれかを完了することで、10 分足らずで Durable Functions を使い始めることができます。
- C# と Visual Studio 2019 を使用する場合
- Visual Studio Code と JavaScript を使用する場合
- Visual Studio Code と TypeScript を使用する場合
- Visual Studio Code と Python を使用する場合
- Visual Studio Code と PowerShell を使用する場合
- Maven と Java を使用する場合
これらのクイックスタートでは、"hello world" という持続的関数をローカルで作成してテストします。 その後、関数コードを Azure に発行します。 作成した関数は、他の関数の呼び出しを調整し、連結します。
パブリケーション
Durable Functions は、Microsoft Research と共同で開発されました。 これを受けて、Durable Functions チームは研究論文と成果物を積極的に作成しています。これには、次のようなものがあります。
- Durable Functions: ステートフル サーバーレスのセマンティクス (OOPSLA'21)
- Durable Functions と Netherite を使用したサーバーレス ワークフロー "(査読前の原稿)"
詳細情報
次のビデオでは、Durable Functions の利点を取り上げています。
Durable Functions は Azure Functionsの高度な拡張機能であるため、すべてのアプリケーションに適しているわけではありません。 他の Azure オーケストレーション テクノロジとの比較については、「Azure Functions と Azure Logic Apps の比較」を参照してください。