Transaktionellt utkorgsmönster med Azure Cosmos DB

Azure Cosmos DB
Azure Service Bus
Azure Functions

Det kan vara svårt att implementera tillförlitliga meddelanden i distribuerade system. Den här artikeln beskriver hur du använder mönstret Transaktionell utkorg för tillförlitliga meddelanden och garanterad leverans av händelser, en viktig del av stöd för idempotent meddelandebearbetning. För att åstadkomma detta använder du Azure Cosmos DB-transaktionsbatcherna och ändringsflödet i kombination med Azure Service Bus.

Översikt

Mikrotjänstarkitekturer blir allt populärare och visar löften när det gäller att lösa problem som skalbarhet, underhåll och flexibilitet, särskilt i stora program. Men det här arkitekturmönstret medför också utmaningar när det gäller datahantering. I distribuerade program underhåller varje tjänst oberoende de data som behövs för att fungera i ett dedikerat tjänstägt datalager. För att stödja ett sådant scenario använder du vanligtvis en meddelandelösning som RabbitMQ, Kafka eller Azure Service Bus som distribuerar data (händelser) från en tjänst via en meddelandebuss till andra tjänster i programmet. Interna eller externa konsumenter kan sedan prenumerera på dessa meddelanden och få meddelanden om ändringar så snart data manipuleras.

Ett välkänt exempel i det området är ett beställningssystem: när en användare vill skapa en beställning tar en Ordering tjänst emot data från ett klientprogram via en REST-slutpunkt. Nyttolasten mappas till en intern representation av ett Order objekt för att verifiera data. Efter en lyckad incheckning till databasen publicerar den en OrderCreated händelse till en meddelandebuss. Alla andra tjänster som är intresserade av nya beställningar (till exempel en eller Invoicing tjänstInventory), prenumererar på OrderCreated meddelanden, bearbetar dem och lagrar dem i sin egen databas.

Följande pseudokod visar hur den här processen vanligtvis ser ut ur tjänstperspektivet Ordering :

CreateNewOrder(CreateOrderDto order){
  // Validate the incoming data.
  ...
  // Apply business logic.
  ...
  // Save the object to the database.
  var result = _orderRespository.Create(order);

  // Publish the respective event.
  _messagingService.Publish(new OrderCreatedEvent(result));

  return Ok();
}

Den här metoden fungerar bra tills ett fel inträffar mellan att spara orderobjektet och publicera motsvarande händelse. Att skicka en händelse kan misslyckas just nu av många orsaker:

  • Nätverksfel
  • Avbrott i meddelandetjänsten
  • Värdfel

Vad felet än är är resultatet att OrderCreated händelsen inte kan publiceras till meddelandebussen. Andra tjänster meddelas inte om att en beställning har skapats. Tjänsten Ordering måste nu ta hand om olika saker som inte är relaterade till den faktiska affärsprocessen. Den måste hålla reda på händelser som fortfarande måste sättas på meddelandebussen så snart den är online igen. Även det värsta fallet kan inträffa: datainkonsekvenser i programmet på grund av förlorade händelser.

Diagram som visar händelsehantering utan mönstret Transaktionell utkorg.

Lösning

Det finns ett välkänt mönster med namnet Transactional Outbox som kan hjälpa dig att undvika dessa situationer. Det säkerställer att händelser sparas i ett datalager (vanligtvis i en utkorgstabell i databasen) innan de slutligen skickas till en meddelandekö. Om affärsobjektet och motsvarande händelser sparas i samma databastransaktion är det garanterat att inga data går förlorade. Allt kommer att checkas in, annars återställs allt om det uppstår ett fel. För att så småningom publicera händelsen frågar en annan tjänst eller arbetsprocess utbox-tabellen efter ohanterade poster, publicerar händelserna och markerar dem som bearbetade. Det här mönstret säkerställer att händelser inte går förlorade när ett affärsobjekt har skapats eller ändrats.

Diagram som visar händelsehantering med mönstret Transaktionell utkorg och en relätjänst för publicering av händelser till meddelandekoordinatorn.

Ladda ned en Visio-fil med den här arkitekturen.

I en relationsdatabas är implementeringen av mönstret enkel. Om tjänsten till exempel använder Entity Framework Core använder den en Entity Framework-kontext för att skapa en databastransaktion, spara affärsobjektet och händelsen och checka in transaktionen eller göra en återställning. Dessutom är arbetstjänsten som bearbetar händelser lätt att implementera: den frågar regelbundet utbox-tabellen efter nya poster, publicerar nyligen infogade händelser i meddelandebussen och markerar slutligen dessa poster som bearbetade.

I praktiken är det inte så enkelt som det kan se ut först. Viktigast av allt är att du måste se till att ordningen på händelserna bevaras så att en OrderUpdated händelse inte publiceras före en OrderCreated händelse.

Implementering i Azure Cosmos DB

Det här avsnittet visar hur du implementerar mönstret Transaktionell utkorg i Azure Cosmos DB för att uppnå tillförlitliga meddelanden i ordning mellan olika tjänster med hjälp av Azure Cosmos DB-ändringsflödet och Service Bus. Den visar en exempeltjänst som hanterar Contact objekt (FirstName, , LastNameEmail, Company information och så vidare). Den använder CQRS-mönstret (Command and Query Responsibility Segregation) och följer grundläggande begrepp för domändriven design (DDD). Du hittar exempelkoden för implementeringen på GitHub.

Ett Contact objekt i exempeltjänsten har följande struktur:

{
    "name": {
        "firstName": "John",
        "lastName": "Doe"
    },
    "description": "This is a contact",
    "email": "johndoe@contoso.com",
    "company": {
        "companyName": "Contoso",
        "street": "Street",
        "houseNumber": "1a",
        "postalCode": "092821",
        "city": "Palo Alto",
        "country": "US"
    },
    "createdAt": "2021-09-22T11:07:37.3022907+02:00",
    "deleted": false
}

Så snart en Contact skapas eller uppdateras genererar den händelser som innehåller information om den aktuella ändringen. Domänhändelser kan bland annat vara:

  • ContactCreated. Utlöses när en kontakt läggs till.
  • ContactNameUpdated. Upphöjt när FirstName eller LastName ändras.
  • ContactEmailUpdated. Utlöses när e-postadressen uppdateras.
  • ContactCompanyUpdated. Utlöses när någon av företagets egenskaper ändras.

Transaktionsbatch

För att implementera det här mönstret måste du se Contact till att affärsobjektet och motsvarande händelser sparas i samma databastransaktion. I Azure Cosmos DB fungerar transaktioner annorlunda än i relationsdatabassystem. Azure Cosmos DB-transaktioner, som kallas transaktionsbatch, fungerar på en enda logisk partition, så de garanterar egenskaperna atomicitet, konsekvens, isolering och hållbarhet (ACID). Du kan inte spara två dokument i en transaktionell batchåtgärd i olika containrar eller logiska partitioner. För exempeltjänsten innebär det att både affärsobjektet och händelsen eller händelserna placeras i samma container och logiska partition.

Kontext, lagringsplatser och UnitOfWork

Kärnan i exempelimplementeringen är en containerkontext som håller reda på objekt som sparas i samma transaktionsbatch. Den upprätthåller en lista över skapade och ändrade objekt och fungerar på en enda Azure Cosmos DB-container. Gränssnittet för det ser ut så här:

public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

Listan i containerkontextkomponenten spårar Contact och DomainEvent objekt. Båda placeras i samma container. Det innebär att flera typer av objekt lagras i samma Azure Cosmos DB-container och använder en Type egenskap för att skilja mellan ett affärsobjekt och en händelse.

För varje typ finns det en dedikerad lagringsplats som definierar och implementerar dataåtkomsten. Lagringsplatsens Contact gränssnitt innehåller följande metoder:

public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

Lagringsplatsen Event ser liknande ut, förutom att det bara finns en metod som skapar nya händelser i arkivet:

public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

Implementeringarna av båda lagringsplatsens gränssnitt får en referens via beroendeinmatning till en enda IContainerContext instans för att säkerställa att båda fungerar i samma Azure Cosmos DB-kontext.

Den sista komponenten är UnitOfWork, som checkar in ändringarna i instansen IContainerContext till Azure Cosmos DB:

public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Händelsehantering: Skapa och publicera

Varje gång ett Contact objekt skapas, ändras eller (mjuk-) tas bort genererar tjänsten en motsvarande händelse. Kärnan i lösningen som tillhandahålls är en kombination av domändriven design (DDD) och medlarmönstret som föreslås av Jimmy Bogard. Han föreslår att du behåller en lista över händelser som har inträffat på grund av ändringar av domänobjektet och publicerar dessa händelser innan du sparar det faktiska objektet i databasen.

Listan över ändringar sparas i själva domänobjektet så att ingen annan komponent kan ändra händelsekedjan. Beteendet att underhålla händelser (IEvent instanser) i domänobjektet definieras via ett gränssnitt IEventEmitter<IEvent> och implementeras i en abstrakt DomainEntity klass:

public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
[...]
[...]
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
[...]
[...]
}

Objektet Contact genererar domänhändelser. Entiteten Contact följer grundläggande DDD-begrepp och konfigurerar domänegenskapernas setters som privata. Det finns inga offentliga setters i klassen. I stället erbjuder den metoder för att manipulera det interna tillståndet. I dessa metoder kan lämpliga händelser för en viss ändring (till exempel ContactNameUpdated eller ContactEmailUpdated) aktiveras.

Här är ett exempel på uppdateringar av namnet på en kontakt. (Händelsen utlöses i slutet av metoden.)

public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) ||
        string.IsNullOrWhiteSpace(lastName))
    {
        throw new ArgumentException("FirstName or LastName cannot be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.UtcNow;
}

Motsvarande ContactNameUpdatedEvent, som spårar ändringarna, ser ut så här:

public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

Hittills loggas händelser bara i domänobjektet och ingenting sparas i databasen eller publiceras till och med till en meddelandekö. Efter rekommendationen bearbetas listan över händelser precis innan affärsobjektet sparas i datalagret. I det här fallet sker det i SaveChangesAsync metoden för instansen IContainerContext , som implementeras i en privat RaiseDomainEvents metod. (dObjs är listan över spårade entiteter i containerkontexten.)

private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters.
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise events.
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters.SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

På den sista raden används MediatR-paketet , en implementering av medlarmönstret i C#, för att publicera en händelse i programmet. Det är möjligt eftersom alla händelser som ContactNameUpdatedEvent implementerar gränssnittet för INotification MediatR-paketet.

Dessa händelser måste bearbetas av en motsvarande hanterare. Här spelar implementeringen IEventsRepository in. Här är exemplet NameUpdated på händelsehanteraren:

public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification,
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

En IEventRepository instans matas in i hanteringsklassen via konstruktorn. Så snart en ContactNameUpdatedEvent publiceras i tjänsten Handle anropas metoden och använder händelselagringsplatsens instans för att skapa ett meddelandeobjekt. Det meddelandeobjektet infogas i sin tur i listan över spårade objekt i IContainerContext objektet och kopplar objekten som sparas i samma transaktionsbatch till Azure Cosmos DB.

Hittills vet containerkontexten vilka objekt som ska bearbetas. Om du så småningom vill spara de spårade objekten IContainerContext i Azure Cosmos DB skapar implementeringen den transaktionella batchen, lägger till alla relevanta objekt och kör åtgärden mot databasen. Den process som beskrivs hanteras i SaveInTransactionalBatchAsync metoden, som anropas av SaveChangesAsync metoden.

Här är de viktiga delarna i implementeringen som du behöver för att skapa och köra transaktionsbatchen:

private async Task<List<IDataObject<Entity>>> SaveInTransactionalBatchAsync(
    CancellationToken cancellationToken)
{
    if (DataObjects.Count > 0)
    {
        var pk = new PartitionKey(DataObjects[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        DataObjects.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions { IfMatchEtag = o.Etag };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[Check for return codes, etc.]
...
    }

    // Return copy of current list as result.
    var result = new List<IDataObject<Entity>>(DataObjects);

    // Work has been successfully done. Reset DataObjects list.
    DataObjects.Clear();
    return result;
}

Här är en översikt över hur processen fungerar hittills (för att uppdatera namnet på ett kontaktobjekt):

  1. En klient vill uppdatera namnet på en kontakt. Metoden SetName anropas på kontaktobjektet och egenskaperna uppdateras.
  2. Händelsen ContactNameUpdated läggs till i listan över händelser i domänobjektet.
  3. Kontaktlagringsplatsens metod anropas Update , vilket lägger till domänobjektet i containerkontexten. Objektet spåras nu.
  4. CommitAsync anropas på instansen UnitOfWork , som i sin tur anropar SaveChangesAsync containerkontexten.
  5. I SaveChangesAsyncpubliceras alla händelser i listan över domänobjektet av en MediatR instans och läggs till via händelselagringsplatsen i samma containerkontext.
  6. I SaveChangesAsyncskapas en TransactionalBatch . Den innehåller både kontaktobjektet och händelsen.
  7. Körningarna TransactionalBatch och data checkas in i Azure Cosmos DB.
  8. SaveChangesAsync och CommitAsync returneras.

Bevarande

Som du ser i föregående kodfragment omsluts alla objekt som sparats i Azure Cosmos DB i en DataObject instans. Det här objektet innehåller vanliga egenskaper:

  • ID.
  • PartitionKey.
  • Type.
  • State. Precis som Createdkommer Updated inte att sparas i Azure Cosmos DB.
  • Etag. För optimistisk låsning.
  • TTL. Time To Live-egenskapen för automatisk rensning av gamla dokument.
  • Data. Generiskt dataobjekt.

Dessa egenskaper definieras i ett allmänt gränssnitt som anropas IDataObject och används av lagringsplatserna och containerkontexten:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

Objekt som omsluts i en DataObject instans och sparas i databasen ser sedan ut som det här exemplet (Contact och ContactNameUpdatedEvent):

// Contact document/object. After creation.
{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "John",
            "lastName": "Doe"
        },
        "description": "This is a contact",
        "email": "johndoe@contoso.com",
        "company": {
            "companyName": "Contoso",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1632301657
}

// After setting a new name, this is how an event document looks.
{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Jane",
            "lastName": "Doe"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-22T11:37:37.3022907+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1632303457
}

Du kan se att Contact dokumenten och ContactNameUpdatedEvent (typen domainEvent) har samma partitionsnyckel och att båda dokumenten sparas i samma logiska partition.

Ändringsflödesbearbetning

För att läsa händelseströmmen och skicka dem till en meddelandekö använder tjänsten Azure Cosmos DB-ändringsflödet.

Ändringsflödet är en beständig logg över ändringar i containern. Den fungerar i bakgrunden och spårar ändringar. Inom en logisk partition garanteras ordningen för ändringarna. Det enklaste sättet att läsa ändringsflödet är att använda en Azure-funktion med en Azure Cosmos DB-utlösare. Ett annat alternativ är att använda ändringsflödesprocessorbiblioteket. Med den kan du integrera bearbetning av ändringsflöde i webb-API:et IHostedService som en bakgrundstjänst (via gränssnittet). Exemplet här använder ett enkelt konsolprogram som implementerar den abstrakta klassen BackgroundService som värd för långvariga bakgrundsuppgifter i .NET Core-program.

Om du vill ta emot ändringarna från Azure Cosmos DB-ändringsflödet måste du instansiera ett ChangeFeedProcessor objekt, registrera en hanteringsmetod för meddelandebearbetning och börja lyssna efter ändringar:

private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started. Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

En hanteringsmetod (HandleChangesAsync här) bearbetar sedan meddelandena. I det här exemplet publiceras händelser till ett Service Bus-ämne som är partitionerat för skalbarhet och där dedupliceringsfunktionen är aktiverad. Alla tjänster som är intresserade av ändringar Contact av objekt kan sedan prenumerera på det Service Bus-ämnet och ta emot och bearbeta ändringarna för sin egen kontext.

Service Bus-meddelandena som skapas har en SessionId egenskap. När du använder sessioner i Service Bus garanterar du att ordningen på meddelandena bevaras (först in, först ut (FIFO)). Det är nödvändigt att bevara ordningen för det här användningsfallet.

Här är kodfragmentet som hanterar meddelanden från ändringsflödet:

private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // Unknown document type.

        if (document.type == EVENT_TYPE) // domainEvent.
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey.
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition.
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition.
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Felhantering

Om det uppstår ett fel när ändringarna bearbetas startar ändringsflödesbiblioteket om läsningsmeddelanden på den plats där den bearbetades den senaste batchen. Om programmet till exempel har bearbetat 10 000 meddelanden, nu arbetar på batch 10 001 till 10 025, och ett fel inträffar, kan det starta om och hämta sitt arbete på plats 10 001. Biblioteket spårar automatiskt vad som har bearbetats via information som sparats i en Leases container i Azure Cosmos DB.

Det är möjligt att tjänsten redan har skickat några av de meddelanden som bearbetas på nytt till Service Bus. Normalt skulle det scenariot leda till duplicerad meddelandebearbetning. Som tidigare nämnts har Service Bus en funktion för duplicerad meddelandeidentifiering som du behöver aktivera för det här scenariot. Tjänsten kontrollerar om ett meddelande redan har lagts till i ett Service Bus-ämne (eller en kö) baserat på den programkontrollerade MessageId egenskapen för meddelandet. Den egenskapen är inställd på ID händelsedokumentets. Om samma meddelande skickas igen till Service Bus ignorerar och släpper tjänsten det.

Städning

I en typisk transaktionell utkorgsimplementering uppdaterar tjänsten hanterade händelser och anger en Processed egenskap till true, vilket indikerar att ett meddelande har publicerats. Det här beteendet kan implementeras manuellt i hanteringsmetoden. I det aktuella scenariot finns det inget behov av en sådan process. Azure Cosmos DB spårar händelser som bearbetats med hjälp av ändringsflödet (i kombination med containern Leases ).

Som ett sista steg måste du ibland ta bort händelserna från containern så att du bara behåller de senaste posterna/dokumenten. För att regelbundet göra en rensning tillämpar implementeringen en annan funktion i Azure Cosmos DB: Time To Live (TTL) på dokument. Azure Cosmos DB kan automatiskt ta bort dokument baserat på en TTL egenskap som kan läggas till i ett dokument: ett tidsintervall i sekunder. Tjänsten kontrollerar hela tiden containern efter dokument som har en TTL egenskap. Så snart ett dokument upphör att gälla tar Azure Cosmos DB bort det från databasen.

När alla komponenter fungerar som förväntat bearbetas och publiceras händelser snabbt: inom några sekunder. Om det uppstår ett fel i Azure Cosmos DB skickas inte händelser till meddelandebussen eftersom både affärsobjektet och motsvarande händelser inte kan sparas i databasen. Det enda du bör tänka på är att ange ett lämpligt TTL värde för DomainEvent dokumenten när bakgrundsarbetaren (ändringsflödesprocessorn) eller servicebussen inte är tillgänglig. I en produktionsmiljö är det bäst att välja ett tidsintervall på flera dagar. Till exempel 10 dagar. Alla inblandade komponenter har sedan tillräckligt med tid för att bearbeta/publicera ändringar i programmet.

Sammanfattning

Mönstret Transaktionell utkorg löser problemet med att publicera domänhändelser på ett tillförlitligt sätt i distribuerade system. Genom att checka in affärsobjektets tillstånd och dess händelser i samma transaktionsbatch och använda en bakgrundsprocessor som ett meddelanderelä ser du till att andra tjänster, interna eller externa, så småningom får den information de är beroende av. Det här exemplet är inte en traditionell implementering av mönstret Transaktionell utkorg. Den använder funktioner som Azure Cosmos DB-ändringsflödet och Time To Live som håller saker enkla och rena.

Här är en sammanfattning av de Azure-komponenter som används i det här scenariot:

Diagram som visar Azure-komponenterna för att implementera transaktionell utkorg med Azure Cosmos DB och Azure Service Bus.

Ladda ned en Visio-fil med den här arkitekturen.

Fördelarna med den här lösningen är:

  • Tillförlitliga meddelanden och garanterad leverans av händelser.
  • Bevarad ordning på händelser och meddelandeduplicering via Service Bus.
  • Du behöver inte underhålla en extra Processed egenskap som indikerar att ett händelsedokument bearbetas.
  • Borttagning av händelser från Azure Cosmos DB via time to live (TTL). Processen förbrukar inte enheter för begäranden som behövs för hantering av användar-/programbegäranden. I stället används enheter för "överblivna" begäranden i en bakgrundsaktivitet.
  • Felsäker bearbetning av meddelanden via ChangeFeedProcessor (eller en Azure-funktion).
  • Valfritt: Flera ändringsflödesprocessorer som var och en behåller sin egen pekare i ändringsflödet.

Att tänka på

Exempelprogrammet som beskrivs i den här artikeln visar hur du kan implementera mönstret Transaktionell utkorg i Azure med Azure Cosmos DB och Service Bus. Det finns också andra metoder som använder NoSQL-databaser. För att garantera att affärsobjektet och händelserna sparas på ett tillförlitligt sätt i databasen kan du bädda in listan över händelser i dokumentet för affärsobjektet. Nackdelen med den här metoden är att rensningsprocessen måste uppdatera varje dokument som innehåller händelser. Det är inte idealiskt, särskilt när det gäller kostnaden för begärandeenhet, jämfört med att använda TTL.

Tänk på att du inte bör överväga exempelkoden som anges här produktionsklar kod. Den har vissa begränsningar när det gäller multitrådning, särskilt hur händelser hanteras i DomainEntity klassen och hur objekt spåras i CosmosContainerContext implementeringarna. Använd den som utgångspunkt för dina egna implementeringar. Du kan också överväga att använda befintliga bibliotek som redan har den här funktionen inbyggd i dem som NServiceBus eller MassTransit.

Distribuera det här scenariot

Du hittar källkoden, distributionsfilerna och instruktionerna för att testa det här scenariot på GitHub: https://github.com/mspnp/transactional-outbox-pattern.

Deltagare

Den här artikeln underhålls av Microsoft. Det har ursprungligen skrivits av följande medarbetare.

Huvudförfattare:

Om du vill se icke-offentliga LinkedIn-profiler loggar du in på LinkedIn.

Nästa steg

Läs de här artiklarna om du vill veta mer: