Lesen von Daten in Azure Cosmos DB for Apache Cassandra-Tabellen mithilfe von Spark
GILT FÜR: Cassandra
In diesem Artikel wird beschrieben, wie Sie in Spark in Azure Cosmos DB for Apache Cassandra gespeicherte Daten lesen.
API für Cassandra-Konfiguration
Legen Sie in Ihrem Notebookcluster die folgende Spark-Konfiguration fest. Dieser Schritt muss nur einmal ausgeführt werden.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Hinweis
Wenn Sie Spark 3 verwenden, müssen Sie die Hilfs- und Verbindungsfactory von Azure Cosmos DB nicht installieren. Sie sollten auch remoteConnectionsPerExecutor
anstelle von connections_per_executor_max
für den Spark 3-Connector verwenden (siehe oben).
Warnung
Die in diesem Artikel gezeigten Spark 3-Beispiele wurden mit Spark Version 3.2.1 und dem entsprechenden Cassandra Spark-Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 getestet. Höhere Versionen von Spark und/oder dem Cassandra-Connector funktionieren möglicherweise nicht wie erwartet.
Datenrahmen-API
Lesen einer Tabelle mit dem Befehl „session.read.format“
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
val readBooksDF = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
readBooksDF.explain
readBooksDF.show
Lesen einer Tabelle mithilfe von spark.read.cassandraFormat
val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()
Lesen von bestimmten Spalten in einer Tabelle
val readBooksDF = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_author", "book_pub_year")
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
Anwenden von Filtern
Sie können Prädikate per Pushdown in die Datenbank verlagern, um Spark-Abfragen mit besserer Optimierung zu ermöglichen. Ein Prädikat ist eine Bedingung für eine Abfrage, für die „true“ oder „false“ zurückgegeben wird. Normalerweise ist sie in der WHERE-Klausel enthalten. Bei einem Pushdown eines Prädikats werden die Daten in der Datenbankabfrage gefiltert, um die Anzahl von aus der Datenbank abgerufenen Einträgen zu reduzieren und die Abfrageleistung zu verbessern. Standardmäßig werden gültige WHERE-Klauseln von der Spark-Dataset-API automatisch per Pushdown in die Datenbank verlagert.
val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain
readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show
Der Abschnitt Cassandra Filters
des physischen Plans enthält den Pushdownfilter.
RDD-API
Lesen einer Tabelle
val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)
Lesen von bestimmten Spalten in einer Tabelle
val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)
SQL-Ansichten
Erstellen einer temporären Ansicht eines Datenframes
spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load.createOrReplaceTempView("books_vw")
Ausführen von Abfragen mit der Ansicht
select * from books_vw where book_pub_year > 1891
Nächste Schritte
Im Folgenden finden Sie weitere Artikel zur Arbeit mit Azure Cosmos DB for Apache Cassandra in Spark: