トランザクションレプリケーションの動作を理解する ? ログリーダー編 ? の続きです。
前回の投稿では、「ログリーダー」の動作を確認しました。
ここまでの操作で「ディストリビューターに配信用のデータが格納された状態」となります。
これを「サブスクライバー」に適用させるのが「ディストリビューション エージェント」の動作となり、今回の投稿の対象となる範囲です。
今回の投稿では、データの配信方法は「プッシュサブスクリプション」を対象としています。
ログリーダーによって、ディストリビューション DB に連携用のコマンドが格納されたら、それをサブスクライバーに反映させることでレプリケーションの基本的な動作が完了します。
それでは、このデータ反映のための「ディストリビューション エージェント」はどのような動作を行っているのでしょうか?
ディストリビューション エージェントは、ディストリビューターで実行されている、次の SQL Server Agent のジョブにより「distrib.exe」が実行されることで制御が行われています。
このプロセスがディストリビューターとサブスクライバーに接続をしながらデータの同期を行います。
ディストリビューション エージェントは、「ディストリビューター」のSQL Server 内では次のようなセッションの情報として確認ができます。
declare @host_process_id int = (select top 1 host_process_id from sys.dm_exec_sessions where program_name like '%Distribution%') select session_id, login_time,host_name,program_name, last_request_start_time,last_request_end_time from sys.dm_exec_sessions where host_process_id = @host_process_id order by last_request_start_time desc
一つがデータ配信用のセッションで、もう一つが配信の履歴を書き込むセッションとなっています。
上記の画像では、Session Id : 53 がデータ同期用のコマンドの取得などを実施しており、Session Id : 52 については、データの配信があった際に「sp_MSadd_distribution_history」を実行して、配信状況の出力を行うものとなります。
上記のクエリは「ディストリビューター」で実行するものですが、今回はデータの同期を行うため、「サブスクライバー」も関係します。
declare @host_process_id int = (select top 1 host_process_id from sys.dm_exec_sessions where program_name like '%Replication%') select session_id, host_process_id , login_time,host_name,program_name, last_request_start_time,last_request_end_time from sys.dm_exec_sessions where host_process_id = @host_process_id order by last_request_start_time desc
サブスクライバー側でも 2 つのセッションが動作しています。
host_process_id = 3620 はディストリビューターで動作している「distrib.exe」のプロセス ID であり、このセッションはディストリビューターから接続されているものとなります。
一つのセッションが、ディストリビューターがサブスクライバーの情報を取得するために使用しており、もう一つのセッションがコマンドを再生する際に使用される常時接続されたセッションとなります。
ディストリビューション エージェントの動作は、サブスクライバーにデータを反映するものとなりますので、このようにディストリビューターとサブスクライバーに接続されたセッション間で連携を行いながらデータの同期を行っています。
基本的な動作の流れは次のようになります。
- サブスクライバーから「どこまでデータの連携が行われているか?」の情報を取得する
- 取得した情報を元にディストリビューションデータベースからコマンドを取得
- サブスクライバーにコマンドを実行
- サブスクライバーのデータ連携状況の情報を更新
ディストリビューター エージェントの動作についても、High Level Transactional Replication Process Flow で解説が行われています。
キーとなるのがこちらの情報と 13 以降でしょうか。
使用される情報としては次のようになっています。
- MSreplication_subscriptions : サブスクライバーのテーブルの情報
- サブスクライバーから「どこまでデータの連携が行われているか?」の情報を取得する
- sp_MSget_repl_commands : ディストリビューターで実行
- 取得した情報を元にディストリビューションデータベースからコマンドを取得
- 配信用コマンド : サブスクライバーで実行
- サブスクライバーでコマンドを実行
- MSreplication_subscriptions : サブスクライバー で実行
- サブスクライバーのデータ連携状況の情報を更新
前回と同様に、手動でディストリビューション エージェントをエミューレーションしてみたいと思います。
Contents
1.サブスクライバーから「どこまでデータの連携が行われているか?」の情報を取得する
最初に、ディストリビューターはデータの連携状況をサブスクライバーから取得する必要があります。
この情報がないと「サブスクライバーがどこまで情報を受け取ったか」を把握することができません。
サブスクライバーがどこまで情報を受け取っているかについては、サブスクライバーのデータの同期が行われているデータベース内の「MSreplication_subscriptions」で確認することができます。
実際に実行されるクエリとしては次のようになります。
exec sp_executesql N' select hashid = case datalength(transaction_timestamp) when 16 then isnull(substring(transaction_timestamp, 16, 1), 0) else 0 end, transaction_timestamp, subscription_guid from MSreplication_subscriptions where UPPER(publisher) = UPPER(@P1) and publisher_db = @P2 and publication= @P3 and subscription_type = 0 ',N'@P1 nvarchar(8),@P2 nvarchar(6),@P3 nvarchar(18)', N'2012-PUB',N'ReplDB',N'Repldb_Replication'
「transaction_timestamp」が取得されるのですが、これがサブスクライバー側で「どこまで連携されているかの xact_seqno の値」となります。
ディストリビューターは、これ以降の xact_seq_no のコマンドを連携する必要があります。
2. 取得した情報を元にディストリビューションデータベースからコマンドを取得
サブスクライバー側でどこまで連携されているかの「transaction_timestamp」(xact_seqno) が確認出来たら、その値を使用して、ディストリビューター側でコマンドの情報を取得します。
ディストリビューターでは「sp_MSget_repl_commands」が実行され、コマンドが取得されます。
実際のコマンドは次のようになります。
(ストアドの最初の引数は agent_id となりますので、MSdistribution_agents からターゲットするエージェント ID を取得して下さい)
use distribution; DECLARE @xact_seqno varbinary(16) = 0x00000283000000D80003000000000000 exec sp_MSget_repl_commands 5,@xact_seqno,0,10000000
サブスクライバーで指定した transaction_timestamp を xact_seqno に設定してストアドを実行することで、配信対象のコマンドの情報を取得することができます。
(ベースとなる情報は MSrepl_commands / MSrepl_transactions の情報かと思いますが)
実際に実行されるコマンドは「command」の列のバイナリ値となり、これについては「sp_printstatement」という内部のストアドプロシージャーでテキスト化されているため、「sp_browsereplcmds」で変換をかけます。
DECLARE @xact_seqno_start nchar(22) = (N'0x' + CONVERT(nchar(20), 0x00000283000000D80003000000000000, 2)) exec sp_browsereplcmds @xact_seqno_start = @xact_seqno_start
これでサブスクライバーで実行する必要のあるコマンドを取得できました。
3. サブスクライバーでコマンドを実行
ディストリビューターでコマンドを生成できましたので、このコマンドをサブスクライバーで実行します。
取得されたコマンドは次のような形式ですが、
{CALL [sp_MSins_dboT1] (62,10,20,NULL,NULL,NULL)}
実際に実行する際には、ステートメントとパラメーターを分割した、次のようなクエリとして実行されます。
(今回はストアドを使用した反映にしていますので、他の形式の場合は異なるパラメーターとなっているかと思いますが)
exec [sp_MSins_dboT1] 62,10,20,NULL,NULL,NULL
4.サブスクライバーのデータ連携状況の情報を更新
サブスクライバーでコマンドの反映が終わったら、サブスクライバーで該当の情報を受け取ったというようにデータを変更する必要があります。
この変更は「MSreplication_subscriptions」を直接更新します。
exec sp_executesql N'update MSreplication_subscriptions set transaction_timestamp = cast(@P1 as binary(15)) + cast(case datalength(transaction_timestamp) when 16 then isnull(substring(transaction_timestamp, 16, 1), 0) else 0 end as binary(1)) , "time" = @P2 where UPPER(publisher) = UPPER(@P3) and publisher_db = @P4 and publication = @P5 and subscription_type = 0 ', N'@P1 varbinary(14),@P2 datetime,@P3 nvarchar(8),@P4 nvarchar(6),@P5 nvarchar(18)' ,0x00000283000001100003 ,'2020-03-07 13:23:16' ,N'2012-PUB' ,N'ReplDB', N'Repldb_Replication'
xact_seqno については、ディストリビューターで「sp_MSget_repl_commands」を実行したさいに、取得した xact_seqno を設定します。
ここれでサブスクライバー側で把握されている受信済みのトランザクションが進んだ状態になります。
このサイクルが繰り返されることでサブスクライバー側にデータの反映が行われることになります。
Qlick Replicate (以前の Attunity Replicate) は、SQL Server をデータソースとした場合、レプリケーションを有効にすることで、利用することが可能なデータ同期の製品ですが、前回 / 今回のようなレプリケーションの動作を活用して、SQL Server 側のテーブルの変更を検知していたはずです。
昨今は、レプリケーションではなく、AlwaysOn 可用性グループのような仕組みでデータベース全体を同期することが多いかもしれませんが、シナリオによってはレプリケーションを使用するケースもあるかと思いますので、どのようにデータが同期されているかは把握しておけると、トラブルシューティングの一助になるのではないでしょうか。