Fan-out/fan-in scenario in Durable Functions - Cloud backup example
Article
Fan-out/fan-in refers to the pattern of executing multiple functions concurrently and then performing some aggregation on the results. This article explains a sample that uses Durable Functions to implement a fan-in/fan-out scenario. The sample is a durable function that backs up all or some of an app's site content into Azure Storage.
Note
Version 4 of the Node.js programming model for Azure Functions is generally available. The new v4 model is designed to have a more flexible and intuitive experience for JavaScript and TypeScript developers. Learn more about the differences between v3 and v4 in the migration guide.
In the following code snippets, JavaScript (PM4) denotes programming model V4, the new experience.
In this sample, the functions upload all files under a specified directory recursively into blob storage. They also count the total number of bytes that were uploaded.
It's possible to write a single function that takes care of everything. The main problem you would run into is scalability. A single function execution can only run on a single virtual machine, so the throughput will be limited by the throughput of that single VM. Another problem is reliability. If there's a failure midway through, or if the entire process takes more than 5 minutes, the backup could fail in a partially completed state. It would then need to be restarted.
A more robust approach would be to write two regular functions: one would enumerate the files and add the file names to a queue, and another would read from the queue and upload the files to blob storage. This approach is better in terms of throughput and reliability, but it requires you to provision and manage a queue. More importantly, significant complexity is introduced in terms of state management and coordination if you want to do anything more, like report the total number of bytes uploaded.
A Durable Functions approach gives you all of the mentioned benefits with very low overhead.
The functions
This article explains the following functions in the sample app:
E2_BackupSiteContent: An orchestrator function that calls E2_GetFileList to obtain a list of files to back up, then calls E2_CopyFileToBlob to back up each file.
E2_GetFileList: An activity function that returns a list of files in a directory.
E2_CopyFileToBlob: An activity function that backs up a single file to Azure Blob Storage.
E2_BackupSiteContent orchestrator function
This orchestrator function essentially does the following:
Takes a rootDirectory value as an input parameter.
Calls a function to get a recursive list of files under rootDirectory.
Makes multiple parallel function calls to upload each file into Azure Blob Storage.
Waits for all uploads to complete.
Returns the sum total bytes that were uploaded to Azure Blob Storage.
Here is the code that implements the orchestrator function:
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Notice the await Task.WhenAll(tasks); line. All the individual calls to the E2_CopyFileToBlob function were not awaited, which allows them to run in parallel. When we pass this array of tasks to Task.WhenAll, we get back a task that won't complete until all the copy operations have completed. If you're familiar with the Task Parallel Library (TPL) in .NET, then this is not new to you. The difference is that these tasks could be running on multiple virtual machines concurrently, and the Durable Functions extension ensures that the end-to-end execution is resilient to process recycling.
After awaiting from Task.WhenAll, we know that all function calls have completed and have returned values back to us. Each call to E2_CopyFileToBlob returns the number of bytes uploaded, so calculating the sum total byte count is a matter of adding all those return values together.
The function uses the standard function.json for orchestrator functions.
Here is the code that implements the orchestrator function:
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Notice the yield context.df.Task.all(tasks); line. All the individual calls to the E2_CopyFileToBlob function were not yielded, which allows them to run in parallel. When we pass this array of tasks to context.df.Task.all, we get back a task that won't complete until all the copy operations have completed. If you're familiar with Promise.all in JavaScript, then this is not new to you. The difference is that these tasks could be running on multiple virtual machines concurrently, and the Durable Functions extension ensures that the end-to-end execution is resilient to process recycling.
Note
Although tasks are conceptually similar to JavaScript promises, orchestrator functions should use context.df.Task.all and context.df.Task.any instead of Promise.all and Promise.race to manage task parallelization.
After yielding from context.df.Task.all, we know that all function calls have completed and have returned values back to us. Each call to E2_CopyFileToBlob returns the number of bytes uploaded, so calculating the sum total byte count is a matter of adding all those return values together.
Here is the code that implements the orchestrator function:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Notice the yield context.df.Task.all(tasks); line. All the individual calls to the copyFileToBlob function were not yielded, which allows them to run in parallel. When we pass this array of tasks to context.df.Task.all, we get back a task that won't complete until all the copy operations have completed. If you're familiar with Promise.all in JavaScript, then this is not new to you. The difference is that these tasks could be running on multiple virtual machines concurrently, and the Durable Functions extension ensures that the end-to-end execution is resilient to process recycling.
Note
Although Tasks are conceptually similar to JavaScript promises, orchestrator functions should use context.df.Task.all and context.df.Task.any instead of Promise.all and Promise.race to manage task parallelization.
After yielding from context.df.Task.all, we know that all function calls have completed and have returned values back to us. Each call to copyFileToBlob returns the number of bytes uploaded, so calculating the sum total byte count is a matter of adding all those return values together.
The function uses the standard function.json for orchestrator functions.
Here is the code that implements the orchestrator function:
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Notice the yield context.task_all(tasks); line. All the individual calls to the E2_CopyFileToBlob function were not yielded, which allows them to run in parallel. When we pass this array of tasks to context.task_all, we get back a task that won't complete until all the copy operations have completed. If you're familiar with asyncio.gather in Python, then this is not new to you. The difference is that these tasks could be running on multiple virtual machines concurrently, and the Durable Functions extension ensures that the end-to-end execution is resilient to process recycling.
Note
Although tasks are conceptually similar to Python awaitables, orchestrator functions should use yield as well as the context.task_all and context.task_any APIs to manage task parallelization.
After yielding from context.task_all, we know that all function calls have completed and have returned values back to us. Each call to E2_CopyFileToBlob returns the number of bytes uploaded, so we can calculate the sum total byte count by adding all the return values together.
Helper activity functions
The helper activity functions, as with other samples, are just regular functions that use the activityTrigger trigger binding.
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
Note
You might be wondering why you couldn't just put this code directly into the orchestrator function. You could, but this would break one of the fundamental rules of orchestrator functions, which is that they should never do I/O, including local file system access. For more information, see Orchestrator function code constraints.
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Note
You will need to install the Microsoft.Azure.WebJobs.Extensions.Storage NuGet package to run the sample code.
The function uses some advanced features of Azure Functions bindings (that is, the use of the Binder parameter), but you don't need to worry about those details for the purpose of this walkthrough.
The function.json file for E2_CopyFileToBlob is similarly simple:
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
The implementation loads the file from disk and asynchronously streams the contents into a blob of the same name in the "backups" container. The return value is the number of bytes copied to storage, that is then used by the orchestrator function to compute the aggregate sum.
Note
This is a perfect example of moving I/O operations into an activityTrigger function. Not only can the work be distributed across many different machines, but you also get the benefits of checkpointing the progress. If the host process gets terminated for any reason, you know which uploads have already completed.
Run the sample
You can start the orchestration, on Windows, by sending the following HTTP POST request.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
Alternatively, on a Linux Function App (Python currently only runs on Linux for App Service), you can start the orchestration like so:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Note
The HttpStart function that you are invoking only works with JSON-formatted content. For this reason, the Content-Type: application/json header is required and the directory path is encoded as a JSON string. Moreover, HTTP snippet assumes there is an entry in the host.json file which removes the default api/ prefix from all HTTP trigger functions URLs. You can find the markup for this configuration in the host.json file in the samples.
This HTTP request triggers the E2_BackupSiteContent orchestrator and passes the string D:\home\LogFiles as a parameter. The response provides a link to get the status of the backup operation:
Depending on how many log files you have in your function app, this operation could take several minutes to complete. You can get the latest status by querying the URL in the Location header of the previous HTTP 202 response.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
In this case, the function is still running. You are able to see the input that was saved into the orchestrator state and the last updated time. You can continue to use the Location header values to poll for completion. When the status is "Completed", you see an HTTP response value similar to the following:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
Now you can see that the orchestration is complete and approximately how much time it took to complete. You also see a value for the output field, which indicates that around 450 KB of logs were uploaded.
Next steps
This sample has shown how to implement the fan-out/fan-in pattern. The next sample shows how to implement the monitor pattern using durable timers.