ADF v2 も 一般提供が開始 され、ブラウザの UI ベースでのデータ取り込みの一連の処理 (パイプライン) を記載しやすくなってきました。
(v1 と v2 の違いはこちらから。Azure Data Factory と Data Factory バージョン 1 の比較)
今まで、簡単なデータコピーしか試したことがなかったのですが、複数のアクティビティを使用したパイプラインの作成方法をまとめてみたいと思います。
アクティビティの利用方法を理解することを目的としているため、効率的なデータ取り込みについては考慮していません。
(一度の取り込みで済むものを行単位でストアドを実行したりしていますが、これは、アクティビティの利用方法を理解するために意図的に組んでいます)
ADF については、Azure Data Factory のドキュメント が公式のドキュメントとなっており、何かあったら、このドキュメントを見ることになります。
書籍については日本語のものはほとんどないと思うのですが、Hands-On Data Warehousing with Azure Data Factory という書籍が、現時点で発刊されている唯一のものではないでしょうか。
まだ途中までしか読めていないのですが、様々なデータソースを ADF で取り込む (利用する) ための基本的な操作が、ハンズオン形式で学習ができるものとなっており、操作をしながら学習したいときには、こちらの書籍の購入を検討してもよいのではないでしょうか。
Contents
本投稿で構築するパイプライン
本投稿では次のパイプラインの作成方法を学習することができます。
- BLOB / SQL Database をパイプラインの操作対象とする → Connection / Datasets の理解
- 配置された BLOB のファイルの内容を行単位で処理する → For Each の理解
- BLOB の行をストアドプロシージャーで処理する → ストアドプロシージャの実行方法の理解
- ファイルの削除を実施するために Logic Apps を実行する → ADF の基本的なアクティビティ以外の利用の理解
- BLOB にファイルが配置されたことをトリガーとして起動する → イベントトリガーの理解
最初にトリガーを作成できると「何をもってパイプラインを起動するか」からスタートすることができるのですが、現状の ADF のトリガーについては「パイプラインに関連づいていないトリガー」は発行することができないので、最初にパイプラインを作成し、最後にトリガーを設定しています。
それでは、各作業についてみていきましょう。
BLOB / SQL Database をパイプラインの操作対象とする
ADF の操作は、ブラウザベースの UI から実施することになり、「Author」が基本画面となります。
ADF で利用するデータソースの接続情報については「Connections」として作成をし、その接続情報を使用したデータソースの定義を「Datasets」として、定義し利用することになります。
最初に「Connections」を作成します。
(Datasets から作成することもできますが、今回は作業を理解するため、別々に作成しています)
Connections の作成
画面の下の方にあるため、少しわかりずらいのですが、「Connections」をクリックすることで、接続情報を作成することができます。
今回は、SQL Database と BLOB Storage を使用するため、事前に Azure 上にリソースを作成しておきます。
BLOB ストレージについてですが、BLOB ストレージへの操作をトリガーとするため、作成時には必ず「StorageV2 (汎用 v2)」で作成をしてください。
BLOB ストレージの操作をトリガーとした設定については、v2 ストレージのみが許可されています。
https://docs.microsoft.com/ja-jp/azure/data-factory/how-to-create-event-trigger#data-factory-ui
それ以外の注意点はありませんので、検証で使用する v2 ストレージと SQL Database (Basic で問題ありません) の接続情報を登録しておきます。
SQL Database については、次のクエリを実行して、テストで使用するテーブルとストアドプロシージャを作成しておいてください。
CREATE TABLE [dbo].[T1]( [C1] [nvarchar](100) NULL, [C2] [nvarchar](100) NULL, [C3] [nvarchar](100) NULL ) ON [PRIMARY] GO CREATE PROCEDURE [dbo].[ADF_USP] ( @p1 nvarchar(100), @p2 nvarchar(100), @p3 nvarchar(100) ) AS BEGIN SET NOCOUNT ON INSERT INTO T1 VALUES(@p1, @p2, @p3) END GO
本投稿では、「T1 というテーブル」に、BLOB ストレージのファイルの内容を、「ADF_USP というストアドプロシージャ」を介して投入するシナリオとなっています。
接続情報の作成が完了したら、実際にその接続を利用し、ADF のパイプラインのアクティビティとして利用することができる「Datasets」を作成します。
Datasets の作成
Datasets は、ADF のパイプラインの中で使用することができる、データソースとなります。
作成した接続情報毎に Dataset の作成を実施することになります。
今回は、BLOB ストレージ用の Dataset と SQL Database 用の Dataset を作成します。
作成は「+」をクリックすることで実施できます。
BLOB ストレージの設定は次のようになります。
今回、ストレージに格納するファイルについては、3 列のヘッダーを含まないファイルとなりますので、「Schema」については、テキストファイルのレイアウトを設定しています。
ポイントとなるのは、「Connection」の「File path」と、「Parametes」の 2 点になります。
「BLOB ストレージに追加されたファイル」を処理対象としたいため、Dataset のファイルパスについては、可変的に取り扱えるようにする必要があります。
そのため、「Prameters」に、ファイル名を格納するためのパラメーター (SourceFile)を作成し、「Connection」の「Filepath」のファイル名については、作成したパラメーターを指定するため「@dataset().SourceFile」を設定しています。
このような指定は「動的コンテンツ」という指定となります。
この構文自体は、Azure Data Factory の式と関数 から確認することもできますが、動的コンテンツを使用できる項目にカーソルを合わせた際に表示されるブレードで、入力保管をすることもできますので、指定しやすい方でよいかと。
今回の項目であれば「Parameters」の「SourceFile」をクリックすることで、同様の内容となります。
SQL Database 側については、単純なテーブルとスキーマの指定のみとなっており、こちらについては固定的な設定となっています。
(パラメーターは指定していません)
Dataset は、ディレクトリで管理することもできますので、種別や用途ごとに、ディレクトリをまとめておいてもよいかもしれないですね。
これで、使用するデータの設定が完了しました。
次は、メインとなるパイプラインを構築していきます。
配置された BLOB のファイルの内容を行単位で処理する
データ取り込みのフローは「Pipeline」として作成を行います。
今回作成した、パイプラインは次のようなシンプルなものです。
- BLOB ストレージのファイルの内容を Lookup アクティビティで取得
- ファイルの内容を行単位で処理するため ForEach アクティビティに連携
- ファイル削除を実行するための Logic Apps を実行
というような流れになっています。
まずは、ForEach までの内容を見ていきましょう。
Pipeline のパラメーターの設定
最初に Pipeline 自体にパラメーターを作成します。
今回は次の画像のように 2 つのパラメーターを登録しています。
- SourceFile : 処理対象のファイル名
- SourceBlob : 処理対象のコンテナー名
これらは、パイプラインを実行する際に指定する必要があるパラメーターとして定義され、BLOB のイベントトリガーを使用した場合、処理対象となったファイルの情報をこれらのパラメーターにマッピングすることができるようになっています。
マッピングについては、トリガー作成時に指定をしますので、まずは、「空のパラメーター」を設定しておいてください。
Lookup アクティビティ
今回は BLOB のファイルの内容を処理したいため、「Lookup アクティビティ」を使用します。
Lookup アクティビティの設定は次のようになっています。
先ほど作成した BLOB ストレージの Dataset を Lookup の対象しています。
Dataset 作成時に「SourceFile」というパラメーターを設定していますので、この Dataset を使用する際には、パラメーターの指定が必要となります。
個のパラメーターに「@pipeline().parameters.SourceFile」を指定します。
これにより、「パイプラインのパラメーターとして指定された SourceFile を Detaset の処理対象のファイルとする」ことができました。
(Dataset 作成時に指定したパラメーターに、パイプライン実行時に指定されたパラメーターを渡す設定になります)
このアクティビティが実行されると、後続のアクティビティには、ファイルの内容を渡すことができます。
Lookkup アクティビティには、次のような制限がありますので、大量のデータが含まれているファイルを処理する場合には、Lookup ではなく、該当のファイルをコピーアクティビティで処理をするような方法を検討した方が良いかと思います。
Lookup アクティビティの最大行数は 5,000 行で、最大サイズは 2 MB です。
Foreach アクティビティ
先ほどの Lookup アクティビティを設定することで「指定された BLOB ストレージのファイルの内容」を取得することができました。
つぎに、取得した内容をループで処理させるため「Foreach アクティビティ」を使用します。
ForEach アクティビティの「Settings」には、次のように「@activity(‘BLOBLookup’).output. value」を設定します。
この設定により、ForEach アクティビティ内では、Lookup アクティビティで取得したファイルの内容を、「Item」として参照ができるようになります。
ForEach アクティビティは、「指定した Item の内容を使用してアクティビティを実行する」ものとなりますので、ForEach の中に、さらにアクティビティを設定する必要があります。
今回は、次の画像のように、ストアドプロシージャを実行するアクティビティが登録されています。
このストアドプロシージャのを実行するアクティビティには、次のような設定をしています。
SQL DB に作成してある、ストアドプロシージャのパラメーターとして、処理対象のファイルの内容を渡して、データの登録を行うというものです。
今回、BLOB の Dataset の設定としては CSV を指定してあり、ファイルの内容は次のようになっています。
BLOB ストレージの Output を Foreach に連携した場合、行単位でアクティビティが実行されることになります。
各行の列の情報をストアドプロシージャ実行時のパラメーターとして利用する設定が上述の内容となります。
これで、
- BLOB ストレージからファイルの内容を取得
- ファイルの各行に対して処理 (ストアドプロシージャを実行)
という処理を実装することができました。
次、取り込みを行ったファイルの削除を見ていきましょう。
ファイルの削除を実施するために Logic Apps を実行する
ファイルの内容の取り込みが終わりましたので、後処理としてのファイル削除の方法を考えてみます。
ADF の標準的なアクティビティとしてファイル削除が実装されていればよいのですが、残念ながら現時点ではファイル削除のアクティビティというものは存在していません。
これについては Delete Blob Activity Needed in Azure Data Factory UI に情報があります。
(Microsoft Azure Data Factory Samples にも v1 のサンプルがあったりします。)
ADF ですが、データの取り込み系についてはかなり簡単に実行することができるのですが、「データの加工」や「アクティビティにない処理」を実行するには、
- Azure Batch との連携によるカスタマイズされた処理の実行
- Web API をコールすることによる情報の加工
- SSIS 統合ランタイムを使用することによる、ETL の実行
を考える必要があるのではないでしょうか。
ADF をシンプルに使う場合「Extract →Transform → Load (ETL)」ではなく「Extract → Load (EL)」として使うようなことを期待しておいた方が良い気がするのですよね。
(Transform の実装のサンプルも、「列の加工」ではなく、「ファイルの変換」系の内容が多い気がします)
今回は標準 Activity に含まれていない BLOB からファイルの削除を実行する処理を Logc Apps で実装してみます。
(Logic Apps 以外の方法でも実装しやすいもので問題はありません)
Logic Apps の作成
Logic Apps の実装はシンプルなものです。
次のようなフローを作成します。
HTTP 要求時に指定された BLOB の内容を削除するというものになります。
単体で動くかどうかの確認は、次のような PowerShell で実行できるかと。
$uri = "<HTTP 要求の受信時の URL>" $body = @{ "blobname" = "/importdta/T2_02.csv" } $header = @{ "Content-Type" = "application/json" } Invoke-WebRequest -Uri $uri -Method POST -Body ($body | ConvertTo-Json) -ContentType "application/json"
BLOB の削除に削除対象のファイルを配置して「blobname」に、ファイルを指定して削除が行えるかを確認し、実行できれば、Logic Apps の準備は完了です。
次に、この Logic Apps を ADF から呼び出します。
Web アクティビティの作成
ADF の「Web アクティビティ」を使用することで、REST API を実行することができますので、ここから先ほど作成した Logic Apps を呼び出します。
次の画像のように Body まで設定すれば、実行できます。
URL は Logic Apps の HTTP 要求の URL を、ヘッダーについては「Content-Type : application/json」を指定しています。
Body については、もともとのデータの渡し方がいけていないことが悪いのですが、JSON フォーマットに加工した文字列を渡すようにしています。
concat でせっせと文字列加工をする「@concat(‘{“blobname”: “/’, pipeline().parameters.SourceBlob, ‘/’, pipeline().parameters.SourceFile, ‘”}’)」を設定しています。
これにより「{“blobname”: “/importdata/T2_02.csv”} 」が Body に設定されて、Logic Apps が呼び出されることになります。
これで、パイプラインの設定が一通り完了しましたので、最後の設定として「このパイプラインを起動するトリガー」の作成を行います。
BLOB にファイルが配置されたことをトリガーとして起動する
トリガーの作成は 2 箇所から実行することができますが、どちらからでも問題ありません。
今回は「パイプライン上の New/Edit」から実行する想定で。
パイプラインからトリガーを作成する場合は、トリガーの新規作成と、トリガー実行時のパラメーターの設定を実施することができます。
最初に「New」からトリガーを作成します。
今回は、「BLOB ストレージにファイルが追加されたことをトリガーとする」ため、「Event」を選択したトリガーを作成し、どの BLOB ストレージを監査対象とするかを指定して下さい。
最初に記載しましたがここで選択できる BLOB ストレージは「v2 ストレージのみ」となります。
今回は BLOB ストレージに対して次のような設定を実施しています。
設定の際に、「Blob path begins with」と「Blob path ends with」の両方を設定する場合は注意してください。
これについては、「Blob path ends with」プロパティの記載例について に記載されています。
例としては次の画像のような設定をしたとします。
「/importdata/ 配下の CSV の拡張子を持つファイルが格納された場合にトリガーを起動したい」というような意味で設定をしたものとなるのですが、上記の設定では、そのような設定にはなりません。
ADF の BLOB のトリガーについては、「Event Grid」として登録が行われます。
Event Grid の登録内容を確認するとわかるのですが、Event Grid の登録内容としては「/blobs/.csv」というような内容となってしまい、特定のコンテナーの特定の拡張子をターゲットとするという設定にはなっていません。
ということで、ADF から「Blob path begins with」と「Blob path ends with」を設定する場合は、どちらかを一方の指定した方が良いかと。
Even Grid の「Suffix Filter」を直接修正すれば「特定のディレクトリの特定の拡張子」というような設定になりそうですが、微妙な動作をしそうな気もするので、この辺は気を付けて検証して利用してみてください。
パイプラインからトリガーを設定する場合「そのトリガーを使用してパイプラインを実行する際に使用するパラメーター」を指定することができます。
今回はパイプラインのパラメーターとして次の内容を設定しています。
- SourceFile : 処理対象のファイル名
- SourceBlob : 処理対象のコンテナー名
そのため、トリガーの実行時にもこれらのパラメーターを指定する必要があります。
今回は次のような設定を行います。
-
- SourceFile : @triggerBody().fileName
- SourceBlob : @triggerBody().folderPath
BLOB のイベントをトリガーとした場合、上記の変数には、トリガーを起動させることになったコンテナーの名称とファイルの名称が自動的に指定されますので、それをパイプライン内で使用する変数にマッピングして実行を行っています。
これで、パイプラインの作成が完了しました。
パイプラインの実行
それでは、パイプラインを実行してみましょう。
BLOB ストレージにファイルをアップロードします。
そうすると、トリガーが発火し、自動的にパイプラインが実行されます。
今回は 4 ファイルアップロードしていますので、 各ファイルに対してパイプラインが実行されていますね。
Parameters の「@」をクリックすると、パイプライン実行時に自動的に設定されたパラメーターも確認できます。
Logic Apps についても実行され、ファイルが削除されたことが確認できますね。
エラー系の処理については実装できていないため、
- ファイルのレイアウトが異なっている場合
- Logic Apps の実行の結果エラーになった場合
等々の考慮は必要ですが、単純な「Data Copy」のアクティビティを利用するだけでなく、複数のアクティビティを利用した ADF のパイプライン作成はこのような流れになるのではないでしょうか。