Lesen von Daten in Azure Cosmos DB for Apache Cassandra-Tabellen mithilfe von Spark

GILT FÜR: Cassandra

In diesem Artikel wird beschrieben, wie Sie in Spark in Azure Cosmos DB for Apache Cassandra gespeicherte Daten lesen.

API für Cassandra-Konfiguration

Legen Sie in Ihrem Notebookcluster die folgende Spark-Konfiguration fest. Dieser Schritt muss nur einmal ausgeführt werden.

//Connection-related
 spark.cassandra.connection.host  YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com  
 spark.cassandra.connection.port  10350  
 spark.cassandra.connection.ssl.enabled  true  
 spark.cassandra.auth.username  YOUR_ACCOUNT_NAME  
 spark.cassandra.auth.password  YOUR_ACCOUNT_KEY  
// if using Spark 2.x
// spark.cassandra.connection.factory  com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory  

//Throughput-related...adjust as needed
 spark.cassandra.output.batch.size.rows  1  
// spark.cassandra.connection.connections_per_executor_max  10   // Spark 2.x
 spark.cassandra.connection.remoteConnectionsPerExecutor  10   // Spark 3.x
 spark.cassandra.output.concurrent.writes  1000  
 spark.cassandra.concurrent.reads  512  
 spark.cassandra.output.batch.grouping.buffer.size  1000  
 spark.cassandra.connection.keep_alive_ms  600000000  

Hinweis

Wenn Sie Spark 3 verwenden, müssen Sie die Hilfs- und Verbindungsfactory von Azure Cosmos DB nicht installieren. Sie sollten auch remoteConnectionsPerExecutor anstelle von connections_per_executor_max für den Spark 3-Connector verwenden (siehe oben).

Warnung

Die in diesem Artikel gezeigten Spark 3-Beispiele wurden mit Spark Version 3.2.1 und dem entsprechenden Cassandra Spark-Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 getestet. Höhere Versionen von Spark und/oder dem Cassandra-Connector funktionieren möglicherweise nicht wie erwartet.

Datenrahmen-API

Lesen einer Tabelle mit dem Befehl „session.read.format“

import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra

val readBooksDF = sqlContext
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load

readBooksDF.explain
readBooksDF.show

Lesen einer Tabelle mithilfe von spark.read.cassandraFormat

val readBooksDF = spark.read.cassandraFormat("books", "books_ks", "").load()

Lesen von bestimmten Spalten in einer Tabelle

val readBooksDF = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load
  .select("book_name","book_author", "book_pub_year")

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

Anwenden von Filtern

Sie können Prädikate per Pushdown in die Datenbank verlagern, um Spark-Abfragen mit besserer Optimierung zu ermöglichen. Ein Prädikat ist eine Bedingung für eine Abfrage, für die „true“ oder „false“ zurückgegeben wird. Normalerweise ist sie in der WHERE-Klausel enthalten. Bei einem Pushdown eines Prädikats werden die Daten in der Datenbankabfrage gefiltert, um die Anzahl von aus der Datenbank abgerufenen Einträgen zu reduzieren und die Abfrageleistung zu verbessern. Standardmäßig werden gültige WHERE-Klauseln von der Spark-Dataset-API automatisch per Pushdown in die Datenbank verlagert.

val df = spark.read.cassandraFormat("books", "books_ks").load
df.explain
val dfWithPushdown = df.filter(df("book_pub_year") > 1891)
dfWithPushdown.explain

readBooksDF.printSchema
readBooksDF.explain
readBooksDF.show

Der Abschnitt Cassandra Filters des physischen Plans enthält den Pushdownfilter.

Partitionen

RDD-API

Lesen einer Tabelle

val bookRDD = sc.cassandraTable("books_ks", "books")
bookRDD.take(5).foreach(println)

Lesen von bestimmten Spalten in einer Tabelle

val booksRDD = sc.cassandraTable("books_ks", "books").select("book_id","book_name").cache
booksRDD.take(5).foreach(println)

SQL-Ansichten

Erstellen einer temporären Ansicht eines Datenframes

spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "books", "keyspace" -> "books_ks"))
  .load.createOrReplaceTempView("books_vw")

Ausführen von Abfragen mit der Ansicht

select * from books_vw where book_pub_year > 1891

Nächste Schritte

Im Folgenden finden Sie weitere Artikel zur Arbeit mit Azure Cosmos DB for Apache Cassandra in Spark: