SQL Server のデータをエクスポートする場合、大規模データをエクスポートする場合はいくつかのデータに分割してエクスポートを行うことで、
- ファイル当たりのエクスポートデータのサイズ調整
- データに問題があった場合の再抽出
- データの並列ローディング
というようなメリットがあるのではないでしょうか?
PolyBase のような機能では、パーティションテーブルに対してのアクセスについては、パーティション単位にスレッドを分散させてデータアクセスを行っており、何らかの論理空間で分割が行われている場合は、「パーティション単位でデータをエクスポートする」というように、複数のエクスポートデータを容易に生成することができます。
それでは「非パーティションテーブル」ではどのような方法を使用することで、エクスポートデータを分割することができるでしょうか?
以前投稿した、Database Migration Assistant (DMA) で SQL Server から SQL Database へのデータ移行方法について にも関連する内容となるのですが、非パーティションテーブルを複数のエクスポートデータに分割したい場合、「統計情報を使用する」というアプローチをとることができますので、本投稿で紹介させていただきます。
タイトルには、「非パーティションテーブル」と書きましたが「パーティションテーブル」でも使用できます。(パーティションと統計情報のヒストグラムを組み合わせるとさらに細かな単位でデータを分割できるかと)
Contents
Data Migration Assistant (DMA) を参考にする
冒頭で紹介した Database Migration Assistant (DMA) で SQL Server から SQL Database へのデータ移行方法について の投稿では次のように記載していました。
Bulk Insert については、SqlBulkCopy.WriteToServerAsync メソッド が使用されています。
この時に投入されるデータについては、先ほどの DataTable で取得された内容となります。
移行先のデータ投入のための Bulk Insert については、SqlBulkCopy の WriteToServer によって実施されますので、4 並列でデータの取得が行われていた場合は、4 並列で Bulk Insert が実行された状態となります。
DMA は SQL Server ベースの環境間の移行に使用するツールとなり、SQL Database のデータ移行では、SqlBulkCopy を使用して移行先にデータの投入が行われています。
本投稿では触れませんが、実装が気になる方は次の内容を調査してみるとよいのではないでしょうか。
- Microsoft.SqlServer.Migration.Core.dll
- Microsoft.SqlServer.Migration.Core.DataMovement.DataMovementWriter
- Microsoft.SqlServer.Migration.Core.DataMovement.SqlServerStatisticsQuery
統計情報を使用したデータ分割
DMA のデータ分割方法は、パーティションテーブルかどうかによっても変わるのですが、「統計情報を使用してデータを分割させる」というような処理も実装されています。
具体的には次のようなクエリを実行して、キー情報と統計情報の取得を行い、この情報を元にしてデータの分割を行います。
DECLARE @TableName sysname = 'LINEITEM'
DECLARE @ObjectId INT, @IndexId INT, @Type INT, @IndexName sysname; -- table type is User
SET @ObjectId = OBJECT_ID(@TableName, N'U');
-- The first result contains details about the type of table, i.e. is it a heap or not
SELECT @IndexId = index_id, @IndexName = name, @Type = type FROM sys.indexes WITH (NOLOCK)
WHERE object_id = @ObjectId AND ((type = 1) OR ((name IS NULL) AND type = 0)); -- index type is Clustered or heap
-- A null @Type value means the table isn't a heap, and didn't have a clustered index
-- For example, it could have a clustered column store index
SELECT @IndexId as IndexId, @IndexName As IndexName, ISNULL(@Type,-1) as Type
-- Check if there is a clustered index
IF (@Type = 1) AND (@IndexId IS NOT NULL) BEGIN
-- The second result contains all the key columns in the clustered index
SELECT
c.name AS ColumnName,
dt.name AS ColumnType,
c.is_nullable AS IsNullable,
ic.is_descending_key AS IsDescending
FROM
sys.index_columns ic WITH (NOLOCK)
INNER JOIN sys.columns c WITH (NOLOCK) ON c.object_id = ic.object_id AND c.column_id = ic.column_id
LEFT JOIN sys.types dt WITH (NOLOCK) ON dt.user_type_id = c.user_type_id AND dt.is_user_defined = 0
WHERE
ic.object_id = @ObjectId AND ic.index_id = @IndexId
ORDER BY
ic.key_ordinal;
-- The third result contains the histogram for the first key column of the index
DBCC SHOW_STATISTICS(@TableName, @IndexName) WITH HISTOGRAM;
END
クエリの実行結果はこのようになります。
- テーブルの主キーの情報
- 主キーに含まれる列の情報
- 主キーのインデックスの統計情報のヒストグラム
これら 3 種類の情報の取得を行っています。
「主キーのインデックスの統計情報のヒストグラム」には、「キー項目の先頭列のヒストグラム」が含まれており、
- 該当のキー項目の分布の上限値
- 範囲に含まれる推定列数
の情報を確認することができます。
SQL Server では、統計情報のヒストグラムのステップ数の上限は「200」と決まっており、データのカーディナリティによって、作成されるヒストグラムのステップ数は変わってきますが、統計情報を使用することで、「最大 200分割」までは実施することができます。
このヒストグラムの「RANGE_HI_KEY」と、主キーの情報を使用して、次のようなクエリを生成することで、主キーを使用したエクスポートデータを生成するためのクエリを作成することができます。
SELECT * FROM LINEITEM WHERE L_ORDERKEY < 47136 ORDER BY L_ORDERKEY ASC, L_LINENUMBER ASC SELECT * FROM LINEITEM WHERE L_ORDERKEY >= 47136 AND L_ORDERKEY < 47138 ORDER BY L_ORDERKEY ASC, L_LINENUMBER ASC - 省略 - SELECT * FROM LINEITEM WHERE L_ORDERKEY >= 299995109 AND L_ORDERKEY < 299995110 ORDER BY L_ORDERKEY ASC, L_LINENUMBER ASC SELECT * FROM LINEITEM WHERE L_ORDERKEY >= 299995110 ORDER BY L_ORDERKEY ASC, L_LINENUMBER ASC
L_ORDERKEY は int 型の項目ですが、主キーが数値以外の場合でも、データ範囲を表す「RANGE_HI_KEY」は、格納されているデータをベースにして生成されますので、文字列型の場合は文字列データとして RANGE_HI_KEY を取得することができます。
上記のようにして生成したクエリを、
- bcp の queryout を使用したエクスポートデータの生成
- SqlBulkCopy の入力データの生成
に使用することで、適度にデータ分割が行われたエクスポート用データを生成することができます。
データの分布に偏りがある場合は「RANGE_ROWS」を考慮して、いくつかのデータ範囲をまとめることも考慮しなくてはいけないかもしれませんが、手軽にデータを分割する場合には、取得したヒストグラムの行単位でデータを分割させてもよいのではないでしょうか。
ソート済みデータを使用して SqlBulkCopy で効率の良いデータインポートを行う
統計情報を使用したデータエクスポートですが、一般的には主キーを使用したものとなるかと思います。
エクスポートデータを取得するクエリでは、「ソート済みデータ」としてデータのエクスポートを行うことでSqlBulkCopy によるデータインポートを行う場合、「SqlBulkCopyColumnOrderHint」を活用することができます。
SqlBulkCopyColumnOrderHint を使用するためには「Microsoft.Data.SqlClient」を使用する必要があります。PowerShell の場合、Microsoft.Data.SqlClient を使用できるための環境を整える必要がありますが、PowerShell (7.0.2) で Microsoft.Data.SqlClient 2.0 を使用する のような方法で、Windows PowerShell / PowerShell 7.0 の両方で、使用することができます。
ヒープのテーブルであれば、関係はないのですが、クラスター化インデックスを設定しているテーブルに対して、Bulk Insert を実行した場合、通常は次のような操作によりデータの投入が行われます。
SqlBulkCopyColumnOrderHint が指定されていない場合は、投入されるデータがどのようなルールに基づいてソートされているかが、Bulk Insert は判断することができないため、データ投入時には Sort の操作が含まれます。
SqlBulkCopyColumnOrderHint を使用した場合は、Bulk Insert はどのようなソートが行われたデータなのかを判断することができるようになりますので、データ投入時に Sort の操作が省略されます。
実行される Insert Bulk ステートメントも次のように変化します。
-- SqlBulkCopyColumnOrderHint なし insert bulk [LINEITEM] ([L_ORDERKEY] Int, [L_PARTKEY] Int - 省略 -) with (KEEP_NULLS, TABLOCK) -- SqlBulkCopyColumnOrderHint あり insert bulk [LINEITEM] ([L_ORDERKEY] Int, [L_PARTKEY] Int, - 省略 - ) with (KEEP_NULLS, TABLOCK, ORDER([L_ORDERKEY] ASC, [L_LINENUMBER] ASC))
Sort はコストの高い操作ですので、
- クラスター化インデックスが設定されているテーブルに SqlBulkCopy でデータ投入を行う
- Microsoft.Data.SqlClient が利用できる
場合には、SqlBulkCopyColumnOrderHint の利用を検討するとよいのではないでしょうか。
PowerShell のサンプル
TPC-H の LINEITEM のデータを移行する想定のサンプルも残しておきたいと思います。
本来はクエリの生成も動的に実施する必要がありますが、今回は、方式の確認を行うものですので、いろいろな情報をハードコードしたざっくりとしたサンプルです。
Windows PowerShell 向けとして、.NET Framework の Microsoft.Data.SqlClient を使用しています。
実行すると、次のように統計情報のヒストグラムを使用しながら分割してデータの取得を行い、移行先にデータの導入を行っていきます。![]()
$ErrorActionPreference = "Stop"
$sourceConnectionstring = "Server=localhost;Integrated security=true;Application Name=DataCopy;Database=tpch"
$destinationConnectionstring = "Server=localhost;Integrated security=true;Application Name=DataCopy;;Database=tpch2"
$tableName = "LINEITEM"
$targetTableStatsQuery = "DBCC SHOW_STATISTICS('LINEITEM', 'PK_LINEITEM') WITH HISTOGRAM;"
$query1 = "SELECT * FROM LINEITEM WHERE L_ORDERKEY < {0} ORDER BY L_ORDERKEY ASC, L_LINENUMBER ASC"
$query2 = "SELECT * FROM LINEITEM WHERE L_ORDERKEY >= {0} AND L_ORDERKEY < {1} ORDER BY L_ORDERKEY ASC, L_LINENUMBER ASC"
$query3 = "SELECT * FROM LINEITEM WHERE L_ORDERKEY >= {0} ORDER BY L_ORDERKEY ASC, L_LINENUMBER ASC"
<#
select * from sys.dm_exec_sessions where program_name = 'DataCopy' order by login_time asc
select
FORMAT(row_count, '#,##0') AS rows,
FORMAT(reserved_page_count * 8 / 1024, '#,##0') AS reserved_page_MB,
FORMAT(used_page_count * 8 / 1024, '#,##0') AS used_page_count
from sys.dm_db_partition_stats where object_id = OBJECT_ID('LINEITEM')
select * from sys.dm_exec_query_profiles AS p
outer apply sys.dm_exec_input_buffer(p.session_id, 1)
where p.session_id in(
select session_id from sys.dm_exec_sessions where program_name = 'DataCopy'
)
#>
Add-type -Path "C:\SqlClient\microsoft.identity.client.4.21.1.nupkg\lib\net461\Microsoft.Identity.Client.dll"
Add-Type -Path "C:\SqlClient\microsoft.data.sqlclient.2.1.2.nupkg\runtimes\win\lib\net46\Microsoft.Data.SqlClient.dll"
function CopyData($sourceConnection, $destinationConnection, $query){
while($true){
try{
if($sourceConnection.State -ne "Open"){
$sourceConnection.Open()
}
if($destinationConnection.State -ne "Open"){
$destinationConnection.Open()
}
$option = [System.Data.SqlClient.SqlBulkCopyOptions]::1 -bor [System.Data.SqlClient.SqlBulkCopyOptions]::KeepNulls -bor [System.Data.SqlClient.SqlBulkCopyOptions]::TableLock -bor [System.Data.SqlClient.SqlBulkCopyOptions]::UseInternalTransaction
$sqlBulk = New-Object Microsoft.Data.SqlClient.SqlBulkCopy($destinationConnection, $option, $null)
[Microsoft.Data.SqlClient.SqlCommand]$sourceDataCommand = $sourceConnection.CreateCommand()
$sourceDataCommand.CommandText = $query
$sourceDataCommand.CommandTimeOut = 0
$r = $sourceDataCommand.ExecuteReader()
$sqlBulk.DestinationTableName = $tableName
$sqlBulk.BulkCopyTimeout = 0
$sqlBulk.BatchSize = 0
[void]$sqlBulk.ColumnOrderHints.Add((New-Object Microsoft.Data.SqlClient.SqlBulkCopyColumnOrderHint("L_ORDERKEY", ([Microsoft.Data.SqlClient.SortOrder]::Ascending))))
[void]$sqlBulk.ColumnOrderHints.Add((New-Object Microsoft.Data.SqlClient.SqlBulkCopyColumnOrderHint("L_LINENUMBER", ([Microsoft.Data.SqlClient.SortOrder]::Ascending))))
$sqlBulk.WriteToServer($r)
break
}catch{
Write-Host ("Error (Loop Retry) : {0}" -f $Error[0].Exception.Message)
Start-Sleep -Seconds (1 * (Get-Random -Minimum 1 -Maximum 10))
}finally{
$r.Dispose()
}
}
}
$sourceConnection = New-Object Microsoft.Data.SqlClient.SqlConnection($sourceConnectionstring)
$destinationConnection = New-Object Microsoft.Data.SqlClient.SqlConnection($DestinationConnectionstring)
$sourceConnection.Open()
$destinationConnection.Open()
$sourceStatsCommand = $sourceConnection.CreateCommand()
$sourceStatsCommand.CommandText = $targetTableStatsQuery
$r = $sourceStatsCommand.ExecuteReader()
$dtStats = New-Object System.Data.DataTable
$dtStats.Load($r)
$r.Close()
$r.Dispose()
$dtStats | ft
$count = 1
foreach($statsRow in $dtStats.Rows){
if($count -eq 1){
$query = $query1 -f $statsRow["RANGE_HI_key"]
}else{
$query = $query2 -f $lowKey,$statsRow["RANGE_HI_key"]
}
Write-Host ("[{0:000}/{1:000}] : Start : {2}" -f $count, $dtStats.Rows.Count, $query)
$sw = [System.Diagnostics.Stopwatch]::StartNew()
CopyData $sourceConnection $destinationConnection $query
$sw.Stop()
Write-Host ("[{0:000}/{1:000}] : End : {2:#,##0} ms" -f $count, $dtStats.Rows.Count, $sw.ElapsedMilliseconds)
$lowKey = $statsRow["RANGE_HI_key"]
if($count -eq $dtStats.Rows.Count){
$query = $query3 -f $statsRow["RANGE_HI_key"]
Write-Host ("[{0:000}/{1:000}] : Start : {2}" -f $count, $dtStats.Rows.Count, $query)
$sw = [System.Diagnostics.Stopwatch]::StartNew()
CopyData $sourceConnection $destinationConnection $query
$sw.Stop()
Write-Host ("[{0:000}/{1:000}] : End : {2:#,##0} ms" -f $count, $dtStats.Rows.Count, $sw.ElapsedMilliseconds)
}
$count++
}
$sourceConnection.Close()
$sourceConnection.Dispose()
$destinationConnection.Close()
$destinationConnection.Dispose()
まとめ
identity の連番がクラスター化インデックスや主キーになっているテーブルや、パーティションテーブルであれば、エクスポート用データの分割は容易に行うことができます。
それ以外の構成のテーブルで、データを分割してエクスポート / インポートする必要がある場合に、どのようにエクスポートデータを分割するか悩ましい場合には、統計情報のヒストグラムを使用してみるという方法が、アイデアの一つとしてよいのではないでしょうか。