Opérations DDL dans Azure Cosmos DB pour Apache Cassandra à partir de Spark
S’APPLIQUE À : Cassandra
Cet article décrit en détail les opérations DDL d’espace de clés et de table dans Azure Cosmos DB for Apache Cassandra à partir de Spark.
Contexte Spark
Le connecteur pour l’API pour Cassandra nécessite que les informations de la connexion Cassandra soient initialisées dans le cadre du contexte spark. Quand vous lancez un notebook, le contexte Spark est déjà initialisé. Il n’est pas recommandé de l’arrêter et de le réinitialiser. Une solution consiste à ajouter l’instance de l’API pour Cassandra à un niveau de cluster, dans la configuration du cluster spark. Cette opération s’effectue une seule fois par cluster. Ajoutez à la configuration Spark le code suivant, où chaque paire clé-valeur utilise un espace en guise de séparation :
spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
API pour la configuration liée à Cassandra
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
Notes
Si vous utilisez Spark 3.x, il n’est pas nécessaire d’installer l’assistance Cosmos DB ni la fabrique de connexion. Par ailleurs, utilisez remoteConnectionsPerExecutor
plutôt que connections_per_executor_max
pour le connecteur Spark 3 (cf. ci-dessus).
Avertissement
Les exemples Spark 3 présentés dans cet article ont été testés avec la version 3.2.1 de Spark et le connecteur Spark Cassandra correspondant, com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1. Les versions ultérieures de Spark et/ou du connecteur Cassandra peuvent ne pas fonctionner comme prévu.
Opérations DDL d’espace de clés
Créer un espace de clés
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
Valider dans cqlsh
Exécutez la commande suivante dans cqlsh ; vous devriez voir l’espace de clés créé précédemment.
DESCRIBE keyspaces;
Supprimer un espace de clés
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
Valider dans cqlsh
DESCRIBE keyspaces;
Opérations DDL de table
Considérations :
- Le débit peut être assigné au niveau de la table à l’aide de l’instruction create table.
- Une clé de partition peut stocker 20 Go de données.
- Un enregistrement peut stocker un maximum de 2 Mo de données.
- Une plage de clés de partition peut stocker plusieurs clés de partition.
Créer une table
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
Valider dans cqlsh
Exécutez la commande suivante dans cqlsh ; vous devriez voir la table nommée «books» :
USE books_ks;
DESCRIBE books;
Les valeurs de durée de vie par défaut et de débit provisionnées ne figurent pas dans la sortie de la commande précédente ; vous pouvez les obtenir à partir du portail.
Modifier une table
Vous pouvez modifier les valeurs suivantes à l’aide de la commande alter table :
- Débit provisionné
- Durée de vie
Les modifications de colonne ne sont pas prises en charge.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
Supprimer une table
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
Valider dans cqlsh
Exécutez la commande suivante dans cqlsh ; vous devriez voir que la table « books » n’est plus disponible :
USE books_ks;
DESCRIBE tables;
Étapes suivantes
Après avoir créé l’espace de clés et la table, passez aux articles suivants, qui traitent, entre autres, des opérations CRUD :