Analizowanie złożonych typów danych w usłudze Azure Synapse Analytics

Ten artykuł dotyczy plików i kontenerów Parquet w usłudze Azure Synapse Link dla usługi Azure Cosmos DB. Za pomocą platformy Spark lub SQL można odczytywać lub przekształcać dane ze złożonymi schematami, takimi jak tablice lub struktury zagnieżdżone. Poniższy przykład został ukończony za pomocą pojedynczego dokumentu, ale można go łatwo skalować do miliardów dokumentów za pomocą platformy Spark lub języka SQL. Kod zawarty w tym artykule używa narzędzia PySpark (Python).

Przypadek użycia

Złożone typy danych są coraz częściej spotykane i stanowią wyzwanie dla inżynierów danych. Analizowanie zagnieżdżonych schematów i tablic może obejmować czasochłonne i złożone zapytania SQL. Ponadto może być trudna zmiana nazwy lub rzutowanie zagnieżdżonych kolumn typu danych. Ponadto podczas pracy z głęboko zagnieżdżonym obiektami można napotkać problemy z wydajnością.

Inżynierowie danych muszą zrozumieć, jak efektywnie przetwarzać złożone typy danych i ułatwić im dostęp do wszystkich użytkowników. W poniższym przykładzie używasz platformy Spark w usłudze Azure Synapse Analytics do odczytywania i przekształcania obiektów w płaską strukturę za pomocą ramek danych. Model SQL bezserwerowy w usłudze Azure Synapse Analytics służy do wykonywania zapytań dotyczących takich obiektów bezpośrednio i zwracania tych wyników jako regularnej tabeli.

Co to są tablice i struktury zagnieżdżone?

Poniższy obiekt pochodzi z usługi Application Insights. W tym obiekcie istnieją zagnieżdżone struktury i tablice, które zawierają zagnieżdżone struktury.

{
    "id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
    "context": {
        "data": {
            "eventTime": "2020-06-10T13:43:34.553Z",
            "samplingRate": "100.0",
            "isSynthetic": "false"
        },
        "session": {
            "isFirst": "false",
            "id": "38619c14-7a23-4687-8268-95862c5326b1"
        },
        "custom": {
            "dimensions": [
                {
                    "customerInfo": {
                        "ProfileType": "ExpertUser",
                        "RoomName": "",
                        "CustomerName": "diamond",
                        "UserName": "XXXX@yahoo.com"
                    }
                },
                {
                    "customerInfo": {
                        "ProfileType": "Novice",
                        "RoomName": "",
                        "CustomerName": "topaz",
                        "UserName": "XXXX@outlook.com"
                    }
                }
            ]
        }
    }
}

Przykład schematu tablic i zagnieżdżonych struktur

Podczas drukowania schematu ramki danych obiektu (o nazwie df) za pomocą polecenia df.printschemazobaczysz następującą reprezentację:

  • Żółty reprezentuje zagnieżdżone struktury.
  • Zielony reprezentuje tablicę z dwoma elementami.

Kod z żółtym i zielonym wyróżnieniem z widocznym źródłem schematu

_rid, _tsi _etag zostały dodane do systemu, ponieważ dokument został pozyskany do magazynu transakcyjnego usługi Azure Cosmos DB.

Poprzednia ramka danych liczy tylko 5 kolumn i 1 wiersz. Po przekształceniu ramka danych wyselekcjonowanych będzie zawierać 13 kolumn i 2 wiersze w formacie tabelarycznym.

Spłaszczane struktury i eksplodują tablice

Dzięki platformie Spark w usłudze Azure Synapse Analytics łatwo przekształcać zagnieżdżone struktury w kolumny i elementy tablicy w wiele wierszy. Aby wykonać implementację, wykonaj następujące kroki.

Schemat blokowy przedstawiający kroki przekształceń platformy Spark

Definiowanie funkcji w celu spłaszczenia zagnieżdżonego schematu

Możesz użyć tej funkcji bez zmiany. Utwórz komórkę w notesie PySpark z następującą funkcją:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

Używanie funkcji do spłaszczenia zagnieżdżonego schematu

W tym kroku spłaszczono zagnieżdżony schemat ramki danych (df) do nowej ramki danych (df_flat):

from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))

Funkcja wyświetlania powinna zwrócić 10 kolumn i 1 wiersz. Tablica i jej zagnieżdżone elementy są nadal dostępne.

Przekształcanie tablicy

W tym miejscu przekształcisz tablicę , context_custom_dimensionsw ramce df_flatdanych na nową ramkę df_flat_explodedanych . W poniższym kodzie zdefiniujesz również kolumnę do wybrania:

from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))

Funkcja wyświetlania powinna zwracać 10 kolumn i 2 wiersze. Następnym krokiem jest spłaszczenie zagnieżdżonych schematów z funkcją zdefiniowaną w kroku 1.

Używanie funkcji do spłaszczenia zagnieżdżonego schematu

Na koniec używasz funkcji do spłaszczenia zagnieżdżonego schematu ramki df_flat_explodedanych w nowej ramce danych: df_flat_explode_flat

df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))

Funkcja wyświetlania powinna zawierać 13 kolumn i 2 wiersze.

Funkcja printSchema ramki df_flat_explode_flat danych zwraca następujący wynik:

Kod przedstawiający ostateczny schemat

Bezpośrednie odczytywanie tablic i zagnieżdżonych struktur

Za pomocą bezserwerowego modelu SQL można wykonywać zapytania i tworzyć widoki i tabele dla takich obiektów.

Najpierw, w zależności od sposobu przechowywania danych, użytkownicy powinni użyć następującej taksonomii. Wszystko pokazane w wielkie litery jest specyficzne dla twojego przypadku użycia:

Zbiorcze Format
'https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet' "Parquet" (ADLSg2)
N'endpoint=;account=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/ACCOUNTNAME;database=DATABASENAME;collection=COLLECTIONNAME;region=REGIONTOQUERY', SECRET='YOURSECRET' "CosmosDB" (link Azure Synapse)

Zastąp każde pole w następujący sposób:

  • Wartość "TWOJA LICZBA ZBIORCZA POWYŻEJ" to parametry połączenia źródła danych, z którym nawiązujesz połączenie.
  • "TWÓJ TYP POWYŻEJ" to format używany do nawiązywania połączenia ze źródłem.
select *
FROM
openrowset(
    BULK 'YOUR BULK ABOVE',
    FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
        contextdataeventTime varchar(50) '$.context.data.eventTime',
        contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
        contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
        contextsessionisFirst varchar(50) '$.context.session.isFirst',
        contextsessionid varchar(50) '$.context.session.id',
        contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q 
cross apply openjson (contextcustomdimensions) 
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
            RoomName varchar(50) '$.customerInfo.RoomName',
            CustomerName varchar(50) '$.customerInfo.CustomerName',
            UserName varchar(50) '$.customerInfo.UserName'
    )

Istnieją dwa różne typy operacji:

  • Pierwszy typ operacji jest wskazywany w poniższym wierszu kodu, który definiuje kolumnę o nazwie contextdataeventTime , która odwołuje się do zagnieżdżonego elementu . Context.Data.eventTime

    contextdataeventTime varchar(50) '$.context.data.eventTime'
    

    Ten wiersz definiuje kolumnę o nazwie contextdataeventTime , która odwołuje się do zagnieżdżonego elementu . Context>Data>eventTime

  • Drugi typ operacji używa do cross apply tworzenia nowych wierszy dla każdego elementu w tablicy. Następnie definiuje każdy zagnieżdżony obiekt.

    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType', 
    

    Jeśli tablica ma 5 elementów z 4 zagnieżdżonych struktur, model bezserwerowy sql zwraca 5 wierszy i 4 kolumn. Model bezserwerowy sql może wykonywać zapytania, mapować tablicę w 2 wiersze i wyświetlać wszystkie zagnieżdżone struktury w kolumnach.

Następne kroki