チュートリアル:Azure Data Lake Storage Gen2、Azure Databricks、および Spark

このチュートリアルでは、Azure Data Lake Storage Gen2 対応の Azure ストレージ アカウント内の格納データに Azure Databricks クラスターを接続する方法を説明します。 この接続を使用することで、必要なデータに関するクエリや分析をクラスターからネイティブに実行することができます。

このチュートリアルでは、次のことについて説明します。

  • 非構造化データをストレージ アカウントに取り込む
  • Blob Storage 内のデータに対して分析を実行する

Azure サブスクリプションをお持ちでない場合は、開始する前に 無料アカウント を作成してください。

前提条件

Azure Databricks ワークスペース、クラスター、ノートブックを作成する

  1. Azure Databricks ワークスペースを作成する。 「Azure Databricks ワークスペースを作成する」をご覧ください。

  2. クラスターを作成する。 クラスターの作成に関する記事を参照してください。

  3. Notebook を作成します。 「ノートブックを作成する」を参照してください。 ノートブックの既定の言語として Python を選択します。

ノートブックを開いたままにします。 これは、次のセクションで使用します。

フライト データのダウンロード

このチュートリアルでは、運輸統計局からの 2016 年 1 月の定刻実績フライト データを使用して ETL 操作を実行する方法を示します。 チュートリアルを完了するには、このデータをダウンロードする必要があります。

  1. On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip ファイルをダウンロードします。 このファイルには、フライト データが含まれています。

  2. ZIP ファイルの内容を解凍し、ファイル名とファイル パスをメモします。 この情報は後の手順で必要になります。

定刻レポート実績データでキャプチャされた情報について知りたい方は、運輸統計局の Web サイトでフィールドの説明を参照してください。

データを取り込む

このセクションでは、.csv フライト データを Azure Data Lake Storage Gen2 アカウントにアップロードし、そのストレージ アカウントを Databricks クラスターにマウントします。 最後に、Databricks を使用して、.csv フライト データを読み取り、Apache parquet 形式でストレージに書き戻します。

ストレージ アカウントにフライト データをアップロードする

AzCopy を使用して .csv ファイルを Data Lake Storage Gen2 アカウントにコピーします。 azcopy make コマンドを使用して、ストレージ アカウントでコンテナーを作成します。 次に、azcopy copy コマンドを使用して、ダウンロードしたばかりの csv データをそのコンテナーのディレクトリにコピーします。

次の手順では、作成するコンテナーの名前と、フライト データをアップロードする先のコンテナー内のディレクトリと BLOB を入力する必要があります。 各手順で推奨される名前を使用するか、コンテナー、ディレクトリ、BLOB の名前付け規則に従って独自の名前を指定できます。

  1. コマンド プロンプト ウィンドウを開き、次のコマンドを入力して Azure Active Directory にサインインし、ストレージ アカウントにアクセスします。

    azcopy login
    

    コマンド プロンプト ウィンドウに表示される指示に従って、ユーザー アカウントを認証します。

  2. フライト データを保存するコンテナーをストレージ アカウントに作成するには、次のコマンドを入力します。

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • <storage-account-name> プレースホルダーの値は、実際のストレージ アカウントの名前に置き換えます。

    • <container-name> プレースホルダーを、csv データの保存先コンテナーとして作成するコンテナーの名前、たとえば flight-data-container に置き換えます。

  3. csv データをストレージ アカウントにアップロード (コピー) するには、次のコマンドを入力します。

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • プレースホルダー <csv-folder-path> の値は、 .csv ファイルへのパスに置き換えます。

    • <storage-account-name> プレースホルダーの値は、実際のストレージ アカウントの名前に置き換えます。

    • <container-name> プレースホルダーは、お使いのストレージ アカウントのコンテナーの名前に置き換えます。

    • <directory-name> プレースホルダーは、コンテナー内のデータの保存先ディレクトリの名前、たとえば、jan2016 に置き換えます。

ストレージ アカウントを Databricks クラスターにマウントする

このセクションでは、Azure Data Lake Storage Gen2 クラウド オブジェクト ストレージを Databricks ファイル システム (DBFS) にマウントします。 ストレージ アカウントでの認証には、前に作成した Azure AD サービス プリンシパルを使用します。 詳細については、「Azure Databricks へのクラウド オブジェクト ストレージのマウント」を参照してください。

  1. クラスターにノートブックをアタッチします。

    1. 前に作成したノートブックで、ノートブック ツール バーの右上隅にある [接続] ボタン選択します。 このボタンをクリックすると、コンピューティング セレクターが開きます (既にノートブックがクラスターに接続されている場合は、[接続] ではなく、そのクラスターの名前のテキストがボタンに表示されます)。

    2. クラスターのドロップダウン メニューで、前に作成したクラスターを選択します。

    3. クラスター セレクターのテキストが、"開始" に変わることに注意してください。 クラスターの開始が完了し、クラスターの名前がボタンに表示されるまで待ってから、先に進みます。

  2. 次のコード ブロックをコピーして最初のセルに貼り付けます。ただし、このコードはまだ実行しないでください。

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. このコード ブロックでは次の操作を行います。

    • configs で、<appId><clientSecret><tenantId> の各プレースホルダーの値を、前提条件でサービス プリンシパルを作成したときにコピーしたアプリケーション ID、クライアント シークレット、テナント ID に置き換えます。

    • source URI で、<storage-account-name><container-name><directory-name> の各プレースホルダーの値を、Azure Data Lake Storage Gen2 ストレージ アカウントの名前、およびフライト データをストレージ アカウントにアップロードしたときに指定したコンテナーとディレクトリの名前に置き換えます。

      Note

      URI のスキーム識別子 (abfss) は、トランスポート層セキュリティ (TLS) で Azure Blob File System ドライバーを使用するように Databricks に指示します。 URI の詳細については、「Azure Data Lake Storage Gen2 の URI を使用する」を参照してください。

  4. クラスターの開始が完了していることを確認してから、次に進みます。

  5. Shift + Enter キーを押して、このブロック内のコードを実行します。

ストレージ アカウントでフライト データをアップロードしたコンテナーとディレクトリに、マウント ポイント (/mnt/flightdata) を介してノートブックでアクセスできるようになりました。

Databricks Notebook を使用して CSV を Parquet に変換する

csv フライト データに DBFS マウント ポイントを介してアクセスできるようになりました。これで、Apache Spark DataFrame を使用して、これをワークスペースに読み込み、Apache parquet 形式で Azure Data Lake Storage Gen2 オブジェクト ストレージに書き戻すことができます。

  • Spark DataFrame は、型が異なる可能性のある列を持つ 2 次元のラベル付きデータ構造です。 DataFrame を使用すると、サポートされているさまざまな形式のデータの読み取りと書き込みを容易に行うことができます。 DataFrame を使用すると、クラウド オブジェクト ストレージ内の基になるデータに影響を与えることなく、クラウド オブジェクト ストレージからデータを読み込み、コンピューティング クラスター内で、それに対して分析と変換を実行できます。 詳細については、「チュートリアル: Azure Databricks で PySpark DataFrames を操作する」を参照してください。

  • Apache parquet は、クエリを高速化する最適化を備えた列形式のファイル形式です。 これは CSV や JSON よりも効率的なファイル形式です。 詳細については、「Parquet ファイル」を参照してください。

ノートブックで新しいセルを追加し、次のコードを貼り付けます。

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Shift + Enter キーを押して、このブロック内のコードを実行します。

次のセクションに進む前に、すべての parquet データが書き込まれていること、および出力に "完了" と表示されていることを確認します。

データの探索

このセクションでは、前のセクションで作成した DBFS マウント ポイントを使用して、Databricks ファイル システム ユーティリティを使って Azure Data Lake Storage Gen2 オブジェクト ストレージを探索します。

新しいセルに次のコードを貼り付けて、マウント ポイントにあるファイルの一覧を取得します。 最初のコマンドによって、ファイルとディレクトリの一覧が出力されます。 2 番目のコマンドによって、出力が読みやすいように表形式で表示されます。

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Shift + Enter キーを押して、このブロック内のコードを実行します。

parquet ディレクトリが一覧に表示されていることに注意してください。 .csv フライト データは、前のセクションで parquet/flights ディレクトリに parquet 形式で保存しました。 parquet/flights ディレクトリ内のファイルを一覧表示するには、次のコードを新しいセルに貼り付けて実行します。

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

新しいファイルを作成して表示するには、次のコードを新しいセルに貼り付けて実行します。

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

このチュートリアルでは 1.txt ファイルは不要なので、次のコードをセルに貼り付けて実行し、mydirectory を再帰的に削除できます。 True パラメーターは再帰的な削除を示します。

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

必要に応じて、ヘルプ コマンドを使用して、他のコマンドの詳細を確認できます。

dbutils.fs.help("rm")

上記のコード サンプルでは、Azure Data Lake Storage Gen2 対応のストレージ アカウントに格納されたデータを使って HDFS の階層的な性質を調査しました。

データにクエリを実行する

これで、ストレージ アカウントにアップロードしたデータの照会を開始できます。 次のコード ブロックをそれぞれ新しいセルに入力し、Shift + Enter キーを押して Python スクリプトを実行します。

DataFrames には、一般的なデータ分析の問題を効率的に解決できるようにする豊富な機能セット (列の選択、フィルター、結合、集計) が用意されています。

前に保存した parquet フライト データから DataFrame を読み込み、サポートされている機能の一部を調べるには、このスクリプトを新しいセルに入力して実行します。

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

このスクリプトを新しいセルに入力して、データに対して基本的な分析クエリをいくつか実行します。 スクリプト全体を実行するか (Shift + Enter)、各クエリを強調表示して Ctrl + Shift + Enter で個別に実行するか、各クエリを個別のセルに入力し、そこで実行できます。

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

まとめ

このチュートリアルでは、次の作業を行いました。

  • Azure Data Lake Storage Gen2 ストレージ アカウントや Azure AD サービス プリンシパルなどの Azure リソースを作成し、ストレージ アカウントにアクセスするためのアクセス許可を割り当てました。

  • Azure Databricks ワークスペース、ノートブック、コンピューティング クラスターを作成しました。

  • AzCopy を使用して、非構造化 .csv フライト データを Azure Data Lake Storage Gen2 ストレージ アカウントにアップロードしました。

  • Databricks ファイル システム ユーティリティ関数を使用して、Azure Data Lake Storage Gen2 ストレージ アカウントをマウントし、その階層型ファイル システムを探索しました。

  • Apache Spark DataFrames を使用して、.csv フライト データを Apache parquet 形式に変換し、それを Azure Data Lake Storage Gen2 ストレージ アカウントに保存しました。

  • DataFrames を使用してフライト データを探索し、簡単なクエリを実行しました。

  • Apache Spark SQL を使用してフライト データに対してクエリを実行し、2016 年 1 月の各航空会社のフライトの合計数、テキサス州の空港、テキサスから運航している航空会社、全国の各航空会社の平均到着遅延時間 (分単位)、出発または到着が遅延した各航空会社のフライトの割合を照会しました。

リソースをクリーンアップする

ノートブックを保持し、後で使用する場合は、課金されないようにクラスターをシャットダウン (終了) することをお勧めします。 クラスターを終了するには、それをノートブック ツール バーの右上にあるコンピューティング セレクターで選択し、メニューから [終了] を選択して、選択内容を確認します (既定では、クラスターは非アクティブな状態が 120 分続いた後、自動的に終了します)。

ノートブックやクラスターなどの個々のワークスペース リソースを削除する必要がある場合は、ワークスペースの左側のサイドバーから削除できます。 詳細な手順については、「クラスターを削除する」または「ノートブックを削除する」を参照してください。

リソース グループおよび関連するすべてのリソースは、不要になったら削除します。 Azure portal でこれを行うには、ストレージ アカウントとワークスペースのリソース グループを選択し、[削除] を選択します。

次のステップ