Добро пожаловать в форум, Guest  >>   Войти | Регистрация | Поиск | Правила | В избранное | Подписаться
Все форумы / Microsoft SQL Server Новый топик    Ответить
 Service Broker прочитать все мессаджи  [new]
kapelan
Member

Откуда: хутор БольшойБугор
Сообщений: 726
хочу построить примерно такую модель:
есть reader и writer
writer может быть много
reader один
Идея: мого writer бросают мессаджи
reader читает их всех за один заход.
кручу Service Broker - читает только одно conversation, не получается одним махом прочитать все
14 фев 12, 23:27    [12092415]     Ответить | Цитировать Сообщить модератору
 Re: Service Broker прочитать все мессаджи  [new]
Mnior
Member

Откуда: Кишинёв
Сообщений: 6724
Конечно, форум чиста поболтать, о том о сём. Хотя в правилах чётко написано - выкладывать скрипты обязательно.
Ах нуда, читалка мыслей вошла в стадию бэта тестирования.

BOL: RECEIVE http://msdn.microsoft.com/ru-ru/library/ms186963.aspx
Извлекает из очереди одно или несколько сообщений. В зависимости от настройки хранения для очереди удаляет сообщение из очереди или обновляет состояние сообщения в очереди.

[ WAITFOR ( ]
    RECEIVE [ TOP ( n ) ]
        <column_specifier> [ ,...n ]
        FROM <queue>
        [ INTO table_variable ]
        [ WHERE {  conversation_handle = conversation_handle
                 | conversation_group_id = conversation_group_id } ]
[ ) ] [ , TIMEOUT timeout ]
[ ; ]


А может надо объяснять смысл операторов TOP и WHERE.
14 фев 12, 23:37    [12092436]     Ответить | Цитировать Сообщить модератору
 Re: Service Broker прочитать все мессаджи  [new]
kapelan
Member

Откуда: хутор БольшойБугор
Сообщений: 726
Mnior,

спасибо не нада,
я должен был сразу вот ето написать:
RELATED_CONVERSATION =related_conversation_handle

Specifies the existing conversation group that the new dialog is added to. When this clause is present, the new dialog belongs to the same conversation group as the dialog specified by related_conversation_handle. The related_conversation_handlemust be of a type implicitly convertible to type uniqueidentifier. The statement fails if the related_conversation_handle does not reference an existing dialog.
RELATED_CONVERSATION_GROUP =related_conversation_group_id

Specifies the existing conversation group that the new dialog is added to. When this clause is present, the new dialog will be added to the conversation group specified by related_conversation_group_id. The related_conversation_group_idmust be of a type implicitly convertible to type uniqueidentifier. If related_conversation_group_iddoes not reference an existing conversation group, the service broker creates a new conversation group with the specified related_conversation_group_id and relates the new dialog to that conversation group.


то есть когра группа ппрочитается вся то ее невозможно заново создать
14 фев 12, 23:45    [12092457]     Ответить | Цитировать Сообщить модератору
 Re: Service Broker прочитать все мессаджи  [new]
kapelan
Member

Откуда: хутор БольшойБугор
Сообщений: 726
use writer_DB
go
DECLARE @InitDlgHandle UNIQUEIDENTIFIER;
DECLARE @RequestMsg NVARCHAR(100);
set @InitDlgHandle ='ECF3BE27-2D57-E111-840A-0015C5F0CFB0'
BEGIN TRANSACTION;

BEGIN DIALOG @InitDlgHandle
     FROM SERVICE [//InitDB/2DBSample/InitiatorService]
     TO SERVICE N'//TgtDB/2DBSample/TargetService'
     ON CONTRACT [//BothDB/2DBSample/SimpleContract]
     WITH
--RELATED_CONVERSATION = @InitDlgHandle --'ECF3BE27-2D57-E111-840A-0015C5F0CFB0'
RELATED_CONVERSATION_GROUP = 'E60084D0-3957-E111-840A-0015C5F0CFBE' 
        , ENCRYPTION = OFF ;

SELECT @RequestMsg =
   N'<RequestMsg>Message for Target service 1.</RequestMsg>';

SEND ON CONVERSATION @InitDlgHandle
     MESSAGE TYPE [//BothDB/2DBSample/RequestMessage]
      (@RequestMsg);

SELECT @RequestMsg AS SentRequestMsg;

COMMIT TRANSACTION;
END CONVERSATION @InitDlgHandle;



use reader_db
go
DECLARE @RecvReqDlgHandle UNIQUEIDENTIFIER;
DECLARE @RecvReqMsg NVARCHAR(100);
DECLARE @RecvReqMsgName sysname;


DECLARE @MsgsQueue TABLE( RecvReqDlgHandle UNIQUEIDENTIFIER
            , RecvReqMsg NVARCHAR(100)
            , RecvReqMsgName sysname);


BEGIN TRANSACTION;


RECEIVE TOP(6)
     conversation_handle,
conversation_group_id,
     message_body,
     message_type_name
,*

  FROM TargetQueue2DB


SELECT @RecvReqMsg AS ReceivedRequestMsg;
SELECT * FROM @MsgsQueue

COMMIT TRANSACTION;

GO
15 фев 12, 00:03    [12092539]     Ответить | Цитировать Сообщить модератору
 Re: Service Broker прочитать все мессаджи  [new]
kapelan
Member

Откуда: хутор БольшойБугор
Сообщений: 726
неужели так делать низзя?
15 фев 12, 17:38    [12097808]     Ответить | Цитировать Сообщить модератору
 Re: Service Broker прочитать все мессаджи  [new]
mike909
Member

Откуда:
Сообщений: 662
kapelan
Mnior,

спасибо не нада,
я должен был сразу вот ето написать:
RELATED_CONVERSATION =related_conversation_handle

Specifies the existing conversation group that the new dialog is added to. When this clause is present, the new dialog belongs to the same conversation group as the dialog specified by related_conversation_handle. The related_conversation_handlemust be of a type implicitly convertible to type uniqueidentifier. The statement fails if the related_conversation_handle does not reference an existing dialog.
RELATED_CONVERSATION_GROUP =related_conversation_group_id

Specifies the existing conversation group that the new dialog is added to. When this clause is present, the new dialog will be added to the conversation group specified by related_conversation_group_id. The related_conversation_group_idmust be of a type implicitly convertible to type uniqueidentifier. If related_conversation_group_iddoes not reference an existing conversation group, the service broker creates a new conversation group with the specified related_conversation_group_id and relates the new dialog to that conversation group.


то есть когра группа ппрочитается вся то ее невозможно заново создать

Как бы это по мягче - Вы глубоко заблуждаетесь
Во первых:
При создании диалога RELATED_CONVERSATION и RELATED_CONVERSATION_GROUP служит только одной цели - включить новый диалог в "существующую" группу. При этом если задать RELATED_CONVERSATION, которого не существует то, естественно, операция завершится с ошибкой, т.к. вычислить RELATED_CONVERSATION_GROUP не представляется возможным.
А если задать RELATED_CONVERSATION_GROUP, которого не существует то, будет создана новая группа, т.к. в этом случае SQL-сервер справедливо полагает, что Вы хотите создать новую группу диалогов.
Во вторых:
Связка нескольких диалогов в одну группу нужна только для того, чтобы обрабатывать сообщения в разных диалогах одной группы строго последовательно, в независимости от кол-ва Reader_ов, которые одновременно пытаются читать одну очередь.
В третьих:
Вычитывание всей группы никак не влияет на возможность привязки нового диалога к этой группе. Да и само понятие "вычитывание всей групп" - смысла не имеет.
В четвертых:
Создав диалог нет возможности узнать характеристики диалога (conversation_handle,conversation_group_id,...) удаленной стороны, если только эта "удаленная" сторона не в той же БД. Связывать диалоги в одну группу можно только на каждой стороне по отдельности.

Проведем пару экспериментов. Лучше всего для понимания использовать разные SQL-сервера, но возиться с сертификатами,маршрутизацией и т.д. не охота. Ограничусь двумя базами на одном сервере. Но, если кто хочет помучится, то см. скрипты здесь
+ Создаем тестовые БД
USE master
GO
CREATE DATABASE TEST_WRITER
GO
CREATE DATABASE TEST_READER
GO
ALTER DATABASE TEST_WRITER SET ENABLE_BROKER
ALTER DATABASE TEST_WRITER SET TRUSTWORTHY ON
GO
ALTER DATABASE TEST_READER SET ENABLE_BROKER
ALTER DATABASE TEST_READER SET TRUSTWORTHY ON
GO
--------------------
USE TEST_WRITER
GO

CREATE QUEUE [dbo].[WRITE_Queue] WITH STATUS = ON , RETENTION = OFF
GO
CREATE SERVICE [SRV/WRITER] AUTHORIZATION [dbo] ON QUEUE [dbo].[WRITE_Queue] ([DEFAULT])
GO
--------------------
USE TEST_READER
GO

CREATE QUEUE [dbo].[READER_Queue] WITH STATUS = ON , RETENTION = OFF
GO
CREATE SERVICE [SRV/READER] AUTHORIZATION [dbo] ON QUEUE [dbo].[READER_Queue] ([DEFAULT])
GO

Таперь в двух разных сессиях запускаем
+ двух писателей
USE [TEST_WRITER]
GO
DECLARE @msg xml,
        @WriterDgHandle UNIQUEIDENTIFIER,
        @WriterDgGroup  UNIQUEIDENTIFIER

set @WriterDgGroup = 'AAAAAAAA-AAAA-AAAA-AAAA-AAAAAAAAAAAA'

BEGIN DIALOG CONVERSATION @WriterDgHandle
FROM SERVICE [SRV/WRITER]
TO SERVICE 'SRV/READER'
ON CONTRACT [DEFAULT]
WITH
   RELATED_CONVERSATION_GROUP = @WriterDgGroup  -- Привяжем диалог к нашей группе 
  ,LIFETIME = 600                               -- 10 минут на эксперименты должно хватить
  ,ENCRYPTION = OFF                             -- т.к. master key мы не создавали ...

--Для проверки можно посмотреть что у нас создалось
SELECT 
  ce.[conversation_handle], ce.[conversation_group_id], ce.[state_desc], ce.[far_service]
  ,s.name as [local_service]
FROM sys.conversation_endpoints ce 
LEFT JOIN sys.services s ON ce.service_id = s.service_id

set @msg = N'<W><handle>' + cast(@WriterDgHandle as nvarchar(max)) + N'</handle>' +
               '<group>' + cast(@WriterDgGroup as nvarchar(max)) + N'</group></W>'

;SEND ON CONVERSATION @WriterDgHandle ( @msg )

waitfor delay '00:00:01' -- подождем чтоб все убежало
--Ничего не зависло ???
SELECT *
FROM sys.transmission_queue with(nolock)
order by enqueue_time

-----------

BEGIN TRAN

WAITFOR (
  RECEIVE TOP(100500)
     [conversation_handle]
    ,[conversation_group_id]
    ,[message_type_name]
    ,cast([message_body] as xml)
  FROM dbo.WRITE_Queue
  WHERE conversation_handle = @WriterDgHandle  
), TIMEOUT 600000;

--COMMIT TRAN
--END CONVERSATION @WriterDgHandle

Далее в новой сесии запускаем
+ одного читателя
USE TEST_READER
GO
DECLARE @msg  xml,
        @msgTypeName  sysname,
        @ReaderDgHandle UNIQUEIDENTIFIER,
        @ReaderDgGroup  UNIQUEIDENTIFIER

DECLARE @t table( [Handle] UNIQUEIDENTIFIER, [Group] UNIQUEIDENTIFIER, [TypeName] sysname, [msg] xml)

WHILE 1=1
BEGIN
  DELETE FROM @t
  
  WAITFOR (
    RECEIVE TOP(100500) 
       [conversation_handle]
      ,[conversation_group_id]
      ,[message_type_name]
      ,cast([message_body] as xml)
    FROM dbo.READER_Queue
    INTO @t
  ), TIMEOUT 600;

  if @@ROWCOUNT = 0
    BREAK;

  SELECT * FROM @t

  DECLARE C CURSOR LOCAL FAST_FORWARD
  FOR SELECT  [Handle], [Group], [TypeName] 
      FROM @t
  
  OPEN C
  FETCH NEXT FROM C INTO @ReaderDgHandle, @ReaderDgGroup, @msgTypeName
  WHILE @@FETCH_STATUS = 0
  BEGIN
    if @msgTypeName = 'DEFAULT'
    BEGIN
      set @msg = '<R>' + '<handle>' + cast(@ReaderDgHandle as nvarchar(max)) + '</handle>'+
                         '<group>' + cast(@ReaderDgGroup as nvarchar(max)) + '</group></R>'
      
      ;SEND ON CONVERSATION @ReaderDgHandle ( @msg )
      END CONVERSATION @ReaderDgHandle;
    END
    ELSE
      END CONVERSATION @ReaderDgHandle WITH CLEANUP;
  
    FETCH NEXT FROM C INTO @ReaderDgHandle, @ReaderDgGroup, @msgTypeName    
  END;
  CLOSE C
  DEALLOCATE C
END;


В результате этого эксперимента видим, что как мы не пытались в Reader_е прочесть 100500 сообщений, прочли за однин раза только одно сообщение (подтверждение - две выборки). Т.е. Reader читает 100500 сообщений только по одной группе, а т.к. на Reader_е оба диалога находятся в разных группах, несмотря на то, что в Writer_ах мы эти диалоги объеденили в одну группу.
Далее видим, чсто только один Writer закончил работу, а второй продолжает висеть.
В скрипте Writer_а закоментарен --COMMIT TRAN.
Выполним эту строчку в сесси, которая выполнилась, и сразу же второй Writer закончит работу.
Т.к. оба диалога Writer_а сидят в одной группе, "зависший" Writer не смог ничего прочесть по своему собственному диалогу, пока мы не выполнили COMMIT.
Этим экспериментом продемонстрировали истинное назначение RELATED_CONVERSATION и RELATED_CONVERSATION_GROUP.

Еще поэкспериментируем ?

Тогда подчистим за собой - закомитим все транзакции
+ и почистим диалоги
USE master
GO
declare @stmt nvarchar(1000)
declare c cursor 
for 
  SELECT 
    'USE TEST_READER; END conversation ''' + 
    CAST([conversation_handle] as nvarchar(100)) + ''' WITH CLEANUP;'
  from [TEST_READER].sys.conversation_endpoints
  union all 
  SELECT 
    'USE TEST_WRITER; END conversation ''' + 
    CAST([conversation_handle] as nvarchar(100)) + ''' WITH CLEANUP;'
  from [TEST_WRITER].sys.conversation_endpoints

open c
fetch next from c into @stmt
while @@fetch_status = 0
begin
  --print @stmt
  EXEC( @stmt )

  fetch next from c into @stmt
end

close c
deallocate c

Изменим слегка Writer_ов на чтение 100500 сообщений по группе
+ заменить вконце скриптов обоих писателей на:
BEGIN TRAN

WAITFOR (
  RECEIVE TOP(100500)
     [conversation_handle]
    ,[conversation_group_id]
    ,[message_type_name]
    ,cast([message_body] as xml)
  FROM dbo.WRITE_Queue
  WHERE conversation_group_id = @WriterDgGroup
), TIMEOUT 600000;

--COMMIT TRAN
--END CONVERSATION @WriterDgHandle

И переделаем читателя
+ полностью
USE TEST_READER
GO
DECLARE @msg  xml,
        @msgTypeName  sysname,
        @ReaderDgHandle UNIQUEIDENTIFIER,
        @ReaderDgGroup  UNIQUEIDENTIFIER

DECLARE @t table( [Handle] UNIQUEIDENTIFIER, [Group] UNIQUEIDENTIFIER, [TypeName] sysname, [msg] xml)
WHILE 1=1
BEGIN
  WAITFOR (
    RECEIVE TOP(100500) 
       [conversation_handle]
      ,[conversation_group_id]
      ,[message_type_name]
      ,cast([message_body] as xml)
    FROM dbo.READER_Queue
    INTO @t
  ), TIMEOUT 600;
  
  if @@ROWCOUNT = 0
    BREAK;
END

select * from @t

BEGIN TRAN

  DECLARE C CURSOR LOCAL FAST_FORWARD
  FOR SELECT  [Handle], [Group], [TypeName] 
      FROM @t
  
  OPEN C
  FETCH NEXT FROM C INTO @ReaderDgHandle, @ReaderDgGroup, @msgTypeName
  WHILE @@FETCH_STATUS = 0
  BEGIN
    if @msgTypeName = 'DEFAULT'
    BEGIN
      set @msg = '<R>' + '<handle>' + cast(@ReaderDgHandle as nvarchar(max)) + '</handle>'+
                         '<group>' + cast(@ReaderDgGroup as nvarchar(max)) + '</group></R>'
      
      ;SEND ON CONVERSATION @ReaderDgHandle ( @msg )
      END CONVERSATION @ReaderDgHandle;
    END
    ELSE
      END CONVERSATION @ReaderDgHandle WITH CLEANUP;
  
    FETCH NEXT FROM C INTO @ReaderDgHandle, @ReaderDgGroup, @msgTypeName    
  END;
  CLOSE C
  DEALLOCATE C

COMMIT TRAN


Запускам обоих писателей, а затем читателя - смотрим что получилось.
Все ответные сообщения по всем обоим диалогам прочел только один писатель, а второй продолжаеит висеть.
Выполним COMMIT TRAN - второй писатель продолжает висеть еще 10 мин. (погорячился с TIMEOUT 600000).
Как отвиснет, мы увидим, что он так и ничего не прочел.
И этот эксперимент еще раз подтверждает сказанное выше.

P.S. Надеюсь стало понятней
+ не забудте грохнуть базы
DROP DATABASE TEST_WRITER
DROP DATABASE TEST_READER
15 фев 12, 18:41    [12098431]     Ответить | Цитировать Сообщить модератору
 Re: Service Broker прочитать все мессаджи  [new]
mike909
Member

Откуда:
Сообщений: 662
kapelan
неужели так делать низзя?

Можно на Reader_е
+ BOL

[ WAITFOR ( ]
GET CONVERSATION GROUP @conversation_group_id
FROM <queue>
[ ) ] [ , TIMEOUT timeout ]
[ ; ]
---- +
MOVE CONVERSATION conversation_handle
TO conversation_group_id
[ ; ]
15 фев 12, 19:11    [12098551]     Ответить | Цитировать Сообщить модератору
 Re: Service Broker прочитать все мессаджи  [new]
kapelan
Member

Откуда: хутор БольшойБугор
Сообщений: 726
mike909,
спасибо, хороший пример.
ето немножко не то что нужно.
Попробую еще:
1. есть мусорка, прохожие (разные сессии) дросают туда бумажки
2. Есть читатель, который хочет ето все обработать, но брать по одной бумажке не целесообразно.
3. Обратно писателям ничего из мусорки не вылетает.
Нашел такое решение:
1. Создается диалог отдельной процедуре, @InitDlgHandle записываем куда-нить в табличку с одной строкой
диалог не закрываем - пусть живет вечно.
2. Писатели из разных сессий читают значение @InitDlgHandle и используют его в
SEND ON CONVERSATION @InitDlgHandle
     MESSAGE TYPE [//BothDB/2DBSample/RequestMessage]
      (@RequestMsg);

Диалоги не открываются!
3. Читатель прочитает все за один раз т.к. @InitDlgHandle один
--
Проблемма возникает когда кто-нить убьет (или сам упадет) етот CONVERSATION , тут надо предусмотреть механизм пересоздания - выполнить шаг ном.1
15 фев 12, 20:44    [12099119]     Ответить | Цитировать Сообщить модератору
 Re: Service Broker прочитать все мессаджи  [new]
mike909
Member

Откуда:
Сообщений: 662
kapelan
mike909,
спасибо, хороший пример.
ето немножко не то что нужно.

Мой пример прежде всего о назначении RELATED_CONVERSATION и RELATED_CONVERSATION_GROUP.
kapelan
Попробую еще:
1. есть мусорка, прохожие (разные сессии) дросают туда бумажки
2. Есть читатель, который хочет ето все обработать, но брать по одной бумажке не целесообразно.
3. Обратно писателям ничего из мусорки не вылетает.

С чего это ?
Включаем "читалку мыслей" (С) Mnior.
Ага, мусорка - Скорее всего Writer_ы кидают односторонние сообщения не связанные друг с другом в некую строгую последовательность.
Нечто вроде лога ?
При этом высоко нагруженного, т.к. в наличии острое желание обработать все это за один присест.
Одного Reader_а мало ?
Или есть подозрение, что несколько Reader_ов будут мешать друг другу пытаясь записать это в одну Log_Table ?
kapelan
Нашел такое решение:
1. Создается диалог отдельной процедуре, @InitDlgHandle записываем куда-нить в табличку с одной строкой
диалог не закрываем - пусть живет вечно.
2. Писатели из разных сессий читают значение @InitDlgHandle и используют его в
SEND ON CONVERSATION @InitDlgHandle
     MESSAGE TYPE [//BothDB/2DBSample/RequestMessage]
      (@RequestMsg);

Диалоги не открываются!
3. Читатель прочитает все за один раз т.к. @InitDlgHandle один
--
Проблемма возникает когда кто-нить убьет (или сам упадет) етот CONVERSATION , тут надо предусмотреть механизм пересоздания - выполнить шаг ном.1

Если речь идет всетаки о логировании, то вот Вам еще для размышлений
15 фев 12, 23:35    [12099853]     Ответить | Цитировать Сообщить модератору
 Re: Service Broker прочитать все мессаджи  [new]
kapelan
Member

Откуда: хутор БольшойБугор
Сообщений: 726
mike909,

ето что-то типа трансформации и репликации - процесс тяжелый поетому есть смысл делать его ассинхронно от основного процесса. Т.е. идея примерно такая - процессы бросают ИД в очередь ну а читатель делает всю черную работу по ИД.
по вашей ссылке есть хорошая идея читать хандлер from sys.conversation_endpoints
одного читателя конечно достаточно когда речь идет о вставке в одну таблицу.
спасибо
16 фев 12, 01:14    [12100104]     Ответить | Цитировать Сообщить модератору
Все форумы / Microsoft SQL Server Ответить