Interakcja z usługą Azure Cosmos DB przy użyciu platformy Apache Spark 2 w usłudze Azure Synapse Link

Uwaga

W przypadku usługi Azure Synapse Link dla usługi Azure Cosmos DB przy użyciu platformy Spark 3 zapoznaj się z tym artykułem Usługa Azure Synapse Link dla usługi Azure Cosmos DB na platformie Spark 3

W tym artykule dowiesz się, jak korzystać z usługi Azure Cosmos DB przy użyciu usługi Synapse Apache Spark 2. Dzięki pełnej obsłudze języków Scala, Python, SparkSQL i C#usługa Synapse Apache Spark jest centralna w scenariuszach analizy, inżynierii danych, nauki o danych i eksploracji danych w usłudze Azure Synapse Link dla usługi Azure Cosmos DB.

Podczas interakcji z usługą Azure Cosmos DB obsługiwane są następujące możliwości:

  • Usługa Synapse Apache Spark umożliwia analizowanie danych w kontenerach usługi Azure Cosmos DB, które są włączone za pomocą usługi Azure Synapse Link w czasie niemal rzeczywistym bez wpływu na wydajność obciążeń transakcyjnych. Następujące dwie opcje są dostępne do wykonywania zapytań względem magazynu analitycznego usługi Azure Cosmos DB z platformy Spark:
    • Ładowanie do ramki danych platformy Spark
    • Tworzenie tabeli platformy Spark
  • Usługa Synapse Apache Spark umożliwia również pozyskiwanie danych do usługi Azure Cosmos DB. Należy pamiętać, że dane są zawsze pozyskiwane do kontenerów usługi Azure Cosmos DB za pośrednictwem magazynu transakcyjnego. Po włączeniu usługi Synapse Link wszystkie nowe wstawki, aktualizacje i usunięcia są automatycznie synchronizowane z magazynem analitycznym.
  • Usługa Synapse Apache Spark obsługuje również przesyłanie strumieniowe ze strukturą platformy Spark z usługą Azure Cosmos DB jako źródłem, a także ujściem.

W poniższych sekcjach przedstawiono składnię powyższych możliwości. Możesz również wyewidencjonować moduł Learn dotyczący wykonywania zapytań dotyczących usługi Azure Cosmos DB przy użyciu platformy Apache Spark dla usługi Azure Synapse Analytics. Gesty w obszarze roboczym usługi Azure Synapse Analytics zostały zaprojektowane w celu zapewnienia łatwego gotowego środowiska do rozpoczęcia pracy. Gesty są widoczne po kliknięciu prawym przyciskiem myszy kontenera usługi Azure Cosmos DB na karcie Dane obszaru roboczego usługi Synapse. Za pomocą gestów można szybko wygenerować kod i dostosować go do własnych potrzeb. Gesty są również idealne do odnajdywania danych jednym kliknięciem.

Ważne

Należy pamiętać o pewnych ograniczeniach w schemacie analitycznym, które mogą prowadzić do nieoczekiwanego zachowania operacji ładowania danych. Na przykład tylko pierwsze 1000 właściwości ze schematu transakcyjnego są dostępne w schemacie analitycznym, właściwości z spacjami są niedostępne itp. Jeśli występują nieoczekiwane wyniki, sprawdź ograniczenia schematu magazynu analitycznego, aby uzyskać więcej szczegółów.

Wykonywanie zapytań względem magazynu analitycznego usługi Azure Cosmos DB

Przed zapoznaniem się z dwiema możliwymi opcjami wykonywania zapytań dotyczących magazynu analitycznego usługi Azure Cosmos DB, ładowanie do ramki danych platformy Spark i tworzenie tabeli Platformy Spark warto zapoznać się z różnicami w środowisku, aby wybrać opcję, która będzie odpowiednia dla Twoich potrzeb.

Różnica w środowisku polega na tym, czy zmiany danych bazowych w kontenerze usługi Azure Cosmos DB powinny zostać automatycznie odzwierciedlone w analizie wykonywanej na platformie Spark. Po zarejestrowaniu ramki danych platformy Spark lub utworzeniu tabeli Spark względem magazynu analitycznego kontenera metadane wokół bieżącej migawki danych w magazynie analitycznym są pobierane do platformy Spark w celu wydajnego wypychania kolejnych analiz. Należy pamiętać, że ponieważ platforma Spark jest zgodna z leniwymi zasadami oceny, chyba że akcja jest wywoływana na ramce danych Platformy Spark lub w zapytaniu SparkSQL jest wykonywana względem tabeli Spark, rzeczywiste dane nie są pobierane z magazynu analitycznego kontenera bazowego.

W przypadku ładowania do ramki danych Spark pobrane metadane są buforowane przez okres istnienia sesji Spark. Tym samym kolejne akcje wywoływane dla ramki danych są oceniane względem migawki magazynu analitycznego z czasu utworzenia ramki danych.

Z drugiej strony w przypadku utworzenia tabeli Spark metadane stanu magazynu analitycznego nie są buforowane na platformie Spark i są ponownie ładowane przy każdym wykonaniu zapytania SparkSQL względem tabeli Spark.

W związku z tym możesz wybrać między ładowaniem do ramki danych Spark i utworzeniem tabeli Spark w zależności od tego, czy analiza platformy Spark ma być oceniana względem, odpowiednio, stałej migawki magazynu analitycznego, czy najnowszej migawki magazynu analitycznego.

Jeśli zapytania analityczne często używały filtrów, możesz podzielić je na partycje na podstawie tych pól w celu uzyskania lepszej wydajności zapytań. Aby wyzwolić partycjonowanie w magazynie analitycznym, można okresowo wykonywać zadanie partycjonowania z poziomu notesu platformy Azure Synapse Spark. Ten podzielony na partycje magazyn wskazuje podstawowe konto magazynu usługi ADLS Gen2 połączone z obszarem roboczym usługi Azure Synapse. Aby dowiedzieć się więcej, zobacz wprowadzenie do partycjonowania niestandardowego i sposób konfigurowania niestandardowych artykułów dotyczących partycjonowania .

Uwaga

Aby wykonywać zapytania dotyczące kont usługi Azure Cosmos DB dla bazy danych MongoDB, dowiedz się więcej o pełnej wierności reprezentacji schematu w magazynie analitycznym i rozszerzonych nazwach właściwości, które mają być używane.

Uwaga

Pamiętaj, że w options poniższych poleceniach uwzględniana jest wielkość liter. Na przykład należy użyć Gateway funkcji while, aby gateway zwrócić błąd.

Ładowanie do ramki danych platformy Spark

W tym przykładzie utworzysz ramkę danych platformy Spark, która wskazuje magazyn analityczny usługi Azure Cosmos DB. Następnie możesz wykonać dodatkową analizę, wywołując akcje platformy Spark względem ramki danych. Ta operacja nie ma wpływu na magazyn transakcyjny.

Składnia w języku Python będzie następująca:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

Równoważna składnia w języku Scala byłaby następująca :

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Tworzenie tabeli platformy Spark

W tym przykładzie utworzysz tabelę Platformy Spark, która wskazuje magazyn analityczny usługi Azure Cosmos DB. Następnie możesz przeprowadzić dodatkową analizę, wywołując zapytania SparkSQL względem tabeli. Ta operacja nie wpływa ani na magazyn transakcyjny, ani nie powoduje żadnego przenoszenia danych. Jeśli zdecydujesz się usunąć tę tabelę Spark, podstawowy kontener usługi Azure Cosmos DB i odpowiedni magazyn analityczny nie będą miały wpływu.

Ten scenariusz jest wygodny do ponownego użycia tabel platformy Spark za pomocą narzędzi innych firm i zapewnia dostępność bazowych danych w czasie wykonywania.

Składnia do utworzenia tabeli Spark jest następująca:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Uwaga

Jeśli masz scenariusze, w których schemat bazowego kontenera usługi Azure Cosmos DB zmienia się w czasie; a jeśli chcesz, aby zaktualizowany schemat automatycznie odzwierciedlał zapytania względem tabeli Spark, możesz to osiągnąć, ustawiając spark.cosmos.autoSchemaMerge opcję w true opcjach tabeli Spark.

Zapisywanie ramki danych Platformy Spark w kontenerze usługi Azure Cosmos DB

W tym przykładzie napiszesz ramkę danych platformy Spark do kontenera usługi Azure Cosmos DB. Ta operacja wpłynie na wydajność obciążeń transakcyjnych i będzie korzystać z jednostek żądań aprowizowania w kontenerze usługi Azure Cosmos DB lub udostępnionej bazie danych.

Składnia w języku Python będzie następująca:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()

Równoważna składnia w języku Scala byłaby następująca :

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>"). 
    option("spark.cosmos.write.upsertEnabled", "true").
    mode(SaveMode.Overwrite).
    save()

Ładowanie przesyłania strumieniowego ramki danych z kontenera

W tym gestzie użyjesz możliwości przesyłania strumieniowego platformy Spark, aby załadować dane z kontenera do ramki danych. Dane będą przechowywane na podstawowym koncie usługi Data Lake (i systemie plików) połączonym z obszarem roboczym.

Uwaga

Jeśli chcesz odwoływać się do bibliotek zewnętrznych w usłudze Synapse Apache Spark, dowiedz się więcej tutaj. Jeśli na przykład chcesz pozyskać ramkę danych Platformy Spark do kontenera usługi Azure Cosmos DB dla bazy danych MongoDB, możesz użyć łącznika bazy danych MongoDB dla platformy Spark.

Ładowanie przesyłania strumieniowego ramki danych z kontenera usługi Azure Cosmos DB

W tym przykładzie użyjesz funkcji przesyłania strumieniowego ze strukturą platformy Spark, aby załadować dane z kontenera usługi Azure Cosmos DB do przesyłania strumieniowego platformy Spark przy użyciu funkcji zestawienia zmian w usłudze Azure Cosmos DB. Dane punktu kontrolnego używane przez platformę Spark będą przechowywane na podstawowym koncie usługi Data Lake (i systemie plików), które zostało połączone z obszarem roboczym.

Jeśli folder /localReadCheckpointFolder nie zostanie utworzony (w poniższym przykładzie), zostanie utworzony automatycznie. Ta operacja wpłynie na wydajność obciążeń transakcyjnych i będzie korzystać z jednostek żądań aprowizowania w kontenerze usługi Azure Cosmos DB lub udostępnionej bazie danych.

Składnia w języku Python będzie następująca:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

Równoważna składnia w języku Scala byłaby następująca :

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.readEnabled", "true").
    option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
    option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
    option("spark.cosmos.changeFeed.queryName", "streamQuery").
    load()

Zapisywanie przesyłania strumieniowego ramki danych do kontenera usługi Azure Cosmos DB

W tym przykładzie napiszesz ramkę danych przesyłania strumieniowego do kontenera usługi Azure Cosmos DB. Ta operacja wpłynie na wydajność obciążeń transakcyjnych i będzie korzystać z jednostek żądań aprowizowania w kontenerze usługi Azure Cosmos DB lub udostępnionej bazie danych. Jeśli folder /localWriteCheckpointFolder nie zostanie utworzony (w poniższym przykładzie), zostanie utworzony automatycznie.

Składnia w języku Python będzie następująca:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

def writeBatchToCosmos(batchDF, batchId):
  batchDF.persist()
  print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()
  print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.unpersist()

streamQuery = dfStream\
        .writeStream\
        .foreachBatch(writeBatchToCosmos) \
        .option("checkpointLocation", "/localWriteCheckpointFolder")\
        .start()

streamQuery.awaitTermination()

Równoważna składnia w języku Scala byłaby następująca :

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

val query = dfStream.
            writeStream.
            foreachBatch { (batchDF: DataFrame, batchId: Long) =>
              batchDF.persist()
              batchDF.write.format("cosmos.oltp").
                option("spark.synapse.linkedService", "<enter linked service name>").
                option("spark.cosmos.container", "<enter container name>"). 
                option("spark.cosmos.write.upsertEnabled", "true").
                mode(SaveMode.Overwrite).
                save()
              println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
              batchDF.unpersist()
              ()
            }.        
            option("checkpointLocation", "/localWriteCheckpointFolder").
            start()

query.awaitTermination()

Następne kroki