Репликация средствами Change Tracking. Те же и сервис-брокер.

Предыдущие посты по этой теме:

· Change Tracking

· Репликация таблиц средствами Change Tracking

· Краткое введение в сервис-брокер

· Репликация средствами Change Tracking. Небольшое упражнение на FOR XML PATH и XQuery.

В посте Репликация таблиц средствами Change Tracking мы рассмотрели вариант синхронизации таблиц tbl_1 и tbl_2 при помощи появившегося в SQL Server 2008 механизма отслеживания изменений Change Tracking. В данном посте мы разовьем этот сценарий на случай, когда таблицы находятся на разных серверах. В качестве транспорта будет использоваться появившийся в SQL Server 2005 механизм асинхронного взаимодействия сервис-брокер. Таблица tbl_1 будет находиться на сервере Маша. Change Tracking будет отслеживать происходящие над ней изменения (delete, insert, update). Эти изменения будут превращаться в XML-сообщение и доставляться сервис-брокером на сервер Дубровский, где XML превратится обратно в DML-команды, которые будут применены к таблице tbl_2 на этом сервере. Вместо сервис-брокера можно задействовать свой транспорт по доставке XML, тогда этот сценарий может применяться, когда Маша и Дубровский оба SQL Expressы. Сервис-брокер входит в состав SQL Express, однако два SQL Expressа через него общаться не могут. Необходимо, чтобы хотя бы одна сторона имела взрослую редакцию.

В упрощенном примере обе таблицы будут находиться на одном сервере в одной базе ChangeTracking_Test.

use tempdb

if exists(select 1 from sys.databases where name = 'ChangeTracking_Test') begin

alter database ChangeTracking_Test set single_user with rollback immediate

drop database ChangeTracking_Test

end

create database ChangeTracking_Test

use ChangeTracking_Test

Скрипт 1

В базе со стороны Маши будут работать два процесса: имитация пользовательской активности, вносящая в tbl_1 случайные изменения, и периодическая синхронизация. Чтобы заморозить tbl_1 на момент синхронизации, используется уровень изоляции snapshot. Его нужно включить на базе со стороны Маши:

alter database ChangeTracking_Test set single_user with rollback immediate

alter database ChangeTracking_Test set read_committed_snapshot on

alter database ChangeTracking_Test set multi_user

alter database ChangeTracking_Test set allow_snapshot_isolation on

Скрипт 2

На базе со стороны Маши должен быть поднят Change Tracking:

if not exists (select 1 from sys.change_tracking_databases where database_id = db_id('ChangeTracking_Test'))

alter database ChangeTracking_Test set change_tracking = on (change_retention = 10 minutes, auto_cleanup = on)

Скрипт 3

Со стороны Маши и со стороны Дубровского должен быть задействован сервис-брокер. Практикой хорошего тона при использовании сервис-брокера является иметь мастер-ключ на базе, чтобы потом не вылезла ошибка Краткое введение в сервис-брокер\Скрипт 3.

create master key encryption by password = 'AbraCadabra'

if (select is_broker_enabled from sys.databases where name = 'ChangeTracking_Test') = 0

alter database ChangeTracking_Test set enable_broker with rollback immediate

Скрипт 4

На стороне Маши имеется таблица tbl_1

if object_id('dbo.tbl_1', 'U') is not null drop table tbl_1

create table tbl_1 (

id1 int identity,

id2 int default (datepart(ns, sysdatetime()) / 100),

fld1 varchar(10),

fld2 sql_variant,

primary key(id1, id2)

)

Скрипт 5

с которой будет синхронизироваться таблица tbl_2 на стороне Дубровского:

if object_id('dbo.tbl_2', 'U') is not null drop table tbl_2

create table tbl_2 (

id1 int,

id2 int,

fld1 varchar(10),

fld2 sql_variant,

primary key(id1, id2)

)

Скрипт 6

Включаем отслеживание изменений по tbl_1:

if not exists (select 1 from sys.change_tracking_tables where object_id = object_id('tbl_1'))

alter table tbl_1 enable change_tracking

Скрипт 7

На стороне Маши создаем вспомогательную таблицу для хранения предыдущей версии синхронизации и журналирования результатов

use ChangeTracking_Test

if object_id('dbo.Sync_Log', 'U') is not null drop table dbo.Sync_Log

create table dbo.Sync_Log (dt datetime default sysdatetime(), version bigint default change_tracking_current_version(),

                           source sysname, destination sysname, status nvarchar(200),

                           deleted bigint, inserted bigint, updated bigint)

insert dbo.Sync_Log (version, source, destination)

values (change_tracking_min_valid_version(object_id('dbo.tbl_1')), 'tbl_1', 'tbl_2')

Скрипт 8

Работающий на периодической основе со стороны Маши скрипт синхронизации будет отлавливать при помощи функции ChangeTable изменения в tbl_1 и превращать их в XML вида Небольшое упражнение на FOR XML PATH и XQuery\Скрипт 2. Я для него даже создал схему:

if exists (select 1 from sys.xml_schema_collections where name = 'CT_Changes_tbl_1_xsd') drop xml schema collection CT_Changes_tbl_1_xsd

create xml schema collection CT_Changes_tbl_1_xsd as

N'<?xml version="1.0" encoding="utf-16"?>

<xs:schema xmlns:xs="https://www.w3.org/2001/XMLSchema">

  <xs:element name="CT_Changes">

    <xs:complexType>

      <xs:sequence>

        <xs:element ref="Record" maxOccurs="unbounded"/>

      </xs:sequence>

      <xs:attribute name="table_name" type="xs:string" use="required" />

      <xs:attribute name="version_since" type="xs:long" use="required" />

      <xs:attribute name="version_upto" type="xs:long" use="required" />

    </xs:complexType>

  </xs:element>

  <xs:element name="Record">

    <xs:complexType>

      <xs:sequence>

        <xs:element ref="PK" minOccurs ="1" maxOccurs ="1"/>

        <xs:element name="fld1" type="xs:string" minOccurs="0" />

        <xs:element name="fld2" type="xs:string" minOccurs="0" />

      </xs:sequence>

      <xs:attribute name="operation" type="xs:string" use="required" />

      <xs:attribute name="change_no" type="xs:long" use="required" />

      <xs:attribute name="commit_time" type="xs:dateTime" use="required" />

    </xs:complexType>

  </xs:element>

  <xs:element name="PK">

    <xs:complexType>

      <xs:sequence>

        <xs:element name="id1" type="xs:int" />

        <xs:element name="id2" type="xs:int" />

      </xs:sequence>

    </xs:complexType>

  </xs:element>

 

</xs:schema>'

Скрипт 9

Конфигурируем сервис-брокер, создавая тип сообщения для передачи изменений (он будет валидироваться схемой CT_Changes_tbl_1_xsd), контракт, по которому будут передаваться сообщения этого типа (имена чувствительны к регистру невзирая на коллацию), очереди для сообщений, сервисы как конечные точки и открываем диалог, в рамках которого сервис Маша будет передавать сервису Дубровский сообщения по только что определенному контракту, короче, Краткое введение в сервис-брокер\Скрипты 4 - 9.

if exists(select 1 from sys.services where name = 'Masha') drop service Masha

if exists(select 1 from sys.services where name = 'Dubrovsky') drop service Dubrovsky

if exists(select 1 from sys.service_contracts where name = 'CT_Changes_tbl_1_Contract')

drop contract CT_Changes_tbl_1_Contract

if exists(select 1 from sys.service_message_types where name = 'CT_Changes_tbl_1_MessageType')

drop message type CT_Changes_tbl_1_MessageType

create message type CT_Changes_tbl_1_MessageType validation = valid_xml with schema collection CT_Changes_tbl_1_xsd

create contract CT_Changes_tbl_1_Contract (CT_Changes_tbl_1_MessageType sent by initiator)

if exists(select 1 from sys.service_queues where name = 'QueueMashi') drop queue QueueMashi

create queue QueueMashi

if exists(select 1 from sys.service_queues where name = 'QueueDubrovskogo') drop queue QueueDubrovskogo

create queue QueueDubrovskogo

create service Masha on queue QueueMashi (CT_Changes_tbl_1_Contract)

create service Dubrovsky on queue QueueDubrovskogo (CT_Changes_tbl_1_Contract)

declare @ch uniqueidentifier

begin dialog conversation @ch from service Masha to service 'Dubrovsky' on contract CT_Changes_tbl_1_Contract

Скрипт 10

С отдельного коннекта в SSMS на стороне Маши запускаем имитацию пользовательской активности Репликация таблиц средствами Change Tracking\Скрипт 5.

Модифицируем скрипт синхронизации Репликация таблиц средствами Change Tracking\Скрипт 7. Скрипт состоит из 3-х частей, разделенных комментарными линиями. Первая часть осталась без изменений. Вторая изменена с тем, чтобы он не напрямую применял изменения к tbl_2, а превращал их в XML (Небольшое упражнение на FOR XML PATH и XQuery\Скрипт 3) и кидал в очередь брокеру. Третья, как и первая, осталась без изменений. Она опциональна. В ней я пользуюсь тем, что таблицы на самом деле находятся в одной базе, и сравниваю их, чтобы убедиться, что синхронизация работает.

while 1 = 1 begin

-----------------------------------------------------------------------------------------------------

waitfor delay '00:01:00'

set transaction isolation level snapshot

begin tran

declare @lastVersion bigint --здесь будет храниться последняя версия, которой синхронизирована tbl_2

declare @curVersion table (curVersion bigint); delete from @curVersion --здесь будет храниться текущая версия изменений

select @lastVersion = isnull(max(version), 0) from dbo.Sync_Log where source = 'tbl_1' and destination = 'tbl_2' --берем последнюю версию из нашего журнала

insert dbo.Sync_Log (source, destination) output inserted.Version into @curVersion values ('tbl_1', 'tbl_2') --отмечаем в журнале текущий факт синхронизации

--Если autocleanup успел почистить изменения tbl_1, которые еще не были доставлены на tbl_2, поднимаем аварийную ситуацию.

if @lastVersion < change_tracking_min_valid_version(object_id('dbo.tbl_1')) begin

 declare @msg nvarchar(200) = 'Часть изменений потеряна! Требуется ручная синхронизация!'

 update dbo.Sync_Log set status = @msg where version = (select curVersion from @curVersion) --фиксируем ее в журнале

 raiserror (@msg, 21, 1) with log --и вызываем строгую ошибку, которая прерывает выполнение скрипта

end

--Если за период с прошлой синхронизации ничего нового не произошло, можно не париться.

if @lastVersion = change_tracking_current_version() goto konec

-----------------------------------------------------------------------------------------------------

--Превращаем результат changetable в xml и передаем его в очередь.

declare @x xml = (

select 'tbl_1' as [@table_name], @lastVersion as [@version_since], change_tracking_current_version() as [@version_upto],

(

select ct.SYS_CHANGE_OPERATION as [@operation], ct.SYS_CHANGE_VERSION as [@change_no], sct.commit_time as [@commit_time],

ct.id1 as [PK/id1], ct.id2 as [PK/id2], t.fld1 as fld1, t.fld2 as fld2

from changetable(changes tbl_1, @lastVersion) ct

join sys.dm_tran_commit_table sct on ct.sys_change_version = sct.commit_ts

left join tbl_1 t on t.id1 = ct.id1 and t.id2 = ct.id2

for xml path('Record'), type

)

for xml path('CT_Changes')

)

declare @ch uniqueidentifier =

(

select top 1 ce.conversation_handle from sys.conversation_endpoints ce join

sys.services s on ce.service_id = s.service_id

join sys.service_queues sq on s.service_queue_id = sq.object_id

where s.name = 'Masha' and ce.far_service = 'Dubrovsky' and ce.is_initiator = 1 and ce.state <> 'ER'

) --всегда открыт только один диалог, инициатором которого является Маша

;send on conversation @ch message type CT_Changes_tbl_1_MessageType (@x) --в него и зафигачиваем этот XML

-----------------------------------------------------------------------------------------------------

konec:

--Сравнение копии с оригиналом.

declare @n1 bigint, @n2 bigint

select @n1 = count(1) from (select * from tbl_1 except select * from tbl_2) t --сколько записей в оригинале не хватает в копии

select @n2 = count(1) from (select * from tbl_2 except select * from tbl_1) t --и наоборот

update dbo.Sync_Log set status = case when @n1 <> 0 or @n2 <> 0 then 'Обнаружено ' + cast(@n1 as varchar(20)) + ' записей в tbl_1, не совпадающих с tbl_2, и ' + cast(@n2 as varchar(20)) + ' записей в tbl_2, не совпадающих с tbl_1.' else 'OK' end where version = (select curVersion from @curVersion) --отражаем несовпадения в журнале, а если их нет, то ОК

commit

set transaction isolation level read committed

end

Скрипт 11

Запускаем этот скрипт с нового коннекта в SSMS. Каждую минуту в очередь Дубровского будет капать сообщение от Маши. Можно их посмотреть select *, cast(message_body as xml) from QueueDubrovskogo и убедиться, что в message_body приходит нечто по образу Небольшое упражнение на FOR XML PATH и XQuery\Скрипт 2. На несовпадения между tbl_1 и tbl_2, о которых сообщается в таблице Sync_Log, пока не обращаем внимания. Мы убедились, что Change Tracking исправно отслеживает изменения в tbl_1, а сервис-брокер исправно доставляет их на сервер с tbl_2. Теперь на Дубровском напишем процедуру очереди, которая будет разгребать валящиеся сообщения, превращать XML обратно в операторы DML и применять их к tbl_2, чтобы синхронизировать ее с tbl_1. Джойним tbl_2 по полям РК c записями, полученными из XML. Те, у которых operation="D", удаляются, "I" - вставляются, "U" - обновляются. Апдейты dbo.Sync_Log, которые идут после каждой операции, предназначены для контрольных целей. По-хорошему, Sync_Log нужно было разделить, сделав журнал на стороне отправки и журнал на стороне приема. Я не стал этим заморачиваться, беззастенчиво воспользовавшись тем, что в данном примере стороны физически совпадают. Процедура очереди будет написана на основе Краткое введение в сервис-брокер\Скрипты 21-22 и Небольшое упражнение на FOR XML PATH и XQuery\Скрипты 4-5.

if object_id('ProcessSyncMessages', 'P') is not null drop proc ProcessSyncMessages

go

--Процедура производит синхронизацию tbl_2 с tbl_1

create proc ProcessSyncMessages as begin

 declare @ch uniqueidentifier, @msgtype sysname, @body varbinary(max)

 --Читаем из очереди сообщение с изменениями

 while 1 = 1 begin

  waitfor (receive top(1) @ch = conversation_handle, @msgtype = message_type_name, @body = message_body from QueueDubrovskogo)

  if @@rowcount = 0 return

  if @msgtype <> 'CT_Changes_tbl_1_MessageType' return

  select @@rowcount, @msgtype

  declare @x xml = @body

  declare @curVersion bigint = (select x.value('@version_upto[1]', 'bigint') from @x.nodes('CT_Changes') d(x))

  --Удаляем удаленные записи

  delete t from tbl_2 t join (select x.value('(PK/id1)[1]', 'int') id1, x.value('(PK/id2)[1]', 'int') id2 from @x.nodes('CT_Changes/Record[@operation="D"]') d(x)) ct on t.id1 = ct.id1 and t.id2 = ct.id2

   update dbo.Sync_Log set deleted = @@rowcount where version = @curVersion --их количество вносим в журнал

  --Добавляем новые

  insert tbl_2 select x.value('(PK/id1)[1]', 'int') id1, x.value('(PK/id2)[1]', 'int') id2, x.value('fld1[1]', 'nvarchar(10)') fld1, x.value('fld2[1]', 'nvarchar(10)') fld2 from @x.nodes('CT_Changes/Record[@operation="I"]') d(x)

   update dbo.Sync_Log set inserted = @@rowcount where version = @curVersion --их количество вносим в журнал

  --Обновляем модифицированные

  ;with cte as (select x.value('(PK/id1)[1]', 'int') id1, x.value('(PK/id2)[1]', 'int') id2, x.value('fld1[1]', 'nvarchar(10)') fld1, x.value('fld2[1]', 'nvarchar(10)') fld2 from @x.nodes('CT_Changes/Record[@operation="U"]') d(x))

   update t2 set t2.fld1 = cte.fld1, t2.fld2 = cte.fld2 from tbl_2 t2 join cte on t2.id1 = cte.id1 and t2.id2 = cte.id2

   update dbo.Sync_Log set updated = @@rowcount where version = @curVersion --их количество вносим в журнал

  end

end 

go

alter queue QueueDubrovskogo with activation

(

 status = on,

 procedure_name = ProcessSyncMessages,

 max_queue_readers = 1,

 execute as self

)

Скрипт 12

Пускаем по-новой, выжидаем, смотрим, что получилось.

image

Рис.1

Не, ну это просто праздник какой-то. Все ж работает. Change Tracking, Service Broker, все рулит. А что в журнале?

image

Рис.2

В журнале тоже все правильно. Черт, единственно, я промахнулся с полем status. Третью часть из Скрипта 11 (Сравнение копии с оригиналом) надо было перетащить в процедуру очереди (Скрипт 12). Не сообразил, осталось исторически с поста Репликация таблиц средствами Change Tracking, где все было синхронно. А сейчас получается, что она отдала изменения в очередь и сразу полезла проверять таблицы на совпадение, а изменения еще не успели дойти и примениться. Поэтому вместо ОК в поле status везде несовпадения. Принципиально это ни на что не влияет, но выглядит неаккуратно. Ладно, предоставляется читателям в качестве самостоятельного упражнения.

По окончании демонстрации стопятся Скрипты 10, 11. Чтобы погасить процедуру обработки очереди, нужно выполнить скрипт

alter queue QueueDubrovskogo with activation

(

 status = off

)

select * from sys.dm_broker_activated_tasks

kill 35

где вместо 35 нужно поставить спид, на котором она болтается в конкретном случае.