以前から、イベントハブに送信された Azure の診断ログを SQL Databae に取り込むということができていたかと思います。
昨年末ぐらいに、Azure SQL Database Stream Analytics 統合という機能がプレビューとして提供され、SQL Database のブレードからも Stream Analytics によるデータ取り込みの設定を行うことができるようになりました。
ブログのログをイベントハブに送信しながら、Stream Analytics から SQL Database のテーブルにデータ取り込みを実施してみたのでその時の覚書を。
今回はブログのフロントに配置している Front Door の診断ログ (アクセスログ) をイベントハブに送信し、それをテーブルに受け取るような設定を使っています。
(1 か月回していると、それなりの金額になりそうですので、今は BLOB に出力されている JSON を Data Factory 経由で取り込みを行うようにしています。)
Stream Analytics ジョブについては、単体で作成しても、SQL Database のブレードから作成しても、最終的に作成されるものは、「Stream Analytics ジョブ」というリソースに変わりはありませんが、設定の面では少し違いがあります。
SQL Database のブレードから作成した場合は、次のような違いがあるようです。
- Stream Analytics ジョブの作成時にテーブルのスキーマを生成することができる。
- Stream Analytics ジョブに、SQL Database と関連付けるためのタグが設定され、SQL Database と統合されたような見え方をすることができる
Stream Analytics ジョブから作成しても、SQL Database ブレードから作成しても、ジョブの動作は変わらないようですので、作成方法はお好みで。
イベントハブに出力されている診断ログを Stream Analytics で取り込もうとした際には、Stream Analytics では、診断ログのレコードは次のように認識されます。
「records」というフィールドに JSON としてレコードが格納されていますので、SQL Database のテーブルに書き込むときには、この JSON のパース方法を検討する必要があります。
今の SQL Database では、データベースエンジンで JSON を取り扱うことができるようになっていますので、今回は、
- スキーマ オン ライト (Schema on Write)
- スキーマ オン リード (Schema on Read)
の 2 種類のパターンでログを取り込んでみたいと思います。
■スキーマ オン ライト (Schema on Write)でログの取り込みを行う
書き込み時に JSON をどのようにパースするかを決めておきます。
今回はログを格納するために次のようなテーブルを作成しておきます。
CREATE TABLE [accesslog]( [time] [datetime2](3) NULL, [httpMethod] [varchar](10) NULL, [httpVersion] [numeric](2, 1) NULL, [requestUri] [varchar](4000) NULL, [userAgent] [varchar](4000) NULL, [clientIp] [varchar](50) NULL, [securityProtocol] [varchar](20) NULL, [requestBytes] [int] NULL, [responseBytes] [int] NULL, [httpStatusCode] [varchar](10) NULL, [httpStatusDetails] [varchar](10) NULL, [cacheStatus] [varchar](100) NULL ) GO
SQL Database に取り込み先のテーブルが作成できたら、
- 入力 : イベントハブ
- 出力 : SQL Database の上記のテーブル
を設定した Strean Analytics のクエリを作成します。
こちらの画像では「FrontDoor-EventHub」が、Front Door の診断ログの送信先として使用しているイベントハブとなり、「SQLDB-AccessLog」が先ほど作成した SQL Database のテーブルとなります。
この入力と出力を使用したクエリとして次のような設定を行います。
SELECT record.ArrayValue.time, record.ArrayValue.properties.httpMethod, CAST(record.ArrayValue.properties.httpVersion AS float) AS httpVersion, record.ArrayValue.properties.requestUri, record.ArrayValue.properties.userAgent, record.ArrayValue.properties.clientIp, record.ArrayValue.properties.securityProtocol, CAST(record.ArrayValue.properties.requestBytes AS bigint) AS requestBytes, CAST(record.ArrayValue.properties.responseBytes AS bigint) AS responseBytes, record.ArrayValue.properties.httpStatusCode, record.ArrayValue.properties.httpStatusDetails, record.ArrayValue.properties.cacheStatus INTO [SQLDB-AccessLog] FROM [FrontDoor-EventHub] CROSS APPLY GetArrayElements(records) AS record WHERE record.ArrayValue.properties.requestUri IS NOT NULL AND record.ArrayValue.properties.requestUri LIKE '%:443/2%'
records の内容は JSON ですので、この情報を CROSS APPLY で展開をしながらデータの生成を行っています。
私のブログでは、ページについては「https://blog.engineer-memo.com:443/2009/0/page/20/」というような形式の情報となっていますので、この情報のみをテーブルに連携します。
(イメージのようなコンテンツは省いた形でログを取り込んでいます)
あとは、この Stream Analytics のジョブを実行すれば、イベントハブのデータがテーブルに取り込みが行われます。
■スキーマ オン リード (Schema on Read) でログの取り込みを行う
SQL Server 2016 以降は、JSON をパースすることができる関数が追加されており、この関数は SQL Database でも使用することができます。
SQL Server の JSON サポートについては、JSON データ型のようなものが提供されているのではなく、JSON フォーマットの文字列をパースすることができるという仕様となります。
イベントハブからのデータ連携時の records のフィールドについては、パースをせずに、そのまま取り込みを行いますので、次のようなテーブルを作成します。
CREATE TABLE [schemaonread]( [eventprocessedutctime] [datetime] NULL, [eventenqueuedutctime] [datetime] NULL, [partitionid] [float] NULL, [records] [nvarchar](max) NULL ) GO
records は nvarchar(max) として、JSON をそのまま格納するようにしておきます。
Stream Analytics で records のフィールドを取り込もうとしたとき、テーブルにそのまま格納しようとすると、Record や Array というような文字列が格納されてしまいますので、次のような UDF を作成し、JSON を文字列として展開するようにしておきます。
この関数を介してデータが投入されることで、テーブルには文字列として JSON が登録されることになります。
function JsonToString(json) { return JSON.stringify(json); }
Stream Analytics のクエリとしては次のようなクエリを設定すれば、データの取り込みは完了です。
(こちらも不要なログについてはフィルタしています)
SELECT frontdoorlog.EventProcessedUtcTime, frontdoorlog.EventEnqueuedUtcTime, frontdoorlog.PartitionId, UDF.JsonToString(frontdoorlog.records) as records INTO [schemaonread] FROM [frontdoorlog] CROSS APPLY GetArrayElements(records) AS record WHERE record.ArrayValue.properties.requestUri IS NOT NULL AND record.ArrayValue.properties.requestUri LIKE '%:443/2%'
テーブルに取り込みが行われると、次のようなレコードとして登録が行われます。
records には JSON の文字列が登録された状態ですので、これを列として直すためには、次のようなクエリを実行します。
SELECT eventprocessedutctime, eventenqueuedutctime, json.time, properties.* FROM [frontdoor].[schemaonread] cross apply OPENJSON(records) WITH( time datetime2(3), properties nvarchar(max) N'lax $.properties' AS JSON ) AS json cross apply OPENJSON(json.properties) WITH( [httpMethod] varchar(10), [httpVersion] numeric(2,1), [requestUri] varchar(4000), [userAgent] varchar(4000), [clientIp] varchar(50), [securityProtocol] varchar(20), [requestBytes] int, [responseBytes] int, [httpStatusCode] varchar(10), [httpStatusDetails] varchar(10), [cacheStatus] varchar(10) ) AS properties
records に格納されている文字列を OPENJSON で読み込んで、クエリ内で適切な列としてマッピングを行うことで、データの活用が可能です。
ブログのログレコードがテーブルに保存されていると、数万件のデータについてもすぐに生成ができるので便利ですね。