モジュール 2: Data Factory でデータフローを使用してデータを変換する
このモジュールでは、約 25 分でデータフローの作成、変換の適用、Bronze テーブルから Gold レイクハウス テーブルへの生データの移動を行います。
最後のモジュールから Bronze レイクハウス テーブルに読み込まれた生データを使用して、特定の日の各ベンダーとその乗車の割引を含む別のテーブルと組み合わせることで、そのデータを準備および強化できるようになりました。 この最後の Gold レイクハウス テーブルが読み込まれ、使用できる状態になります。
データフローの大まかな手順は次のとおりです。
- 「モジュール 1: Data Factory を使用してパイプラインを作成する」の Copy アクティビティによって作成されたレイクハウス テーブルから生データを取得します。
- レイクハウス テーブルからインポートされたデータを変換します。
- 割引データを含む CSV ファイルに接続します。
- 割引データを変換します。
- 乗車と割引のデータを組み合わせます。
- Gold レイクハウス テーブルに出力クエリを読み込みます。
レイクハウス テーブルからデータを取得する
サイドバーから [作成] を選択し、[Dataflow Gen2] を選択して新しい Dataflow Gen2 を作成します。
新しいデータフロー メニューで、[データの取得] を選択し、[その他...] を選択します。
[Lakehouse](レイクハウス) コネクタを検索して選択します。
[Connect to data source](データ ソースへの接続) ダイアログが表示され、現在サインインしているユーザーに基づいて新しい接続が自動的に作成されます。 [次へ] を選択します。
[Choose data](データの選択) ダイアログが表示されます。 ナビゲーション ウィンドウを使用して、前のモジュールで変換先として作成したレイクハウスを見つけ、Tutorial_Lakehouse データ テーブルを選択します。
"(省略可能)" キャンバスにデータが設定されたら、列プロファイル情報を設定できます。これは、データ プロファイルの設定に役立ちます。 適切な変換を適用し、それに基づいて適切なデータ値をターゲットにすることができます。
これを行うには、リボン ウィンドウで [オプション] を選択し、[列プロファイル] の最初の 3 つのオプションを選択し、[OK] を選択します。
レイクハウスからインポートされたデータを変換する
2 番目の列 IpepPickupDatetime の列ヘッダーにあるデータ型アイコンを選択してドロップダウン メニューを表示し、メニューからデータ型を選択して、列の型を [日付と時刻] から [日付] に変換します。
"(省略可能)" リボンの [ホーム] タブで、[Manage columns](列の管理) グループから [列の選択] オプションを選択します。
"(省略可能)" [列の選択] ダイアログで、ここにリストされているいくつかの列の選択を解除し、[OK] を選択します。
- lpepDropoffDatetime
- puLocationId
- doLocationId
- pickupLatitude
- dropoffLongitude
- rateCodeID
storeAndFwdFlag 列のフィルターを選択し、ドロップダウン メニューを並べ替えます。 (警告 "リストが不完全である可能性があります" が表示される場合は、[さらに読み込む] を選択してすべてのデータを表示します)。
[Y] を選択して、割引が適用された行のみを表示し、[OK] を選択します。
[IpepPickupDatetime 列の並べ替えを選択してドロップダウン メニューをフィルター処理し、[日付フィルター] を選択し、[日付] および [日付と時刻] 型に対して提供されている [Between...](間...) フィルターを選択します。
[行をフィルター] ダイアログで、2015 年 1 月 1 日から 2015 年 1 月 31 日までの日付を選択し、[OK] を選択します。
割引データを含む CSV ファイルに接続する
次に、乗車のデータを所定の場所に配置して、各日と VendorID のそれぞれの割引が含まれるデータを読み込んで準備した後、それを乗車データと組み合わせます。
データフロー エディター メニューの [ホーム] タブで、[データの取得] オプションを選択し、[Text/CSV](テキスト/CSV) を選択します。
[Connect to data source](データ ソースへの接続) ダイアログで、次の詳細を指定します。
- ファイル パスまたは URL -
https://raw.githubusercontent.com/ekote/azure-architect/master/Generated-NYC-Taxi-Green-Discounts.csv
- 認証の種類 - 匿名
次へ を選択します。
- ファイル パスまたは URL -
[Preview file data](ファイル データのプレビュー) ダイアログで、[作成] を選択します。
割引データを変換する
データを確認すると、ヘッダーが最初の行に表示されます。 プレビュー グリッド領域の左上にあるテーブルのコンテキスト メニューを選択し、[Use first row as headers](最初の行をヘッダーとして使用) を選択して、ヘッダーに昇格させます。
Note
ヘッダーを昇格させると、データフロー エディターの上部にある [Applied steps](適用される手順) ウィンドウに新しい手順が追加されたことを確認できます。
VendorID 列を右クリックし、表示されたコンテキスト メニューから [Unpivot other columns](他の列のピボット解除) オプションを選択します。 これにより、列を属性と値のペアに変換して、列を行にすることができます。
テーブルのピボットを解除して、Attribute および Value 列の名前を変更します。これを行うには、それぞれをダブルクリックし、Attribute を Date に、Value を Discount に変更します。
Date 列のデータ型を変更するには、列名の左側にあるデータ型メニューを選択し、[日付] を選択します。
Discount 列を選択し、メニューの [変換] タブを選択します。 [Number column](数値列) を選択し、サブメニューから [Standard](標準) 数値変換 を選択し、[Divide](除算) を選択します。
[Divide](除算) ダイアログで、値 100 を入力します。
乗車と割引データを組み合わせる
次の手順では、両方のテーブルを結合して、乗車に適用される割引と調整された合計が含まれる 1 つのテーブルにします。
最初に、[Diagram view](ダイアグラム ビュー) ボタンを切り替えて、両方のクエリを表示できるようにします。
nyc_taxi クエリを選択し、[ホーム] タブで [Combine](結合) メニューを選択し、[Merge queries](クエリのマージ) を選択し、[Merge queries as new](新規としてクエリをマージ) を選択します。
[Merge](マージ) ダイアログで、[Right table for merge](マージ用の右テーブル) ドロップダウンから Generated-NYC-Taxi-Green-Discounts を選択し、ダイアログの右上にある "電球" アイコンを選択して、2 つのテーブル間の推奨される列マッピングを表示します。
推奨された 2 つの列マッピング (両方のテーブルの VendorID 列と Date 列のマッピング) を一度に 1 つずつ選択します。 両方のマッピングが追加されると、一致する列ヘッダーが各テーブルで強調表示されます。
複数のデータ ソースからのデータを結合して結果を表示することを許可するように求めるメッセージが表示されます。 [Merge](マージ) ダイアログで [OK] を選択します。
テーブル領域には、最初に次の警告が表示されます。"複数のソースからのデータを結合すると、あるソースから別のソースへのデータが表示される可能性があるため、評価が取り消されました。 データを表示する可能性が問題ない場合は、[続行] を選択してください"。[続行] を選択して、結合されたデータを表示します。
ダイアグラム ビューで新しいクエリが作成され、新しいマージ クエリと以前に作成した 2 つのクエリとの関係が示されていることに注目してください。 エディターのテーブル ウィンドウを見て、マージ クエリ列リストの右側までスクロールして、テーブル値を持つ新しい列が存在することを確認します。 これは "Generated-NYC-Taxi-Green-Discounts" 列で、その種類は [テーブル] です。 列ヘッダーには、反対方向を向いた 2 つの矢印が付いたアイコンがあり、テーブルから列を選択できます。 Discount を除くすべての列の選択を解除し、[OK] を選択します。
割引の値が行レベルになったので、新しい列を作成して割引後の合計金額を計算できます。 これを行うには、エディターの上部にある [列を追加] タブを選択し、[標準] グループから [Custom column](カスタム列) を選択します。
[Custom column](カスタム列) ダイアログで、Power Query 数式言語 (M とも呼ばれます) を使用して、新しい列の計算方法を定義できます。 [New column name](新しい列名) に「TotalAfterDiscount」を入力し、[データ型] に [通貨] を選択し、[Custom column formula](カスタム列の式) に次の M 式を指定します。
if [totalAmount] > 0 then [totalAmount] * ( 1 -[Discount] ) else [totalAmount]
[OK] をクリックします。
新たに作成した TotalAfterDiscount 列を選択し、エディター ウィンドウの上部にある [変換] タブを選択します。 [Number column](数値列) グループで、[Rounding](丸め) ドロップダウンを選択し、[Round...](丸め...) を選択します。
[Rounding](丸め) ダイアログで、小数点以下の桁数として「2」と入力し、[OK] を選択します。
IpepPickupDatetime のデータ型を [日付] から [日付と時刻] に変更します。
最後に、エディターの右側から [クエリ設定] ウィンドウを展開し (まだ展開されていない場合)、クエリの名前を Merge から Output に変更します。
レイクハウスのテーブルに出力クエリを読み込む
出力クエリが完全に準備され、データを出力する準備ができたので、クエリの出力先を定義できます。
以前に作成した Output マージ クエリを選択します。 次に、エディターで [ホーム] タブを選択し、[クエリ] グループから [Add data destination](データの出力先を追加) を選択して、レイクハウスを出力先として選択します。
[Connect to data destination](データの出力先に接続) ダイアログで、接続が既に選択されているはずです。 [次へ] をクリックして続行します。
[Choose destination target](出力先ターゲットを選択) ダイアログで、データを読み込むレイクハウスを参照し、新しいテーブルに nyc_taxi_with_discounts という名前を付けてから、もう一度 [次へ] を選択します。
[Choose destination settings](出力先の設定を選択) ダイアログで、更新方法を既定の [置き換え] のままにし、列が正しくマップされていることを再確認して、[設定の保存] を選択します。
エディターのメイン ウィンドウに戻り、[クエリ設定] ウィンドウで Output テーブルの出力先が表示されていることを確認し、[パブリッシュ] を選択します。
重要
ワークスペースに最初の Dataflow Gen2 が作成されると、レイクハウス項目とウェアハウス項目が、関連する SQL 分析エンドポイントおよびセマンティック モデルと共にプロビジョニングされます。 これらの項目はワークスペース内のすべてのデータフローによって共有され、Dataflow Gen2 が動作するために必要であるため、削除しないでください。また、ユーザーが直接使用することを意図したものではありません。 これらの項目は、Dataflow Gen2 の実装の詳細です。 項目はワークスペースには表示されませんが、ノートブック、SQL エンドポイント、レイクハウス、ウェアハウスなどの他のエクスペリエンスでアクセスできる場合があります。 名前のプレフィックスで項目を認識できます。 これらの項目のプレフィックスは "DataflowsStaging" です。
"(省略可能)" ワークスペース ページでデータフローの名前を変更するには、行を選択した後に表示されるデータフロー名の右側にある省略記号を選択し、[プロパティ] を選択します。
データフローの行を選択した後、更新アイコンを選択します。完了すると、[Data destination](データの出力先) 設定で構成されたとおりに作成された新しいレイクハウス テーブルが表示されます。
レイクハウスを調べて、そこに読み込まれた新しいテーブルを確認します。
関連するコンテンツ
Microsoft Fabric の Data Factory を使用した最初のデータ統合に関するエンドツーエンドのチュートリアルのこの 2 つ目のモジュールでは、次の方法を学習しました。
- 新しいデータフロー Gen2 を作成する。
- サンプル データをインポートして変換する。
- テキスト/CSV データをインポートして変換する。
- 両方のデータ ソースのデータを新しいクエリにマージする。
- データを変換し、クエリで新しい列を生成する。
- クエリの出力先ソースを構成する。
- 新しいデータフローの名前を変更して更新する。
次のセクションに進み、データ パイプラインを統合します。