Реализация инкрементной загрузки с использованием Change Data Capture в SQL Server.

добавлено: 20 июн 13
понравилось:0
просмотров: 3377
комментов: 0

теги:

Автор: Алексей Куренков

Эта статья будет интересна, прежде всего, тем, кому часто приходится сталкиваться с задачами интеграции данных.

Реализация инкрементной загрузки с использованием Change Data Capture в SQL Server.

О проблеме

Предположим, у вас есть база данных, в которой пользователи постоянно меняют (вставляют и удаляют) данные, возможно даже эта база данных используется крупным приложением, которое не позволяет модифицировать структуру таблиц (например, 1С). И перед вами стоит задача периодически данные этой БД заливать в другую БД на другой сервер. Если подойти к решению задачи «в лоб», то самый простой способ – это полная заливка заново данных из источника в приемник, с предварительным очищением приемника. И этим можно конечно пользоваться и нужно, но до тех пор, пока время загрузки данных приемлемо и не превышает некоторые заданные бизнесом сроки. А что, если время загрузки данных у вас длится несколько суток, да плюс ко всему неустойчивые каналы связи, что приводит к тому, что загрузка прерывается и начинается заново? Если встречаются такие препятствия, предлагаю рассмотреть один из алгоритмов «дозагрузки данных», т.е. данные загружаются не целиком, а только изменения, произошедшие с момента последней загрузки.

CDC

В SQL Server 2008 компанией Microsoft был представлен механизм отслеживания данных, который называется Change Data Capture (сокр. CDC). Если говорить в общих чертах, то суть этого механизма в том, что включив CDC на любую таблицу в БД, создастся в системная таблица в этой же БД с подобным названием, что и таблица-оригинал (в схеме «cdc» с приставкой старая схема + «_» и с окончанием «_CT». Например: оригинальная таблица – dbo.Example, системная таблица будет с названием – cdc.dbo_Example_CT), и будет хранить все данные, которые были подвержены изменению. Собственно, что бы глубже рассмотреть CDC, предлагаю продолжить рассмотрение примера. Перед тем, как начать, убедимся, что на тестовом экземпляре SQL Server работает SQL Agent, который использует CDC. И рассмотрим скрипт, который создает БД и тестовую таблицу, заполняет эту таблицу и включает CDC на этой таблице.
Здесь для понимания сути задачи и ее упрощения в примерах буду пользоваться одним экземпляром SQL Server, не разносить БД-источник и БД-приемник по разным серверам.


use master
go
-- создадим базу данных источник
if not exists (select * from sys.databases where name = 'db_src_cdc')
	create database db_src_cdc
go
use db_src_cdc
go
-- включим CDC если таковой не включен
if not exists (select * from sys.databases where name = db_name() and is_cdc_enabled=1)
	exec sys.sp_cdc_enable_db
go
-- создадим роль для таблиц с CDC
if not exists(select * from sys.sysusers where name = 'CDC_Reader' and issqlrole=1)
	create role CDC_Reader
go
-- создадим таблицу
if object_id('dbo.Example','U') is null
	create table dbo.Example
	(
		ID int identity constraint PK_Example primary key,
		Title varchar(200) not null
	)
go
-- Заполним таблицу
insert dbo.Example (Title) values
('Один'),('Два'),('Три'),('Четыре'),('Пять');
go
-- включаем CDC на таблице
if not exists (select * from sys.tables where is_tracked_by_cdc = 1 and name = 'Example')
	exec sys.sp_cdc_enable_table
		@source_schema = 'dbo',
		@source_name = 'Example',
		@role_name = 'CDC_Reader'
go
-- заполним таблицу некоторыми данными, что то поменяем, что то удалим
update dbo.Example
set Title = reverse(Title)
where ID in (2,3,4);

delete from dbo.Example where ID in (1,2);

set identity_insert dbo.Example on;
insert dbo.Example (ID, Title) values
(1,'Один'),(6,'Шесть');
set identity_insert dbo.Example off;
go


Теперь рассмотрим, что у нас находится после отработки этого скрипта в таблицах dbo.Example и cdc.dbo_Example_CT (следует отметить, что CDC по своей природе асинхронна, и в таблицах, где хранятся отслеживания изменений, данные попадают не сразу, а спустя некоторый промежуток времени).

select * from dbo.Example;


ID Title
---- ----------------------
1 Один
3 ирТ
4 ерытеЧ
5 Пять
6 Шесть


select
	row_number() over
		(
			partition by ID
			order by
				__$start_lsn desc,
				__$seqval desc
		) as __$rn,
	*
from cdc.dbo_Example_CT;



__$rn __$start_lsn __$end_lsn __$seqval __$operation __$update_mask ID Title
------ ---------------------- ----------- ---------------------- ------------ ---------------- --- -----------
1 0x0000003A000000580005 NULL 0x0000003A000000580003 2 0x03 1 Один
2 0x0000003A000000560006 NULL 0x0000003A000000560002 1 0x03 1 Один
1 0x0000003A000000560006 NULL 0x0000003A000000560005 1 0x03 2 авД
2 0x0000003A000000540005 NULL 0x0000003A000000540002 3 0x02 2 Два
3 0x0000003A000000540005 NULL 0x0000003A000000540002 4 0x02 2 авД
1 0x0000003A000000540005 NULL 0x0000003A000000540003 3 0x02 3 Три
2 0x0000003A000000540005 NULL 0x0000003A000000540003 4 0x02 3 ирТ
1 0x0000003A000000540005 NULL 0x0000003A000000540004 3 0x02 4 Четыре
2 0x0000003A000000540005 NULL 0x0000003A000000540004 4 0x02 4 ерытеЧ
1 0x0000003A000000580005 NULL 0x0000003A000000580004 2 0x03 6 Шесть

Рассмотрим более детально структуру таблицы, в которой хранится отслеживание изменений. Поля __$start_lsn и __$seqval это, соответственно, номер LSN (глобальный номер транзакции в БД) и номер операции внутри транзакции. У этих полей есть важное для нас свойство, а именно: мы можем быть точно уверенны, что та запись, у которой LSN больше других – исполнялась позже. Благодаря этому свойству мы можем легко в запросе получить последнее состояние каждой записи, отфильтровав нашу выборку по условию - where __$rn = 1.
Поле __$operation, как вы и сами видите, содержит код операции:
• 1 – запись удалена
• 2 – запись вставлена
• 3,4 – запись обновлена, старые данные до обновления – 3, новые данные – 4.
Ну и помимо служебных полей с приставкой «__$» полностью дублируются поля оригинальной таблицы. Этой информации нам достаточно для того, чтобы перейти к инкрементной загрузке.

Настраиваем БД для загрузки данных.

Создадим в нашей тестовой БД-приемнике таблицу, в которую будут загружаться данные, а также дополнительную таблицу для хранения данных об истории загрузок.

use master
go
-- создадим базу данных приемник
if not exists (select * from sys.databases where name = 'db_dst_cdc')
	create database db_dst_cdc
go
use db_dst_cdc
go
-- создадим таблицу
if object_id('dbo.Example','U') is null
	create table dbo.Example
	(
		ID int constraint PK_Example primary key,
		Title varchar(200) not null
	)
go
-- создадим таблицу для сохранения лога загрузок
if object_id('dbo.log_cdc','U') is null
	create table dbo.log_cdc
	(
		table_name nvarchar(512) not null,
		dt datetime not null default getdate(),
		lsn binary(10) not null default(0x0),
		constraint pk_log_cdc primary key (table_name,dt desc)
	)
go

Хочу заострить внимание на поля таблицы LOG_CDC
• TABLE_NAME – в нем хранится информация о том, какая таблица загружалась (возможно будет загружаться и несколько таблиц в перспективе, возможно, с разных баз данных, а то и с разных серверов), формат таблицы – ‘SERVER_NAME.DB_NAME.SCHEMA_NAME.TABLE_NAME’
• DT – дата и время загрузки, поле, необязательное для реализации инкрементной загрузки, но и не мешает, будет полезно для аудита загрузок.
• LSN – после того, как загрузим таблицу, нам необходимо где то хранить информацию о том, откуда начать следующую загрузку, когда это потребуется. Соответственно, после каждой загрузки в эту колонку помещаем последний (максимальный) __$start_lsn.

Алгоритм загрузки данных.

Как я уже писал, мы можем запросом вывести последние состояния таблицы, используя оконные функции, и если мы знаем LSN от последней загрузки, то при следующей загрузке мы можем фильтровать в выборке из источника все данные, изменения которых выше, чем сохраненный LSN, конечно же, если была хотя бы одна полная предыдущая загрузка:
with incr_Example as
(
	select
		row_number() over
			(
				partition by ID
				order by
					__$start_lsn desc,
					__$seqval desc
			) as __$rn,
		*
	from db_src_cdc.cdc.dbo_Example_CT
	where
		__$operation <> 3 and
		__$start_lsn > @lsn
)
select * from incr_Example


Далее мы можем получить для полной загрузки все записи, если LSN загрузки не сохранен:
with incr_Example as
(
	select
		row_number() over
			(
				partition by ID
				order by
					__$start_lsn desc,
					__$seqval desc
			) as __$rn,
		*
	from db_src_cdc.cdc.dbo_Example_CT
	where
		__$operation <> 3 and
		__$start_lsn > @lsn
)
, full_Example as
(
	select *
	from db_src_cdc.dbo.Example
	where @lsn is null
)
select
	ID, Title, __$operation
from incr_Example
where __$rn = 1
union all
select
	ID, Title, 2 as __$operation
from full_Example


Т.е. в зависимости от значения @LSN данный запрос будет нам выдавать либо все последние изменения (минуя промежуточные) со статусом Удалено или нет, либо полностью все данные из оригинальной таблицы, добавив статус 2 (новая запись) – данное поле исключительно для унификации двух выборок. И теперь, имея данный запрос, можно легко командой MERGE (начиная с версии SQL 2008) реализовать либо полную загрузку, либо дозагрузку.
Что бы избежать блокировок, которые могут создавать альтернативные процессы, и что бы загружать согласованные данные с разных таблиц (в перспективе мы будем загружать несколько таблиц и, возможно, между ними могут быть реляционные отношения), предлагаю воспользоваться на БД-источнике моментальным снимком БД (еще одна возможность SQL 2008).

Полный текст загрузки:
/*
	Алгоритм загрузки данных
*/
-- создаем моментальный снимок БД
if exists (select * from sys.databases where name = 'db_src_cdc_ss' )
	drop database db_src_cdc_ss;

declare
	@query nvarchar(max);

select
	@query = N'create database db_src_cdc_ss on ( name = N'''+name+
		''', filename = N'''+[filename]+'.ss'' ) as snapshot of db_src_cdc'
from db_src_cdc.sys.sysfiles where groupid = 1;

exec ( @query );

-- считываем LSN от предыдущей загрузки
declare @lsn binary(10) =
	(select max(lsn) from db_dst_cdc.dbo.log_cdc
	where table_name = 'localhost.db_src_cdc.dbo.Example');

-- Очищаем таблицу перед полной загрузкой
if @lsn is null truncate table db_dst_cdc.dbo.Example;

-- сама загрузка
with incr_Example as
(
	select
		row_number() over
			(
				partition by ID
				order by
					__$start_lsn desc,
					__$seqval desc
			) as __$rn,
		*
	from db_src_cdc_ss.cdc.dbo_Example_CT
	where
		__$operation <> 3 and
		__$start_lsn > @lsn
)
, full_Example as
(
	select *
	from db_src_cdc_ss.dbo.Example
	where @lsn is null
)
, cte_Example as
(
	select
		ID, Title, __$operation
	from incr_Example
	where __$rn = 1
	union all
	select
		ID, Title, 2 as __$operation
	from full_Example
)
merge db_dst_cdc.dbo.Example as trg using cte_Example as src on trg.ID=src.ID
when matched and __$operation = 1 then delete
when matched and __$operation <> 1 then update set trg.Title = src.Title
when not matched by target and __$operation <> 1 then insert (ID, Title) values (src.ID, src.Title);

-- отмечаем окончание загрузки и последний LSN
insert db_dst_cdc.dbo.log_cdc (table_name, lsn)
values ('localhost.db_src_cdc.dbo.Example', isnull((select max(__$start_lsn) from db_src_cdc_ss.cdc.dbo_Example_CT),0))

-- Удаляем моментальный снимок БД
if exists (select * from sys.databases where name = 'db_src_cdc_ss' )
	drop database db_src_cdc_ss


Материал взят с сайта автора.

Комментарии




Необходимо войти на сайт, чтобы оставлять комментарии