SE の雑記

SQL Server の情報をメインに Microsoft 製品の勉強内容を日々投稿

Azure Data Factory の Web Activity で REST API をコールする (MSI を使用しないケース)

without comments

Azure Data Factory の Web Activity を使用すると、パイプライン内で REST API をコールすることができます。

image

Azure のリソースにアクセスするのであれば、Data Factory の MSI を使用するアクティビティを作成すればよいのですが、MSI を使わない場合、どうやるのかを忘れて最初から調べなおすことがあるので、まとめておこうかと。

Azure Data Factory の MSI を使用するケースについては、次の情報を参照してください。

今回は Data Factory の実行状況を確認するための REST API をコールしてみます。

PowerShell でモックを作る

REST をコールした際の挙動を確認するため、適当に PowerShell でモックを作ってみます。

Azure の REST API の呼び出しについては Azure REST API Reference に記載されていますので、使用方法を忘れるたびにこの情報を思い出しています。

client_credentials で、クライアント ID / クライアントシークレットを使用して、アクセストークンを取得するための方法については、トークンを取得する に記載されていますので、トークンの生成方法については、こちらの情報を確認します。

それでは、実際に PowerShell からコールしてみます。

実際にどのような要求を送ればよいのかについては、次の情報をみて、HTTP のリクエストを思い出しています。

Azure AD に登録したアプリケーションで「https://management.core.windows.net」のリソースにアクセスするためのアクセストークンを取得する必要がありますので、サクッと取得します。

$tenantId = "<Tenant ID>"
$clientId = "<Azure AD Application Client ID>"
$clientSecret = "<Azure AD Application Client Secret>"

$uri = ("https://login.microsoftonline.com/{0}/oauth2/token" -f $tenantId)
$method = "POST"
$header = @{"Content-Type" = "application/x-www-form-urlencoded"}
$body = @{
    grant_type = "client_credentials"
    client_id = $clientId
    client_secret = $clientSecret
    resource = "https://management.core.windows.net"
}

$result = Invoke-WebRequest -Uri $uri -Method $method -Headers $header -Body $body

$accessToken = ($result.Content | ConvertFrom-Json).access_token

($result.Content | ConvertFrom-Json) | Select * 
([datetime]"1970/1/1").AddSeconds(($result.Content | ConvertFrom-Json).expires_on)

 

これでアクセストークンが取得できましたので、トークンを使って Data Factory の REST API をコールしてみます。

Data Factory の REST API については、Azure Data Factory にドキュメントがあります。

今回はパイプラインの実行状況を取得したいので、Pipeline Runs – Query By Factory を使用して、クエリを実行します。

$subscriptionId = "<Subscription ID>"
$resourceGroup = "<Resource Group>"
$factoryName = "<ADF Name>"
$pipelineName = "<ADF Pipeline Name>"

$restUri = ("https://management.azure.com/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.DataFactory/factories/{2}/queryPipelineRuns?api-version=2018-06-01" -f $subscriptionId, $resourceGroup, $factoryName)

$header = @{
    "Content-Type" = "application/json"
    Authorization = ("Bearer {0}" -f $accessToken) 
}

$lastUpdateAfter = ([System.DateTime]::UtcNow).AddDays(-1).GetDateTimeFormats("o")
$lastUpdateBefore = ([System.DateTime]::UtcNow).AddDays(0).GetDateTimeFormats("o")

$body = @{
    lastUpdatedAfter= $lastUpdateAfter[0]
    lastUpdatedBefore= $lastUpdateBefore[0]
    filters = @(
        @{
            operand = "PipelineName"
            operator = "Equals"
            values = ,($pipelineName)
        }
    )
} | ConvertTo-Json -Depth 3


$result = Invoke-RestMethod -Uri $restUri -Headers $header -Method "POST" -Body $body
$result.value

 

Pipeline の実行結果が取得できるのが確認できましたので、この情報の取得を Web Activity に移行してみます。

image

 

Web Activity で MSI を使用せず REST API を呼び出し

PoweShell で要求と応答が確認できましたので、Web Activity に移行を行ってみます。

作成するパイプラインは次のような構成です。

image

パラメーターは手抜きですが、次のような値を設定できるようにしています。

        "parameters": {
            "tenantId": {
                "type": "string",
                "defaultValue": ""
            },
            "clientid": {
                "type": "string",
                "defaultValue": ""
            },
            "clientSecret": {
                "type": "string",
                "defaultValue": ""
            },
            "subscriptionId": {
                "type": "string",
                "defaultValue": ""
            },
            "resourceGroup": {
                "type": "string",
                "defaultValue": ""
            },
            "factoryName": {
                "type": "string",
                "defaultValue": ""
            }
        },

 

Access Token の取得

最初に Acceess Token の取得を行います。

Azure の API であれば、MSI を使ってしまえばいいのですが、今回は Access Token を使用するので認証は None でいきます。

image

最初のアクティビティでは次のような設定を行います。

{
    "name": "Get Access Token",
    "type": "WebActivity",
    "dependsOn": [],
    "policy": {
        "timeout": "7.00:00:00",
        "retry": 0,
        "retryIntervalInSeconds": 30,
        "secureOutput": false,
        "secureInput": false
    },
    "userProperties": [],
    "typeProperties": {
        "url": {
            "value": "@concat('https://login.microsoftonline.com/', pipeline().parameters.tenantId, '/oauth2/token')",
            "type": "Expression"
        },
        "method": "POST",
        "headers": {
            "Content-Type": "application/x-www-form-urlencoded"
        },
        "body": {
            "value": "@concat('grant_type=client_credentials&client_id=', pipeline().parameters.clientid, '&client_secret=', pipeline().parameters.clientSecret, '&resource=https://management.core.windows.net')",
            "type": "Expression"
        }
    }
}

 

Body に Parameters に設定を行った、grant_type=client_credentials を指定した場合に必要となる情報を文字列連結して、API を呼び出します。

正常に呼び出しができれば、Activity の Output として、access_token が取得できていますので、後続では今情報を使用します。

image

 

Access Token を変数に格納

格納しなくても後続で使えますが、何となく変数に入れておきます。

{
    "name": "Set Access Token",
    "type": "SetVariable",
    "dependsOn": [
        {
            "activity": "Get Access Token",
            "dependencyConditions": [
                "Succeeded"
            ]
        }
    ],
    "userProperties": [],
    "typeProperties": {
        "variableName": "accesstoken",
        "value": {
            "value": "@concat('Bearer ', activity('Get Access Token').output.access_token)",
            "type": "Expression"
        }
    }
}

 

これで accesstoken という変数に Bearer Token が設定されていますので、REST API をコールする際にはこの変数を使用します。

 

REST API のコール

最後に REST API をコールします。

{
    "name": "Call Azure REST API",
    "type": "WebActivity",
    "dependsOn": [
        {
            "activity": "Set Access Token",
            "dependencyConditions": [
                "Succeeded"
            ]
        }
    ],
    "policy": {
        "timeout": "7.00:00:00",
        "retry": 0,
        "retryIntervalInSeconds": 30,
        "secureOutput": false,
        "secureInput": false
    },
    "userProperties": [],
    "typeProperties": {
        "url": {
            "value": "@concat('https://management.azure.com/subscriptions/', pipeline().parameters.subscriptionId, '/resourceGroups/', pipeline().parameters.resourceGroup, '/providers/Microsoft.DataFactory/factories/', pipeline().parameters.factoryName, '/queryPipelineRuns?api-version=2018-06-01')\n\n\n",
            "type": "Expression"
        },
        "method": "POST",
        "headers": {
            "Authorization": {
                "value": "@variables('accesstoken')",
                "type": "Expression"
            },
            "Content-Type": "application/json"
        },
        "body": {
            "value": "@json(concat('{',\n'\"lastUpdatedBefore\":\"', utcNow(),'\",',\n'\"lastUpdatedAfter\":\"', addDays(utcNow(), -1),'\"',\n'}'))",
            "type": "Expression"
        }
    }
}

 

Body の JSON の文字列を生成するのが面倒だったので、時間の指定のみを行っています。

正常に実行できていれば、直近 1 日に実行されたパイプラインの実行状態が取得できた状態となります。

image

 

 

パイプラインの実行単位ごとに処理をする必要があるのであれば、この出力を ForEach Activity で処理をすれば、実行結果ごとに処理をすることができますので、Slack 等に連携も可能かと。

image

 

Azure の API をコールするのであれば、ADF の MSI を使用するのが最適かと思いますが、どうやって Access Token を取得するかを思い出すためのメモとして投稿しておきました。

Written by Masayuki.Ozawa

9月 10th, 2020 at 8:37 pm

Posted in Azure Data Factory

Tagged with

Leave a Reply