Orleans クライアント

クライアントを使用すると、グレイン以外のコードで Orleans クラスターと対話できます。 クライアントにより、クラスターでホスティングされているグレインやストリームとアプリケーション コードで通信することができます。 クライアント コードがホスティングされている場所に応じて、クライアントを取得する方法は 2 つあります。サイロと同じプロセスを使うか、別のプロセスを使うかです。 この記事では両方のオプションについて説明します。まず、推奨されるオプション (グレイン コードと同じプロセスでクライアント コードを共同ホスティングする) から始めます。

共同ホスティングされたクライアント

クライアント コードがグレイン コードと同じプロセスでホスティングされている場合、クライアントはホスティングするアプリケーションの依存関係挿入コンテナーから直接取得できます。 この場合、クライアントは接続されているサイロと直接通信し、サイロがクラスターに関して持っている追加の知識を活用できます。

これには、ネットワークと CPU のオーバーヘッドの削減、待機時間の短縮、スループットと信頼性の向上など、いくつかの利点があります。 クライアントは、クラスターのトポロジと状態に関するサイロの知識を利用し、別のゲートウェイを使用する必要はありません。 これにより、ネットワーク ホップとシリアル化および逆シリアル化のラウンド トリップを回避できます。 また、これにより、クライアントとグレインの間に必要なノードの数が最小限に抑えられるため、信頼性も向上します。 グレインがステートレス ワーカー グレインである場合、またはクライアントがホスティングされているサイロでアクティブ化される場合は、シリアル化やネットワーク通信を実行する必要はまったくなく、クライアントはパフォーマンスと信頼性に関する追加の利点を得ることができます。 クライアントとグレイン コードを共同ホスティングする場合、2 つの異なるアプリケーション バイナリをデプロイして監視する必要がなくなるため、デプロイとアプリケーション トポロジも簡素化されます。

このアプローチには欠点もあります。それは主に、グレイン コードがクライアント プロセスから分離されなくなるという点です。 そのため、クライアント コードにおける問題 (IO のブロックやロックの競合によってスレッドが枯渇するなど) が、グレイン コードのパフォーマンスに影響を与えるおそれがあります。 前述のようなコードの欠陥がなくても、クライアント コードをグレイン コードと同じプロセッサで実行するだけで、CPU キャッシュに追加の負担がかかり、ローカル リソース全般の競合が増えて、"うるさい隣人" の影響が発生するおそれがあります。 さらに、監視システムで論理的にクライアント コードとグレイン コードを区別できないため、これらの問題の原因を特定することがより困難になります。

これらの欠点にもかかわらず、クライアント コードとグレイン コードの共同ホスティングは一般的なオプションであり、ほとんどのアプリケーションに推奨されるアプローチです。 詳しく説明すると、前述の欠点は、次の理由から、実際には最小限になります。

  • 多くの場合、クライアント コードは非常に "細く" なります (たとえば、受信 HTTP 要求をグレイン呼び出しに変換するなど)。そのため、"うるさい隣人" の影響は最小限になり、それ以外の場合に必要になるゲートウェイと同等のコストになります。
  • パフォーマンス上の問題が発生した場合、開発者の一般的なワークフローでは、CPU プロファイラーやデバッガーなどのツールを使います。これらは、クライアント コードとグレイン コードの両方を同じプロセスで実行していても、問題の原因をすばやく特定するのに引き続き有効です。 つまり、メトリックはより粗くなり、問題の原因を正確に特定できなくなりますが、より詳細なツールは引き続き有効です。

ホストからクライアントを取得する

.NET での汎用ホストを使ってホスティングする場合、クライアントはホストの依存関係の挿入コンテナーで自動的に使用できるようになり、ASP.NET コントローラーIHostedService の実装などのサービスに挿入できます。

または、IGrainFactoryIClusterClient などのクライアント インターフェイスを、ISiloHost から取得することもできます。

var client = host.Services.GetService<IClusterClient>();
await client.GetGrain<IMyGrain>(0).Ping();

外部クライアント

クライアント コードは、グレイン コードがホストされている Orleans クラスターの外部で実行できます。 そのため、外部クライアントは、クラスターとアプリケーションのすべてのグレインに対するコネクタまたはパイプ役として機能します。 通常、クライアントは、ビジネス ロジックを実行するグレインを含む中間層として機能する Orleans クラスターに接続するために、フロントエンド Web サーバー上で使用されます。

一般的なセットアップでは、フロントエンド Web サーバーは:

  • Web 要求を受信します
  • 必要な認証と認可の検証を実行します
  • どのグレインが要求を処理するかを決定します
  • Microsoft.Orleans.Client NuGet パッケージを使用して、グレインに対して 1 つ以上のメソッド呼び出しを行います。
  • グレイン呼び出しの正常な完了または失敗と、返された値を処理します
  • Web 要求に応答を送信します

グレイン クライアントの初期化

グレイン クライアントは、Orleans クラスターでホスティングされているグレインに対する呼び出しに使用される前に、構成および初期化され、クラスターに接続されている必要があります。

構成は、UseOrleansClient といくつかの補助オプション クラスを使って行います。それには、クライアントをプログラムによって構成するための構成プロパティの階層が含まれています。 詳細については、「クライアント構成」を参照してください。

次のクライアント構成の例を考えてみましょう。

// Alternatively, call Host.CreateDefaultBuilder(args) if using the 
// Microsoft.Extensions.Hosting NuGet package.
using IHost host = new HostBuilder()
    .UseOrleansClient(clientBuilder =>
    {
        clientBuilder.Configure<ClusterOptions>(options =>
        {
            options.ClusterId = "my-first-cluster";
            options.ServiceId = "MyOrleansService";
        });

        clientBuilder.UseAzureStorageClustering(
            options => options.ConfigureTableServiceClient(connectionString))
    })
    .Build();

host が開始されると、クライアントが構成され、構築されたサービス プロバイダー インスタンスを介して使用できるようになります。

構成は、ClientBuilder といくつかの補助オプション クラスを使って行います。それには、クライアントをプログラムによって構成するための構成プロパティの階層が含まれています。 詳細については、「クライアント構成」を参照してください。

クライアント構成の例:

var client = new ClientBuilder()
    .Configure<ClusterOptions>(options =>
    {
        options.ClusterId = "my-first-cluster";
        options.ServiceId = "MyOrleansService";
    })
    .UseAzureStorageClustering(
        options => options.ConnectionString = connectionString)
    .ConfigureApplicationParts(
        parts => parts.AddApplicationPart(typeof(IValueGrain).Assembly))
    .Build();

最後に、構築したクライアント オブジェクト上で Connect() メソッドを呼び出して、Orleans クラスターに接続する必要があります。 これは Task を返す非同期メソッドです。 そのため、await または .Wait() でそれが完了するまで待つ必要があります。

await client.Connect();

グレインへの呼び出しを行う

クライアントからグレインへの呼び出しを行うのは、グレイン コード内からそのような呼び出しを行うのと変わりません。 どちらの場合も、同じ IGrainFactory.GetGrain<TGrainInterface>(Type, Guid) メソッド (T は対象のグレイン インターフェイス) を使って、グレイン参照を取得します。 違いは、どのファクトリ オブジェクトで IGrainFactory.GetGrain が呼び出されるかです。 クライアント コードでは、次の例に示すように、接続されたクライアント オブジェクトを使ってこれを行います。

IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);
Task joinGameTask = player.JoinGame(game)

await joinGameTask;

グレイン メソッドの呼び出しは、グレイン インターフェイスの規則によって要求される Task または Task<TResult> を返します。 クライアントでは、await キーワードを使ってスレッドをブロックすることなく返された Task を非同期的に待機するか、場合によっては Wait() メソッドを使って現在の実行スレッドをブロックすることができます。

グレインへの呼び出しをクライアント コードから行う場合と、別のグレイン内から行う場合の主な違いは、グレインのシングル スレッド実行モデルです。 グレインは Orleans ランタイムによってシングル スレッドに制約されますが、クライアントはマルチスレッドである可能性があります。 Orleans によってクライアント側でそのような保証は提供されないため、環境に適した同期コンストラクト (ロック、イベント、Tasks) を使用してコンカレンシーを管理するのはクライアントの責任となります。

通知の受信

シンプルな要求 - 応答のパターンでは十分でなく、クライアントが非同期通知を受信することが必要になる場合があります。 たとえば、あるユーザーが、自分がフォローしているユーザーが新しいメッセージを発行したときに通知を受け取りたいと思うことがあります。

Observer の使用は、そのようなメカニズムの 1 つです。クライアント側オブジェクトをグレインに似たターゲットとして公開し、グレインによって呼び出すことができます。 オブザーバーへの呼び出しは、一方向のベスト エフォート メッセージとして送信されるため、成功または失敗を示すものではありません。 そのため、必要に応じてオブザーバーに高レベルの信頼性メカニズムを構築するのは、アプリケーション コードの責任です。

非同期メッセージをクライアントに配信するために使用できるもう 1 つのメカニズムは、Streams です。 ストリームでは、個々のメッセージ配信の成功または失敗が示されるため、クライアントへの信頼性の高い通信が可能になります。

クライアント接続

クラスター クライアントで接続に関する問題が発生するシナリオは 2 つあります。

  • クライアントがサイロに接続しようとする場合
  • 接続されたクラスター クライアントから取得したグレイン参照への呼び出しを行うとき。

最初のケースでは、クライアントはサイロへの接続を試みます。 クライアントがサイロに接続できない場合は、問題が発生したことを示す例外がスローされます。 IClientConnectionRetryFilter を登録して例外を処理し、再試行するかどうかを決定できます。 再試行フィルターを指定しない場合、または再試行フィルターから false が返される場合、クライアントは試行を放棄します。

using Orleans.Runtime;

internal sealed class ClientConnectRetryFilter : IClientConnectionRetryFilter
{
    private int _retryCount = 0;
    private const int MaxRetry = 5;
    private const int Delay = 1_500;

    public async Task<bool> ShouldRetryConnectionAttempt(
        Exception exception,
        CancellationToken cancellationToken)
    {
        if (_retryCount >= MaxRetry)
        {
            return false;
        }

        if (!cancellationToken.IsCancellationRequested &&
            exception is SiloUnavailableException siloUnavailableException)
        {
            await Task.Delay(++ _retryCount * Delay, cancellationToken);
            return true;
        }

        return false;
    }
}

クラスター クライアントで接続に関する問題が発生するシナリオは 2 つあります。

  • IClusterClient.Connect() メソッドを最初に呼び出すとき。
  • 接続されたクラスター クライアントから取得したグレイン参照への呼び出しを行うとき。

最初のケースでは、Connect メソッドによって問題の内容を示す例外がスローされます。 これは通常 (必ずしもそうであるとは限りませんが)、SiloUnavailableException です。 これが発生した場合、クラスター クライアント インスタンスは使用できないため、破棄する必要があります。 必要に応じて、再試行フィルター関数を Connect メソッドに指定できます。これにより、たとえば、指定した期間待機してから別の試行を行うことができます。 再試行フィルターを指定しない場合、または再試行フィルターから false が返される場合、クライアントは試行を放棄します。

Connect が正常に返された場合、クラスター クライアントは破棄されるまで使用可能であることが保証されます。 つまり、クライアントで接続に関する問題が発生しても、無期限に回復が試みられます。 正確な回復動作は、ClientBuilder によって提供される GatewayOptions オブジェクトで構成できます。例:

var client = new ClientBuilder()
    // ...
    .Configure<GatewayOptions>(
        options =>                         // Default is 1 min.
        options.GatewayListRefreshPeriod = TimeSpan.FromMinutes(10))
    .Build();

グレイン呼び出し中に接続の問題が発生する 2 番目のケースでは、クライアント側で SiloUnavailableException がスローされます。 これは次のように処理できます。

IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);

try
{
    await player.JoinGame(game);
}
catch (SiloUnavailableException)
{
    // Lost connection to the cluster...
}

この状況ではグレイン参照は無効になりません。後で接続が再確立されたかもしれないときに、同じ参照で呼び出しを再試行できます。

依存関係の挿入

.NET 汎用ホストを使用するプログラムで外部クライアントを作成するための推奨される方法は、依存関係の挿入を使って IClusterClient シングルトン インスタンスを挿入することです。これはその後、ホスティングされたサービスや ASP.NET コントローラーなどで、コンストラクター パラメーターとして指定できます。

Note

Orleans サイロを接続時と同じプロセスで共同ホスティングする場合、クライアントを手動で作成する必要は "ありません"。Orleans によってクライアントが自動的に提供され、その有効期間が適切に管理されます。

別のプロセス (別のマシン上) でクラスターに接続する場合、一般的なパターンは、次のようなホステッド サービスを作成することです。

using Microsoft.Extensions.Hosting;

namespace Client;

public sealed class ClusterClientHostedService : IHostedService
{
    private readonly IClusterClient _client;

    public ClusterClientHostedService(IClusterClient client)
    {
        _client = client;
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        // Use the _client to consume grains...

        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
        => Task.CompletedTask;
}
public class ClusterClientHostedService : IHostedService
{
    private readonly IClusterClient _client;

    public ClusterClientHostedService(IClusterClient client)
    {
        _client = client;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        // A retry filter could be provided here.
        await _client.Connect();
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await _client.Close();

        _client.Dispose();
    }
}

その後、このサービスを次のように登録します。

await Host.CreateDefaultBuilder(args)
    .UseOrleansClient(builder =>
    {
        builder.UseLocalhostClustering();
    })
    .ConfigureServices(services => 
    {
        services.AddHostedService<ClusterClientHostedService>();
    })
    .RunConsoleAsync();

上記のクライアント アプリケーションの例の拡張バージョンを次に示します。このクライアント アプリケーションでは、Orleans への接続やプレーヤー アカウントの検索を行うことができるだけではなく、オブザーバーを使用してプレーヤーが参加しているゲーム セッションの更新を受信登録したり、プログラムが手動で終了されるまで通知を出力したりすることができます。

try
{
    using IHost host = Host.CreateDefaultBuilder(args)
        .UseOrleansClient((context, client) =>
        {
            client.Configure<ClusterOptions>(options =>
            {
                options.ClusterId = "my-first-cluster";
                options.ServiceId = "MyOrleansService";
            })
            .UseAzureStorageClustering(
                options => options.ConfigureTableServiceClient(
                    context.Configuration["ORLEANS_AZURE_STORAGE_CONNECTION_STRING"]));
        })
        .UseConsoleLifetime()
        .Build();

    await host.StartAsync();

    IGrainFactory client = host.Services.GetRequiredService<IGrainFactory>();

    // Hardcoded player ID
    Guid playerId = new("{2349992C-860A-4EDA-9590-000000000006}");
    IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);
    IGameGrain? game = null;
    while (game is null)
    {
        Console.WriteLine(
            $"Getting current game for player {playerId}...");

        try
        {
            game = await player.GetCurrentGame();
            if (game is null) // Wait until the player joins a game
            {
                await Task.Delay(TimeSpan.FromMilliseconds(5_000));
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Exception: {ex.GetBaseException()}");
        }
    }

    Console.WriteLine(
        $"Subscribing to updates for game {game.GetPrimaryKey()}...");

    // Subscribe for updates
    var watcher = new GameObserver();
    await game.ObserveGameUpdates(
        client.CreateObjectReference<IGameObserver>(watcher));

    Console.WriteLine(
        "Subscribed successfully. Press <Enter> to stop.");
}
catch (Exception e)
{
    Console.WriteLine(
        $"Unexpected Error: {e.GetBaseException()}");
}
await RunWatcherAsync();

// Block the main thread so that the process doesn't exit.
// Updates arrive on thread pool threads.
Console.ReadLine();

static async Task RunWatcherAsync()
{
    try
    {
        var client = new ClientBuilder()
            .Configure<ClusterOptions>(options =>
            {
                options.ClusterId = "my-first-cluster";
                options.ServiceId = "MyOrleansService";
            })
            .UseAzureStorageClustering(
                options => options.ConnectionString = connectionString)
            .ConfigureApplicationParts(
                parts => parts.AddApplicationPart(typeof(IValueGrain).Assembly))
            .Build();

            // Hardcoded player ID
            Guid playerId = new("{2349992C-860A-4EDA-9590-000000000006}");
            IPlayerGrain player = client.GetGrain<IPlayerGrain>(playerId);
            IGameGrain game = null;
            while (game is null)
            {
                Console.WriteLine(
                    $"Getting current game for player {playerId}...");

                try
                {
                    game = await player.GetCurrentGame();
                    if (game is null) // Wait until the player joins a game
                    {
                        await Task.Delay(TimeSpan.FromMilliseconds(5_000));
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Exception: {ex.GetBaseException()}");
                }
            }

            Console.WriteLine(
                $"Subscribing to updates for game {game.GetPrimaryKey()}...");

            // Subscribe for updates
            var watcher = new GameObserver();
            await game.SubscribeForGameUpdates(
                await client.CreateObjectReference<IGameObserver>(watcher));

            Console.WriteLine(
                "Subscribed successfully. Press <Enter> to stop.");

            Console.ReadLine(); 
        }
        catch (Exception e)
        {
            Console.WriteLine(
                $"Unexpected Error: {e.GetBaseException()}");
        }
    }
}

/// <summary>
/// Observer class that implements the observer interface.
/// Need to pass a grain reference to an instance of
/// this class to subscribe for updates.
/// </summary>
class GameObserver : IGameObserver
{
    public void UpdateGameScore(string score)
    {
        Console.WriteLine("New game score: {0}", score);
    }
}