Durable Functions のモニター シナリオ - 天気ウォッチャーのサンプル

モニター パターンは、ワークフローの柔軟な "繰り返し" プロセスを参照します。たとえば、特定の条件が満たされるまでポーリングします。 この記事では、Durable Functions を使って監視を実装するサンプルを説明します。

前提条件

シナリオの概要

このサンプルでは、ある場所の現在の気象条件を監視し、空が晴れたときに SMS でユーザーに通知します。 定期的にタイマーでトリガーされる関数を使って、天気を確認し、アラートを送信できます。 ただし、このアプローチの 1 つの問題は有効期間管理です。 送信する必要があるアラートが 1 つだけの場合は、晴天が検出された後でモニターを無効にする必要があります。 特にメリットがあるのは、監視パターンがそれ自体の実行を終了できることです。

  • モニターは、スケジュールではなく一定間隔で実行されます。タイマーは "実行" を毎時トリガーし、モニターはアクション間で 1 時間 "待機" します。 モニターのアクションは、指定されない限りは重複しません。これは実行時間の長いタスクで重要となる可能性があります。
  • モニターの間隔は動的にすることができます。待機時間は、いくつかの条件に基づいて変化することがあります。
  • モニターは、ある条件が満たされたときに終了することも、別のプロセスによって終了することもできます。
  • モニターは、パラメーターを受け取ることができます。 サンプルでは、要求された場所と電話番号に同じ気象監視プロセスを適用する方法を示します。
  • モニターはスケーラブルです。 各モニターはオーケストレーション インスタンスであるため、新しい関数を作成したり、コードをさらに定義したりしなくても、複数のモニターを作成できます。
  • モニターは、より大規模なワークフローと簡単に統合できます。 モニターは、より複雑なオーケストレーション関数の 1 つのセクション、つまりサブ オーケストレーションにすることができます。

構成

Twilio 統合の構成

このサンプルでは、Twilio サービスを使って、携帯電話に SMS メッセージを送信します。 Azure Functions では Twilio バインディングにより Twilio が既にサポートされており、サンプルではその機能を使います。

最初に必要なものは Twilio アカウントです。 https://www.twilio.com/try-twilio で無料で作成できます。 アカウントを作成したら、次の 3 つのアプリ設定を関数アプリに追加します。

アプリ設定の名前 値の説明
TwilioAccountSid Twilio アカウントの SID
TwilioAuthToken Twilio アカウントの認証トークン
TwilioPhoneNumber Twilio アカウントに関連付けられている電話番号。 これは、SMS メッセージの送信に使われます。

Weather Underground 統合の構成

このサンプルでは、Weather Underground API を使って、ある場所の現在の気象条件をチェックします。

まず必要なのは、Weather Underground アカウントです。 https://www.wunderground.com/signup で無料で作成できます。 アカウントを作成したら、API キーを取得する必要があります。 これは、https://www.wunderground.com/weather/api にアクセスし、キー設定を選択することで行います。 Stratus Developer プランは無料で、このサンプルを実行するのに十分です。

API キーを入手したら、次のアプリ設定を関数アプリに追加します。

アプリ設定の名前 値の説明
WeatherUndergroundApiKey Weather Underground API キー。

関数

この記事では、サンプル アプリで使用されている次の関数について説明します。

  • E3_Monitor:E3_GetIsClear を定期的に呼び出すオーケストレーター関数です。 E3_GetIsClear が true を返した場合に E3_SendGoodWeatherAlert を呼び出します。
  • E3_GetIsClear:ある場所の現在の気象条件を確認するアクティビティ関数です。
  • E3_SendGoodWeatherAlert:Twilio 経由で SMS メッセージを送信するアクティビティ関数です。

E3_Monitor オーケストレーター関数

[FunctionName("E3_Monitor")]
public static async Task Run([OrchestrationTrigger] IDurableOrchestrationContext monitorContext, ILogger log)
{
    MonitorRequest input = monitorContext.GetInput<MonitorRequest>();
    if (!monitorContext.IsReplaying) { log.LogInformation($"Received monitor request. Location: {input?.Location}. Phone: {input?.Phone}."); }

    VerifyRequest(input);

    DateTime endTime = monitorContext.CurrentUtcDateTime.AddHours(6);
    if (!monitorContext.IsReplaying) { log.LogInformation($"Instantiating monitor for {input.Location}. Expires: {endTime}."); }

    while (monitorContext.CurrentUtcDateTime < endTime)
    {
        // Check the weather
        if (!monitorContext.IsReplaying) { log.LogInformation($"Checking current weather conditions for {input.Location} at {monitorContext.CurrentUtcDateTime}."); }

        bool isClear = await monitorContext.CallActivityAsync<bool>("E3_GetIsClear", input.Location);

        if (isClear)
        {
            // It's not raining! Or snowing. Or misting. Tell our user to take advantage of it.
            if (!monitorContext.IsReplaying) { log.LogInformation($"Detected clear weather for {input.Location}. Notifying {input.Phone}."); }

            await monitorContext.CallActivityAsync("E3_SendGoodWeatherAlert", input.Phone);
            break;
        }
        else
        {
            // Wait for the next checkpoint
            var nextCheckpoint = monitorContext.CurrentUtcDateTime.AddMinutes(30);
            if (!monitorContext.IsReplaying) { log.LogInformation($"Next check for {input.Location} at {nextCheckpoint}."); }

            await monitorContext.CreateTimer(nextCheckpoint, CancellationToken.None);
        }
    }

    log.LogInformation($"Monitor expiring.");
}

[Deterministic]
private static void VerifyRequest(MonitorRequest request)
{
    if (request == null)
    {
        throw new ArgumentNullException(nameof(request), "An input object is required.");
    }

    if (request.Location == null)
    {
        throw new ArgumentNullException(nameof(request.Location), "A location input is required.");
    }

    if (string.IsNullOrEmpty(request.Phone))
    {
        throw new ArgumentNullException(nameof(request.Phone), "A phone number input is required.");
    }
}

オーケストレーターには、監視する場所と、その場所での天気が判明したときにメッセージを送信する先の電話番号が必要です。 このデータは、厳密に型指定された MonitorRequest オブジェクトとしてオーケストレーターに渡されます。

このオーケストレーター関数は、次のアクションを行います。

  1. 監視する "場所" と SMS 通知の送信先の "電話番号" から構成される MonitorRequest を取得します。
  2. 監視の有効期限を決定します。 サンプルでは、簡略化のためにハード コーディングされた値を使います。
  3. E3_GetIsClear を呼び出し、要求された場所が晴れているかどうかを判断します。
  4. 天気が晴れの場合は、E3_SendGoodWeatherAlert を呼び出して、要求された電話番号に SMS 通知を送信します。
  5. 持続的タイマーを作成して、次のポーリング間隔でオーケストレーションを再開します。 サンプルでは、簡略化のためにハード コーディングされた値を使います。
  6. 現在の UTC 時間がモニターの有効期限を経過するか SMS アラートが送信されるまで、実行を続けます。

オーケストレーター関数を複数回呼び出すことによって、複数のオーケストレーター インスタンスを同時に実行できます。 監視する場所と SMS アラートを送信する電話番号を指定することができます。 最後に、オーケストレーター関数は、タイマーを待機している間は実行されて "いない" ため、その間は課金が発生しないことに注意してください。

E3_GetIsClear アクティビティ関数

他のサンプルと同様に、ヘルパー アクティビティ関数は、activityTrigger トリガー バインドを使う標準的な関数です。 E3_GetIsClear 関数は、Weather Underground API を使って現在の気象条件を取得し、晴れているかどうかを判断します。

[FunctionName("E3_GetIsClear")]
public static async Task<bool> GetIsClear([ActivityTrigger] Location location)
{
    var currentConditions = await WeatherUnderground.GetCurrentConditionsAsync(location);
    return currentConditions.Equals(WeatherCondition.Clear);
}

E3_SendGoodWeatherAlert アクティビティ関数

E3_SendGoodWeatherAlert 関数は、Twilio バインディングを使って、散歩に適した時間であることをエンド ユーザーに通知する SMS メッセージを送信します。

    [FunctionName("E3_SendGoodWeatherAlert")]
    public static void SendGoodWeatherAlert(
        [ActivityTrigger] string phoneNumber,
        ILogger log,
        [TwilioSms(AccountSidSetting = "TwilioAccountSid", AuthTokenSetting = "TwilioAuthToken", From = "%TwilioPhoneNumber%")]
            out CreateMessageOptions message)
    {
        message = new CreateMessageOptions(new PhoneNumber(phoneNumber));
        message.Body = $"The weather's clear outside! Go take a walk!";
    }

internal class WeatherUnderground
{
    private static readonly HttpClient httpClient = new HttpClient();
    private static IReadOnlyDictionary<string, WeatherCondition> weatherMapping = new Dictionary<string, WeatherCondition>()
    {
        { "Clear", WeatherCondition.Clear },
        { "Overcast", WeatherCondition.Clear },
        { "Cloudy", WeatherCondition.Clear },
        { "Clouds", WeatherCondition.Clear },
        { "Drizzle", WeatherCondition.Precipitation },
        { "Hail", WeatherCondition.Precipitation },
        { "Ice", WeatherCondition.Precipitation },
        { "Mist", WeatherCondition.Precipitation },
        { "Precipitation", WeatherCondition.Precipitation },
        { "Rain", WeatherCondition.Precipitation },
        { "Showers", WeatherCondition.Precipitation },
        { "Snow", WeatherCondition.Precipitation },
        { "Spray", WeatherCondition.Precipitation },
        { "Squall", WeatherCondition.Precipitation },
        { "Thunderstorm", WeatherCondition.Precipitation },
    };

    internal static async Task<WeatherCondition> GetCurrentConditionsAsync(Location location)
    {
        var apiKey = Environment.GetEnvironmentVariable("WeatherUndergroundApiKey");
        if (string.IsNullOrEmpty(apiKey))
        {
            throw new InvalidOperationException("The WeatherUndergroundApiKey environment variable was not set.");
        }

        var callString = string.Format("http://api.wunderground.com/api/{0}/conditions/q/{1}/{2}.json", apiKey, location.State, location.City);
        var response = await httpClient.GetAsync(callString);
        var conditions = await response.Content.ReadAsAsync<JObject>();

        JToken currentObservation;
        if (!conditions.TryGetValue("current_observation", out currentObservation))
        {
            JToken error = conditions.SelectToken("response.error");

            if (error != null)
            {
                throw new InvalidOperationException($"API returned an error: {error}.");
            }
            else
            {
                throw new ArgumentException("Could not find weather for this location. Try being more specific.");
            }
        }

        return MapToWeatherCondition((string)(currentObservation as JObject).GetValue("weather"));
    }

    private static WeatherCondition MapToWeatherCondition(string weather)
    {
        foreach (var pair in weatherMapping)
        {
            if (weather.Contains(pair.Key))
            {
                return pair.Value;
            }
        }

        return WeatherCondition.Other;
    }
}

Note

サンプル コードを実行するには、Microsoft.Azure.WebJobs.Extensions.Twilio NuGet パッケージをインストールする必要があります。

サンプルを実行する

サンプルに含まれる HTTP によってトリガーされる関数を使って、次の HTTP POST 要求を送信することによりオーケストレーションを開始できます。

POST https://{host}/orchestrators/E3_Monitor
Content-Length: 77
Content-Type: application/json

{ "location": { "city": "Redmond", "state": "WA" }, "phone": "+1425XXXXXXX" }
HTTP/1.1 202 Accepted
Content-Type: application/json; charset=utf-8
Location: https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635?taskHub=SampleHubVS&connection=Storage&code={SystemKey}
RetryAfter: 10

{"id": "f6893f25acf64df2ab53a35c09d52635", "statusQueryGetUri": "https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635?taskHub=SampleHubVS&connection=Storage&code={systemKey}", "sendEventPostUri": "https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635/raiseEvent/{eventName}?taskHub=SampleHubVS&connection=Storage&code={systemKey}", "terminatePostUri": "https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635/terminate?reason={text}&taskHub=SampleHubVS&connection=Storage&code={systemKey}"}

E3_Monitor インスタンスが起動し、要求された場所の現在の気象条件のクエリを実行します。 天気が晴れの場合はアラートを送信するアクティビティ関数を呼び出し、そうでない場合はタイマーを設定します。 タイマーが切れると、オーケストレーションが再開されます。

オーケストレーションのアクティビティは、Azure Functions ポータルで関数ログを調べることで確認できます。

2018-03-01T01:14:41.649 Function started (Id=2d5fcadf-275b-4226-a174-f9f943c90cd1)
2018-03-01T01:14:42.741 Started orchestration with ID = '1608200bb2ce4b7face5fc3b8e674f2e'.
2018-03-01T01:14:42.780 Function completed (Success, Id=2d5fcadf-275b-4226-a174-f9f943c90cd1, Duration=1111ms)
2018-03-01T01:14:52.765 Function started (Id=b1b7eb4a-96d3-4f11-a0ff-893e08dd4cfb)
2018-03-01T01:14:52.890 Received monitor request. Location: Redmond, WA. Phone: +1425XXXXXXX.
2018-03-01T01:14:52.895 Instantiating monitor for Redmond, WA. Expires: 3/1/2018 7:14:52 AM.
2018-03-01T01:14:52.909 Checking current weather conditions for Redmond, WA at 3/1/2018 1:14:52 AM.
2018-03-01T01:14:52.954 Function completed (Success, Id=b1b7eb4a-96d3-4f11-a0ff-893e08dd4cfb, Duration=189ms)
2018-03-01T01:14:53.226 Function started (Id=80a4cb26-c4be-46ba-85c8-ea0c6d07d859)
2018-03-01T01:14:53.808 Function completed (Success, Id=80a4cb26-c4be-46ba-85c8-ea0c6d07d859, Duration=582ms)
2018-03-01T01:14:53.967 Function started (Id=561d0c78-ee6e-46cb-b6db-39ef639c9a2c)
2018-03-01T01:14:53.996 Next check for Redmond, WA at 3/1/2018 1:44:53 AM.
2018-03-01T01:14:54.030 Function completed (Success, Id=561d0c78-ee6e-46cb-b6db-39ef639c9a2c, Duration=62ms)

オーケストレーションは、タイムアウトになるか晴天が検出されると完了します。 別の関数内で terminate API を使用することも、上の 202 応答内で参照されている terminatePostUri HTTP POST Webhook を呼び出すこともできます。 Webhook を使用するには、{text} を早期終了の理由に置き換えます。 HTTP POST URL は、大体次のようなものになります。

POST https://{host}/runtime/webhooks/durabletask/instances/f6893f25acf64df2ab53a35c09d52635/terminate?reason=Because&taskHub=SampleHubVS&connection=Storage&code={systemKey}

次のステップ

このサンプルでは、Durable Functions と持続的タイマーと条件付きロジックを使用して外部ソースの状態を監視する方法を示しました。 次のサンプルでは、外部イベントと持続的タイマーを使用してユーザーの操作を処理する方法について説明します。