Flux de modification dans Azure Cosmos DB for Apache Cassandra

S’APPLIQUE À : Cassandra

La prise en charge du flux de modification dans Azure Cosmos DB for Apache Cassandra est disponible par le biais des prédicats de requête en CQL (Cassandra Query Language). À l’aide de ces conditions de prédicat, vous pouvez interroger l’API de flux de modification. Les applications peuvent obtenir les modifications apportées à une table à l’aide de la clé primaire (également appelée clé de partition) comme le nécessite le langage CQL. Vous pouvez ensuite effectuer d’autres actions en fonction des résultats. Les modifications apportées aux lignes du tableau sont capturées dans l'ordre de leur modification et dans l'ordre de tri par clé de partition.

L’exemple suivant montre comment obtenir un flux de modification sur toutes les lignes d’une table d’espace de clés d’API pour Cassandra à l’aide de .NET. Le prédicat COSMOS_CHANGEFEED_START_TIME() est utilisé directement dans le langage CQL pour interroger les éléments du flux de modification à partir d’une heure de début spécifiée (dans ce cas, la date et l’heure actuelles). Vous pouvez télécharger l’exemple complet, pour C# ici et pour Java ici.

Dans chaque itération, la requête reprend au dernier point auquel les modifications ont été lues, à l’aide de l’état de pagination. Nous pouvons voir un flux continu de nouvelles modifications apportées à la table dans l’espace de clés. Nous allons voir les modifications apportées aux lignes insérées ou mises à jour. La surveillance des opérations de suppression à l’aide d’un flux de modification dans l’API pour Cassandra n’est pas prise en charge actuellement.

Notes

Si vous réutilisez un jeton après la suppression d’une collection, puis le recréez avec le même nom, cela entraînera une erreur. Nous vous conseillons de définir pageState sur Null lors de la création d’une collection et de la réutilisation du nom de la collection.

    Session cassandraSession = utils.getSession();

    try {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");  
        LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);  
        String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='" 
            + dtf.format(now)+ "'";
        
        byte[] token=null; 
        System.out.println(query); 
        while(true)
        {
            SimpleStatement st=new  SimpleStatement(query);
            st.setFetchSize(100);
            if(token!=null)
                st.setPagingStateUnsafe(token);
            
            ResultSet result=cassandraSession.execute(st) ;
            token=result.getExecutionInfo().getPagingState().toBytes();
            
            for(Row row:result)
            {
                System.out.println(row.getString("user_name"));
            }
        }
    } finally {
        utils.close();
        LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
    }

Pour obtenir les modifications apportées à une seule ligne par clé primaire, vous pouvez ajouter la clé primaire dans la requête. L’exemple suivant montre comment effectuer le suivi des modifications pour la ligne où « user_id = 1 ».

    String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='" 
                       + dtf.format(now)+ "'";
    SimpleStatement st=new  SimpleStatement(query);

Limites actuelles

Les limitations suivantes s’appliquent quand un flux de modification est utilisé avec l’API pour Cassandra :

  • Les insertions et les mises à jour sont actuellement prises en charge. L’opération de suppression n’est pas encore prise en charge. Pour contourner ce problème, vous pouvez ajouter un marqueur réversible sur les lignes en cours de suppression. Par exemple, ajoutez un champ dans la ligne nommée « deleted » et affectez-lui la valeur « true ».
  • La dernière mise à jour est conservée comme dans l’API pour NoSQL de base et les mises à jour intermédiaires de l’entité ne sont pas disponibles.

Gestion des erreurs

Les codes d’erreur et messages suivants sont pris en charge quand un flux de modification est utilisé dans l’API pour Cassandra :

  • Code d’erreur HTTP 429 : quand la vitesse du flux de modification est limitée, une page vide est retournée.

Étapes suivantes