Azure Data Factory の Web Activity を使用すると、パイプライン内で REST API をコールすることができます。
Azure のリソースにアクセスするのであれば、Data Factory の MSI を使用するアクティビティを作成すればよいのですが、MSI を使わない場合、どうやるのかを忘れて最初から調べなおすことがあるので、まとめておこうかと。
Azure Data Factory の MSI を使用するケースについては、次の情報を参照してください。
-
マネージド ID (Web Activity で MSI を使用する)
今回は Data Factory の実行状況を確認するための REST API をコールしてみます。
Contents
PowerShell でモックを作る
REST をコールした際の挙動を確認するため、適当に PowerShell でモックを作ってみます。
Azure の REST API の呼び出しについては Azure REST API Reference に記載されていますので、使用方法を忘れるたびにこの情報を思い出しています。
client_credentials で、クライアント ID / クライアントシークレットを使用して、アクセストークンを取得するための方法については、トークンを取得する に記載されていますので、トークンの生成方法については、こちらの情報を確認します。
それでは、実際に PowerShell からコールしてみます。
実際にどのような要求を送ればよいのかについては、次の情報をみて、HTTP のリクエストを思い出しています。
Azure AD に登録したアプリケーションで「https://management.core.windows.net」のリソースにアクセスするためのアクセストークンを取得する必要がありますので、サクッと取得します。
$tenantId = "<Tenant ID>" $clientId = "<Azure AD Application Client ID>" $clientSecret = "<Azure AD Application Client Secret>" $uri = ("https://login.microsoftonline.com/{0}/oauth2/token" -f $tenantId) $method = "POST" $header = @{"Content-Type" = "application/x-www-form-urlencoded"} $body = @{ grant_type = "client_credentials" client_id = $clientId client_secret = $clientSecret resource = "https://management.core.windows.net" } $result = Invoke-WebRequest -Uri $uri -Method $method -Headers $header -Body $body $accessToken = ($result.Content | ConvertFrom-Json).access_token ($result.Content | ConvertFrom-Json) | Select * ([datetime]"1970/1/1").AddSeconds(($result.Content | ConvertFrom-Json).expires_on)
これでアクセストークンが取得できましたので、トークンを使って Data Factory の REST API をコールしてみます。
Data Factory の REST API については、Azure Data Factory にドキュメントがあります。
今回はパイプラインの実行状況を取得したいので、Pipeline Runs – Query By Factory を使用して、クエリを実行します。
$subscriptionId = "<Subscription ID>" $resourceGroup = "<Resource Group>" $factoryName = "<ADF Name>" $pipelineName = "<ADF Pipeline Name>" $restUri = ("https://management.azure.com/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.DataFactory/factories/{2}/queryPipelineRuns?api-version=2018-06-01" -f $subscriptionId, $resourceGroup, $factoryName) $header = @{ "Content-Type" = "application/json" Authorization = ("Bearer {0}" -f $accessToken) } $lastUpdateAfter = ([System.DateTime]::UtcNow).AddDays(-1).GetDateTimeFormats("o") $lastUpdateBefore = ([System.DateTime]::UtcNow).AddDays(0).GetDateTimeFormats("o") $body = @{ lastUpdatedAfter= $lastUpdateAfter[0] lastUpdatedBefore= $lastUpdateBefore[0] filters = @( @{ operand = "PipelineName" operator = "Equals" values = ,($pipelineName) } ) } | ConvertTo-Json -Depth 3 $result = Invoke-RestMethod -Uri $restUri -Headers $header -Method "POST" -Body $body $result.value
Pipeline の実行結果が取得できるのが確認できましたので、この情報の取得を Web Activity に移行してみます。
Web Activity で MSI を使用せず REST API を呼び出し
PoweShell で要求と応答が確認できましたので、Web Activity に移行を行ってみます。
作成するパイプラインは次のような構成です。
パラメーターは手抜きですが、次のような値を設定できるようにしています。
"parameters": { "tenantId": { "type": "string", "defaultValue": "" }, "clientid": { "type": "string", "defaultValue": "" }, "clientSecret": { "type": "string", "defaultValue": "" }, "subscriptionId": { "type": "string", "defaultValue": "" }, "resourceGroup": { "type": "string", "defaultValue": "" }, "factoryName": { "type": "string", "defaultValue": "" } },
Access Token の取得
最初に Acceess Token の取得を行います。
Azure の API であれば、MSI を使ってしまえばいいのですが、今回は Access Token を使用するので認証は None でいきます。
最初のアクティビティでは次のような設定を行います。
{ "name": "Get Access Token", "type": "WebActivity", "dependsOn": [], "policy": { "timeout": "7.00:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "url": { "value": "@concat('https://login.microsoftonline.com/', pipeline().parameters.tenantId, '/oauth2/token')", "type": "Expression" }, "method": "POST", "headers": { "Content-Type": "application/x-www-form-urlencoded" }, "body": { "value": "@concat('grant_type=client_credentials&client_id=', pipeline().parameters.clientid, '&client_secret=', pipeline().parameters.clientSecret, '&resource=https://management.core.windows.net')", "type": "Expression" } } }
Body に Parameters に設定を行った、grant_type=client_credentials を指定した場合に必要となる情報を文字列連結して、API を呼び出します。
正常に呼び出しができれば、Activity の Output として、access_token が取得できていますので、後続では今情報を使用します。
Access Token を変数に格納
格納しなくても後続で使えますが、何となく変数に入れておきます。
{ "name": "Set Access Token", "type": "SetVariable", "dependsOn": [ { "activity": "Get Access Token", "dependencyConditions": [ "Succeeded" ] } ], "userProperties": [], "typeProperties": { "variableName": "accesstoken", "value": { "value": "@concat('Bearer ', activity('Get Access Token').output.access_token)", "type": "Expression" } } }
これで accesstoken という変数に Bearer Token が設定されていますので、REST API をコールする際にはこの変数を使用します。
REST API のコール
最後に REST API をコールします。
{ "name": "Call Azure REST API", "type": "WebActivity", "dependsOn": [ { "activity": "Set Access Token", "dependencyConditions": [ "Succeeded" ] } ], "policy": { "timeout": "7.00:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "url": { "value": "@concat('https://management.azure.com/subscriptions/', pipeline().parameters.subscriptionId, '/resourceGroups/', pipeline().parameters.resourceGroup, '/providers/Microsoft.DataFactory/factories/', pipeline().parameters.factoryName, '/queryPipelineRuns?api-version=2018-06-01')\n\n\n", "type": "Expression" }, "method": "POST", "headers": { "Authorization": { "value": "@variables('accesstoken')", "type": "Expression" }, "Content-Type": "application/json" }, "body": { "value": "@json(concat('{',\n'\"lastUpdatedBefore\":\"', utcNow(),'\",',\n'\"lastUpdatedAfter\":\"', addDays(utcNow(), -1),'\"',\n'}'))", "type": "Expression" } } }
Body の JSON の文字列を生成するのが面倒だったので、時間の指定のみを行っています。
正常に実行できていれば、直近 1 日に実行されたパイプラインの実行状態が取得できた状態となります。
パイプラインの実行単位ごとに処理をする必要があるのであれば、この出力を ForEach Activity で処理をすれば、実行結果ごとに処理をすることができますので、Slack 等に連携も可能かと。
Azure の API をコールするのであれば、ADF の MSI を使用するのが最適かと思いますが、どうやって Access Token を取得するかを思い出すためのメモとして投稿しておきました。