Azure Data Factory (ADF) で、コピーアクティビティを使用して、データの投入 (インポート) を行う場合、ポータルからシンプルに設定を行場合は、次のようにマッピングの情報を指定することになります。
(同一の構成 (同一列名) のテーブル to テーブルであれば、明示的なマッピングを設定する必要が不要なケースもあると思いますが)
このようなインポートを実行する場合に、マッピングを設定するのが面倒だったので、コピーアクティビティのマッピング情報を動的に設定してみたいと思います。
コピーアクティビティですが、上記の画像のように、インポートデータ (Source) のスキーマを読み込み、同期先 (Sink) 都のマッピングを指定することもできるのですが、他のアクティビティと同様に「Add dynamic content」を使用することで、動的に指定したマッピング情報 (Mapping) を使用することができます。
以下の JSON はコピーアクティビティの変換のルールを指定する「translator」の記述となります。
マッピング情報に動的なコンテンツを使用した場合は、変換ルールの「translator」の種類が「Expression」として設定され、value に変換ルールのオブジェクトを指定することができるようになります。
"translator": { "value": "@json(activity('Lookup1').output.firstRow.result)", "type": "Expression" }
ポータルから GUI でマッピングを設定した場合、次のような形式で「type」「mappings」というキーが記述されている JSON が指定されます。
この JSON のオブジェクトを自分で作成することで、インポートのマッピング情報を動的に生成することができます。
それでは、実際に試してみましょう。
ストアドプロシージャの作成
今回は SQL Database に TPC-H のスキーマを作成してあり、NATION のテーブルのデータの取り込みを行います。
まずは、この NATION の列の構成をマッピング情報のフォーマットで取得可能なストアドプロシージャを SQL Database に作成します。
CREATE OR ALTER PROCEDURE usp_GetMapping @tableName varchar(255) AS DECLARE @parameter varchar(8000)= '{"type": "TabularTranslator", "mappings": {0}}' DECLARE @json varchar(8000) = ( SELECT ROW_NUMBER() OVER (ORDER BY column_id) AS 'source.ordinal', name AS 'sink.name' FROM sys.columns WHERE object_id = object_id(@tableName) FOR JSON PATH ) SELECT REPLACE(@parameter,'{0}', @json) AS result GO
このストアドプロシージャはテーブル名を受け取ることで、ADF の mappings に使用可能なフォーマットで JSON を返すというものになります。
(マッピングはシンプルなものにしているため、今回は、データ型の設定等は実施していません)
SQL Server 2016 で追加された JSON 関数を使用し、多少の加工を行いながら ADF で利用可能なフォーマットで文字列の生成を行っています。
実際に実行してみると、次のような文字列を取得することができます。
EXEC usp_GetMapping @tableName = 'NATION'
この結果を ADF で使用して、データのインポートを実施します。
Lookup アクティビティの実行
それでは、ADF でパイプラインを作成していきます。
全体像としてはシンプルなもので、二つのアクティビティで構成されています。
最初に Lookup アクティビティを実行します。
Lookup アクティビティでは、ストアドプロシージャを実行して、その結果を取得することができますので、先ほど、作成した「usp_GetMapping」を実行するように設定します。
実行時には、テーブル名を指定することができますので、テーブル名を変更することで他の処理でも再利用できます。
ストアドプロシージャの実行が完了すると、アクティビティの出力としては、ストアドプロシージャ内で取得されたマッピング情報で使用するための情報が「文字列」(String) として出力されます。
コピーアクティビティの実行
次に実際にデータの取り込みを行う、コピーアクティビティを実行します。
「Source」「Sink」の設定については通常のコピーアクティビティと同様です。
動的に生成したマッピング情報を使用する場合は「Mapping」で「Add dynamic content」をクリックし、
動的なコンテンツとして「@json(activity(‘Lookup1’).output.firstRow.result)」を指定します。
マッピングには、JSON のオブジェクトを渡す必要がありますので、json 関数でストアドの文字列をオブジェクト化します。
(今回の例は Lookup アクティビティの名称が「Lookup1」のため、関数内のアクティビティ名もその名称を使用しています)
設定はこれで完了です。
パイプラインを実行すると、ストアドプロシージャからテーブルの情報を元にして生成したマッピング情報が取得され、コピーアクティビティの Mapping として設定されます。
インポートファイルの列の順序とインポート先テーブルの列の順序が不一致の場合は、ストアドのロジックを少し検討する必要があります。
シンプルなテキストでしたら、このような動的なマッピングを使用すると、明示的にマッピングを使用する必要のあるアクティビティが含まれていても、再利用性のあるパイプラインを作成することができるのではないでしょうか。