Durable Functions のファンアウト/ファンイン シナリオ - クラウド バックアップの例
[アーティクル] 03/25/2023
10 人の共同作成者
フィードバック
この記事の内容
"ファンアウト/ファンイン " は、複数の関数を同時に実行した後、その結果に対して集計を行うパターンを指します。 この記事では、Durable Functions を使用してファンイン/ファンアウト シナリオを実装するサンプルについて説明します。 このサンプルは、アプリのサイトのコンテンツの一部またはすべてを Azure Storage にバックアップする永続関数です。
Note
Azure Functions の Node.js プログラミング モデルのバージョン 4 は一般提供されています。 新しい v4 モデルは、JavaScript と TypeScript の開発者にとって、より柔軟で直感的なエクスペリエンスが得られるように設計されています。 v3 と v4 の違いの詳細については、移行ガイド を参照してください。
次のコード スニペットでは、JavaScript (PM4) は、新しいエクスペリエンスであるプログラミング モデル V4 を示しています。
前提条件
シナリオの概要
このサンプルでは、関数は、指定されたディレクトリの下にあるすべてのファイルを BLOB ストレージに再帰的にアップロードします。 さらに、アップロードされたバイトの合計数をカウントします。
すべてを管理する単一の関数を記述できます。 その際に発生する最大の問題はスケーラビリティ です。 単一の関数は、単一の仮想マシンでのみ実行できるため、スループットは単一の VM のスループットによって制限されます。 別の問題として、信頼性 があります。 途中でエラーが発生した場合、またはプロセス全体が 5 分以上かかる場合、バックアップは部分的に完了した状態で失敗する可能性があります。 これにより、再起動が必要になることがあります。
もっと堅牢な方法は、2 つの標準的な関数 (ファイルを列挙し、ファイル名をキューに追加する関数と、キューからファイルを読み取り、そのファイルを BLOB ストレージにアップロードする関数) を記述することです。 このアプローチにより、スループットと信頼性は向上しますが、キューのプロビジョニングと管理を行う必要があります。 さらに重要なのは、アップロードされた合計バイト数の報告などを行うと、状態管理 と調整 が非常に複雑になることです。
Durable Functions を使用する方法は、上記の利点を非常に少ないオーバーヘッドで実現できます。
関数
この記事では、サンプル アプリで使用されている次の関数について説明します。
E2_BackupSiteContent
:E2_GetFileList
を呼び出してバックアップするファイルのリストを取得してから、E2_CopyFileToBlob
を呼び出して各ファイルをバックアップするオーケストレーター関数 。
E2_GetFileList
:ディレクトリ内のファイルのリストを返すアクティビティ関数 。
E2_CopyFileToBlob
:1 つのファイルを Azure Blob Storage にバックアップするアクティビティ関数。
E2_BackupSiteContent オーケストレーター関数
このオーケストレーター関数は、基本的に次の操作を行います。
rootDirectory
値を入力パラメーターとして使用します。
rootDirectory
の下のファイルの再帰リストを取得する関数を呼び出します。
複数の並列関数を呼び出して、各ファイルを Azure Blob Storage にアップロードします。
すべてのアップロードが完了するまで待機します。
Azure Blob ストレージにアップロードされたバイト数の合計を返します。
オーケストレーター関数を実装するコードを次に示します。
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
await Task.WhenAll(tasks);
行に注目してください。 E2_CopyFileToBlob
関数への個々の呼び出しがすべて待機されていて並列実行が可能なわけではありません 。 このタスク配列を Task.WhenAll
に渡すと、"すべてのコピー操作が完了するまで " 完了することがない 1 つのタスクが戻ります。 .NET のタスク並列ライブラリ (TPL) を知っていれば、これは新しい事柄ではありません。 違いは、これらのタスクが複数の仮想マシンで同時に実行される可能性があることと、Durable Functions 拡張機能によって、プロセスのリサイクルに対してエンド ツー エンドの実行が回復することが保証されることです。
Task.WhenAll
から応答が返ることは、すべての関数呼び出しが完了し、値が戻っていることを意味します。 E2_CopyFileToBlob
への各呼び出しがアップロードしたバイト数を返しているため、バイト数の合計を計算することは、これらの返された値をすべて合計するだけの操作です。
この関数では、オーケストレーター関数の標準的な function.json が使用されます。
{
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
],
"disabled": false
}
オーケストレーター関数を実装するコードを次に示します。
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
yield context.df.Task.all(tasks);
行に注目してください。 E2_CopyFileToBlob
関数への個々の呼び出しがすべて中断され、並列実行が可能になるわけではありません 。 このタスク配列を context.df.Task.all
に渡すと、"すべてのコピー操作が完了するまで " 完了することがない 1 つのタスクが戻ります。 JavaScript の Promise.all
に慣れている方には、これは目新しいものではないでしょう。 違いは、これらのタスクが複数の仮想マシンで同時に実行される可能性があることと、Durable Functions 拡張機能によって、プロセスのリサイクルに対してエンド ツー エンドの実行が回復することが保証されることです。
Note
タスクは概念的には JavaScript の Promise に似ていますが、タスクの並列化を管理するために、オーケストレーター関数は Promise.all
と Promise.race
の代わりに context.df.Task.all
と context.df.Task.any
を使用する必要があります。
context.df.Task.all
が中断した後、すべての関数呼び出しが完了し、戻り値が返されたことがわかります。 E2_CopyFileToBlob
への各呼び出しがアップロードしたバイト数を返しているため、バイト数の合計を計算することは、これらの返された値をすべて合計するだけの操作です。
オーケストレーター関数を実装するコードを次に示します。
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
yield context.df.Task.all(tasks);
行に注目してください。 copyFileToBlob
関数への個々の呼び出しがすべて中断され、並列実行が可能になるわけではありません 。 このタスク配列を context.df.Task.all
に渡すと、"すべてのコピー操作が完了するまで " 完了することがない 1 つのタスクが戻ります。 JavaScript の Promise.all
に慣れている方には、これは目新しいものではないでしょう。 違いは、これらのタスクが複数の仮想マシンで同時に実行される可能性があることと、Durable Functions 拡張機能によって、プロセスのリサイクルに対してエンド ツー エンドの実行が回復することが保証されることです。
Note
Task は概念的には JavaScript の Promise に似ていますが、タスクの並列化を管理するために、オーケストレーター関数は Promise.all
と Promise.race
の代わりに context.df.Task.all
と context.df.Task.any
を使用する必要があります。
context.df.Task.all
が中断した後、すべての関数呼び出しが完了し、戻り値が返されたことがわかります。 copyFileToBlob
への各呼び出しがアップロードしたバイト数を返しているため、バイト数の合計を計算することは、これらの返された値をすべて合計するだけの操作です。
この関数では、オーケストレーター関数の標準的な function.json が使用されます。
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
オーケストレーター関数を実装するコードを次に示します。
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
yield context.task_all(tasks);
行に注目してください。 E2_CopyFileToBlob
関数への個々の呼び出しがすべて中断され、並列実行が可能になるわけではありません 。 このタスク配列を context.task_all
に渡すと、"すべてのコピー操作が完了するまで " 完了することがない 1 つのタスクが戻ります。 Python の asyncio.gather
に慣れている方には、これは目新しいものではないでしょう。 違いは、これらのタスクが複数の仮想マシンで同時に実行される可能性があることと、Durable Functions 拡張機能によって、プロセスのリサイクルに対してエンド ツー エンドの実行が回復することが保証されることです。
Note
タスクは、概念的には Python の awaitable に似ていますが、タスクの並列化を管理するために、オーケストレーター関数では yield
と context.task_all
および context.task_any
API を使用する必要があります。
context.task_all
が中断した後、すべての関数呼び出しが完了し、戻り値が返されたことがわかります。 E2_CopyFileToBlob
を呼び出すたびに、アップロードされたバイト数が返されるので、すべての戻り値を合計することによって合計バイト数を計算できます。
ヘルパー アクティビティ関数
ヘルパー アクティビティ関数は、他のサンプルと同じように、activityTrigger
トリガー バインドを使う標準的な関数です。
E2_GetFileList アクティビティ関数
[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
[ActivityTrigger] string rootDirectory,
ILogger log)
{
log.LogInformation($"Searching for files under '{rootDirectory}'...");
string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");
return files;
}
E2_GetFileList
の function.json ファイルは次のようになります。
{
"bindings": [
{
"name": "rootDirectory",
"type": "activityTrigger",
"direction": "in"
}
],
"disabled": false
}
その実装を次に示します。
const readdirp = require("readdirp");
module.exports = function (context, rootDirectory) {
context.log(`Searching for files under '${rootDirectory}'...`);
const allFilePaths = [];
readdirp(
{ root: rootDirectory, entryType: "all" },
function (fileInfo) {
if (!fileInfo.stat.isDirectory()) {
allFilePaths.push(fileInfo.fullPath);
}
},
function (err, res) {
if (err) {
throw err;
}
context.log(`Found ${allFilePaths.length} under ${rootDirectory}.`);
context.done(null, allFilePaths);
}
);
};
関数は、readdirp
モジュール (バージョン 2.x) を使用してディレクトリ構造を再帰的に読み取ります。
getFileList
アクティビティ関数の実装を次に示します。
const df = require("durable-functions");
const readdirp = require("readdirp");
const getFileListActivityName = "getFileList";
df.app.activity(getFileListActivityName, {
handler: async function (rootDirectory, context) {
context.log(`Searching for files under '${rootDirectory}'...`);
const allFilePaths = [];
for await (const entry of readdirp(rootDirectory, { type: "files" })) {
allFilePaths.push(entry.fullPath);
}
context.log(`Found ${allFilePaths.length} under ${rootDirectory}.`);
return allFilePaths;
},
});
この関数は、readdirp
モジュール (バージョン 3.x
) を使用してディレクトリ構造を再帰的に読み取ります。
E2_GetFileList
の function.json ファイルは次のようになります。
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "rootDirectory",
"type": "activityTrigger",
"direction": "in"
}
]
}
その実装を次に示します。
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
Note
このコードをオーケストレーター関数に直接配置できないことを疑問に思うかもしれません。 配置することは可能ですが、それを行うと、オーケストレーター関数の基本ルールの 1 つである、ローカル ファイル システムへのアクセスを含めて I/O 操作を行うべきではないというルールを破ることになります。 詳細については、「オーケストレーター関数コードの制約 」を参照してください。
E2_CopyFileToBlob アクティビティ関数
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Note
サンプル コードを実行するには、Microsoft.Azure.WebJobs.Extensions.Storage
NuGet パッケージをインストールする必要があります。
関数は、Azure Functions のバインドの高度な機能を使用します (つまり、Binder
パラメーター の使用) が、このチュートリアルでは、詳細を気にする必要はありません。
E2_CopyFileToBlob
の function.json ファイルも同じように単純です。
{
"bindings": [
{
"name": "filePath",
"type": "activityTrigger",
"direction": "in"
},
{
"name": "out",
"type": "blob",
"path": "",
"connection": "AzureWebJobsStorage",
"direction": "out"
}
],
"disabled": false
}
JavaScript の実装では、Azure Storage SDK for Node を使用して、ファイルを Azure Blob Storage にアップロードします。
const fs = require("fs");
const path = require("path");
const storage = require("azure-storage");
module.exports = function (context, filePath) {
const container = "backups";
const root = path.parse(filePath).root;
const blobPath = filePath.substring(root.length).replace("\\", "/");
const outputLocation = `backups/${blobPath}`;
const blobService = storage.createBlobService();
blobService.createContainerIfNotExists(container, (error) => {
if (error) {
throw error;
}
fs.stat(filePath, function (error, stats) {
if (error) {
throw error;
}
context.log(
`Copying '${filePath}' to '${outputLocation}'. Total bytes = ${stats.size}.`
);
const readStream = fs.createReadStream(filePath);
blobService.createBlockBlobFromStream(
container,
blobPath,
readStream,
stats.size,
function (error) {
if (error) {
throw error;
}
context.done(null, stats.size);
}
);
});
});
};
copyFileToBlob
の JavaScript 実装では、Azure Storage 出力バインドを使用して、ファイルを Azure Blob Storage にアップロードします。
const df = require("durable-functions");
const fs = require("fs/promises");
const { output } = require("@azure/functions");
const copyFileToBlobActivityName = "copyFileToBlob";
const blobOutput = output.storageBlob({
path: "backups/{backupPath}",
connection: "StorageConnString",
});
df.app.activity(copyFileToBlobActivityName, {
extraOutputs: [blobOutput],
handler: async function ({ backupPath, filePath }, context) {
const outputLocation = `backups/${backupPath}`;
const stats = await fs.stat(filePath);
context.log(`Copying '${filePath}' to '${outputLocation}'. Total bytes = ${stats.size}.`);
const fileContents = await fs.readFile(filePath);
context.extraOutputs.set(blobOutput, fileContents);
return stats.size;
},
});
E2_CopyFileToBlob
の function.json ファイルも同じように単純です。
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "filePath",
"type": "activityTrigger",
"direction": "in"
}
]
}
Python の実装では、Azure Storage SDK for Python を使用して、ファイルを Azure Blob Storage にアップロードします。
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
この実装は、ディスクからファイルを読み込み、"backups" コンテナー内の同じ名前の BLOB に内容を非同期でストリーミングします。 戻り値はストレージにコピーされたバイト数であり、この数値がオーケストレーター関数によって集計の合計を計算するために使用されます。
Note
これは、I/O 操作を activityTrigger
関数に移動させる完璧な例です。 作業をさまざまなマシンに分散できるだけではなく、進行状況のチェックポイント処理のメリットも得ることができます。 ホスト プロセスが何らかの理由で終了した場合でも、どのアップロードが完了しているかがわかります。
サンプルを実行する
次の HTTP POST 要求を送信することによって、Windows でオーケストレーションを開始できます。
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
または、Linux Function App で (現在、Python は Linux for App Service でのみ実行されます)、次のようにオーケストレーションを開始することもできます。
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Note
呼び出している HttpStart
関数は、JSON 形式のコンテンツでのみ動作します。 このため、Content-Type: application/json
ヘッダーは必須であり、ディレクトリ パスは JSON 文字列としてエンコードされます。 さらに、HTTP スニペットでは、既定の api/
プレフィックスをすべての HTTP トリガー関数 URL から削除するエントリが host.json
ファイルにあることを想定しています。 この構成のマークアップはサンプルの host.json
ファイルにあります。
この HTTP 要求で E2_BackupSiteContent
オーケストレーターがトリガーされ、文字列 D:\home\LogFiles
がパラメーターとして渡されます。 応答は、このバックアップ操作の状態を取得するためのリンクを提供します。
HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
(...trimmed...)
この操作は、関数アプリ内のログ ファイルの数によっては、完了するまで数分かかる場合があります。 前の HTTP 202 応答の Location
ヘッダー内の URL をクエリすることで、最新の状態を取得できます。
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}
ここでは、関数はまだ実行中です。 オーケストレーターの状態と最終更新時間に保存された入力を確認できます。 Location
ヘッダーの値を引き続き使用して、完了するまでポーリングできます。 状態が "Completed" になると、次のような HTTP 応答値が表示されます。
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
これで、オーケストレーションが完了したこと、完了までにかかったおおよその時間を確認できます。 output
フィールドの値から、約 450 KB のログがアップロードされたことも確認できます。
次のステップ
このサンプルでは、ファンアウト/ファンイン パターンの実装方法について説明しました。 次のサンプルでは、永続的タイマー を使用して監視パターンを実装する方法を示します。