Azure Data Factory または Synapse Analytics を使用して、Azure Database for MySQL のデータをコピーして変換する
適用対象: Azure Data Factory Azure Synapse Analytics
ヒント
企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。
この記事では、Azure Data Factory または Synapse Analytics パイプラインで Copy アクティビティを使用して、Azure Database for MySQL との間でデータをコピーする方法、および Data Flow を使用して Azure Database for MySQL のデータを変換する方法について説明します。 詳細については、Azure Data Factory と Synapse Analytics の概要記事を参照してください。
このコネクタは、以下に特化しています。
オンプレミスまたはクラウドにある汎用 MySQL データベースからデータをコピーするには、MySQL コネクタを使用します。
前提条件
このクイック スタートでは、下に記載されているリソースと構成が出発点として必要です。
- パブリック アクセスまたはプライベート エンドポイントを持つ既存の Azure Database for MySQL 単一サーバーまたは MySQL フレキシブル サーバー。
- MySQL サーバーの [ネットワーク] ページで、[Azure 内の任意の Azure サービスにこのサーバーへのパブリック アクセスを許可する] を有効にします。 これにより、Data Factory Studio を使用できるようになります。
サポートされる機能
この Azure Database for MySQL コネクタは、次の機能でサポートされています。
サポートされる機能 | IR | マネージド プライベート エンドポイント |
---|---|---|
Copy アクティビティ (ソース/シンク) | ① ② | ✓ |
マッピング データ フロー (ソース/シンク) | ① | ✓ |
Lookup アクティビティ | ① ② | ✓ |
① Azure 統合ランタイム ② セルフホステッド統合ランタイム
作業の開始
パイプラインでコピー アクティビティを実行するには、次のいずれかのツールまたは SDK を使用します。
UI を使用して Azure Database for MySQL のリンク サービスを作成する
次の手順を使用して、Azure portal UI で Azure Database for MySQL のリンク サービスを作成します。
Azure Data Factory または Synapse ワークスペースの [管理] タブに移動し、[リンク サービス] を選択して、[新規] をクリックします。
MySQL を検索して、Azure Database for MySQL コネクタ選択します。
サービスの詳細を構成し、接続をテストして、新しいリンク サービスを作成します。
コネクタの構成の詳細
以下のセクションでは、Azure Database for MySQL コネクタに固有の Data Factory エンティティの定義に使用されるプロパティについて詳しく説明します。
リンクされたサービスのプロパティ
Azure Database for MySQL のリンクされたサービスでは、次のプロパティがサポートされます。
プロパティ | 内容 | 必須 |
---|---|---|
type | type プロパティは、次のように設定する必要があります:AzureMySql | はい |
connectionString | Azure Database for MySQL インスタンスに接続するために必要な情報を指定します。 パスワードを Azure Key Vault に格納して、接続文字列から password 構成をプルすることもできます。 詳細については、下記の例と、「Azure Key Vault への資格情報の格納」の記事を参照してください。 |
はい |
connectVia | データ ストアに接続するために使用される統合ランタイム。 Azure 統合ランタイムまたは自己ホスト型統合ランタイム (データ ストアがプライベート ネットワークにある場合) を使用できます。 指定されていない場合は、既定の Azure 統合ランタイムが使用されます。 | いいえ |
一般的な接続文字列は Server=<server>.mysql.database.azure.com;Port=<port>;Database=<database>;UID=<username>;PWD=<password>
です。 ケースごとにさらに多くのプロパティを設定できます。
プロパティ | 説明 | Options | 必須 |
---|---|---|---|
SSLMode | このオプションは、MySQL に接続するときにドライバーで TLS 暗号化および検証を使用するかどうかを指定します。 例: SSLMode=<0/1/2/3/4> |
DISABLED (0) / PREFERRED (1) (既定) / REQUIRED (2) / VERIFY_CA (3) / VERIFY_IDENTITY (4) | いいえ |
UseSystemTrustStore | このオプションは、システムの信頼ストアと指定した PEM ファイルのどちらの CA 証明書を使用するかを指定します。 例: UseSystemTrustStore=<0/1>; |
Enabled (1) / Disabled (0) (既定) | いいえ |
例:
{
"name": "AzureDatabaseForMySQLLinkedService",
"properties": {
"type": "AzureMySql",
"typeProperties": {
"connectionString": "Server=<server>.mysql.database.azure.com;Port=<port>;Database=<database>;UID=<username>;PWD=<password>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
例: パスワードを Azure Key Vault に格納する
{
"name": "AzureDatabaseForMySQLLinkedService",
"properties": {
"type": "AzureMySql",
"typeProperties": {
"connectionString": "Server=<server>.mysql.database.azure.com;Port=<port>;Database=<database>;UID=<username>;",
"password": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "<Azure Key Vault linked service name>",
"type": "LinkedServiceReference"
},
"secretName": "<secretName>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
データセットのプロパティ
データセットを定義するために使用できるセクションとプロパティの完全な一覧については、データセットに関する記事をご覧ください。 このセクションでは、Azure Database for MySQL データセットでサポートされるプロパティの一覧を示します。
Azure Database for MySQL からデータをコピーするには、データセットの type プロパティを AzureMySqlTable に設定します。 次のプロパティがサポートされています。
プロパティ | 内容 | 必須 |
---|---|---|
type | データセットの type プロパティは、次のように設定する必要があります:AzureMySqlTable | はい |
tableName | MySQL データベースのテーブルの名前。 | いいえ (アクティビティ ソースの "query" が指定されている場合) |
例
{
"name": "AzureMySQLDataset",
"properties": {
"type": "AzureMySqlTable",
"linkedServiceName": {
"referenceName": "<Azure MySQL linked service name>",
"type": "LinkedServiceReference"
},
"typeProperties": {
"tableName": "<table name>"
}
}
}
コピー アクティビティのプロパティ
アクティビティの定義に利用できるセクションとプロパティの完全な一覧については、パイプラインに関する記事を参照してください。 このセクションでは、Azure Database for MySQL のソースとシンクでサポートされるプロパティの一覧を示します。
ソースとしての Azure Database for MySQL
Azure Database for MySQL からデータをコピーするために、コピー アクティビティの source セクションでは次のプロパティがサポートされています。
プロパティ | 内容 | 必須 |
---|---|---|
type | コピー アクティビティのソースの type プロパティは、次のように設定する必要があります:AzureMySqlSource | はい |
query | カスタム SQL クエリを使用してデータを読み取ります。 (例: "SELECT * FROM MyTable" )。 |
いいえ (データセットの "tableName" が指定されている場合) |
queryCommandTimeout | クエリ要求がタイムアウトするまでの待機時間。規定では 120 分 (02:00:00) です | いいえ |
例:
"activities":[
{
"name": "CopyFromAzureDatabaseForMySQL",
"type": "Copy",
"inputs": [
{
"referenceName": "<Azure MySQL input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "AzureMySqlSource",
"query": "<custom query e.g. SELECT * FROM MyTable>"
},
"sink": {
"type": "<sink type>"
}
}
}
]
シンクとしての Azure Database for MySQL
データを Azure Database for MySQL にコピーするために、コピー アクティビティの sink セクションでは次のプロパティがサポートされています。
プロパティ | 内容 | 必須 |
---|---|---|
type | コピー アクティビティのシンクの type プロパティは、次のように設定する必要があります: AzureMySqlSink | はい |
preCopyScript | コピー アクティビティの毎回の実行で、データを Azure Database for MySQL に書き込む前に実行する SQL クエリを指定します。 このプロパティを使用して、事前に読み込まれたデータをクリーンアップできます。 | いいえ |
writeBatchSize | バッファー サイズが writeBatchSize に達したら、Azure Database for MySQL テーブルにデータを挿入します。 許可される値は行数を表す整数です。 |
いいえ (既定値は 10,000) |
writeBatchTimeout | タイムアウトする前に一括挿入操作の完了を待つ時間です。 使用可能な値: 期間。 たとえば "00:30:00" (30 分) を指定できます。 |
いいえ (既定値は 00:00:30) |
例:
"activities":[
{
"name": "CopyToAzureDatabaseForMySQL",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Azure MySQL output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "AzureMySqlSink",
"preCopyScript": "<custom SQL script>",
"writeBatchSize": 100000
}
}
}
]
Mapping Data Flow のプロパティ
マッピング データ フローでデータを変換する場合、Azure Database for MySQL からテーブルの読み取りと書き込みを実行できます。 詳細については、マッピング データ フローのソース変換とシンク変換に関する記事をご覧ください。 ソースとシンクの種類として、Azure Database for MySQL データセットまたはインライン データセットを使用することができます。
ソース変換
次の表に、Azure Database for MySQL ソースでサポートされるプロパティの一覧を示します。 これらのプロパティは、 [ソース オプション] タブで編集できます。
名前 | 説明 | 必須 | 使用できる値 | データ フロー スクリプトのプロパティ |
---|---|---|---|---|
テーブル | [テーブル] を入力として選択した場合、データセットで指定されたテーブルからすべてのデータがデータ フローによってフェッチされます。 | いいえ | - | (インライン データセットのみ) tableName |
クエリ | [クエリ] を入力として選択した場合は、ソースからデータをフェッチする SQL クエリを指定します。これにより、データセットで指定したテーブルがオーバーライドされます。 テストまたはルックアップ対象の行を減らすうえで、クエリの使用は有効な手段です。 Order By 句はサポートされていませんが、完全な SELECT FROM ステートメントを設定することができます。 ユーザー定義のテーブル関数を使用することもできます。 select * from udfGetData() は、データ フローで使用できるテーブルを返す SQL の UDF です。 クエリ例: select * from mytable where customerId > 1000 and customerId < 2000 または select * from "MyTable" 。 |
いいえ | String | query |
ストアド プロシージャ | 入力として [ストアド プロシージャ] を選択する場合、ソース テーブルからデータを読み込むストアド プロシージャの名前を指定するか、[更新] を選択し、プロシージャ名を検出するようにサービスに要求します。 | はい ([ストアド プロシージャ] を入力として選択した場合) | String | procedureName |
プロシージャのパラメーター | 入力として [ストアド プロシージャ] を選択した場合、プロシージャで設定された順序でストアド プロシージャの入力パラメーターを指定するか、[インポート] を選択し、フォーム @paraName を使用してすべてのプロシージャ パラメーターをインポートします。 |
いいえ | Array | inputs |
バッチ サイズ | 大量データをバッチにまとめるバッチ サイズを指定します。 | いいえ | Integer | batchSize |
Isolation Level | 次のいずれかの分離レベルを選択します。 - コミットされたものを読み取り - コミットされていないものを読み取り (既定値) - 反復可能読み取り - シリアル化可能 - なし (分離レベルを無視) |
いいえ | READ_COMMITTED READ_UNCOMMITTED REPEATABLE_READ SERIALIZABLE NONE |
isolationLevel |
Azure Database for MySQL ソース スクリプトの例
ソースの種類として Azure Database for MySQL を使用する場合、関連付けられているデータ フロー スクリプトは次のようになります。
source(allowSchemaDrift: true,
validateSchema: false,
isolationLevel: 'READ_UNCOMMITTED',
query: 'select * from mytable',
format: 'query') ~> AzureMySQLSource
シンク変換
次の表に、Azure Database for MySQL シンクでサポートされるプロパティの一覧を示します。 これらのプロパティは、 [シンク オプション] タブで編集できます。
名前 | 説明 | 必須 | 使用できる値 | データ フロー スクリプトのプロパティ |
---|---|---|---|---|
更新方法 | 対象となるデータベースに対して許可される操作を指定します。 既定では、挿入のみが許可されます。 行を更新、アップサート、または削除するには、それらのアクションに対して行をタグ付けするために行の変更変換が必要になります。 |
はい | true または false |
deletable insertable updateable upsertable |
[キー列] | 更新、upsert、削除の場合、キー列 (複数可) を設定して、変更する行を決定する必要があります。 キーとして選択する列の名前は、後続の更新、upsert、削除の一部として使用されます。 そのため、シンク マッピングに存在する列を選択する必要があります。 |
いいえ | Array | キー |
Skip writing key columns\(キー列の書き込みをスキップする) | キー列に値を書き込まない場合は、[Skip writing key columns](キー列の書き込みをスキップする) を選択します。 | いいえ | true または false |
skipKeyWrites |
テーブル アクション | 書き込み前に変換先テーブルのすべての行を再作成するか削除するかを指定します。 - なし: テーブルに対してアクションは実行されません。 - Recreate:テーブルが削除され、再作成されます。 新しいテーブルを動的に作成する場合に必要です。 - Truncate:ターゲット テーブルのすべての行が削除されます。 |
いいえ | true または false |
recreate truncate |
バッチ サイズ | 各バッチで書き込まれる行の数を指定します。 バッチ サイズを大きくすると、圧縮とメモリの最適化が向上しますが、データをキャッシュする際にメモリ不足の例外が発生するリスクがあります。 | いいえ | Integer | batchSize |
事前および事後の SQL スクリプト | データがシンク データベースに書き込まれる前 (前処理) と書き込まれた後 (後処理) に実行される複数行の SQL スクリプトを指定します。 | いいえ | String | preSQLs postSQLs |
ヒント
- 複数のコマンドを含む単一のバッチ スクリプトを複数のバッチに分割することをお勧めします。
- バッチの一部として実行できるのは、単純に更新数を返すデータ操作言語 (DML) ステートメントおよびデータ定義言語 (DDL) ステートメントだけです。 詳細については、「バッチ操作の実行」を参照してください。
増分抽出を有効にする: このオプションを使用して、パイプラインが最後に実行されてから変更された行のみを処理するように ADF に指示します。
増分日付列: 増分抽出機能を使う場合は、ソース テーブルのウォーターマークとして使う日時列を選ぶ必要があります。
最初から読み取りを開始する: 増分抽出でこのオプションを設定すると、増分抽出が有効になっているパイプラインの最初の実行時にすべての行を読み取るよう ADF に指示します。
Azure Database for MySQL シンク スクリプトの例
シンクの種類として Azure Database for MySQL を使用する場合、関連付けられているデータ フロー スクリプトは次のようになります。
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
deletable:false,
insertable:true,
updateable:true,
upsertable:true,
keys:['keyColumn'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> AzureMySQLSink
Lookup アクティビティのプロパティ
プロパティの詳細については、Lookup アクティビティに関するページを参照してください。
Azure Database for MySQL のデータ型のマッピング
Azure Database for MySQL からデータをコピーするとき、MySQL のデータ型からサービス内で内部的に使用される中間データ型への次のマッピングが使用されます。 コピー アクティビティでソースのスキーマとデータ型がシンクにマッピングされるしくみについては、スキーマとデータ型のマッピングに関する記事を参照してください。
Azure Database for MySQL データ型 | 中間サービス データ型 |
---|---|
bigint |
Int64 |
bigint unsigned |
Decimal |
bit |
Boolean |
bit(M), M>1 |
Byte[] |
blob |
Byte[] |
bool |
Int16 |
char |
String |
date |
Datetime |
datetime |
Datetime |
decimal |
Decimal, String |
double |
Double |
double precision |
Double |
enum |
String |
float |
Single |
int |
Int32 |
int unsigned |
Int64 |
integer |
Int32 |
integer unsigned |
Int64 |
long varbinary |
Byte[] |
long varchar |
String |
longblob |
Byte[] |
longtext |
String |
mediumblob |
Byte[] |
mediumint |
Int32 |
mediumint unsigned |
Int64 |
mediumtext |
String |
numeric |
Decimal |
real |
Double |
set |
String |
smallint |
Int16 |
smallint unsigned |
Int32 |
text |
String |
time |
TimeSpan |
timestamp |
Datetime |
tinyblob |
Byte[] |
tinyint |
Int16 |
tinyint unsigned |
Int16 |
tinytext |
String |
varchar |
String |
year |
Int32 |
関連するコンテンツ
Copy アクティビティでソースおよびシンクとしてサポートされるデータ ストアの一覧については、サポートされるデータ ストアに関するセクションを参照してください。