SE の雑記

SQL Server の情報をメインに Microsoft 製品の勉強内容を日々投稿

Azure Data Factory でテーブルの定義に合わせて動的にコピーアクティビティのマッピングを設定する

without comments

Azure Data Factory (ADF) で、コピーアクティビティを使用して、データの投入 (インポート) を行う場合、ポータルからシンプルに設定を行場合は、次のようにマッピングの情報を指定することになります。

image

上記のマッピングについては、テキストの区切りと、テーブルのスキーマが同一であり、複雑な可能が不要なものではあるのですが、このようなインポートを実行する場合に、マッピングを設定するのが面倒だったので、コピーアクティビティのマッピング情報を動的に設定してみたいと思います。

コピーアクティビティですが、上記の画像のように、インポートデータ (Source) のスキーマを読み込み、同期先 (Sink) 都のマッピングを指定することもできるのですが、他のアクティビティと同様に「Add dynamic content」を使用することで、動的に指定したマッピング情報 (Mapping) を使用することができます。

image

以下の JSON はコピーアクティビティの変換のルールを指定する「translator」の記述となります。
マッピング情報に動的なコンテンツを使用した場合は、変換ルールの「translator」の種類が「Expression」として設定され、value に変換ルールのオブジェクトを指定することができるようになります。

image

        "translator": {
            "value": "@json(activity('Lookup1').output.firstRow.result)",
            "type": "Expression"
        }

 

ポータルから GUI でマッピングを設定した場合、次のような形式で「type」「mappings」というキーが記述されている JSON が指定されます。

image

この JSON のオブジェクトを自分で作成することで、インポートのマッピング情報を動的に生成することができます。

それでは、実際に試してみましょう。

 

ストアドプロシージャの作成

今回は SQL Database に TPC-H のスキーマを作成してあり、NATION のテーブルのデータの取り込みを行います。

まずは、この NATION の列の構成をマッピング情報のフォーマットで取得可能なストアドプロシージャを SQL Database に作成します。

CREATE OR ALTER PROCEDURE usp_GetMapping
    @tableName varchar(255)
AS
    DECLARE @parameter varchar(8000)= '{"type": "TabularTranslator", "mappings": {0}}'

    DECLARE @json varchar(8000) = 
    (
    SELECT 
        ROW_NUMBER() OVER (ORDER BY column_id) AS 'source.ordinal', 
        name AS 'sink.name' 
    FROM 
        sys.columns 
    WHERE 
        object_id = object_id(@tableName) 
    FOR JSON PATH
    )

    SELECT REPLACE(@parameter,'{0}', @json) AS result
GO

 

このストアドプロシージャはテーブル名を受け取ることで、ADF の mappings に使用可能なフォーマットで JSON を返すというものになります。

(マッピングはシンプルなものにしているため、今回は、データ型の設定等は実施していません)

SQL Server 2016 で追加された JSON 関数を使用し、多少の加工を行いながら ADF で利用可能なフォーマットで文字列の生成を行っています。

実際に実行してみると、次のような文字列を取得することができます。

EXEC usp_GetMapping @tableName = 'NATION'

image

この結果を ADF で使用して、データのインポートを実施します。

 

Lookup アクティビティの実行

それでは、ADF でパイプラインを作成していきます。

全体像としてはシンプルなもので、二つのアクティビティで構成されています。

image

最初に Lookup アクティビティを実行します。

Lookup アクティビティでは、ストアドプロシージャを実行して、その結果を取得することができますので、先ほど、作成した「usp_GetMapping」を実行するように設定します。

実行時には、テーブル名を指定することができますので、テーブル名を変更することで他の処理でも再利用できます。

image

ストアドプロシージャの実行が完了すると、アクティビティの出力としては、ストアドプロシージャ内で取得されたマッピング情報で使用するための情報が「文字列」(String) として出力されます。

image

 

コピーアクティビティの実行

次に実際にデータの取り込みを行う、コピーアクティビティを実行します。

「Source」「Sink」の設定については通常のコピーアクティビティと同様です。

動的に生成したマッピング情報を使用する場合は「Mapping」で「Add dynamic content」をクリックし、

image

動的なコンテンツとして「@json(activity(‘Lookup1’).output.firstRow.result)」を指定します。

マッピングには、JSON のオブジェクトを渡す必要がありますので、json 関数でストアドの文字列をオブジェクト化します。

(今回の例は Lookup アクティビティの名称が「Lookup1」のため、関数内のアクティビティ名もその名称を使用しています)

image

設定はこれで完了です。

パイプラインを実行すると、ストアドプロシージャからテーブルの情報を元にして生成したマッピング情報が取得され、コピーアクティビティの Mapping として設定されます。

image

 

インポートファイルの列の順序とインポート先テーブルの列の順序が不一致の場合は、ストアドのロジックを少し検討する必要がありますが、シンプルなテキストでしたら、このような動的なマッピングを使用すると、再利用性のあるパイプライン / アクティビティを作成することができるのではないでしょうか。

Written by Masayuki.Ozawa

5月 17th, 2020 at 4:22 pm

Posted in Azure Data Factory

Tagged with

Leave a Reply