Dağıtılmış sistemlerde güvenilir mesajlaşma uygulamak zor olabilir. Bu makalede, etkili ileti işlemeyi desteklemenin önemli bir parçası olan olayların güvenilir mesajlaşması ve garantili teslimi için İşlem Giden Kutusu düzeninin nasıl kullanılacağı açıklanmaktadır. Bunu başarmak için Azure Cosmos DB işlem toplu işlemlerini ve değişiklik akışını Azure Service Bus ile birlikte kullanacaksınız.
Genel bakış
Mikro hizmet mimarileri giderek daha popüler hale geliyor ve özellikle büyük uygulamalarda ölçeklenebilirlik, bakım ve çeviklik gibi sorunların çözümünde söz veriyor. Ancak bu mimari desen, veri işleme konusunda da güçlükler ortaya çıkar. Dağıtılmış uygulamalarda her hizmet, ayrılmış bir hizmete ait veri deposunda çalışması için gereken verileri bağımsız olarak tutar. Böyle bir senaryoyu desteklemek için genellikle bir hizmetten uygulamanın diğer hizmetlerine veri (olaylar) dağıtan RabbitMQ, Kafka veya Azure Service Bus gibi bir mesajlaşma çözümü kullanırsınız. İç veya dış tüketiciler daha sonra bu iletilere abone olabilir ve veriler işlendiği anda değişikliklerden haberdar olabilir.
Bu alandaki iyi bilinen bir örnek bir sipariş sistemidir: Kullanıcı sipariş oluşturmak istediğinde, Ordering
bir hizmet rest uç noktası aracılığıyla bir istemci uygulamasından veri alır. Verileri doğrulamak için yükü bir Order
nesnenin iç gösterimiyle eşler. Veritabanına başarılı bir işlemeden sonra, ileti veri yolu için bir OrderCreated
olay yayımlar. Yeni siparişlerle (örneğin bir Inventory
veya Invoicing
hizmet) ilgilenen diğer tüm hizmetler iletilere OrderCreated
abone olur, bunları işler ve kendi veritabanında depolar.
Aşağıdaki sahte kod, bu işlemin genellikle hizmet açısından nasıl göründüğünü Ordering
gösterir:
CreateNewOrder(CreateOrderDto order){
// Validate the incoming data.
...
// Apply business logic.
...
// Save the object to the database.
var result = _orderRespository.Create(order);
// Publish the respective event.
_messagingService.Publish(new OrderCreatedEvent(result));
return Ok();
}
Bu yaklaşım, order nesnesini kaydetme ve karşılık gelen olayı yayımlama arasında bir hata oluşana kadar düzgün çalışır. Olay gönderme işlemi şu noktada birçok nedenden dolayı başarısız olabilir:
- Ağ hataları
- İleti hizmeti kesintisi
- Konak hatası
Hata her ne olursa olsun, sonuç olayın ileti veri yolu üzerinde yayımlanamaz olmasıdır OrderCreated
. Diğer hizmetlere bir siparişin oluşturulduğu bildirilmeyecek. Hizmetin Ordering
artık gerçek iş süreciyle ilgili olmayan çeşitli işlemleri gerçekleştirmesi gerekiyor. Yeniden çevrimiçi olur olmaz ileti veri yoluna koyulması gereken olayları izlemesi gerekir. En kötü durum bile olabilir: kayıp olaylar nedeniyle uygulamadaki veri tutarsızlıkları.
Çözüm
Bu durumlardan kaçınmanıza yardımcı olabilecek İşlemSel Giden Kutusu adlı iyi bilinen bir desen vardır. Olayların bir ileti aracısına gönderilmeden önce bir veri deposuna (genellikle veritabanınızdaki bir Giden Kutusu tablosuna) kaydedilmesini sağlar. İş nesnesi ve buna karşılık gelen olaylar aynı veritabanı işlemi içinde kaydedilirse hiçbir verinin kaybolmayacağı garanti edilir. Her şey işlenir veya hata olduğunda her şey geri alınır. Sonunda olayı yayımlamak için farklı bir hizmet veya çalışan işlemi, işlenmeyen girdiler için Giden Kutusu tablosunu sorgular, olayları yayımlar ve işlendi olarak işaretler. Bu düzen, bir iş nesnesi oluşturulduktan veya değiştirildikten sonra olayların kaybolmamasını sağlar.
Bu mimarinin bir Visio dosyasını indirin.
İlişkisel bir veritabanında, desenin uygulanması basittir. Örneğin hizmet Entity Framework Core kullanıyorsa, veritabanı işlemi oluşturmak, iş nesnesini ve olayı kaydetmek ve işlemi işlemek için bir Entity Framework bağlamı kullanır veya geri alma işlemi yapar. Ayrıca, olayları işleyen çalışan hizmetinin uygulanması kolaydır: Yeni girdiler için Giden Kutusu tablosunu düzenli aralıklarla sorgular, yeni eklenen olayları ileti veri yolu için yayımlar ve son olarak bu girdileri işlendi olarak işaretler.
Pratikte, işler ilk bakışta göründüğü kadar kolay değildir. En önemlisi, bir olayın bir olaydan önce OrderCreated
yayımlanmaması için olayların sırasının korundığından OrderUpdated
emin olmanız gerekir.
Azure Cosmos DB'de uygulama
Bu bölümde, Azure Cosmos DB değişiklik akışı ve Service Bus yardımıyla farklı hizmetler arasında güvenilir ve sıralı mesajlaşma elde etmek için Azure Cosmos DB'de İşlem Giden Kutusu düzeninin nasıl uygulaneceği gösterilmektedir. Nesneleri (, , Email
, Company
bilgileri vb.) yöneten Contact
örnek LastName
bir hizmet gösterir.FirstName
Komut ve Sorgu Sorumluluğu Ayrım (CQRS) desenini kullanır ve temel etki alanı odaklı tasarım (DDD) kavramlarını izler. Uygulamanın örnek kodunu GitHub'da bulabilirsiniz.
Örnek hizmetteki bir Contact
nesne aşağıdaki yapıya sahiptir:
{
"name": {
"firstName": "John",
"lastName": "Doe"
},
"description": "This is a contact",
"email": "johndoe@contoso.com",
"company": {
"companyName": "Contoso",
"street": "Street",
"houseNumber": "1a",
"postalCode": "092821",
"city": "Palo Alto",
"country": "US"
},
"createdAt": "2021-09-22T11:07:37.3022907+02:00",
"deleted": false
}
Bir Contact
oluşturulur veya güncelleştirilir oluşturulmaz, geçerli değişiklik hakkında bilgi içeren olayları yayar. Etki alanı olayları arasında aşağıdakiler de olabilir:
ContactCreated
. Kişi eklendiğinde oluşturulur.ContactNameUpdated
. veyaLastName
değiştirildiğindeFirstName
yükseltilir.ContactEmailUpdated
. E-posta adresi güncelleştirildiğinde oluşturulur.ContactCompanyUpdated
. Şirket özelliklerinden herhangi biri değiştirildiğinde oluşturulur.
İşlem toplu işlemleri
Bu düzeni uygulamak için iş nesnesinin Contact
ve ilgili olayların aynı veritabanı işlemine kaydedildiğinden emin olmanız gerekir. Azure Cosmos DB'de işlemler ilişkisel veritabanı sistemlerinde olduğundan farklı çalışır. İşlem toplu işlemleri olarak adlandırılan Azure Cosmos DB işlemleri tek bir mantıksal bölümde çalışır, böylece Bölünmezlik, Tutarlılık, Yalıtım ve Dayanıklılık (ACID) özelliklerini garanti eder. İşlem toplu işlemindeki iki belgeyi farklı kapsayıcılara veya mantıksal bölümlere kaydedemezsiniz. Örnek hizmet için bu, hem iş nesnesinin hem de olayın veya olayların aynı kapsayıcıya ve mantıksal bölüme yerleştirileceği anlamına gelir.
Bağlam, depolar ve UnitOfWork
Örnek uygulamanın temeli, aynı işlem toplu işlemine kaydedilen nesneleri izleyen bir kapsayıcı bağlamıdır . Oluşturulan ve değiştirilen nesnelerin listesini tutar ve tek bir Azure Cosmos DB kapsayıcısı üzerinde çalışır. Arabirimi şöyle görünür:
public interface IContainerContext
{
public Container Container { get; }
public List<IDataObject<Entity>> DataObjects { get; }
public void Add(IDataObject<Entity> entity);
public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
public void Reset();
}
Kapsayıcı bağlamı bileşenindeki liste ve DomainEvent
nesneleri izlerContact
. Her ikisi de aynı kapsayıcıya yerleştirilir. Bu, birden çok nesne türünün aynı Azure Cosmos DB kapsayıcısında depolandığı ve bir Type
iş nesnesi ile olayı ayırt etmek için bir özellik kullandığı anlamına gelir.
Her tür için veri erişimini tanımlayan ve uygulayan ayrılmış bir depo vardır. Contact
Depo arabirimi şu yöntemleri sağlar:
public interface IContactsRepository
{
public void Create(Contact contact);
public Task<(Contact, string)> ReadAsync(Guid id, string etag);
public Task DeleteAsync(Guid id, string etag);
public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
public void Update(Contact contact, string etag);
}
Event
Depo benzer görünür, ancak depoda yeni olaylar oluşturan tek bir yöntem vardır:
public interface IEventRepository
{
public void Create(ContactDomainEvent e);
}
Her iki depo arabiriminin de uygulamaları, her ikisinin de aynı Azure Cosmos DB bağlamında çalıştığından emin olmak için tek IContainerContext
bir örneğe bağımlılık ekleme yoluyla bir başvuru alır.
Son bileşen, örnekte tutulan değişiklikleri Azure Cosmos DB'ye IContainerContext
işleyen bileşenidirUnitOfWork
:
public class UnitOfWork : IUnitOfWork
{
private readonly IContainerContext _context;
public IContactRepository ContactsRepo { get; }
public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
{
_context = ctx;
ContactsRepo = cRepo;
}
public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
{
return _context.SaveChangesAsync(cancellationToken);
}
}
Olay işleme: Oluşturma ve yayımlama
Bir Contact
nesne her oluşturulduğunda, değiştirildiğinde veya (geçici-) silindiğinde, hizmet karşılık gelen bir olayı tetikler. Sağlanan çözümün temeli, etki alanı odaklı tasarım (DDD) ve Jimmy Bogard tarafından önerilen aracı deseninin bir bileşimidir. Asıl nesneyi veritabanına kaydetmeden önce etki alanı nesnesinde yapılan değişiklikler nedeniyle gerçekleşen olayların listesinin tutulmasını ve bu olayların yayımlanmasını önerir.
Değişikliklerin listesi etki alanı nesnesinde tutulur, böylece başka hiçbir bileşen olay zincirini değiştiremez. Etki alanı nesnesinde olayları (IEvent
örnekleri) koruma davranışı bir arabirim IEventEmitter<IEvent>
aracılığıyla tanımlanır ve soyut DomainEntity
bir sınıfta uygulanır:
public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
private readonly List<IEvent> _events = new();
[JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();
public virtual void AddEvent(IEvent domainEvent)
{
var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
if (i < 0)
{
_events.Add(domainEvent);
}
else
{
_events.RemoveAt(i);
_events.Insert(i, domainEvent);
}
}
[...]
[...]
}
Contact
nesnesi etki alanı olaylarını oluşturur. Varlık Contact
, etki alanı özelliklerinin ayarlayıcılarını özel olarak yapılandırarak temel DDD kavramlarını izler. Sınıfında genel ayarlayıcı yok. Bunun yerine iç durumu işlemek için yöntemler sunar. Bu yöntemlerde, belirli bir değişiklik (örneğin ContactNameUpdated
veya ContactEmailUpdated
) için uygun olaylar oluşturulabilir.
Bir kişinin adıyla ilgili güncelleştirmeler için bir örnek aşağıda verilmiştır. (Olay, yöntemin sonunda oluşturulur.)
public void SetName(string firstName, string lastName)
{
if (string.IsNullOrWhiteSpace(firstName) ||
string.IsNullOrWhiteSpace(lastName))
{
throw new ArgumentException("FirstName or LastName cannot be empty");
}
Name = new Name(firstName, lastName);
if (IsNew) return;
AddEvent(new ContactNameUpdatedEvent(Id, Name));
ModifiedAt = DateTimeOffset.UtcNow;
}
Değişiklikleri izleyen karşılık gelen ContactNameUpdatedEvent
aşağıdaki gibi görünür:
public class ContactNameUpdatedEvent : ContactDomainEvent
{
public Name Name { get; }
public ContactNameUpdatedEvent(Guid contactId, Name contactName) :
base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
{
Name = contactName;
}
}
Şu ana kadar olaylar yalnızca etki alanı nesnesine kaydedilir ve hiçbir şey veritabanına kaydedilmez ve hatta ileti aracısında yayımlanır. Önerinin ardından, olay listesi iş nesnesi veri deposuna kaydedilmeden hemen önce işlenir. Bu durumda, özel bir yöntemde SaveChangesAsync
uygulanan örneğin yönteminde IContainerContext
gerçekleşir.RaiseDomainEvents
(dObjs
kapsayıcı bağlamının izlenen varlıklarının listesidir.)
private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
var eventEmitters = new List<IEventEmitter<IEvent>>();
// Get all EventEmitters.
foreach (var o in dObjs)
if (o.Data is IEventEmitter<IEvent> ee)
eventEmitters.Add(ee);
// Raise events.
if (eventEmitters.Count <= 0) return;
foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
_mediator.Publish(evt);
}
Son satırda , C# dilinde aracı düzeninin bir uygulaması olan MediatR paketi, uygulama içinde bir olay yayımlamak için kullanılır. MediatR paketinin arabirimini INotification
uygulamak gibi ContactNameUpdatedEvent
tüm olaylar nedeniyle bunu yapmak mümkündür.
Bu olayların ilgili işleyici tarafından işlenmesi gerekir. IEventsRepository
Burada uygulama devreye girer. Olay işleyicisinin örneği aşağıda verilmişti NameUpdated
:
public class ContactNameUpdatedHandler :
INotificationHandler<ContactNameUpdatedEvent>
{
private IEventRepository EventRepository { get; }
public ContactNameUpdatedHandler(IEventRepository eventRepo)
{
EventRepository = eventRepo;
}
public Task Handle(ContactNameUpdatedEvent notification,
CancellationToken cancellationToken)
{
EventRepository.Create(notification);
return Task.CompletedTask;
}
}
Oluşturucu aracılığıyla işleyici sınıfına bir IEventRepository
örnek eklenir. hizmette Handle
bir ContactNameUpdatedEvent
yayımlanır yayımlanmaz yöntemi çağrılır ve bir bildirim nesnesi oluşturmak için olay deposu örneğini kullanır. Bu bildirim nesnesi de nesnedeki izlenen nesneler IContainerContext
listesine eklenir ve aynı işlem toplu işleminde kaydedilen nesneleri Azure Cosmos DB'ye ekler.
Kapsayıcı bağlamı şu ana kadar hangi nesnelerin işlenmek üzere olduğunu biliyor. Sonunda izlenen nesneleri Azure Cosmos DB'de IContainerContext
kalıcı hale getirmek için uygulama işlem toplu işlemini oluşturur, tüm ilgili nesneleri ekler ve işlemi veritabanına karşı çalıştırır. Açıklanan işlem yöntemi tarafından çağrılan yönteminde SaveChangesAsync
işlenirSaveInTransactionalBatchAsync
.
uygulamanın işlem toplu işlemini oluşturmak ve çalıştırmak için ihtiyaç duyduğunuz önemli bölümleri şunlardır:
private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
CancellationToken cancellationToken)
{
if (DataObjects.Count > 0)
{
var pk = new PartitionKey(DataObjects[0].PartitionKey);
var tb = Container.CreateTransactionalBatch(pk);
DataObjects.ForEach(o =>
{
TransactionalBatchItemRequestOptions tro = null;
if (!string.IsNullOrWhiteSpace(o.Etag))
tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };
switch (o.State)
{
case EntityState.Created:
tb.CreateItem(o);
break;
case EntityState.Updated or EntityState.Deleted:
tb.ReplaceItem(o.Id, o, tro);
break;
}
});
var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
}
// Return copy of current list as result.
var result = new List<IDataObject<Entity>>(DataObjects);
// Work has been successfully done. Reset DataObjects list.
DataObjects.Clear();
return result;
}
Burada, işlemin şu ana kadar nasıl çalıştığına ilişkin genel bir bakış bulabilirsiniz (kişi nesnesinde adı güncelleştirmek için):
- İstemci bir kişinin adını güncelleştirmek istiyor. yöntemi
SetName
kişi nesnesinde çağrılır ve özellikler güncelleştirilir. - Olay
ContactNameUpdated
, etki alanı nesnesindeki olaylar listesine eklenir. - Kişi deposunun
Update
yöntemi çağrılır ve bu da etki alanı nesnesini kapsayıcı bağlamlarına ekler. Nesne artık izlenir. CommitAsync
örneğinde çağrılırUnitOfWork
ve kapsayıcı bağlamını da çağırırSaveChangesAsync
.- içinde
SaveChangesAsync
, etki alanı nesnesi listesindeki tüm olaylar birMediatR
örnek tarafından yayımlanır ve olay deposu aracılığıyla aynı kapsayıcı bağlamı içine eklenir. - içinde
SaveChangesAsync
birTransactionalBatch
oluşturulur. Hem kişi nesnesini hem de olayı barındıracaktır. TransactionalBatch
Çalıştırmalar ve veriler Azure Cosmos DB'ye işlenir.SaveChangesAsync
veCommitAsync
başarıyla geri dönün.
Kalıcılık
Yukarıdaki kod parçacıklarında görebileceğiniz gibi Azure Cosmos DB'ye kaydedilen tüm nesneler bir DataObject
örnekte sarmalanmıştır. Bu nesne ortak özellikler sağlar:
ID
.PartitionKey
.Type
.State
. gibiCreated
,Updated
Azure Cosmos DB'de kalıcı olmaz.Etag
. İyimser kilitleme için.TTL
. Eski belgelerin otomatik olarak temizlenmesi için Yaşam Süresi özelliği.Data
. Genel veri nesnesi.
Bu özellikler, depolar ve kapsayıcı bağlamı tarafından çağrılan IDataObject
ve kullanılan genel bir arabirimde tanımlanır:
public interface IDataObject<out T> where T : Entity
{
string Id { get; }
string PartitionKey { get; }
string Type { get; }
T Data { get; }
string Etag { get; set; }
int Ttl { get; }
EntityState State { get; set; }
}
Bir DataObject
örnekte sarmalanan ve veritabanına kaydedilen nesneler şu örneğe (Contact
ve ContactNameUpdatedEvent
) benzer olacaktır:
// Contact document/object. After creation.
{
"id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"type": "contact",
"data": {
"name": {
"firstName": "John",
"lastName": "Doe"
},
"description": "This is a contact",
"email": "johndoe@contoso.com",
"company": {
"companyName": "Contoso",
"street": "Street",
"houseNumber": "1a",
"postalCode": "092821",
"city": "Palo Alto",
"country": "US"
},
"createdAt": "2021-09-22T11:07:37.3022907+02:00",
"deleted": false,
"id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
},
"ttl": -1,
"_etag": "\"180014cc-0000-1500-0000-614455330000\"",
"_ts": 1632301657
}
// After setting a new name, this is how an event document looks.
{
"id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
"partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"type": "domainEvent",
"data": {
"name": {
"firstName": "Jane",
"lastName": "Doe"
},
"contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
"action": "ContactNameUpdatedEvent",
"id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
"createdAt": "2021-09-22T11:37:37.3022907+02:00"
},
"ttl": 120,
"_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
"_ts": 1632303457
}
ve ContactNameUpdatedEvent
(tür domainEvent
) belgelerinin Contact
aynı bölüm anahtarına sahip olduğunu ve her iki belgenin de aynı mantıksal bölümde kalıcı olacağını görebilirsiniz.
Değişiklik akışı işleme
Olay akışını okumak ve bir ileti aracısına göndermek için hizmet, Azure Cosmos DB değişiklik akışını kullanır.
Değişiklik akışı, kapsayıcınızdaki değişikliklerin kalıcı bir günlüğüdür. Arka planda çalışır ve değişiklikleri izler. Tek bir mantıksal bölümde değişikliklerin sırası garanti edilir. Değişiklik akışını okumanın en kullanışlı yolu, Azure Cosmos DB tetikleyicisiyle bir Azure işlevi kullanmaktır. Bir diğer seçenek de değişiklik akışı işlemci kitaplığını kullanmaktır. Web API'nizdeki değişiklik akışı işlemeyi arka plan hizmeti olarak tümleştirmenize olanak tanır (arabirim aracılığıyla IHostedService
). Buradaki örnek, .NET Core uygulamalarında uzun süre çalışan arka plan görevlerini barındırmak için BackgroundService soyut sınıfını uygulayan basit bir konsol uygulaması kullanır.
Değişiklikleri Azure Cosmos DB değişiklik akışından almak için bir ChangeFeedProcessor
nesne örneği oluşturmanız, ileti işleme için bir işleyici yöntemi kaydetmeniz ve değişiklikleri dinlemeye başlamanız gerekir:
private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
var changeFeedProcessor = _container
.GetChangeFeedProcessorBuilder<ExpandoObject>(
_configuration.GetSection("Cosmos")["ProcessorName"],
HandleChangesAsync)
.WithInstanceName(Environment.MachineName)
.WithLeaseContainer(_leaseContainer)
.WithMaxItems(25)
.WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
.WithPollInterval(TimeSpan.FromSeconds(3))
.Build();
_logger.LogInformation("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
_logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
return changeFeedProcessor;
}
bir işleyici yöntemi (HandleChangesAsync
burada) iletileri işler. Bu örnekte olaylar, ölçeklenebilirlik için bölümlenmiş ve yinelenenleri kaldırma özelliğinin etkinleştirildiği bir Service Bus konusuna yayımlanır. Nesnelerdeki değişikliklerle Contact
ilgilenen tüm hizmetler bu Service Bus konusuna abone olabilir ve değişiklikleri kendi bağlamı için alıp işleyebilir.
Oluşturulan Service Bus iletilerinin bir SessionId
özelliği vardır. Service Bus'ta oturumları kullandığınızda, iletilerin sırasının korunduğundan emin olursunuz (ilk önce, ilk çıkış (FIFO)). Bu kullanım örneği için siparişin korunması gerekir.
Değişiklik akışındaki iletileri işleyen kod parçacığı aşağıdadır:
private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
_logger.LogInformation($"Received {changes.Count} document(s).");
var eventsCount = 0;
Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();
foreach (var document in changes as dynamic)
{
if (!((IDictionary<string, object>)document).ContainsKey("type") ||
!((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.
if (document.type == EVENT_TYPE) // domainEvent.
{
string json = JsonConvert.SerializeObject(document.data);
var sbMessage = new ServiceBusMessage(json)
{
ContentType = "application/json",
Subject = document.data.action,
MessageId = document.id,
PartitionKey = document.partitionKey,
SessionId = document.partitionKey
};
// Create message batch per partitionKey.
if (partitionedMessages.ContainsKey(document.partitionKey))
{
partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
}
else
{
partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
}
eventsCount++;
}
}
if (partitionedMessages.Count > 0)
{
_logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");
// Loop over each partition.
foreach (var partition in partitionedMessages)
{
// Create batch for partition.
using var messageBatch =
await _topicSender.CreateMessageBatchAsync(cancellationToken);
foreach (var msg in partition.Value)
if (!messageBatch.TryAddMessage(msg))
throw new Exception();
_logger.LogInformation(
$"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");
try
{
await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
}
catch (Exception e)
{
_logger.LogError(e.Message);
throw;
}
}
}
else
{
_logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
}
}
Hata işleme
Değişiklikler işlenirken bir hata oluşursa, değişiklik akışı kitaplığı son toplu işlemi başarıyla işlediği konumda okuma iletilerini yeniden başlatır. Örneğin, uygulama 10.000 iletiyi başarıyla işlediyse, şimdi 10.001 ile 10.025 arasında toplu işlem üzerinde çalışıyorsa ve bir hata oluşursa, yeniden başlatılabilir ve 10.001 konumundan çalışmasını alabilir. Kitaplık, Azure Cosmos DB'de bir Leases
kapsayıcıya kaydedilen bilgiler aracılığıyla işlenen işlemleri otomatik olarak izler.
Hizmet, Service Bus'a yeniden işlenmiş iletilerin bazılarını zaten göndermiş olabilir. Normalde, bu senaryo yinelenen ileti işlemeye yol açar. Daha önce belirtildiği gibi, Service Bus'ın bu senaryo için etkinleştirmeniz gereken yinelenen ileti algılama özelliği vardır. Hizmet, iletinin uygulama denetimli MessageId
özelliğine göre bir iletinin Service Bus konusuna (veya kuyruğuna) zaten eklenip eklenmediğini denetler. Bu özellik, olay belgesinin değerine ayarlanır ID
. Aynı ileti Service Bus'a yeniden gönderilirse, hizmet bu iletiyi yoksayar ve bırakır.
Bakım ve temizlik
Tipik bir İşlem Giden Kutusu uygulamasında hizmet, işlenen olayları güncelleştirir ve bir iletinin başarıyla yayımlandığını belirten bir Processed
özelliği olarak true
ayarlar. Bu davranış işleyici yönteminde el ile uygulanabilir. Geçerli senaryoda, böyle bir işleme gerek yoktur. Azure Cosmos DB, değişiklik akışı kullanılarak işlenen olayları izler (kapsayıcıyla birlikte Leases
).
Son adım olarak, yalnızca en son kayıtları/belgeleri tutmak için zaman zaman olayları kapsayıcıdan silmeniz gerekir. Düzenli aralıklarla temizleme yapmak için uygulama, Azure Cosmos DB'nin başka bir özelliğini uygular: Belgelere Yaşam Süresi (TTL
). Azure Cosmos DB, belgeye eklenebilen bir TTL
özelliğe göre belgeleri otomatik olarak silebilir: saniye olarak bir zaman aralığı. Hizmet, özelliği olan TTL
belgeler için kapsayıcıyı sürekli denetler. Belgenin süresi dolduğunda Azure Cosmos DB belgeyi veritabanından kaldırır.
Tüm bileşenler beklendiği gibi çalıştığında olaylar hızla işlenir ve yayımlanır: saniyeler içinde. Azure Cosmos DB'de bir hata varsa, hem iş nesnesi hem de ilgili olaylar veritabanına kaydedilemediğinden olaylar ileti veri yolu'na gönderilmez. Dikkate alınması gereken tek şey, arka plan çalışanı (değişiklik akışı işlemcisi) veya hizmet veri yolu kullanılabilir olmadığında belgelerde uygun TTL
bir değer DomainEvent
ayarlamaktır. Bir üretim ortamında, birden çok günlük bir zaman aralığı seçmek en iyisidir. Örneğin, 10 gün. Daha sonra ilgili tüm bileşenler uygulama içindeki değişiklikleri işlemek/yayımlamak için yeterli zamana sahip olur.
Özet
İşlem Giden Kutusu düzeni, etki alanı olaylarını dağıtılmış sistemlerde güvenilir bir şekilde yayımlama sorununu çözer. İş nesnesinin durumunu ve olaylarını aynı işlem toplu işleminde işleyerek ve ileti geçişi olarak bir arka plan işlemcisi kullanarak, iç veya dış diğer hizmetlerin sonunda bağımlı oldukları bilgileri aldığından emin olursunuz. Bu örnek, İşlem Giden Kutusu düzeninin geleneksel bir uygulaması değildir. Azure Cosmos DB değişiklik akışı ve Yaşam Süresi gibi işleri basit ve temiz tutan özellikleri kullanır.
Bu senaryoda kullanılan Azure bileşenlerinin özeti aşağıdadır:
Bu mimarinin bir Visio dosyasını indirin.
Bu çözümün avantajları şunlardır:
- Güvenilir mesajlaşma ve olayların garantili teslimi.
- Service Bus aracılığıyla olayların ve ileti yinelenenleri kaldırmanın korunan sırası.
- Olay belgesinin başarılı bir şekilde işlendiğini gösteren ek
Processed
bir özellik bulundurmanıza gerek yoktur. - Yaşam süresi (TTL) aracılığıyla Azure Cosmos DB'den olay silme. İşlem, kullanıcı/uygulama isteklerini işlemek için gereken istek birimlerini kullanmaz. Bunun yerine, bir arka plan görevinde "kalan" istek birimlerini kullanır.
- İletilerin (veya bir Azure işlevi) aracılığıyla
ChangeFeedProcessor
hataya dayanıklı işlenmesi. - İsteğe bağlı: Her biri değişiklik akışında kendi işaretçisini koruyan birden çok değişiklik akışı işlemcisi.
Dikkat edilmesi gereken noktalar
Bu makalede ele alınan örnek uygulama, Azure Cosmos DB ve Service Bus ile Azure'da İşlem Giden Kutusu düzenini nasıl uygulayabileceğinizi gösterir. NoSQL veritabanlarını kullanan başka yaklaşımlar da vardır. İş nesnesinin ve olayların veritabanına güvenilir bir şekilde kaydedileceğini garanti etmek için, olay listesini iş nesnesi belgesine ekleyebilirsiniz. Bu yaklaşımın dezavantajı, temizleme işleminin olay içeren her belgeyi güncelleştirmesi gerekeceğidir. Bu, TTL kullanımına kıyasla özellikle İstek Birimi maliyeti açısından ideal değildir.
Üretime hazır kodda sağlanan örnek kodu dikkate almamanız gerektiğini unutmayın. Çok iş parçacığı kullanımıyla ilgili bazı sınırlamaları vardır, özellikle de DomainEntity
olayların sınıfta işlenme şekli ve uygulamalarda nesnelerin nasıl izlendiği CosmosContainerContext
. Kendi uygulamalarınız için başlangıç noktası olarak kullanın. Alternatif olarak, NServiceBus veya MassTransit gibi bu işlevlere sahip mevcut kitaplıkları kullanmayı göz önünde bulundurun.
Bu senaryoyu dağıtın
Bu senaryoyu test etmek için kaynak kodunu, dağıtım dosyalarını ve yönergeleri GitHub'da bulabilirsiniz: https://github.com/mspnp/transactional-outbox-pattern.
Katkıda Bulunanlar
Bu makale Microsoft tarafından yönetilir. Başlangıçta aşağıdaki katkıda bulunanlar tarafından yazılmıştır.
Asıl yazar:
- Christian Dennig | Kıdemli Yazılım Mühendisi
Genel olmayan LinkedIn profillerini görmek için LinkedIn'de oturum açın.
Sonraki adımlar
Daha fazla bilgi edinmek için şu makaleleri gözden geçirin:
- Etki alanı temelli tasarım
- Azure Service Bus: İleti yinelenenleri kaldırma
- Akış işlemci kitaplığını değiştirme
- Jimmy Bogard: Daha iyi bir etki alanı olayları düzeni