Değişiklik akışı işlemci kitaplığından Azure Cosmos DB .NET V3 SDK'sına geçiş

UYGULANANLAR: NoSQL

Bu makalede, var olan bir uygulamanın değişiklik akışı işlemci kitaplığını kullanan kodunu .NET SDK'nın en son sürümündeki (.NET V3 SDK olarak da adlandırılır) değişiklik akışı özelliğine geçirmek için gerekli adımlar açıklanmaktadır.

Gerekli kod değişiklikleri

.NET V3 SDK'sı bazı hataya neden olan değişiklikler içerir; uygulamanızı geçirmenin temel adımları şunlardır:

  1. DocumentCollectionInfo örnekleri Container izlenen ve kiralanan kapsayıcılar için başvurulara dönüştürün.
  2. Kullanan WithProcessorOptions özelleştirmeler, aralıklar, WithStartTime başlangıç zamanı ve WithPollInterval en fazla öğe sayısını tanımlamak için kullanılacak WithLeaseConfiguration şekilde WithMaxItems güncelleştirilmelidir.
  3. processorName üzerinde yapılandırılan ChangeFeedProcessorOptions.LeasePrefixdeğerle eşleşecek şekilde açık GetChangeFeedProcessorBuilder değerini ayarlayın veya başka bir şekilde kullanınstring.Empty.
  4. Değişiklikler artık olarak IReadOnlyList<Document>teslim edilmemiştir, burada tanımlamanız gereken bir IReadOnlyCollection<T> T türdür ve artık temel öğe sınıfı yoktur.
  5. Değişiklikleri işlemek için artık uygulamasına IChangeFeedObserverihtiyacınız yoktur. Bunun yerine bir temsilci tanımlamanız gerekir. Temsilci statik bir İşlev olabilir veya yürütmeler arasında durumu korumanız gerekiyorsa kendi sınıfınızı oluşturabilir ve bir örnek yöntemini temsilci olarak geçirebilirsiniz.

Örneğin, değişiklik akışı işlemcisini derlemek için kullanılan özgün kod aşağıdaki gibi görünüyorsa:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

Geçirilen kod şöyle görünür:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Temsilci için olayları almak için statik bir yönteminiz olabilir. 'den IChangeFeedObserverContext bilgi kullanıyorsanız, kullanmak ChangeFeedProcessorContextüzere geçirebilirsiniz:

  • ChangeFeedProcessorContext.LeaseToken yerine kullanılabilir IChangeFeedObserverContext.PartitionKeyRangeId
  • ChangeFeedProcessorContext.Headers yerine kullanılabilir IChangeFeedObserverContext.FeedResponse
  • ChangeFeedProcessorContext.Diagnostics sorun giderme için istek gecikme süresi hakkında ayrıntılı bilgi içerir
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

Sistem durumu olayları ve gözlemlenebilirlik

Daha önce kullanıyorsanız IHealthMonitor veya ve IChangeFeedObserver.CloseAsynckullanıyorsanız IChangeFeedObserver.OpenAsync Bildirimler API'sini kullanın.

  • IChangeFeedObserver.OpenAsync ile WithLeaseAcquireNotificationdeğiştirilebilir.
  • IChangeFeedObserver.CloseAsync ile WithLeaseReleaseNotificationdeğiştirilebilir.
  • IHealthMonitor.InspectAsync ile WithErrorNotificationdeğiştirilebilir.

Durum ve kira kapsayıcısı

Değişiklik akışı işlemci kitaplığına benzer şekilde, .NET V3 SDK'daki değişiklik akışı özelliği de durumu depolamak için bir kira kapsayıcısı kullanır. Ancak şemalar farklıdır.

SDK V3 değişiklik akışı işlemcisi, eski kitaplık durumlarını algılar ve geçirilen uygulama kodunun ilk yürütülmesinden sonra otomatik olarak yeni şemaya geçirir.

Eski kodu kullanarak uygulamayı güvenle durdurabilir, kodu yeni sürüme geçirebilir, geçirilen uygulamayı başlatabilir ve uygulama durdurulurken gerçekleşen tüm değişiklikler yeni sürüm tarafından alınır ve işlenir.

Ek kaynaklar

Sonraki adımlar

Şimdi aşağıdaki makalelerde değişiklik akışı işlemcisi hakkında daha fazla bilgi edinebilirsiniz: