Administración de directivas de resolución de conflictos en Azure Cosmos DB

SE APLICA A: NoSQL

Con las operaciones de escritura en varias regiones, cuando varios clientes escriben en el mismo elemento podrían producirse conflictos. Cuando se produce un conflicto de datos, se puede resolver mediante el uso de distintas directivas de resolución de conflictos. En este artículo se describe cómo administrar directivas de resolución de conflictos.

Sugerencia

La directiva de resolución de conflictos solo se puede especificar en el momento de la creación del contenedor, y no se puede modificar una vez creado.

Creación de una directiva de resolución de conflictos del tipo "el último en escribir gana"

En estos ejemplos se muestra cómo configurar un contenedor con una directiva de resolución de conflictos del tipo "el último en escribir gana". La ruta de acceso predeterminada para modo "el último en escribir gana" es el campo de marca de tiempo o la propiedad _ts. En la API para NoSQL, también se puede establecer en una ruta de acceso definida por el usuario con un tipo numérico. En un conflicto, gana el valor más alto. Si no se establece la ruta de acceso o la que se establece no es válida, se usará de forma predeterminada _ts. Los conflictos resueltos con esta directiva no se muestran en la fuente de conflictos. Todas las API pueden usar esta directiva.

.NET SDK

DocumentCollection lwwCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
  UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
  {
      Id = this.lwwCollectionName,
      ConflictResolutionPolicy = new ConflictResolutionPolicy
      {
          Mode = ConflictResolutionMode.LastWriterWins,
          ConflictResolutionPath = "/myCustomId",
      },
  });

SDK de Java V4

SDK para Java V4 (Maven com.azure::azure-cosmos) API asincrónica


ConflictResolutionPolicy policy = ConflictResolutionPolicy.createLastWriterWinsPolicy("/myCustomId");

CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();

SDK de Java V2

SDK de Java v2 asincrónico (Maven com.microsoft.azure::azure-cosmosdb)

DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createLastWriterWinsPolicy("/myCustomId");
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();

SDK para Node.js/JavaScript/TypeScript

const database = client.database(this.databaseName);
const { container: lwwContainer } = await database.containers.createIfNotExists(
  {
    id: this.lwwContainerName,
    conflictResolutionPolicy: {
      mode: "LastWriterWins",
      conflictResolutionPath: "/myCustomId"
    }
  }
);

SDK de Python

database = client.get_database_client(database=database_id)
lww_conflict_resolution_policy = {'mode': 'LastWriterWins', 'conflictResolutionPath': '/regionId'}
lww_container = database.create_container(id=lww_container_id, partition_key=PartitionKey(path="/id"), 
    conflict_resolution_policy=lww_conflict_resolution_policy)

Creación de una directiva de resolución de conflictos personalizada con un procedimiento almacenado

En estos ejemplos se muestran cómo configurar un contenedor con una directiva de resolución de conflictos personalizada. Esta directiva usa la lógica de un procedimiento almacenado para resolver el conflicto. Si se designa un procedimiento almacenado para resolver conflictos, los conflictos no se mostrarán en la fuente de conflictos a menos que haya un error en el procedimiento almacenado designado.

Una vez que se crea una directiva en el contenedor, es necesario crear el procedimiento almacenado. La siguiente instancia de SDK de .NET muestra un ejemplo de este flujo de trabajo. Esta directiva solo se admite en la API para NoSQL.

Ejemplo de un procedimiento almacenado de resolución de conflictos personalizada

Los procedimientos almacenados de resolución de conflictos personalizada deben implementarse mediante la signatura de función que se muestra a continuación. No es necesario que el nombre de la función coincida con el nombre usado al registrar el procedimiento almacenado con el contenedor, pero simplifica la nomenclatura. A continuación, se muestra una descripción de los parámetros que se deben implementar para este procedimiento almacenado.

  • incomingItem: el elemento insertado o actualizado en la confirmación que está generando los conflictos. Es nulo para las operaciones de eliminación.
  • existingItem: el elemento confirmado actualmente. Este valor no es NULL en una actualización ye s NULL en una inserción o eliminaciones.
  • isTombstone: valor booleano que indica si incomingItem está en conflicto con un elemento eliminado anteriormente. Cuando es verdadero, existingItem también es nulo.
  • conflictingItems: Matriz de la versión confirmada de todos los elementos del contenedor que están en conflicto con incomingItem en el identificador o cualquier otra propiedad de índice único.

Importante

Al igual que con cualquier procedimiento almacenado, un procedimiento de resolución de conflictos personalizado puede acceder a los datos con la misma clave de partición y puede realizar cualquier operación de inserción, actualización o eliminación para resolver los conflictos.

Este procedimiento almacenado de ejemplo resuelve los conflictos mediante la selección del valor más bajo de la ruta de acceso /myCustomId.

function resolver(incomingItem, existingItem, isTombstone, conflictingItems) {
  var collection = getContext().getCollection();

  if (!incomingItem) {
      if (existingItem) {

          collection.deleteDocument(existingItem._self, {}, function (err, responseOptions) {
              if (err) throw err;
          });
      }
  } else if (isTombstone) {
      // delete always wins.
  } else {
      if (existingItem) {
          if (incomingItem.myCustomId > existingItem.myCustomId) {
              return; // existing item wins
          }
      }

      var i;
      for (i = 0; i < conflictingItems.length; i++) {
          if (incomingItem.myCustomId > conflictingItems[i].myCustomId) {
              return; // existing conflict item wins
          }
      }

      // incoming item wins - clear conflicts and replace existing with incoming.
      tryDelete(conflictingItems, incomingItem, existingItem);
  }

  function tryDelete(documents, incoming, existing) {
      if (documents.length > 0) {
          collection.deleteDocument(documents[0]._self, {}, function (err, responseOptions) {
              if (err) throw err;

              documents.shift();
              tryDelete(documents, incoming, existing);
          });
      } else if (existing) {
          collection.replaceDocument(existing._self, incoming,
              function (err, documentCreated) {
                  if (err) throw err;
              });
      } else {
          collection.createDocument(collection.getSelfLink(), incoming,
              function (err, documentCreated) {
                  if (err) throw err;
              });
      }
  }
}

.NET SDK

DocumentCollection udpCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
  UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
  {
      Id = this.udpCollectionName,
      ConflictResolutionPolicy = new ConflictResolutionPolicy
      {
          Mode = ConflictResolutionMode.Custom,
          ConflictResolutionProcedure = string.Format("dbs/{0}/colls/{1}/sprocs/{2}", this.databaseName, this.udpCollectionName, "resolver"),
      },
  });

//Create the stored procedure
await clients[0].CreateStoredProcedureAsync(
UriFactory.CreateStoredProcedureUri(this.databaseName, this.udpCollectionName, "resolver"), new StoredProcedure
{
    Id = "resolver",
    Body = File.ReadAllText(@"resolver.js")
});

SDK de Java V4

SDK para Java V4 (Maven com.azure::azure-cosmos) API asincrónica


ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy("resolver");

CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();

SDK de Java V2

SDK de Java v2 asincrónico (Maven com.microsoft.azure::azure-cosmosdb)

DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy("resolver");
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();

Una vez creado el contenedor, debe crear el procedimiento almacenado resolver.

SDK para Node.js/JavaScript/TypeScript

const database = client.database(this.databaseName);
const { container: udpContainer } = await database.containers.createIfNotExists(
  {
    id: this.udpContainerName,
    conflictResolutionPolicy: {
      mode: "Custom",
      conflictResolutionProcedure: `dbs/${this.databaseName}/colls/${
        this.udpContainerName
      }/sprocs/resolver`
    }
  }
);

Una vez creado el contenedor, debe crear el procedimiento almacenado resolver.

SDK de Python

database = client.get_database_client(database=database_id)
udp_custom_resolution_policy = {'mode': 'Custom' }
udp_container = database.create_container(id=udp_container_id, partition_key=PartitionKey(path="/id"),
    conflict_resolution_policy=udp_custom_resolution_policy)

Una vez creado el contenedor, debe crear el procedimiento almacenado resolver.

Creación de una directiva de resolución de conflictos personalizada

En estos ejemplos se muestran cómo configurar un contenedor con una directiva de resolución de conflictos personalizada. Con esta implementación, cada conflicto se mostrará en la fuente de conflictos. Es el usuario el que debe controlar los conflictos de forma individual desde la fuente de conflictos.

.NET SDK

DocumentCollection manualCollection = await createClient.CreateDocumentCollectionIfNotExistsAsync(
  UriFactory.CreateDatabaseUri(this.databaseName), new DocumentCollection
  {
      Id = this.manualCollectionName,
      ConflictResolutionPolicy = new ConflictResolutionPolicy
      {
          Mode = ConflictResolutionMode.Custom,
      },
  });

SDK de Java V4

SDK para Java V4 (Maven com.azure::azure-cosmos) API asincrónica


ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy();

CosmosContainerProperties containerProperties = new CosmosContainerProperties(container_id, partition_key);
containerProperties.setConflictResolutionPolicy(policy);
/* ...other container config... */
database.createContainerIfNotExists(containerProperties).block();

SDK de Java V2

SDK de Java v2 asincrónico (Maven com.microsoft.azure::azure-cosmosdb)

DocumentCollection collection = new DocumentCollection();
collection.setId(id);
ConflictResolutionPolicy policy = ConflictResolutionPolicy.createCustomPolicy();
collection.setConflictResolutionPolicy(policy);
DocumentCollection createdCollection = client.createCollection(databaseUri, collection, null).toBlocking().value();

SDK para Node.js/JavaScript/TypeScript

const database = client.database(this.databaseName);
const {
  container: manualContainer
} = await database.containers.createIfNotExists({
  id: this.manualContainerName,
  conflictResolutionPolicy: {
    mode: "Custom"
  }
});

SDK de Python

database = client.get_database_client(database=database_id)
manual_resolution_policy = {'mode': 'Custom'}
manual_container = database.create_container(id=manual_container_id, partition_key=PartitionKey(path="/id"), 
    conflict_resolution_policy=manual_resolution_policy)

Lectura desde la fuente de conflictos

En estos ejemplos se muestra cómo leer desde la fuente de conflictos de un contenedor. Los conflictos pueden aparecer en la fuente de conflictos debido a un par de razones:

  • El conflicto no se resolvió automáticamente
  • El conflicto produjo un error con el procedimiento almacenado designado
  • La directiva de resolución de conflictos se establece en personalizada y no designa un procedimiento almacenado para controlar conflictos

.NET SDK

FeedResponse<Conflict> conflicts = await delClient.ReadConflictFeedAsync(this.collectionUri);

SDK de Java

SDK de Java V4 (Maven com.azure::azure-cosmos)

int requestPageSize = 3;
CosmosQueryRequestOptions options = new CosmosQueryRequestOptions();

CosmosPagedFlux<CosmosConflictProperties> conflictReadFeedFlux = container.readAllConflicts(options);

conflictReadFeedFlux.byPage(requestPageSize).toIterable().forEach(page -> {

    int expectedNumberOfConflicts = 0;
    int numberOfResults = 0;
    Iterator<CosmosConflictProperties> pageIt = page.getElements().iterator();

    while (pageIt.hasNext()) {
        CosmosConflictProperties conflictProperties = pageIt.next();

        // Read the conflict and committed item
        CosmosAsyncConflict conflict = container.getConflict(conflictProperties.getId());
        CosmosConflictResponse response = conflict.read(new CosmosConflictRequestOptions()).block();

        // response.
    }
});

SDK para Node.js/JavaScript/TypeScript

const container = client
  .database(this.databaseName)
  .container(this.lwwContainerName);

const { result: conflicts } = await container.conflicts.readAll().toArray();

Python

conflicts_iterator = iter(container.list_conflicts())
conflict = next(conflicts_iterator, None)
while conflict:
    # Do something with conflict
    conflict = next(conflicts_iterator, None)

Pasos siguientes

Obtenga información acerca de los siguientes conceptos de Azure Cosmos DB: