演習 - Azure Synapse パイプライン内で Notebook を統合する

完了

このユニットでは、Azure Synapse Spark ノートブックを作成して、マッピング データ フローによって読み込まれたデータを分析および変換し、そのデータをデータ レイクに格納します。 ノートブックでデータ レイクに書き込まれたデータのフォルダー名を定義する文字列パラメーターを受け取るパラメーター セルを作成します。

その後、このノートブックを Synapse パイプラインに追加し、一意のパイプライン実行 ID をノートブック パラメーターに渡します。これにより、パイプライン実行と、ノートブック アクティビティによって保存されたデータを関連付けることができます。

最後に、Synapse Studio の [監視] ハブを使用して、パイプライン実行を監視し、実行 ID を取得してから、データ レイクに格納されている対応するファイルを見つけます。

Apache Spark とノートブックについて

Apache Spark は、ビッグデータ分析アプリケーションのパフォーマンスを向上させるメモリ内処理をサポートする並列処理フレームワークです。 Azure Synapse Analytics の Apache Spark は、Apache Spark を Microsoft がクラウドに実装したものです。

Azure Synapse Studio の Apache Spark ノートブックは、ライブ コード、視覚化、説明テキストを含むファイルを作成するための Web インターフェイスです。 ノートブックは、アイデアを確認し、簡単な実験を使用してデータから分析情報を得るのに最適な場所です。 また、ノートブックは、データの準備、データの視覚化、機械学習、およびその他のビッグ データのシナリオで広く使用されています。

Synapse Spark ノートブックを作成する

Synapse Analytics で、ユーザー プロファイル データを処理、結合、インポートするためのマッピング データ フローが既に作成されているとします。 ここで、ユーザーに好まれ、かつ上位に選ばれており、過去 12 か月で最も購入数が多い製品という条件に基づいて、ユーザーごとに上位 5 つの製品を検索したいと考えています。 その後、全体で上位 5 つの製品を計算したいと考えています。

この演習では、これらの計算を行うために Synapse Spark ノートブックを作成します。

  1. Synapse Analytics Studio (https://web.azuresynapse.net/) を開き、[データ] ハブに移動します。

    [データ] メニュー項目が強調表示されています。

  2. [リンク] タブ (1) を選択し、[Azure Data Lake Storage Gen2] の下の Data Lake Storage のプライマリ アカウント (2) を展開します。 [wwi-02] コンテナー (3) を選択し、top-products フォルダー (4) を開きます。 任意の Parquet ファイル (5) を右クリックし、[新しいノートブック] メニュー項目 (6)[Load to DataFrame](データフレームに読み込む) (7) の順に選択します。 フォルダーが表示されない場合は、[更新] を選択します。Refresh

    Parquet ファイルと [新しいノートブック] オプションが強調表示されています。

  3. ノートブックが Spark プールにアタッチされていることを確認します。

    [アタッチ先: Spark プール] メニュー項目が強調表示されています。

  4. Parquet ファイル名を *.parquet (1) に置き換えて、top-products フォルダー内のすべての Parquet ファイルを選択します。 たとえば、パスは abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet のようになります。

    ファイル名が強調表示されています。

  5. ノートブックを実行するには、ノートブック ツール バーの [すべて実行] を選択します。

    セル結果が表示されています。

    注意

    Spark プールでノートブックを初めて実行すると、Synapse によって新しいセッションが作成されます。 これには、3 分から 5 分ほどかかる可能性があります。

    注意

    セルだけを実行するには、セルの上にポインターを合わせ、セルの左側にある [セルの実行] アイコンを選択するか、セルを選んでから Ctrl + Enter キーを押します。

  6. [+] ボタンを選び、[コード セル] 項目を選んで、下に新しいセルを作成します。 [+] ボタンは、左側のノートブック セルの下にあります。 また、ノートブック ツールバーの [+ セル] メニューを展開して、[コード セル] 項目を選択することもできます。

    [コードの追加] メニュー オプションが強調表示されています。

  7. 新しいセルで次のコマンドを実行して topPurchases という新しいデータフレームを設定し、top_purchases という名前の新しい一時ビューを作成して、最初の 100 行を表示します。

    topPurchases = df.select(
        "UserId", "ProductId",
        "ItemsPurchasedLast12Months", "IsTopProduct",
        "IsPreferredProduct")
    
    # Populate a temporary view so we can query from SQL
    topPurchases.createOrReplaceTempView("top_purchases")
    
    topPurchases.show(100)
    

    出力は次のようになります。

    +------+---------+--------------------------+------------+------------------+
    |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
    +------+---------+--------------------------+------------+------------------+
    |   148|     2717|                      null|       false|              true|
    |   148|     4002|                      null|       false|              true|
    |   148|     1716|                      null|       false|              true|
    |   148|     4520|                      null|       false|              true|
    |   148|      951|                      null|       false|              true|
    |   148|     1817|                      null|       false|              true|
    |   463|     2634|                      null|       false|              true|
    |   463|     2795|                      null|       false|              true|
    |   471|     1946|                      null|       false|              true|
    |   471|     4431|                      null|       false|              true|
    |   471|      566|                      null|       false|              true|
    |   471|     2179|                      null|       false|              true|
    |   471|     3758|                      null|       false|              true|
    |   471|     2434|                      null|       false|              true|
    |   471|     1793|                      null|       false|              true|
    |   471|     1620|                      null|       false|              true|
    |   471|     1572|                      null|       false|              true|
    |   833|      957|                      null|       false|              true|
    |   833|     3140|                      null|       false|              true|
    |   833|     1087|                      null|       false|              true|
    
  8. SQL を使用して新しい一時ビューを作成するには、新しいセルで次のコマンドを実行します。

    %%sql
    
    CREATE OR REPLACE TEMPORARY VIEW top_5_products
    AS
        select UserId, ProductId, ItemsPurchasedLast12Months
        from (select *,
                    row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum
            from top_purchases
            ) a
        where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true
        order by a.UserId
    

    注意

    このクエリの出力はありません。

    このクエリでは、top_purchases 一時ビューをソースとして使用し、row_number() over メソッドを適用して、ItemsPurchasedLast12Months が最大である各ユーザーのレコードに行番号を適用します。 where 句は結果をフィルター処理するため、IsTopProductIsPreferredProduct の両方が true に設定されている最大 5 つの製品のみを取得することになります。 これにより、Azure Cosmos DB に格納されているユーザー プロファイルに従って、ユーザーごとに購入数の最も多い上位 5 つの製品が得られ、それらの製品は "さらに" お気に入りの製品としても識別されます。

  9. 新しいセルで次のコマンドを実行し、前のセルで作成した top_5_products 一時ビューの結果を格納する新しいデータフレームを作成して表示します。

    top5Products = sqlContext.table("top_5_products")
    
    top5Products.show(100)
    

    好まれている上位 5 つの製品をユーザーごとに表示した次のような出力が表示されます。

    好まれている上位 5 つの製品がユーザーごとに表示されています。

  10. 顧客に好まれ、購入数が最も多い製品という条件に基づいた全体で上位 5 つの製品を計算します。 これを行うには、新しいセルで次のコマンドを実行します。

    top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months")
        .groupBy("ProductId")
        .agg( sum("ItemsPurchasedLast12Months").alias("Total") )
        .orderBy( col("Total").desc() )
        .limit(5))
    
    top5ProductsOverall.show()
    

    このセルでは、上位 5 つの好まれている商品を商品 ID 別にグループ化し、その製品が過去 12 か月に購入された合計数を計算し、その値を降順で並べ替えて、上位 5 つの結果を返しました。 出力は次のようになります。

    +---------+-----+
    |ProductId|Total|
    +---------+-----+
    |     2107| 4538|
    |     4833| 4533|
    |      347| 4523|
    |     3459| 4233|
    |     4246| 4155|
    +---------+-----+
    

パラメーター セルを作成する

Azure Synapse パイプラインでは、パラメーター セルを検索し、このセルを、実行時に渡されるパラメーターの既定値として扱います。 実行エンジンにより、既定値を上書きするために、入力パラメーターを含んだ新しいセルがパラメーター セルの下に追加されます。 パラメーター セルが指定されていない場合、挿入されたセルがノートブックの上部に挿入されます。

  1. ここでは、このノートブックをパイプラインから実行します。 Parquet ファイルに名前を付けるために使用される runId 変数の値を設定するパラメーターを渡す必要があります。 新しいセルで次のコマンドを実行します。

    import uuid
    
    # Generate random GUID
    runId = uuid.uuid4()
    

    Spark に付属されている uuid ライブラリを使用して、ランダムな GUID を生成します。 パイプラインによって渡されたパラメーターを使用して、runId 変数をオーバーライドする必要があります。 このためには、これをパラメーター セルとして切り替える必要があります。

  2. セル (1) の右上隅にあるアクションの省略記号 (...) を選択してから、[Toggle parameter cell](パラメーター セルを切り替える) (2) を選びます。

    メニュー項目が強調表示されています。

    このオプションに切り替えると、セルに [パラメーター] タグが表示されます。

    セルは、パラメーターを受け入れるように構成されています。

  3. 新しいセルに次のコードを貼り付けて、Data Lake のプライマリ アカウントの /top5-products/ パスで、Parquet ファイル名として runId 変数を使用します。 Data Lake のプライマリ アカウントで、パスの YOUR_DATALAKE_NAME を置き換えます。 これを見つけるには、ページの上部にあるセル 1(1) までスクロールします。 パス (2) から Data Lake Storage アカウントをコピーします。 この値を YOUR_DATALAKE_NAME の代わりとして、新しいセル内のパス (3) に貼り付けてから、そのセルでコマンドを実行します。

    %%pyspark
    
    top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
    

    パスが、Data Lake のプライマリ アカウントの名前で更新されています。

  4. ファイルがデータ レイクに書き込まれたことを確認します。 [データ] ハブに移動して、[リンク] タブ (1) を選択します。 Data Lake Storage のプライマリ アカウントを展開してから、[wwi-02] コンテナー (2) を選択します。 top5-products フォルダー (3) に移動します。 ディレクトリに、ファイル名 (4) として GUID を含む Parquet ファイルのフォルダーが表示されるはずです。

    Parquet ファイルが強調表示されています。

    このディレクトリは、以前は存在していなかったため、ノートブック セル内のデータフレームに対する Parquet の書き込みメソッドによって作成されました。

ノートブックを Synapse パイプラインに追加する

この演習の冒頭で説明したマッピング データ フローに戻り、データ フローがオーケストレーション プロセスの一部として実行された後にこのノートブックを実行するとします。 これを行うには、このノートブックを新しいノートブック アクティビティとしてパイプラインに追加します。

  1. ノートブックに戻ります。 ノートブックの右上隅にある [プロパティ] (1) を選択してから、[名前] (2) に「Calculate Top 5 Products」と入力します。

    [プロパティ] ブレードが表示されています。

  2. ノートブックの右上隅にある [パイプラインに追加] (1) を選択してから、[既存のパイプライン] (2) を選びます。

    [パイプラインに追加] ボタンが強調表示されています。

  3. [Write User Profile Data to ASA](ユーザー プロファイル データを ASA に書き込む) パイプライン (1) を選んでから、[追加] *(2) を選択します。

    パイプラインが選ばれています。

  4. Synapse Studio により、ノートブック アクティビティがパイプラインに追加されます。 [ノートブック] アクティビティ[データ フロー] アクティビティの右側に配置されるように再配置します。 [データ フロー] アクティビティを選択し、パイプライン接続の成功アクティビティを示す緑色のボックス[ノートブック] アクティビティにドラッグします。

    緑色の矢印が強調表示されています。

    成功アクティビティの矢印により、[データ フロー] アクティビティが正常に実行された後、[ノートブック] アクティビティを実行するようにパイプラインに示されます。

  5. [ノートブック] アクティビティ (1)[設定] タブ (2) の順に選択し、[基本パラメーター] (3) を展開してから、[+ 新規] (4) を選択します。 [名前] フィールド (5) に「runId」と入力します。 [型] (6) として [文字列] を選択します。 [値] として、[動的なコンテンツの追加] (7) を選択します。

    設定が表示されています。

  6. [システム変数] (1) で、[Pipeline run ID](パイプラインの実行 ID) を選択します。 これにより、動的コンテンツ ボックス (2)@pipeline().RunId が追加されます。 [完了] (3) を選択してダイアログ ボックスを閉じます。

    動的コンテンツのフォームが表示されています。

    パイプラインの実行 ID の値は、各パイプライン実行に割り当てられた一意の GUID です。 この値を runId ノートブック パラメーターとして渡して、Parquet ファイルの名前に使用します。 パイプライン実行の履歴を調べて、各パイプライン実行用に作成されている特定の Parquet ファイルを見つけることができます。

  7. [すべて発行][発行] の順に選択して、変更を保存します。

    [すべて発行] が強調されています。

  8. 発行が完了したら、[トリガーの追加] (1)[Trigger now](今すぐトリガー) (2) の順に選択して、更新されたパイプラインを実行します。

    トリガーのメニュー項目が強調表示されています。

  9. [OK] を選択してトリガーを実行します。

    [OK] ボタンが強調されています。

パイプラインの実行を監視します

[監視] ハブを使用すると、SQL、Apache Spark、パイプラインの現在および過去のアクティビティを監視できます。

  1. [監視] ハブに移動します。

    [監視] ハブ メニュー項目が選択されています。

  2. [パイプラインの実行] (1) を選択し、パイプライン実行が正常に完了する (2) まで待ちます。 場合によっては、ビューを更新する (3) 必要があります。

    パイプラインの実行が正常に完了しています。

  3. パイプラインのアクティビティの実行を表示するには、そのパイプラインの名前を選択します。

    パイプライン名が選ばれています。

  4. [データ フロー] アクティビティと新しい[ノートブック] アクティビティ (1) の両方に注目してください。 [Pipeline run ID](パイプラインの実行 ID) の値 (2) をメモします。 これを、ノートブックによって生成された Parquet ファイルの名前と比較します。 [Calculate Top 5 Products] ノートブック名を選択して、その詳細を表示します (3)

    パイプライン実行の詳細が表示されています。

  5. ここで、ノートブック実行の詳細を確認します。 [再生] (1) を選択して、ジョブ (2) の進行状況の再生を監視することができます。 下部では、さまざまなフィルター オプション (3) を使用して [診断][ログ] を表示できます。 右側には、実行時間、Livy ID、Spark プールの詳細など、実行の詳細が表示されます。 ジョブ[詳細の表示] リンクを選択すると、その詳細 (5) が表示されます。

    実行の詳細が表示されています。

  6. 新しいタブで、ステージの詳細を表示できる Spark アプリケーション UI が開きます。 [DAG Visualization](DAG の視覚化) を展開すると、ステージの詳細が表示されます。

    Spark ステージの詳細が表示されています。

  7. [データ] ハブに戻ります。

    [データ] ハブ。

  8. [リンク] タブ (1) を選択してから、Data Lake Storage のプライマリ アカウントの [wwi-02] コンテナー (2) を選び、top5-products フォルダー (3) に移動して、名前がパイプラインの実行 ID と一致する Parquet ファイルのフォルダーが存在していることを確認します。

    ファイルが強調表示されています。

    ご覧のとおり、前述のパイプラインの実行 ID と名前が一致するファイルがあります。

    パイプラインの実行 ID が強調表示されています。

    これらの値が一致するのは、[ノートブック] アクティビティの runId パラメーターにパイプラインの実行 ID を渡したためです。