Добро пожаловать в форум, Guest  >>   Войти | Регистрация | Поиск | Правила | В избранное | Подписаться
Все форумы / Delphi Новый топик    Ответить
Топик располагается на нескольких страницах: Ctrl  назад   1 2 [3] 4 5 6 7 8 9 10 11   вперед  Ctrl      все
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Для удобства в дальнейшей работе создадим пару функций, пересылающих и принимающих строки:
function s_recv(aZMQSocket: Pointer; aFlags: integer = 0): string;
function s_send(aZMQSocket: Pointer; const aSrcString: string;

function s_recv(aZMQSocket: Pointer; aFlags: integer = 0): string;
var
  fLen: Integer;
  fZMQMsg: zmq_msg_t;
begin

  Result := '';
  try
    zmq_msg_init(fZMQMsg);
    fLen := zmq_msg_recv(fZMQMsg, aZMQSocket, aFlags);
    if fLen <= 0 then
      Exit;
    SetLength(Result, fLen div SizeOf(Char));
    Move(zmq_msg_data(fZMQMsg)^, Result[1], fLen div SizeOf(Char));
  finally
    zmq_msg_close(fZMQMsg);
  end;
end;

function s_send(aZMQSocket: Pointer; const aSrcString: string;
  aFlags: integer = 0): integer;
var
  fLen: Integer;
  fZMQMsg: zmq_msg_t;
begin
  Result := 0;
  zmq_msg_init(fZMQMsg);
  if Length(aSrcString) > 0 then begin
    zmq_msg_init_size(fZMQMsg, Length(aSrcString) * SizeOf(Char));
    Move(PChar(aSrcString)^, zmq_msg_data(fZMQMsg)^, Length(aSrcString) *
      SizeOf(Char));
  end;
  Result := zmq_msg_send(fZMQMsg, aZMQSocket, aFlags);
end;
1 окт 14, 20:44    [16647068]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Обмен сигналами между потоками.

Пора рассмотреть сокеты типа PAIR.

Пример:

Приложение в "основном потоке" создает поток 2 и ждет сигнала о выполнении какой-то полезной работы.
Поток 2 создает поток 3 и ждет сигнала о выполнении какой-то полезной его работы, затем посылает сигнал "основному" потоку.
Поток 3 выполняет некоторую полезную работу и посылает сигнал потоку 2.

Отправка - прием сигналов будет выполняться с помощью сокетов типа PAIR по inproc протоколу.

К сообщению приложен файл. Размер - 5Kb
1 окт 14, 21:04    [16647139]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
"Сигнал" в данном случае - это просто сообщение, строка "READY". На приемной стороне содержамое строки игнорируется, важен сам факт передачи-приема.

... Что-то я с названиями напутал. :(

Step 3 - это "основной" поток, который создает Step 2. Последний создает Step 1.
Ну и Step 3 ждет сигнала от Step 2, а Step 2 ждет сигнала от Step 1.

Итак, исходник:

+

program SgnlBtwThrds;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ,
  ZMQ_Utils, // Тут вспомогательные вещи, вроде s_send() и s_recv()
  Windows;


function Thread_Step_1(aContext: Pointer): integer;
var
  fSocketXmitter: Pointer;
begin
  Result := 0;
 // Подключается к Thread_Step_2 и сообщает о готовности
  fSocketXmitter := zmq_socket(aContext, ZMQ_PAIR);
  zmq_connect(fSocketXmitter, 'inproc://step2');
  Writeln('Step 1 ready, signaling step 2');
  s_send(fSocketXmitter, 'READY');
  zmq_close(fSocketXmitter);
end;


function Thread_Step_2(aContext: Pointer): integer;
var
  fDummy: string;
  fSocketReceiver: Pointer;
  fSocketXmitter: Pointer;
  fThreadId: Cardinal;
begin
  Result := 0;
 // Связыывает inproc сокет перед запуском
  fSocketReceiver := zmq_socket(aContext, ZMQ_PAIR);
  zmq_bind(fSocketReceiver, 'inproc://step2');
  BeginThread(nil, 0, @Thread_Step_1, aContext, 0, fThreadId);

// Ожидание сигнала
  fDummy := s_recv(fSocketReceiver);
  zmq_close(fSocketReceiver);

// Коннект к step3 сообщение о готовности
  fSocketXmitter := zmq_socket(aContext, ZMQ_PAIR);
  zmq_connect(fSocketXmitter, 'inproc://step3');
  Writeln('Step 2 ready, signaling step 3');
  s_send(fSocketXmitter, 'READY');
  zmq_close(fSocketXmitter);
end;
var
  fContext: Pointer;
  fSocketReceiver: Pointer;
  fThreadId: Cardinal;
  fDummy: string;
begin
  fContext := zmq_ctx_new();

  fSocketReceiver := zmq_socket(fContext, ZMQ_PAIR);
    // Сокет для приема сигнала
  zmq_bind(fSocketReceiver, 'inproc://step3');
  BeginThread(nil, 0, @Thread_Step_2, fContext, 0, fThreadId);

  fDummy := s_recv(fSocketReceiver);
  zmq_close(fSocketReceiver);

  Writeln('Step 3 ready!');
  Writeln;
  Writeln('Test successful!');
  zmq_ctx_destroy(fContext);
  Readln;

end.



К сообщению приложен файл. Размер - 6Kb
1 окт 14, 21:45    [16647304]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Итак, это был образец классического многопоточного приложения ZeroMQ:
- Два потока взаимодействуют через InProc, используя общий контекст.
- Родительский поток создает один сокет, связывает его с конкретной конечной точкой по InProc, а затем запускает дочерний поток, передавая ему контекст.
- Дочерний поток создает второй сокет, соединяет его с той же конкретной конечной точкой по InProc и по готовности сигнализирует родительскому потоку.


Обращаем внимание, что многопоточный код, используемый в данной схеме, не масштабируется за пределы процесса: если используется протокол InProc и сокеты типа PAIR, значит, строится сильносвязная система, в которой есть взаимозависымые структуры. Такие вещи следует делать, когда нужна высокая скорость взаимодействия между компонентами системы.
Если использовать схему с протоколом TCP и использовать собственный контекст в каждом потоке, система будет менее связной и позволит в будущем легко масштабироваться методом вычленения узлов в отдельные процессы.
~~~~~~~~~~~~~

Почему были использованы сокеты типа PAIR? -Потому, что использование сокетов других типов имеет нежелательные побочные эффекты:
- Можно использовать рассмотренные ранее PUSH для отправителя и PULL для приемника. Это выглядит просто и будет работать, однако следует помнить, что PUSH будет распределять сообщения по всем доступным приемникам. Если вы случайно запустили два приемника (например, создали еще один поток с такой же процедурой потока), то вы будете "потеряете" половину ваших сигналов. Преимущество сокетов PAIR в том, что они не позволят создать больше одного соединения; пара сокетов типа PAIR - является эксклюзивной.
- Вы можете использовать DEALER для отправителя и ROUTER для приемника. ROUTER, однако, упаковывает сообщение в "конверт", т.обр. ваш сигнал нулевого размера превращается в составное сообщение. Это несущественно, если вы не заботитесь о самих данных, а посылаете только сигнал. Однако, если понадобится отправить реальные данные, то обнаружится, что ROUTER прислал вам "неправильные" сообщения. Кроме того, DEALER точно так же как и PUSH, распределяет исходящие сообщения между всеми приемниками , т.е. тут такой же риск потери сообщений, как и при использовании PUSH.
- Вы можете использовать PUB для отправителя и SUB для приемника. Эта схема будет правильно доставлять сообщения, и PUB не разбросает их по приемникам, как DEALER или PUSH. Тем неменее, вам придется все время настривать приемник на подписку, что утомительно.

По этим причинам, пара сокетов типа PAIR - лучший выбор для пересылки сигналов координации между парами потоков в приложении.
1 окт 14, 22:42    [16647476]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Согласование работы между узлами сети.

Если нужно согласовывать работы набора узлов в сети, то сокеты типа PAIR уже не так хороши.

Это как раз те области, где стратегии использования потоков и узлов различаются. В большинстве случаев узлы приходят и уходят "сами по себе", а потоки обычно статичны. Сокеты типа PAIR не выполнят автоматическое переподключение, если удаленный узел сети уйдет, а потом появится снова. Другим существенным различием в применении узлов и применением потоков является то, что обычно мы имеем фиксированное число потоков, в то время как число рабочих узлов сети меняется.
...
~~~~~~~~~~~~~
Рассмотрим рассмотреть предыдущий сценарий (с метостанцией-издателем и кучей клиентов-подписчиков) и попробуем координировать узлы так, чтобы быть уверенными в том, что при запуске подписчики-клиенты не потеряют данные.

Схема работы приложения:

- Издатель (сервис метеостанции) заранее знает, сколько будет подписчиков. То есть, это просто волшебное число, которое он откуда-то получает.

- Издатель запускается и ждет, пока подключатся все подписчики. Эта часть и есть процесс согласования. Каждый подписчик подписывается, а затем через другой сокет сообщает издателю, что он готов.

- Когда к издателю подключатся все подписчики, он начинает публиковать данные.

В данном случае для согласования действий между подписчиком и издателем мы будем использовать пары сокетов REQ-REP.

К сообщению приложен файл. Размер - 3Kb
2 окт 14, 00:04    [16647731]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Поехали. Кодируем проект "Метео" (исходная задача - 16583825). Сервис начинает публиковать данные только после коннекта 10 клиентов. Пакеты данных публикуются 10 000 раз, после чего публикуется сообщение 'END'.
Клиенты при запуске сообщают сервису о своем появлении и подписываются на данные сервиса.
Полученные по подписке данные выводятся в консоль клиента, число полученных пакетов подсчитывается, . Рабочий цикл клиента прерывается при получении сообщения 'END'.

Код сервиса:
+ Сервис "метео" с синхронизацией запуска.
program SynPS_Service;
{$APPTYPE CONSOLE}
// Меостанция
// Сервис - издатель с синхронизацией запуска

uses
  SysUtils, ZMQ, ZMQ_Utils;

const
  c_SUBSCRIBERS_EXPECTED = 10; // Ждем 10 подписчиков!
var
  fContext: Pointer;
  fSocketPublisher: Pointer;
  fSocketSyncService: Pointer;
  fDummy: string;
  fMsgStr: string;
  fSubscribers: Integer = 0;
  fSndhwm: Integer = 1100000;
  i: Integer;
begin
  fContext := zmq_ctx_new();
  // Сокет для общения с клиенатами
  fSocketPublisher := zmq_socket(fContext, ZMQ_PUB);
  zmq_setsockopt(fSocketPublisher, ZMQ_SNDHWM, @fSndhwm, SizeOf(fSndhwm));
  zmq_bind(fSocketPublisher, 'tcp://*:5561');

  // Сокет для приема сигналов
  fSocketSyncService := zmq_socket(fContext, ZMQ_REP);
  zmq_bind(fSocketSyncService, 'tcp://*:5562');

 // Получение синхросигналов от подписчиков
  Writeln('Waiting for subscribers...');
  while fSubscribers < c_SUBSCRIBERS_EXPECTED do begin
  // - ожидание синхрозапроса
    s_recv(fSocketSyncService);
  // - отправка синхроответа
    s_send(fSocketSyncService, '');
    Inc(fSubscribers);
  end;

  // Теперь раздача 10 000 оповещений, а затем - отправка 'END'

  Randomize;

  for i := 0 to 9999 do begin
    Sleep(1); // Типа измеряет что-то
    // Температура
    fMsgStr := Format('Temperature : %d C', [20 - Random(40)]);
    s_send(fSocketPublisher, fMsgStr);

    // Атм. давление
    fMsgStr := Format('Pressure : %d Pa', [101375 - Random(100)]);
    s_send(fSocketPublisher, fMsgStr);

    // Скорость ветра
    fMsgStr := Format('Wind : %d m/s', [Random(10)]);
    s_send(fSocketPublisher, fMsgStr);
  end;
  s_send(fSocketPublisher, 'END');
  Writeln('Publisher stopped...');
  zmq_close(fSocketPublisher);
  zmq_close(fSocketSyncService);
  zmq_ctx_destroy(fContext);
  Readln(fDummy);
end.

Код клиента:
+ Клиент "метео" с синхронизацией запуска.
program SynPS_Client;

{$APPTYPE CONSOLE}
// Меостанция
// Клиент - подписчик с синхронизацией запуска
uses
  SysUtils, ZMQ, ZMQ_Utils;

var
  fContext: Pointer;
  fSocketSubscriber: Pointer;
  fSocketSyncClient: Pointer;
  fMsgStr: string;
  fCnt: Integer = 0;

const
  cFilter1 = 'Temperature';
  cFilter2 = 'Pressure';
  cFilter3 = 'Wind';
begin
  fContext := zmq_ctx_new(); // Инициализация
  // Сначала подключаем сокет подписчика
  fSocketSubscriber := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocketSubscriber, 'tcp://localhost:5561');
  zmq_setsockopt(fSocketSubscriber, ZMQ_SUBSCRIBE, nil, 0);  // Настройка сокета
  Sleep(1);// ZMQ настолько шустрый, что нужно подождать...

  // Теперь синхронизируемся с издателем
  fSocketSyncClient := zmq_socket(fContext, ZMQ_REQ);
  zmq_connect(fSocketSyncClient, 'tcp://localhost:5562');
  s_send(fSocketSyncClient, ''); // Отправка сообщения о готовности
  s_recv(fSocketSyncClient); // Ожидание подтверждения

  Writeln('Subscriber started...');

  while True do begin
    fMsgStr := s_recv(fSocketSubscriber); // Прием данных
    if fMsgStr = 'END' then
      Break;
    Writeln(fMsgStr);
    Inc(fCnt)
  end;
  Writeln('Received ', fCnt, ' updates');

  zmq_close(fSocketSubscriber);
  zmq_close(fSocketSyncClient);
  zmq_ctx_destroy(fContext);
  Readln;
end.


Мы не можем быть уверены, что коннект SUB будет завершен к тому времени, когда завершится диалог REQ/REP. Вообще нет никакой гарантии того, что исходящие соединения завершатся в том или ином порядке, если вы используете любой транспорт за исключением InProc.
Ну, в примере мы воткнули ожидание (sleep(1)) после подпиской и синхропосылками REQ/REP.
Что как бы работает, но тоже в общем случае не гарантирует.

Более надежная схема могла бы выглядеть так:

- Издатель открывает PUB - сокет и начинает передавать сообщения "Hello"(без данных).
- Подписчик подключает SUB - сокет и, когда тот принимает сообщение "Hello", то сообщает об этом издателю через пару сокетов REQ/REP.
- Когда издатель получит необходимое число подтверждений от коннекте от подписчиков, он начинает публиковать реальные данные.
2 окт 14, 01:57    [16647859]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Нуль - копия (Zero-Copy).

API ZeroMQ позволяет отправлять и принимать сообщения напрямую, используя буфера данных приложения, без копирования данных. Эта технология называется нуль - копия, и её применение позволяет увеличить производительность в некоторых приложениях.

О нуль-копии следует вспоминать в случае, когда вы отправляете большие блоки памяти (тысячи байт) с высокой частотой.
Для коротких сообщений, или для сообщений, отравляемых редко, использование нуль-копии сделает код грязнее и сложнее без заметной пользы. Как и все прочие оптимизации, использовать нуль-копии следует, когда вы точно знаете, что это это помогает, и были выполнены измерения производительности до и после применения.

Делаем нуль-копию. С помощью zmq_msg_init_data() создается сообщение, которое ссылается на уже существующий блок данных вашего приложения, которое затем передается в zmq_msg_send(). Когда создается сообщение, вы также передаете параметр - функцию, чтобы ZeroMQ смогла вызвать её для освобождения блока данных после завершения передачи сообщения.

Пример такой функции, предполагающий, что буфер представляет собой блок длиной, скажем, в 1000 байт, выделенный в куче:

procedure my_free (aData : Pointer; aHint : Pointer); cdecl;
begin
  Freemem(aData);
end;


PS: в модуле ZMQ.PAS

Тип zmq_free_fn определен как
type
...
  free_fn = procedure(data, hint: Pointer); 

без модификатор cdecl. Конечно, следует исправить:

 free_fn = procedure(data, hint: Pointer); cdecl;



Пример применения:
procedure my_free(aBuf, aHint : Pointer); cdecl;
begin
  FreeMem(aBuf);
end;


var
  fMsg : zmq_msg_t;
  fData : Pointer;

begin
...
    GetMem(fData, 1000);
    FillChar(fData^, 1000, 'z');
    zmq_msg_init_data(fMsg, fData, 1000, my_free, nil);
    zmq_msg_send(fMsg,fSocketSyncService, 0);


PPS: насчет параметра hint:
 free_fn = procedure(data, hint: Pointer); cdecl;

Значение его просто дублируется из параметра hint функции
function zmq_msg_init_data( var msg: zmq_msg_t; data: Pointer; size: size_t;
  ffn: free_fn; hint: Pointer ): Integer; cdecl; external libzmq;

Судя по всему, он введен для особых случаев - например, когда требуется передать блок дополнительных данных в процедуру ffn: free_fn.
2 окт 14, 03:58    [16647908]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
чччД
Нуль - копия (Zero-Copy).


Следует еще раз отметить, что вызывать zmq_msg_close() после отправки сообщения не нужно - libzmq выполнит вызов автоматически, когда сообщение будет отправлено.

Способа сделать нуль-копию на приеме - нет. ZeroMQ предоставит вам буфер, который вы можете использовать сколько угодно, но не записывает данные напрямую в буферы вашего приложения.

При записи составных сообщений ZeroMQ отлично работает с нуль-копией. Для обычных сообщений вам понадобилось бы слить несколько сообщений в один буфер, а только потом отправлять. То есть, понадобилось бы выполнить копирование данных. А с ZMQ можно отправить несколько разных буферов, пришедших от разных источников, как отдельные кадры сообщения. Каждое поле отправляется как кадр, отделенный значением длины (префиксом). В приложении это выглядит как последовательность вызовов отправлений или приема. Однако, внутри ядра ZMQ, составное сообщение отправляется и принимается одним системным вызовом, что очень эффективно.
2 окт 14, 04:12    [16647914]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Упаковка сообщений для схемы "Издатель-Подписчик"

Вернемся к схеме "Издатель - Подписчик". (Приложение "Метостанция").
Вспомним, что при подписке данные можно фильтровать: 16590914.
Однако, фильтрация по самим данным не всегда удобно. Куда удобнее фильтровать по значению ключевого поля, связанному с данными.
Например:

T - Температура
P - Давление
W - Скорость ветра

Это гораздо удобнее. Например, ключу W можно сопоставить не только скорость ветра, но и направление ветра.
Или данные о скорости ветра разбить на две подгруппы:
WS - ветер до 5 м/с;
WF - ветер 5 м/с и более.

Это реализуется очень просто: на сервере нужно формировать составное сообщение:

    // Температура
    fMsgStr := Format('Temperature : %d C', [20 - Random(40)]);
    s_send(fSocketPublisher, 'T', ZMQ_SNDMORE); // 1я часть - ключ
    s_send(fSocketPublisher, fMsgStr);// 2я часть - тело сообщения


А клиент, при оформлении подписки, должен указать фильтр:

const
  cFilter1: string = 'T';
  cFilter2: string = 'P';
  cFilter3: string = 'W';
begin
  fContext := zmq_ctx_new(); // Инициализация
  // Подключаем сокет подписчика
  fSocketSubscriber := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocketSubscriber, 'tcp://localhost:5561');
  zmq_setsockopt(fSocketSubscriber, ZMQ_SUBSCRIBE, PChar(cFilter1), Length(cFilter1) * SizeOf(Char)); // Настройка сокета

При этом сообщение будет фильтроваться по первому кадру, а приходить отфильтрованное сообщение будет полностью.

Сервис.
+ Код сервиса "Метео" с составным пакетом

program EnvPS_Service;
{$APPTYPE CONSOLE}
// Метеостанция

uses
  FastMM4, SysUtils, ZMQ, ZMQ_Utils;

var
  fContext: Pointer;
  fSocketPublisher: Pointer;
  fDummy: string;
  fMsgStr: string;
  i: Integer;
begin
  fContext := zmq_ctx_new();
  // Сокет для общения с клиенатами
  fSocketPublisher := zmq_socket(fContext, ZMQ_PUB);
  zmq_bind(fSocketPublisher, 'tcp://*:5561');

  Writeln('Press Enter,  please, when all subscribers will be ready...');
  Readln;

  // Теперь раздача 100 собщений
  Randomize;

  for i := 0 to 99 do begin
    Sleep(1); // Типа измеряет что-то
    // Температура
    fMsgStr := Format('Temperature : %d C', [20 - Random(40)]);
    s_send(fSocketPublisher, 'T', ZMQ_SNDMORE); // 1я часть - ключ
    s_send(fSocketPublisher, fMsgStr);// 2я часть - тело сообщения

    // Атм. давление
    fMsgStr := Format('Pressure : %d Pa', [101375 - Random(100)]);
    s_send(fSocketPublisher, 'P', ZMQ_SNDMORE);
    s_send(fSocketPublisher, fMsgStr);

    // Скорость ветра
    fMsgStr := Format('Wind : %d m/s', [Random(10)]);
    s_send(fSocketPublisher, 'W', ZMQ_SNDMORE);
    s_send(fSocketPublisher, fMsgStr);
  end;
  Writeln('Publisher stopped...');
  zmq_close(fSocketPublisher);
  zmq_ctx_destroy(fContext);
  Readln(fDummy);
end.



Клиент:

+ Клиент "Метео", составной пакет
program EnvPS_Client;

{$APPTYPE CONSOLE}
// Метеостанция
uses
  SysUtils, ZMQ, ZMQ_Utils;

var
  fContext: Pointer;
  fSocketSubscriber: Pointer;
  fPrefix: string;
  fBody: string;
  fCnt: Integer = 0;

const
  cFilter1: string = 'T';
  cFilter2: string = 'P';
  cFilter3: string = 'W';
begin
  fContext := zmq_ctx_new(); // Инициализация
  // Подключаем сокет подписчика
  fSocketSubscriber := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocketSubscriber, 'tcp://localhost:5561');
  zmq_setsockopt(fSocketSubscriber, ZMQ_SUBSCRIBE, PChar(cFilter1), Length(cFilter1) * SizeOf(Char)); // Настройка сокета
  Writeln('Subscriber started...');

  while True do begin
    fPrefix := s_recv(fSocketSubscriber); // Прием данных, префикс
    fBody := s_recv(fSocketSubscriber); // Прием данных, тело
    Writeln(fPrefix, ' ', fBody);
    Inc(fCnt);
    if fCnt > 10 then
      break
  end;
  Writeln('Received ', fCnt, ' updates');

  zmq_close(fSocketSubscriber);
  zmq_ctx_destroy(fContext);
  Readln;
end.


Разбиение сообщения на части удобно, в том числе, например, для логического разделения составных данных.Например: Ключ - Адрес - Основное сообщение.

К сообщению приложен файл. Размер - 24Kb
2 окт 14, 05:16    [16647934]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
High-Water Marks (Высокая вода).

Когда вы наконец сможете отправлять сообщения с огромной скоростью от процесса к процессу, вы вскоре онаружите, что память имеет свойство заканчиваться. Несколько секунд задержки в каком-то процессе приводит к общему дикому ухудшению реакции системы. В общем, в проблему нужно вникнуть и принять меры предосторожности.

Проблема заключается в следующем: вообразите, что есть процесс A, с высокой частотой отправляющий сообщения процессу B, который их обрабатывает. Иногда процесс B оказывается недоступен (сборка мусора, перегрузка CPU, что угодно), и не может обработать сообщения за короткий период. Если такие задержки составляют несколько секунд или даже больше - это может стать серьезной проблемой. Что произойдет с сообщениями, которые процесс A все еще старается отправлять? Некоторые из них попадут в сетевые буферы процесса B. Некоторые будут все еще в процессе передачи по Ethernet. Некоторые будут в буферах сети процесса A. А остальные будут накапливаться в памяти процесса A с той скоростью, с которой процесс A их отправляет. Если не принять мер предосторожности, можно легко получить out of memory и крах.

Это - классическая проблема систем обмена сообщениями. Причем, чаще всего, процесс В - это приложение, написанное пользователем, и процесс А его никак не контролирует.

Что делать? Один из вариантов решения - управлять входным потоком. Процесс A получает сообщение откуда-то еще. Говорим ему "Stop!". И так далее - тормозим всех, кто меня торопит. Это называется управление потоком (flow control - обмен сигналами, при котором каждое устройство оповещает о готовности послать или принять данные). Такое решение выглядит правдоподобно, но что если вам пришло сообщение из Твиттера? Вы скажете всему миру подождать, пока вы в процессе B делается что-то важное?

Flow control работает в некоторых случаях, но не работает в других. Транспортный уровень не может сообщить уровню приложения "stop". Это как если метрополитен скажет большему бизнесу: "пожалуйста, держите работников еще полчаса, я слишком занят". Решением для системы обмена сообщениями является назначение пределов размеров буферов, а по достижению этих границ - выполнение некоторых разумных действий. В некоторых случаях (ОК, не для метрополитена), будет отказ в обслуживании (сообщения отбрасываются), в других лучшей стратегией будет ожидание.

ZeroMQ использует концепцию HWM (high-water mark) для определения емкости своих внутренних трубопроводов. Каждое соединение от сокета к сокету имеет собственный трубопровод, и значение HWM для отправки и/или для приема , в зависимости от типа сокета. Некоторые сокеты (PUB, PUSH) имеют только буферы для отправления сообщений. Некоторые (SUB, PULL, REQ, REP) - только для приема. Некоторые (DEALER, ROUTER, PAIR) имеют оба типа буферов.

В ZeroMQ v2.x значение HWM было бесконечным по умолчанию. Это было легко для использования, но и, как правило, оказывалось смертельным для сокетов - издателей с большим объемом сообщений. В ZeroMQ v3.x оно установлено в 1000 по умолчанию, которое является более разумным. Если вы все еще используете ZeroMQ v2.x, вы всегда должны установить HWM на ваших сокетах, например - 1000, чтобы соответствовать ZeroMQ v3.x или другое значение, которое посчитаете правильным.

Когда сокет достигает своего HWM, он либо блокирует данные, либо отбрасывает сообщения, в зависимости от типа скета. Сокеты типа PUB и ROUTER будут отбрасывать данные, в то время как другие будут блокировать. Для транспорта inproc передающий и принимающий сокеты используют общие буферы, поэтому реальное значение HWM будет суммой HWM, установленных для обоих сторон.

Наконец, последнее. Значения HWMs не являются точными величинами. Если вы установите 1,000 сообщений (по умолчанию), то реальная величина буфера будет меньше, чем половина, так как libzmq реализует еще и собственные очереди.
2 окт 14, 06:11    [16647969]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Потери сообщений.

Сообщения атомарны, это хорошо. То есть, если вы что-то получаете - то получаете это полностью.
Плохо то, что если что-то теряется, то вы не получаете вообще ничего.

Далее - универсальный решатель проблем потерь сообщений.

- Для сокетов SUB, всегда устанавливать подписку с помощью zmq_setsockopt() и ZMQ_SUBSCRIBE, или не получите ни одного сообщения. Так как вы подписываетесь на сообщения с заданным префиксом, для приема всех сообщений следует указать префикс "" (пустая строка).

- Если вы стартуете сокет SUB (то есть, соединяетесь с сокетом PUB) после того, как сокет PUB начал рассылать данные, вы потеряете все, что было разослано до вашего подключения. Если это является проблемой, поменяйте архитектуру так, чтобы сокет SUB стартовал первым, а лишь затем начинайте публикацию данных с сокетом PUB.

- Даже если сокеты SUB и PUB синхронизированы, сообщения могут все равно теряться. Это из-за того, что внутренние очереди могут быть еще не созданы до момента фактического коннекта. Если вы можете переключить направление операций bind/connect так, чтобы для сокета SUB выполнялся bind, а для сокета PUB - connect, возможно, вы получите более рабочий вариант.

- Если используются сокеты REP и REQ, но при этом не соблюдается синхронный порядок операций запрос/ответ/запрос/ответ, ZeroMQ будет сообщать об ошибках, которые вы, возможно, будете игнорировать. Это также будет выглядеть как потеря сообщений. При использовании сокетов REQ или REP следует строго соблюдать последовательность запросов и ответов на обеих сторонах соединения. И всегда в реальном коде следует проверять коды ошибок после вызовов ZeroMQ.

- Если используются сокеты PUSH, может получиться, что первые подключившиеся сокеты PULL получат несоразмерно большое количество сообщений. Равномерное распределение сообщений возможно лишь в случае, когда все сокеты PULL успешкно подключились, что может занять несколько миллисекунд. В качестве альтернативы PUSH/PULL, для если нагрузка невелика, следует рассмотреть пару ROUTER/DEALER и шаблон балансировки нагрузки.

- Разделение сокетов между потоками приведет к неустойчивому поведению и авариям.

- Если используется транспорт inproc, убедитесь, что оба сокета имеют общий контекст. Иначе будет отказ на стороне коннекта. Также, сначала делайте bind, а лишь затем - connect. Транаспорт inproc не отключаемый, в отличии от tcp.

- При использовании сокетов ROUTER очень легко потерять сообщения, если случайно отправлены неверные кадры идентификации(или если вообще забыть про кадры идентификации). Отличной идеей будет установка для таких сокетов ROUTER отции ZMQ_ROUTER_MANDATORY, но все равно не следует забывать проверять код завершения после каждого вызова ZMQ.

Последнее. Если вы все сделали правильно, но ничего не работает, сделайте крошечное тестовое приложение, воспроизводящее проблему и запросите помощь у сообщества ZeroMQ.

К сообщению приложен файл. Размер - 50Kb
2 окт 14, 20:11    [16652644]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
чччД
...
- Если используется транспорт inproc....Также, сначала делайте bind, а лишь затем - connect.
...


PS: В ZeroMQ версии 4.* эта проблема решена, bind и connect для inproc может быть выполнен в любой последовательности.
2 окт 14, 20:14    [16652653]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Шаблон "Запрос - Ответ" - подробности.

Ранее были немного упомянуты составные сообщения.
Так вот, использование составных сообщений дает возможность оформлять сообщения в форме "конвертов", когда адрес отделен от тела сообщения. Так вот, сообщения в SUB/PUB сокетах как раз пересылаются в таких конвертах. Наличие адресов позволяет легко организовать двусторонний обмен с помощью средств общего назначения - таких, как ZPI ZMQ и прокси, которые на лету создают, читают и удаляют адреса, не затрагивая "полезные" данные.

Так вот, в шаблоне "Запрос - Ответ" конверт содержит обратный адрес для ответа. Наличие обратного адреса позволяет получить ответ на запрос.

При использовании сокетов REQ и REP нет никакой нужды создавать конверты самостоятельно; это автоматически делают сами сокеты.

Для понимания интересно разобрать, как такие конверты используются с сокетом ROUTER.

Простая форма конверта для ответа REPLY.

Обмен данными "запрос-ответ" состоит из сообщения-запроса и, в конечном итоге, из сообщении-ответа.
В простой схеме "запрос-ответ" на один запроса приходится один ответ.
В более сложных схемах запросы и ответы могут пересылаться асинхронно. Однако, ответный "конверт" всегда одинаковый.

В общем виде, ответный конверт ZMQ всегда состоит из нуля или более обратных адресов, следующих после пустого кадра. (разделителя конвертов). За ними следует тело сообщения - ноль или более кадров.
Конверт создается множеством сокетов, работающих в одной цепочке.


Например, в сокет REQ отправляем запрос "Hello". Сокет REQ создает простейший конверт для ответа без адреса, просто пустой кадр и кадр сообщения, содержащий строку "Hello". Т.обр., сообщение из двух кадров:

№ кадраДлинаСодержание
10
25Hello


Сокет REP, получив конверт, "вскрывает" его: удаляет разделитель конвертов (первый кадр) и передает сообщение приложению.

Если перехватить сетевые данные приложения, мы увидим, что каждый запрос и каждый ответ состоит из двух кадров: пустой кадр, а затем кадр с полезными данными. Для простого случая "REQ-REP" это выглядит не очень полезным.

Однако, это важно для сокетов типа ROUTER и DEALER.


Расширенная форма конвертов для ответа

Рассмотрим, как работают пары сокетов REQ-REP через прокси (который использует сокеты ROUTER-DEALER), и как это влияет на форма конвертов для ответа. См. приложение из 16630307

В общем, совершенно никакой разницы, сколько прокси - ноль, один, два или больше:
[REQ] <-> [REP]
[REQ] <-[ROUTER|DEALER]-> [REP]
[REQ] <-[ROUTER|DEALER]<-[ROUTER|DEALER]-> [REP]


Вот псевдокод того, что делает прокси:

prepare context, frontend and backend sockets
while true:
    poll on both sockets
    if frontend had input:
        read all frames from frontend
        send to backend
    if backend had input:
        read all frames from backend
        send to frontend


+ Просто код (не всевдо).

  while true do begin
    zmq_poll(fZMQPoll[0], Length(fZMQPoll), -1);
      // Проверка состояния сокетов из пула
    if (fZMQPoll[0].revents and ZMQ_POLLIN) <> 0 then
      while True do
      begin // Трансляция сообщний от клиента к сервису
      // Обработка всх частей сообщения
        zmq_msg_init(fMsg);
        zmq_msg_recv(fMsg, fSocketFrontEnd, 0);
        fDoMore := zmq_msg_more(fMSG) <> 0;
        zmq_msg_send(fMsg, fSocketBackEnd, IfThen(fDoMore, ZMQ_SNDMORE, 0));
        zmq_msg_close(fMsg);
        if not fDoMore then
          Break; // Это была последняя часть сообщения
      end;
    if (fZMQPoll[1].revents and ZMQ_POLLIN) <> 0 then
      while True do
        // Трансляция сообщний от сервиса к клиенту
      begin // Обработка всх частей сообщения
        zmq_msg_init(fMsg);
        zmq_msg_recv(fMsg, fSocketBackEnd, 0);
        fDoMore := zmq_msg_more(fMSG) <> 0;
        zmq_msg_send(fMsg, fSocketFrontEnd, IfThen(fDoMore, ZMQ_SNDMORE, 0));
        zmq_msg_close(fMsg);
        if not fDoMore then
          Break; // Это была последняя часть сообщения
      end;
  end;




Так вот. Сокет ROUTER, в отличии от всех остальных, отслеживает каждое входящее соединение и сообщает об этом вызывающей стороне. Вызывающая сторона получает уведомление, что она должна проверять идентификатор каждого входящего сообщения, который будет следовать перед сообщением. Идентификатор, который иногда называют адресом, представляет собой всего-навсего двоичную строку, несущую единственную нагрузку: "Это - уникальный дескриптор связи". После этого, когда приложение отправляет сообщение через сокет ROUTER, первым отправляется кадр идентификации.

Вот что сказано об этом в документации по zmq_socket():
...из документации по zmq_socket()
Перед передачей в приложение все сообщения, принимаемые сокетом ZMQ_ROUTER , должны предваряться частью собщения, идентифицирующей вызывающего корреспондента. Сообщения принимаются от всех подключенных корреспондентов в соответствии с алгоритмом справедливой очереди (fair-queued). При отправке сообщения, сокет ZMQ_ROUTER должен удалить первую часть сообщения и использовать её для идентификации корреспондента, которому должно быть доставлено сообщение.


В качестве идентификаторов ZMQ v2.* использовала UUID, а начиная с V3.0 используются короткие целые. Изменения улучшили производительность сети, правда, только в случае использования множества прокси - ретрансляторов (что бывает крайне редко).

Сокет ROUTER для каждого соединения генерирует случайное число. То есть, когда к сокету ROUTER в прокси подключается три клиента сокетами REQ, генерируется три случайных числа, по одному на каждый сокет REQ.


Итак, начнем разрабатывать поясняющий пример. Предположим, у сокета REQ трехбатовый идентификатор "ABC". Это подразумевает, что внутренние механизмы сокета ROUTER содержат хэш-таблицу, в которой ищется строка "ABC" и соответствующее данному сокету REQ соединение TCP.
При получении сообщения от сокета ROUTER socket, мы получаем три кадра:
Запрос с одним адресом
Номер кадра Длина Содержание Описание
13ABCИдентификатор соединения
20Пустой разделяющий кадр
35HelloКадр с данными



Ядро прокси в цикле просто читает данные из одного сокета и передает в другой, то есть буквально эти три кадра и попадают на вход сокета DEALER. Прослушка сетевого трафика показала бы, что эти три фрейма пролетают из сокета DEALER в REP. Сокет REP делает все так же, как было описано ранее: срезает с конверта все, включая новый обратный адрес и передает "Hello" абоненту - получателю.

Следует отметить, что сокет REP одновременно может работать только одним циклом запрос - ответ. Поэтому, если вы попытаетесь прочитать несколько запросов или оправить несколько ответов, строго не придерживаясь цикла "Запрос-Ответ", сокет вернет ошибку.

Теперь вполне понятен и обратный путь сообщения. Когда сервис возвращает ответ, сокет REP заворачивает его в "сохраненный" (при "вскрытии") конверт, и отправляет ответ из трех кадров через сокет DEALER.

Ответ с одним адресом
Номер кадра Длина Содержание Описание
13ABCИдентификатор соединения
20Пустой разделяющий кадр
35WorldКадр с данными


Далее сокет DEALER читает эти три фрейма и отправляет через сокет ROUTER. Сокет ROUTER берет первый кадр сообщения с идентификатором "ABC" и находит коннект, связанный с ним. Ели коннект найден, наружу перекачивается следующие два кадра - уже знакомый минимальный конверт

№ кадраДлинаСодержание
10
25World


Сокет REQ принимает сообщение, проверяет, что первым кадром идет пустой разделитель, отбрасывает кадр и передает в приложение "World"...

Все просто.

Ну и что?

Следует признать, что вот такие простые схемы "Запрос - Ответ" или даже "Запрос - Ответ с брокером" не так уж часто применяются. Кроме того, нет простого способа восстановления системы после таких вещей как падение сервера (например, из-за бажного кода).

Методы построения надежных схем Запрос -Ответ будут рассмотрены позже.

Сейчас разберемся, как отважная четверка сокетов (REQ-REP-ROUTER-DEALER) борется с конвертами. Это позволит делать всякие полезные вещи.

Итак, мы поняли, что сокет ROUTER использует конверты для обратной пересылки, чтобы определить, какому из клиентских сокетов REQ следует направить обратный ответ. Или, иными словами:

- Каждый раз сокет ROUTER принимает сообщение, оно с помощью идентификатора сообщает вам, какой корреспондент это сообщение прислал.
- Вы можете воспользоваться хэш- таблицей (когда идентификатор - ключевая строка), чтобы отследить вновь подключенного абонента.
- Сокет ROUTER будет асинхронно, циклически обрабатывать всех корреспондентов, подключившихся к нему, если префикс - идентификатор идентичен первому кадру сообщения.
Сокеты ROUTER не обрабатывают конверт полностью. Они ничего не знают о пустых разделителях. Все ,что они делают - это работа с кадром идентификации, который позволяет выяснить, в какой из коннектов следует дальше отправить сообщение.
5 окт 14, 02:40    [16662285]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Что мы знаем про сокеты из схемы "Запрос - Ответ"


- Сокеты REQ перед сообщением отправляют в сеть пустой фрагмент-разделитель. Сокеты REQ - синхронные. Сокеты REQ всегда посылают один запрос и всегда ждут одного ответа. Сокеты REQ одновременно общаются только с одним корреспондентом. Если вы подключаете сокет REQ к некольким абонентам, запросы будет отправлены, а ответы получены по одному корреспонденту в каждом цикле запрос - ответ.

- Сокет REP читает и сохраняет все кадры идентификации до пустого разделительного кадра включительно, затем передает следующий кадр или кадры абоненту. Сокеты REP - синхронные и одновременно общаются только с одним абонентом. Если вы подключаете сокет REP ко множеству абонентов, запросы читаются от корреспондентов последовательно по кругу (in fair fashion), а ответы всегда будут отправляться тому же самому корреспонденту, от которого был последний запрос.

- Сокет DEALER не обращает на конверт возврата никакого внимания и обрабатывает его как любое составное сообщение. Сокеты DEALER - асинхронные и похожи на комбинацию сокетов PUSH и PULL. Они распределяют отправляемые сообщения по всем соединениям, и принимают сообщения от всех соединений по алгоритму справедливой очереди.

- Сокет ROUTER не обращает внимания на конверт возврата, как и сокет DEALER. Он создает идентификаторы своих соединений, и передают эти идентификаторы корреспондентам в виде первого кадра каждого входящего соединения. И наоборот, когда корреспондент отправляет сообщение, он использует первый кадр как идентификатор, чтобы найти соединение для отправки. Сокет ROUTERS - асинхронный.


Допустимые комбинации сокетов

От (connect) Направление К(bind)
REQ-->REP
DEALER--> REP
REQ-->ROUTER
DEALER-->ROUTER
DEALER-->DEALER
ROUTER-->ROUTER


Недопустимые комбинации:

От (connect) Направление К(bind)
REQ-->REQ
REQ-->DEALER
REP-->REP
REP-->ROUTER
5 окт 14, 03:13    [16662307]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
"Запрос - Ответ", рабочие комбинации сокетов.
Некоторые особенности.

REQ->REP
Мы уже рассматривали, как клиент REQ общается с сервером REP. Одно замечание: клиент REQ должен быть инициатором потока сообщений. Сервер REP не может начать общение с клиентом REQ, пока тот не пришлет запрос.

DEALER->REP

Далее. Меняем в клиенте сокет REQ на сокет DEALER. Это дает асинхронного клиента, который может общаться с множеством серверов REP. Если перепишем клиента "Hello World", используя сокет DEALER, мы сможет отправлять любое число запросов "Hello" без ожидания ответов.

Когда мы используем сокет DEALER для общения с сокетом REP, мы должны тщательно эмулировать формирвоание конверта, который должен был посылать сокет REQ, или же сокет REP будет отбрасывать сообщение как неправильное. Итак, чтобы отправить сообщение:

- Отправляем пустой кадр с установленным флагом "MORE".
- Затем отправляем тело сообщения.

Ну, а когда принимаем сообщение, то:

- принимаем первый кадр и, если он не пуст - отбрасываем все сообщение;
- принимаем следующий кадр и передаем его в приложение.

REQ -> ROUTER

Можем заменить не только REQ на DEALER, но и REP на ROUTER. Это дает нам асинхронный сервер, который может общаться с множеством REQ клиентов одновременно. Если мы сервер перепишем "Hello World", используя сокет ROUTER, мы сможем обрабатывать параллельно любое число запросом "Hello". Мы это уже делали.

Есть два различных способа использования сокетов ROUTER::

- как прокси, который переключает сообщения между сокетами frontend и backend.
- как приложение, которое читает сообщения и реагирует на них.

В первом случае сокет ROUTER просто читает все кадры, включая искусственный кадр с идентификатором, а затем вслепую передает их
Во втором случае сокет ROUTER должен знать формат конверта возврата, который он отправляет... Так как второй сокет в паре типа REQ то сокет ROUTER получает кадр идентификации, пустой кадр-разделитель и затем - кадр данных.

DEALER -> ROUTER

Теперь заменим сразу оба сокета - и REQ, и REP на DEALER и ROUTER. Получам самую мощную комбинацию сокетов, в которой DEALER общается с ROUTER. Это дает нам асинхронного клиента, общающегося с асинхронным сервером, когда обе стороны имеют полный контроль над форматом сообщений.

Так как и DEALER и ROUTER могут работать с сообщениями любого фората, вам придется "немного" поработать в качестве проектировщика протокола. Как минимум, вы должны решить - будете ли вы эмулировать конверты возврата REQ/REP. Все зависит от того, нужно ли вам в действительности отправлять ответы или нет.

DEALER -> DEALER

Вы можете поменять не только REP на ROUTER, но и REP на DEALER, если DEALER общается с одним и только одним корреспондентом.

Когда вы заменяете REP на DEALER, ваше приложение становится полностью асинхронным и сможет посылать и принимать любое число запросов и ответов. За это придется заплатить необходимостью управлять формированием конвертами ответа самостоятельно, и получать их либо правильно - иначе вообще ничего работать не будет.
Пока отметим, что пара сокетов DEALER -> DEALER - одна из самых хитрых, и хорошо, что в действительности она бывает нужна нечасто.

ROUTER -> ROUTER

Название пары звучит так, как будто она идеальна для организации соединений N-to-N, но в действительности это наиболее сложная для использования комбинация. В руководстве рекомендуют избегать её, пока не станете докой в ZeroMQ.
5 окт 14, 04:28    [16662330]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Для дальнейшей работы понадобится инструмент для более детального просмотра сообщений, поступающих в сокет.

Создадим процедуру s_dump(), которая будет читать все части сообщения из сокета и выводить их в консоль:

procedure s_dump(aSocket: Pointer);


...и добавим её в наш вспомогательны модуль ZMQ_utils:

+ ZMQ_utils.pas
unit ZMQ_Utils;
interface
// Возвращает длину отправленного сообщения в байтах
function s_recv(aZMQSocket: Pointer; aFlags: integer = 0): string;

// Отправляет строку Delphi в сокет. Возвращает число отправленных байт.
function s_send(aZMQSocket: Pointer; const aSrcString: string;
  aFlags: integer = 0): integer;

// Читает из сокета сообщение и показывает его в консоли с разбивкой по кадрам
procedure s_dump(aSocket: Pointer);

implementation
uses SysUtils, ZMQ;

//  Читает строку ZMQ из сокета и преобразует её в строку Delphi
//  В сокете должна быть именно строка Delphi. Возвращает пустую строку
//  если контекст ZMQ был завершен.

function s_recv(aZMQSocket: Pointer; aFlags: integer = 0): string;
var
  fLen: Integer;
  fZMQMsg: zmq_msg_t;
begin

  Result := '';
  try
    zmq_msg_init(fZMQMsg);
    fLen := zmq_msg_recv(fZMQMsg, aZMQSocket, aFlags);
    if fLen <= 0 then
      Exit;
    SetLength(Result, fLen div SizeOf(Char));
    Move(zmq_msg_data(fZMQMsg)^, Result[1], fLen div SizeOf(Char));
  finally
    zmq_msg_close(fZMQMsg);
  end;
end;

function s_send(aZMQSocket: Pointer; const aSrcString: string;
  aFlags: integer = 0): integer;
    // Возвращает длину отправленного сообщения в байтах
var
  fZMQMsg: zmq_msg_t;
begin
  zmq_msg_init(fZMQMsg);
  if Length(aSrcString) > 0 then begin
    zmq_msg_init_size(fZMQMsg, Length(aSrcString) * SizeOf(Char));
    Move(PChar(aSrcString)^, zmq_msg_data(fZMQMsg)^, Length(aSrcString) *
      SizeOf(Char));
  end;
  Result := zmq_msg_send(fZMQMsg, aZMQSocket, aFlags);
end;

procedure s_dump(aSocket: Pointer);
var
  fZMQMsg: zmq_msg_t;
  fSize: Integer;
  fIsText: Boolean;
  i: Integer;
  fData: PChar;
  fMore: UInt64;
  fMoreSize: size_t;
begin
  Writeln('----------------------------------------');
  while true do begin
        //  Обработка всех частей сообщения fZMQMsg
    zmq_msg_init(fZMQMsg);
    fSize := zmq_msg_recv(fZMQMsg, aSocket, 0);

        //  Вывод сообщения fZMQMsg в виде текста или hex
    fData := zmq_msg_data(fZMQMsg);
    fIsText := True;
    for i := 0 to Pred(fSize) do
      if not (fData[i] in [#32..#127]) then begin
        fIsText := False;
        break
      end;
    Write(Format('[%3u] ', [fSize]));
    for i := 0 to Pred(fSize) do begin
      if (fIsText) then
        Write(fData[i])
      else
        Write(Format('%.2x ', [Integer(fData[i])]))
    end;
    Writeln;
    fMore := 0; //  Сообщение составное?
    fMoreSize := sizeof(fMore);
    zmq_getsockopt(aSocket, ZMQ_RCVMORE, @fMore, fMoreSize);
    zmq_msg_close(fZMQMsg);
    if fMore = 0 then
      break; //  Последняя часть сообщения
  end;
end;

end.


Пригодится, когда начнем разбирать "конверты".
6 окт 14, 01:03    [16664115]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Сокеты ROUTER, более подробно.
Концепция идентификаторов и адресов

- Идентификация сообщений в ZMQ касается только сокетов ROUTER.
- В широком смысле идентификатор представляет собой обратный адрес для ответа в составе конверта сообщений. В большинстве случаев, идентификаторы являются случайным значением, и они локальны в рамках одного сокета ROUTER. Идентификатор - ключевое значение для поиска в хэш-таблице.
- Независимо от значений идентификаторов, соединения могут иметь физические адреса ("tcp://192.168.55.117:5670"), или логические (UUID или email адрес или любое уникальное ключевое значение).

Приложение, которое общается с абонентами с помощью сокета ROUTER, может преобразовывать логические адреса в идентификаторы с помощью встроенных хэш-таблиц. Так как в момент передачи сообщения сокет ROUTER задает идентификацию соединения для конкретного корреспондента-отправителя, вы можете ответить лишь этому отправителю, а не произвольному абоненту сокета.

Пример крошечного приложения, в котором сообщения пересылаются по inproc протоколу от сокетов REQ к сокету ROUTER:

+ Пример идентификации
program IC;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ, ZMQ_Utils;
var
  fContext: Pointer;
  fSocketSink: Pointer;
  fSocketAnonymous: Pointer;
  fSocketIdentifier: Pointer;
begin
  fContext := zmq_ctx_new();
  fSocketSink := zmq_socket(fContext, ZMQ_ROUTER);
    // Настройка приемника
  zmq_bind(fSocketSink, 'inproc://example');

// 1. Разрешаем 0MQ установить идентификатор
  fSocketAnonymous := zmq_socket(fContext, ZMQ_REQ);
  zmq_connect(fSocketAnonymous, 'inproc://example');
  s_send(fSocketAnonymous, 'ROUTER uses a generated UUID');
  s_dump(fSocketSink); // Смотрим, что на выходе приемника

// 2. Устанавливаем идентификатор самостоятельно
  fSocketIdentifier := zmq_socket(fContext, ZMQ_REQ);
  zmq_setsockopt(fSocketIdentifier, ZMQ_IDENTITY, PChar('PEER2'), 5); //!
  zmq_connect(fSocketIdentifier, 'inproc://example');
  s_send(fSocketIdentifier, 'ROUTER socket uses REQ''s socket identity');
  s_dump(fSocketSink); // Смотрим, что на выходе приемника

  zmq_close(fSocketSink);
  zmq_close(fSocketAnonymous);
  zmq_close(fSocketIdentifier);
  zmq_ctx_destroy(fContext);
  Readln;
end.


Создается сокет - приемник ROUTER, к которому коннектятся два сокета REQ. Второму сокету перед коннектом назначется идентификатор 'PEER2'.
Смотрим, что же получилось на выходе приемника.

Видно, что каждое сообщение состоит из трех кадров: идентификатор, пустой кадр - разделитель, и кадр с данными:

К сообщению приложен файл. Размер - 37Kb
6 окт 14, 01:29    [16664174]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Обработка ошибок сокета ROUTER.

Ну, не то чтобы ошибок.

Если сокет ROUTER не может определить, куда отправить сообщения, он просто удаляет их. Для реальных приложений это, наверное, правильно (клиент отвалился - что тут поделаешь?), но это затрудняет отладку - особенно если обратный конверт для сокета ROUTER формируется "ручками".

Начиная с ZeroMQ v3.2, у сокетов появилась опция для перехвата ошибок: ZMQ_ROUTER_MANDATORY. Устанавливаем ей для сокета ROUTER получаем возможность отловить ситуацию, когда индентификации недостаточно для работы сокета ROUTER, сокет будет сигнализировать ошибкой EHOSTUNREACH.
6 окт 14, 02:34    [16664229]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Раз уж выяснилось (16666035), что Write/Writeln небезопасны в потоках, добавим безопасную функцию вывода строки в консоль. Снова расширим ZMQ_Utils.pas.

// "Потокобезопасно" выводит строку в консоль
procedure z_Log(const aStr : string);


Ну и для работы с текущей темой добавим еще пару функций:

// Формирует строку указанной длины со случайным заполнением
function s_random(aLen: Integer): string;

//  Устанавливает случайный текстовый идентификатор для сокета
procedure s_set_id(aSocket: Pointer);


+ ZMQ_Utils.pas

unit ZMQ_Utils;
interface
// Возвращает длину отправленного сообщения в байтах
function s_recv(aZMQSocket: Pointer; aFlags: integer = 0): string;

// Отправляет строку Delphi в сокет. Возвращает число отправленных байт.
function s_send(aZMQSocket: Pointer; const aSrcString: string;
  aFlags: integer = 0): integer;

// Читает из сокета сообщение и показывает его в консоли с разбивкой по кадрам
procedure s_dump(aSocket: Pointer);

// Формирует строку указанной длины со случайным заполнением
function s_random(aLen: Integer): string;

//  Устанавливает случайный текстовый идентификатор для сокета
procedure s_set_id(aSocket: Pointer);

// "Потокобезопасно" выводит строку в консоль
procedure z_Log(const aStr : string);

implementation
uses SysUtils, ZMQ, Windows;

var
  cs: TRTLCriticalSection;

procedure z_Log(const aStr : string);
begin
  EnterCriticalSection(cs);
    Writeln(aStr);
  LeaveCriticalSection(cs);
end;

procedure s_set_id(aSocket: Pointer);
//  Устанавливает случайный текстовый идентификатор для сокета
begin
  zmq_setsockopt( aSocket, ZMQ_IDENTITY,  PChar(s_random(10)), 10 * SizeOf(Char));
end;

function s_random(aLen: Integer): string;
// Формирует случайную текстовую строку
const
  Chars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ';
var
  s: String;
  i: integer;
begin
  Randomize;
  result := '';
  for i := 1 to aLen do
    result := result + Chars[Random(Length(Chars)) + 1];
end;



function s_recv(aZMQSocket: Pointer; aFlags: integer = 0): string;
//  Читает строку ZMQ из сокета и преобразует её в строку Delphi
//  В сокете должна быть именно строка Delphi. Возвращает пустую строку
//  если контекст ZMQ был завершен.
var
  fLen: Integer;
  fZMQMsg: zmq_msg_t;
begin

  Result := '';
  try
    zmq_msg_init(fZMQMsg);
    fLen := zmq_msg_recv(fZMQMsg, aZMQSocket, aFlags);
    if fLen <= 0 then
      Exit;
    SetLength(Result, fLen div SizeOf(Char));
    Move(zmq_msg_data(fZMQMsg)^, Result[1], fLen div SizeOf(Char));
  finally
    zmq_msg_close(fZMQMsg);
  end;
end;

function s_send(aZMQSocket: Pointer; const aSrcString: string;
  aFlags: integer = 0): integer;
    // Возвращает длину отправленного сообщения в байтах
var
  fZMQMsg: zmq_msg_t;
begin
  zmq_msg_init(fZMQMsg);
  if Length(aSrcString) > 0 then begin
    zmq_msg_init_size(fZMQMsg, Length(aSrcString) * SizeOf(Char));
    Move(PChar(aSrcString)^, zmq_msg_data(fZMQMsg)^, Length(aSrcString) *
      SizeOf(Char));
  end;
  Result := zmq_msg_send(fZMQMsg, aZMQSocket, aFlags);
end;

procedure s_dump(aSocket: Pointer);
var
  fZMQMsg: zmq_msg_t;
  fSize: Integer;
  fIsText: Boolean;
  i: Integer;
  fData: PChar;
  fMore: UInt64;
  fMoreSize: size_t;
begin
  Writeln('----------------------------------------');
  while true do begin
        //  Обработка всех частей сообщения fZMQMsg
    zmq_msg_init(fZMQMsg);
    fSize := zmq_msg_recv(fZMQMsg, aSocket, 0);

        //  Вывод сообщения fZMQMsg в виде текста или hex
    fData := zmq_msg_data(fZMQMsg);
    fIsText := True;
    for i := 0 to Pred(fSize) do
      if not (fData[i] in [#32..#127]) then begin
        fIsText := False;
        break
      end;
    Write(Format('[%3u] ', [fSize]));
    for i := 0 to Pred(fSize) do begin
      if (fIsText) then
        Write(fData[i])
      else
        Write(Format('%.2x ', [Integer(fData[i])]))
    end;
    Writeln;
    fMore := 0; //  Сообщение составное?
    fMoreSize := sizeof(fMore);
    zmq_getsockopt(aSocket, ZMQ_RCVMORE, @fMore, fMoreSize);
    zmq_msg_close(fZMQMsg);
    if fMore = 0 then
      break; //  Последняя часть сообщения
  end;
end;

initialization
  InitializeCriticalSection( cs );

finalization
  DeleteCriticalSection( cs );

end.

6 окт 14, 17:07    [16667318]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Шаблон Балансировка Нагрузки

Рассмотрим простой код (см. ниже). Видно, как сокет ROUTER коннектится к сокету REQ, а затем к сокету DEALER. Эти два примера следуют одной и той же логике, соотвествующей шаблону "Балансировка Нагрузки". Этот шаблон был использован, когда мы впервые применили сокет ROUTER для маршрутизации сообщений, а не просто как средство для ответа на запрос.

Шаблон "Балансировка Нагрузки" чрезвычайно популярен. Он решает главную проблему, когда простые алгоритмы последовательной круговой(round robin) маршртизации (которые обеспечивают PUSH и DEALER) становятся неэффективными. Такое случается, когда для выполнения задач, решаемых рабочими процессами, требуется разное время.

Рассмотрим аналогию с почтовым офисом. Если у нас есть общая очередь клиентов перед прилавком, и некоторые покупают почтовые марки (быстрая, простая транзакция), а некоторые открывают новые счета (очень медленная, долгая транзакция), то становися очевидно, что покупатели почтовых марок застряли неоправданно долго. Если ваша архитектура обмена сообщениями окажется неудачной, ваши клиенты будут раздражены, точно так же, как в почтовом офисе.

Для почтового офиса решением будет использование одной очереди так, чтобы даже когда один или два служащих застрянут на медленном задании, прочие клиенты будут обслуживаться другими служащими по принципу "первый пришел - первый обслужен".

Одной из причин использования упрощенного подхода в сокетах PUSH и DEALER является производительность. Если вы прибываете в любой из главных аэропортов США, вы увидите несколько длинных очередей людей, ожидающих оформления процедуры иммиграции. Пограничники заранее отправляют людей к очереди конкретного служащего, а не заставляют всех стоять в одной общей очереди. Заставив людей заранее пройти полсотни метров, реально экономится 1-2 минут на обслуживание каждого пассажира. А так как проверка каждого паспорт занимает примерно одинаковое время, это обеспечивает что-то вроде справедливого обслуживания. Это и есть стратегия для сокетов PUSH и DEALER: распределить нагрузку заранее так, чтобы уменьшить расстояние для перемещения сообщения.

То есть, метод обслуживания в аэропорту отличается от метода в почтовом офисе.

Рассмотрим сценарий, когда рабочий процесс (сокетом DEALER или REQ) подключается к брокеру (к сокету ROUTER). Брокер знает, когда рабочий процесс готов к обслуживанию, и хранит список рабочих процессов. Поэтому он может всегда определить, какой из рабочих процессов наиболее редко использовался.

Решение задачи очень просто: рабочий посылает сообщение "готов" после того, как стартует, а также всякий раз после выполнения очередной задачи. Брокер читает сообщения последовательно, одно за другим. Понятно, что всякий раз, когда сообщение прочитано, оно принято от рабочего процесса, использованного последним . И так как используется сокет ROUTER, в начале конверта сообщения содержится идентификатор, который позволяет оправить задачу рабочему процессу обратно.

Цикл запрос - ответ необходим, так как задание отправляется с ответом, а любой ответ на задание отправляется как новый запрос.

Следующий пример понятно демонстрирует описанное:

Основной поток приложения создает "слушающий" сокет ROUTER. Затем создается 10 рабочих потоков, каждый поток создает сокет REQ и коннектится к сокету ROUTER основного потока. Каждый рабочий поток в цикле посылает сообщение "Я готов!", потом получает задание и выполняет его, пока не получит сообщение "Свободен!".
Основной поток в цикле читает конверт сообщения из сокета ROUTER. Первый кадр сообщения - идентификатор потока, который его прислал. Значит, этот поток готов к выполнению работы, формируется обратный конверт и сообщение отправляется обратно.
Если задача выполнялась больше 5 секунд, основной поток отправляет сообщение "Fire!".

+ Код приложения, поясняющего идею балансировки нагрузки

program RouterToReq;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ, ZMQ_Utils, Windows;

const
  C_NMBR_WORKERS = 10;

procedure worker_task(args: Pointer);
var
  fContext: Pointer;
  fSocketWorker: Pointer;
  fTotal: Integer;
  fWorkload: Utf8String;
begin
  fContext := zmq_ctx_new();
  fSocketWorker := zmq_socket(fContext, ZMQ_REQ);
  // Устанавливает случайное текстовое значение идентификатора для сокета
  s_set_id(fSocketWorker);
  zmq_connect(fSocketWorker, 'tcp://localhost:5671');

  fTotal := 0;
  while true do
  begin
// Сообщаем брокеру, что поток готов к работе
    s_send(fSocketWorker, 'Hi Boss');

// Получаем рабочее задание от брокера, пока не будет команды на прекращение
    fWorkload := s_recv(fSocketWorker);
    if fWorkload = 'Fired!' then
    begin
      z_Log(Format('Completed: %d tasks', [fTotal]));
      break;
    end;

    sleep(random(500) + 1); // Выполнение какой-то "полезной" работы
    Inc(fTotal);
  end;
  zmq_close(fSocketWorker);
  zmq_ctx_destroy(fContext);
end;

var
  fContext: Pointer;
  fSocketBroker: Pointer;
  i: Integer;
  fThrId: Cardinal;
  fFrequency: Int64;
  fStart: Int64;
  fStop: Int64;
  fDT: Int64;
  fWrkThreadsFired: Integer;
  fStrIdentity: string;
  fDummy: string;
begin
  fContext := zmq_ctx_new();
  fSocketBroker := zmq_socket(fContext, ZMQ_ROUTER);

  zmq_bind(fSocketBroker, 'tcp://*:5671');
  Randomize;

  // Запускаем пять рабочих потоков
  for i := 0 to Pred(C_NMBR_WORKERS) do
    BeginThread(nil, 0, @worker_task, nil, 0, fThrId);

 // Засекаем время
  QueryPerformanceFrequency(fFrequency);
  QueryPerformanceCounter(fStart);

// В течении пяти секунд шлем задания, а затем посылаем сообщение, чтобы остановились
  fWrkThreadsFired := 0;
  while true do
  begin

  // Следующее сообщение возвращает поток, первый выполнивший задание
    fStrIdentity := s_recv(fSocketBroker); // Кадр с идентификатором (фактически - обратный адрес)

    // Формируем конверт для ответа
    s_send(fSocketBroker, fStrIdentity, ZMQ_SNDMORE); // Первый кадр - обратный адрес

    fDummy :=  s_recv(fSocketBroker); // Пустой кадр - разделитель конверта
    s_send(fSocketBroker, '', ZMQ_SNDMORE); // Пустой кадр - разделитель конверта

    fDummy :=  s_recv(fSocketBroker); // Ответ от рабочего потока, тоже игнорируем


    QueryPerformanceCounter(fStop);
    fDT := (MSecsPerSec * (fStop - fStart)) div fFrequency;

    if fDT < 5000 then
      s_send(fSocketBroker, 'Work harder') // Шлем задание
    else begin
      s_send(fSocketBroker, 'Fired!'); // Команда на остановку
      Inc(fWrkThreadsFired);
      if fWrkThreadsFired = C_NMBR_WORKERS then
        break;
    end;
  end;
  zmq_close(fSocketBroker);
  zmq_ctx_destroy(fContext);
  readln;
end.




На выходе сокета ROUTER формируется конверт со следующей структурой:

№ кадраДлинаСодержаниеОписание
11002947ws5fw Идентификатор отправителя
20Пустой кадр - разделитель
37Hi Boss Сигнал готовности рабочего потока


Видно, что кадры ответного сообщения формируется и отправляются параллельно со считыванием кадров входящего сообщения.
Это сделано, чтобы лишний раз напомнить: сокет ROUTER - асинхронный.

ИМХО, сообщения лучше читать кадр за кадром, логически отделяя процесс приема от процесса передачи. Вот так, например:
  // Следующее сообщение возвращает поток, первый выполнивший задание
    fStrIdentity := s_recv(fSocketBroker); // Кадр с идентификатором (фактически - обратный адрес)
    fDummy :=  s_recv(fSocketBroker); // Пустой кадр - разделитель конверта
    fDummy :=  s_recv(fSocketBroker); // Ответ от рабочего потока, тоже игнорируем

    // Формируем конверт для ответа
    s_send(fSocketBroker, fStrIdentity, ZMQ_SNDMORE); // Первый кадр - обратный адрес
    s_send(fSocketBroker, '', ZMQ_SNDMORE); // Пустой кадр - разделитель конверта

    QueryPerformanceCounter(fStop);
    fDT := (MSecsPerSec * (fStop - fStart)) div fFrequency;

    if fDT < 5000 then
      s_send(fSocketBroker, 'Work harder') // Шлем задание
    else begin
      s_send(fSocketBroker, 'Fired!'); // Команда на остановку
      Inc(fWrkThreadsFired);
      if fWrkThreadsFired = C_NMBR_WORKERS then
        break;
    end;

...

После запуска приложение усиленно что-то делает 5 секунд, потом каждый поток отчитывается о количестве задач, которые успел выполнит за эти 5 секунд
Вывод программы:

К сообщению приложен файл. Размер - 42Kb
6 окт 14, 17:36    [16667531]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Брокер на сокете ROUTER, рабочие процессы - на сокете DEALER

Везде, где можно использовать сокет REQ, можно использовать DEALER. При этом следует учесть отличия:
- Сокет REQ всегда отправляет пустой кадр - разделитель конверта перед любыми кадрами с данными. Сокет DEALER так не делает.
- Сокет REQ всегда отправляет одно сообщение перед тем, как принимает ответ. Сокет DEALER - полностью асинхронный.

В нашем примере нет никакой разницы - синхронная работа или асинхронная, так как придерживаемся строгой последовательности "запрос - ответ". Это станет важно позднее, когда мы займемся вопросами восстановления после сбоев.

Рассмотрим такой же пример, как и предыдущий, но заменим сокет REQ на сокет DEALER:

+ Брокер на сокете ROUTER, рабочий процесс - на сокете Dealer

program RouterToDealer;

{$APPTYPE CONSOLE}

uses
  SysUtils,
  ZMQ,
  ZMQ_Utils,
  Windows;

const
  C_NMBR_WORKERS = 10;

procedure worker_task(args: Pointer);
var
  fContext: Pointer;
  fDummy: string;
  fSocketWorker: Pointer;
  fTotal: Integer;
  fWorkload: Utf8String;
begin
  fContext := zmq_ctx_new();
  fSocketWorker := zmq_socket(fContext, ZMQ_DEALER);
  // Устанавливает случайное текстовое значение идентификатора для сокета
  s_set_id(fSocketWorker);
  zmq_connect(fSocketWorker, 'tcp://localhost:5671');

  fTotal := 0;
  while true do
  begin
// Сообщаем брокеру, что поток готов к работе
    s_send(fSocketWorker, '', ZMQ_SNDMORE); // Отправляем пустой кадр-разделитель конверта
    s_send(fSocketWorker, 'Hi Boss'); // Отправляем сообщение о готовности

// Получаем рабочее задание от брокера, пока не будет команды на прекращение
    fDummy := s_recv(fSocketWorker); // Пропускаем кадр - разделитель
    fWorkload := s_recv(fSocketWorker); // Получаем задание
    if fWorkload = 'Fired!' then
    begin
      z_Log(Format('Completed: %d tasks', [fTotal]));
      break;
    end;

    sleep(random(500) + 1); // Выполнение какой-то "полезной" работы
    Inc(fTotal);
  end;
  zmq_close(fSocketWorker);
  zmq_ctx_destroy(fContext);
end;

var
  fContext: Pointer;
  fSocketBroker: Pointer;
  i: Integer;
  fThrId: Cardinal;
  fFrequency: Int64;
  fStart: Int64;
  fStop: Int64;
  fDT: Int64;
  fWrkThreadsFired: Integer;
  fStrIdentity: string;
  fDummy: string;
begin
  fContext := zmq_ctx_new();
  fSocketBroker := zmq_socket(fContext, ZMQ_ROUTER);

  zmq_bind(fSocketBroker, 'tcp://*:5671');
  Randomize;

  // Запускаем десять рабочих потоков
  for i := 0 to Pred(C_NMBR_WORKERS) do
    BeginThread(nil, 0, @worker_task, nil, 0, fThrId);

 // Засекаем время
  QueryPerformanceFrequency(fFrequency);
  QueryPerformanceCounter(fStart);

// В течении пяти секунд шлем задания, а затем посылаем сообщение, чтобы остановились
  fWrkThreadsFired := 0;
  while true do
  begin

  // Следующее сообщение возвращает поток, первый выполнивший задание
    fStrIdentity := s_recv(fSocketBroker); // Кадр с идентификатором (фактически - обратный адрес)
    // Формируем конверт для ответа
    s_send(fSocketBroker, fStrIdentity, ZMQ_SNDMORE); // Первый кадр - обратный адрес

    fDummy :=  s_recv(fSocketBroker); // Пустой кадр - разделитель конверта
    fDummy :=  s_recv(fSocketBroker); // Ответ от рабочего потока, тоже игнорируем

    s_send(fSocketBroker, '', ZMQ_SNDMORE); // Пустой кадр - разделитель конверта

    QueryPerformanceCounter(fStop);
    fDT := (MSecsPerSec * (fStop - fStart)) div fFrequency;

    if fDT < 5000 then
      s_send(fSocketBroker, 'Work harder') // Шлем задание
    else begin
      s_send(fSocketBroker, 'Fired!'); // Команда на остановку
      Inc(fWrkThreadsFired);
      if fWrkThreadsFired = C_NMBR_WORKERS then
        break;
    end;
  end;
  zmq_close(fSocketBroker);
  zmq_ctx_destroy(fContext);
  readln;
end.



Этот код почти такой же, как предыдущий, за исключением того, что рабочие процессы используют сокет DEALER и читают и пишут пустой кадр перед кадром данных. Таким образом, сохранена совместимость с вариантом рабочих процессов на сокетах REQ.

Следует помнить, что пустой кадр - разделитель конверта необходим для того, чтобы позволить в последовательной цепочке запросов использовать завершающий сокет REQ, который использует данный разделитель для отсечения обратного адреса конверта так, чтобы можно было в приложении обрабатывать кадры данных.

Если сообщение никогда не будет передано к сокету REP, то для простоты мы можем с обоих сторон просто отбросить пустой кадр - разделитель. Что, в общем, часто и делается.
6 окт 14, 18:44    [16667823]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Брокер сообщений с балансировкой нагрузки

Предыдущий пример был завершен лишь наполовину. Брокер может управлять множеством рабочих процессов с фиктивными запросами и ответами, но не может общаться с клиентами. Если мы добавим еще один frontend ROUTER сокет, который принимает запросы клиентов, и превратим наш пример в настоящий прокси, который может передавать сообщения от frontend к backend, мы получим полезный и готовый для практического многократного использования крошечный брокер сообщений с балансировкой нагрузки.

Этот брокер делает следующее:
- принимает соединения от множества клиентов;
- принимает соединения от множества рабочих процессов;
- принимает запросы от клиентов и хранит их в общей очереди;
- отсылает эти запросы рабочим процессам в соответствии со схемой «Балансировка нагрузки»;
- обратно принимает ответы от рабочих процессов;

Исходник получается длинноват, но в нем стоит разобраться.
6 окт 14, 21:22    [16668293]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Вот он:
+ Брокер с балансировкой нагрузки
program LoadBalancingBroker;

{$APPTYPE CONSOLE}
// Брокер с балансировкой нагрузки.
//=================================
// Для упрощения запуска пример реализована как многопоточное приложение,
// с использованием протокола inproc
//=================================
// Имеется клиенты (c_NBR_CLIENTS шт) и рабочие (c_NBR_WORKERS шт).
// Каждый рабочий при запуске сообщает брокеру о своей готовности.
// Рабочий ставится в очередь.
//
// Клиент при запуске обращается к брокеру с заданием. Брокер отправляет
// задание первому свободному рабочему. Рабочий, выполнив задание, возвращает
// результат брокеру, брокер пересылает задание исходному клиенту



uses
  SysUtils, ZMQ, ZMQ_Utils, Windows;
const
  c_NBR_CLIENTS = 5; // Число клиентов
  c_NBR_WORKERS = 3; // // Число рабочих
  c_NMBR_REQ = 2; // // Число запросов от каждого клиента

  // Конечные точки подключения
  c_url_clients = 'inproc://clients';
  c_url_workers = 'inproc://workers';

procedure client_thread_proc(aContext: Pointer);
// Процедура потока клиента
var
  fSocketClient: Pointer;
  fReply: string;
  i: integer;
begin
  fSocketClient := zmq_socket(aContext, ZMQ_REQ);
  // Назначение идентификатора соединения (строка случайных символов)
  s_set_id(fSocketClient);
  zmq_connect(fSocketClient, c_url_clients); // Коннект к брокеру

  for i := 0 to Pred(c_NMBR_REQ) do begin
  // Отправка запроса, получение ответа
    s_send(fSocketClient, 'HELLO');
    fReply := s_recv(fSocketClient);
    z_Log(Format('Client : %s', [fReply]));
  end;
  zmq_close(fSocketClient);
end;


procedure worker_thread_proc(aContext: Pointer);
// Процедура рабочего потока
var
  fSocketWorker: Pointer;
  fIdentity: string;
  fEmpty: string;
  fRequest: string;
begin
  fSocketWorker := zmq_socket(aContext, ZMQ_REQ);
   // Назначение идентификатора соединения
  s_set_id(fSocketWorker);
  zmq_connect(fSocketWorker, c_url_workers); // Коннект к брокеру

  // Соощаем брокеру, что рабочий поток запущен и готов к работе
  s_send(fSocketWorker, 'READY');

  while true do // Рабочий цикл
  begin
    // Читаем и запоминаем все кадры вплоть пустого (fEmpty)
    // В данном примере кадров всего один, но реально их может быть больше
    fIdentity := s_recv(fSocketWorker); // Идентификатор клиента
    if zmq_errno() = ETERM then
      Break; // Уходим, если контекст в процессе завершения

    fEmpty := s_recv(fSocketWorker); // Кадр - разделитель
    Assert(fEmpty = '');
    // Получение запроса, отправка ответа
    fRequest := s_recv(fSocketWorker);
    z_Log(Format('Worker : %s', [fRequest]));

    Sleep(50); // Имитируем выполнение полезной работы

    // Формирование конверта составного сообщения:
    s_send(fSocketWorker, fIdentity, ZMQ_SNDMORE);
      // Идентификатор клиента
    s_send(fSocketWorker, '', ZMQ_SNDMORE); // Разделитель
    s_send(fSocketWorker, 'OK'); // Результат работы
  end;
  zmq_close(fSocketWorker);
end;


var
  fContext: Pointer;
  fSocketClients: Pointer;
  fSocketWorkers: Pointer;
  i: Integer;
  fThrId: Cardinal;
  fAvailableWorkers: Integer;
  fZMQPoll: array[0..1] of pollitem_t;
  fWrkrs_Que: array[0..Pred(c_NBR_WORKERS)] of string; // Очередь рабочих
  fRC: Integer;
  fWorker_id: string;
  fClient_id: string;
  fEmpty: string;
  fReplay: string;
  fRequest: string;
  fCliReqNmbr: Integer; // Номер клиентского запроса
  fThrWIds: array[0..Pred(c_NBR_WORKERS)] of Cardinal;
  fThrCIds: array[0..Pred(c_NBR_CLIENTS)] of Cardinal;
begin
 // Подготовка контекста и сокетов
  fContext := zmq_ctx_new();
  fSocketClients := zmq_socket(fContext, ZMQ_ROUTER);
  fSocketWorkers := zmq_socket(fContext, ZMQ_ROUTER);
  zmq_bind(fSocketClients, c_url_clients);
  zmq_bind(fSocketWorkers, c_url_workers);

  for i := 0 to Pred(c_NBR_WORKERS) do // Создаем рабочих
    fThrWIds[i] := BeginThread(nil, 0, @worker_thread_proc, fContext, 0, fThrId);

  for i := 0 to Pred(c_NBR_CLIENTS) do // Создаем клиентов
    fThrCIds[i] := BeginThread(nil, 0, @client_thread_proc, fContext, 0, fThrId);


// Главный цикл для LRU очереди. Используется два сокета: fSocketClients для
// клиентов и fSocketWorkers для рабочих. Опрос fSocketWorkers
// выполняется всегда, а fSocketClients - только тогда, когда есть один или
// больше готовых рабочих.
// Сообщения, которые еще не готовы к обработке, в ZMQ хранятся
// во встроенной очередей сообщений.
// Когда мы получаем запрос клиента, мы берем рабочего из начала
// очереди (fWrkrs_Que[0]) и посылаем ему запрос, которых включаеи исходный
// идентификатор клиента.
// Когда же приходит запрос от рабочего, этого рабочего ставим в конец очереди,
// а ответ переправляем исходному клиенту (используя идинтификатор в конверте).

  fAvailableWorkers := 0; // Число доступных рабочих

  fCliReqNmbr := 0;

  // Подготовка пула сокетов
  fZMQPoll[0].socket := fSocketWorkers;
  fZMQPoll[0].fd := 0;
  fZMQPoll[0].events := ZMQ_POLLIN;
  fZMQPoll[1].socket := fSocketClients;
  fZMQPoll[1].fd := 0;
  fZMQPoll[1].events := ZMQ_POLLIN;

  while fCliReqNmbr < c_NBR_CLIENTS * c_NMBR_REQ do begin

    fZMQPoll[0].revents := 0; // Сброс результатов опроса
    fZMQPoll[1].revents := 0;


    // Читать из fSocketClients только тогда, когда есть свободные рабочие
    // Если нет свободных - читать только из fSocketWorkers
    if fAvailableWorkers = 0 then
      fRC := zmq_poll(fZMQPoll[0], 1, 11)
        // Только ждем готовности от рабочих
    else
      fRC := zmq_poll(fZMQPoll[0], 2, 11); // Читаем оба сокета

    if fRC = -1 then
      Break; // Цикл прерван

    // Обрабока действий рабочих на fSocketWorkers
    if (fZMQPoll[0].revents and ZMQ_POLLIN) <> 0 then begin
      fWorker_id := s_recv(fSocketWorkers);
      Assert(fAvailableWorkers < c_NBR_WORKERS);
      // Помещаем рабочего в конец очереди
      fWrkrs_Que[fAvailableWorkers] := fWorker_id;
      Inc(fAvailableWorkers);

      // Второй кадр - пустой
      fEmpty := s_recv(fSocketWorkers);
      assert(fEmpty = '');

      // Третий кадр - готовность ("READY"), иначе это Id клиента в ответе
      fClient_id := s_recv(fSocketWorkers);

      // Если это ответ клиенту, отправить ответ в fSocketClients
      if fClient_id <> 'READY' then begin
        fEmpty := s_recv(fSocketWorkers);
        Assert(fEmpty = '');
        fReplay := s_recv(fSocketWorkers);
        s_send(fSocketClients, fClient_id, ZMQ_SNDMORE);
        s_send(fSocketClients, '', ZMQ_SNDMORE);
        s_send(fSocketClients, fReplay);
        Inc(fCliReqNmbr); // Число обслуженных запросов

      end;
    end;

    // Обработка запросов клиентов:
    if (fZMQPoll[1].revents and ZMQ_POLLIN) <> 0 then
    begin
      // Получаем очередной клиентский запрос, отправляем его рабочему из
      // начала очереди
      // Конверт запроса клиента: [identity][empty][request]
      fClient_id := s_recv(fSocketClients);
      fEmpty := s_recv(fSocketClients);
      Assert(fEmpty = '');
      fRequest := s_recv(fSocketClients);

      s_send(fSocketWorkers, fWrkrs_Que[0], ZMQ_SNDMORE);
      s_send(fSocketWorkers, '', ZMQ_SNDMORE);
      s_send(fSocketWorkers, fClient_id, ZMQ_SNDMORE);
      s_send(fSocketWorkers, '', ZMQ_SNDMORE);
      s_send(fSocketWorkers, fRequest);

      // Извлечение из очереди
      Dec(fAvailableWorkers);
      for i := 0 to Pred(fAvailableWorkers) do
        fWrkrs_Que[i] := fWrkrs_Que[i + 1];
    end;
  end;

  for I := 0 to High(fThrCIds) do // Ждем завершения клиентов
    WaitForSingleObject(fThrCIds[i], INFINITE);

  zmq_close(fSocketClients);
  zmq_close(fSocketWorkers);
  zmq_ctx_destroy(fContext);

//  for I := 0 to High(fThrWIds) do
//    WaitForSingleObject(fThrWIds[i], INFINITE);

  Readln;

end.


Исходник избыточно комментирован, поэтому добавлю только пару замечаний.

1. Обратить внимание на цикл рабочего потока worker_thread_proc():

  while true do // Рабочий цикл
  begin
    // Читаем и запоминаем все кадры вплоть пустого (fEmpty)
    // В данном примере кадров всего один, но реально их может быть больше
    fIdentity := s_recv(fSocketWorker); // Идентификатор клиента
    if zmq_errno() = ETERM then
      Break; // Уходим, если контекст в процессе завершения

Так как для связи между потоками используется протокол inproc, необходимо использовать общий контекст, который передается как параметр в процедуру потока.
Так вот, когда основной поток завершается, выполняется закрытие контекста ZMQ:
  zmq_ctx_destroy(fContext);

Следовательно, обращение к сокетам контекста вызовет ошибку ETERM, что в данном случае считается признаком необходимости завершения основного цикла процедуры потока.

2. Основным источником задач в примере является процедура потока клиентов client_thread_proc(). Следовательно, при завершении приложения есть смысл дождаться завершения потоков клиентского слоя. Для этого при создании потоков слоя клиентов запоминаются дескрипторы потоков, а при завершении процедуры выполняется ожидание завершения всех потоков клиентов:
var
...
  fThrCIds: array[0..Pred(c_NBR_CLIENTS)] of Cardinal;
...
begin
...
  for i := 0 to Pred(c_NBR_CLIENTS) do // Создаем клиентов
    fThrCIds[i] := BeginThread(nil, 0, @client_thread_proc, fContext, 0, fThrId);
...
...
...
  for I := 0 to High(fThrCIds) do // Ждем завершения клиентов
    WaitForSingleObject(fThrCIds[i], INFINITE);

  zmq_close(fSocketClients);
  zmq_close(fSocketWorkers);
  zmq_ctx_destroy(fContext);
9 окт 14, 13:01    [16681363]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Немного поясню.

В алгоритме две сложные вещи:
а) упаковка сообщений в конверты при каждом чтении и записи;
б) сам алгоритм балансировки нагрузки


Рассмотрим весь путь сообщения схемы "Запрос - от клиента до рабочего процесса и обратно. В коде есть вызовы
  
s_set_id(fSocketClient);
и
s_set_id(fSocketWorker);

- это установка идентичности сокетов в форме набора случайных символов в строках. Такая форма используется исключительно для облегчения процесса отслеживания сообщений. В реальности, можно не задавать идентификацию самому, а позволить это сделать сокету ROUTER.

Предположим, что идентификатором клиента будет строка "CLIENT", а рабочего - "WORKER". Предположим, что клиентское приложение посылает один кадр, содержащий "Hello".

Сообщение, отсылаемое клиентом:
№ кадраДлинаСодержаниеОписание
15Hello Кадр с данными


Сокет REQ сам добавляет в начало пустой кадр - разделитель. Далее, сокет ROUTER добавляет идентификатор соединения. В итоге прокси читает уже адрес (идентификатор) клиента, пустой кадр и данные:

Сообщение, которое читает прокси с фронтэнд сокета ROUTER:
№ кадраДлинаСодержаниеОписание
15ClientИдентификатор клиента
20Пустой кадр - разделитель
35Hello Кадр с данными

Брокер отправляет все это рабочему, предварив двумя кадрами: идентификаторо рабочего и разделителем.

Сообщение, которое брокер отправляет в бэкэнд сокета ROUTER:
№ кадраДлинаСодержаниеОписание
16WorkerИдентификатор рабочего
20Пустой кадр - разделитель
35ClientИдентификатор клиента
40Пустой кадр - разделитель
55Hello Кадр с данными


Этот конверт распаковывается сначала бэкенд - сокетом ROUTER, который удаляет первый кадр и отправляет все сокету REQ рабочего. Потом сокет REQ рабочего удаляет пустой кадр, а остаток передает в приложение рабочего потока:

Сообщение, которое получает рабочий:
№ кадраДлинаСодержаниеОписание
15ClientИдентификатор клиента
20Пустой кадр - разделитель
35Hello Кадр с данными


Рабочий сохраняет все части сообщения, что-то делает с данными и возвращает результат.
На обратном пути сообщение проходит те же самые этапы, то есть, например, на бэкэнд сокет брокеру передается сообщение из пяти частей, а брокер отправляет в фронтенд сокет сообщение из трех частей, в итоге клиент получает сообщение из одной части.

~~~~~~~~~~~~~~~~~

Теперь по поводу алгоритма балансировки нагрузки.

Со стороны и клиента, и рабочего используются сокеты REQ. Рабочий должен корректно получить, запомнить и правильно вернуть получаемые конверты . Алгоритм:

- создается пул сокетов. Пул всегда содержит бэкэнд сокет (сторона рабочего), и, если есть хоть один свободный рабочий - фронтэнд сокет (сторона клиента).

- пул активируется с бесконечным таймаутом;

- если есть активность со стороны бэкэнда (рабочего), мы читаем либо сообщение "READY", либо идентификатор клиента, для которого рабочий отправил сообщение. В обоих случаях мы помещаем адрес рабочего (первый кадр конверта) в конец очереди свободных рабочих, а оставшуюся часть (если это ответ клиенту) отправляем через фронтенд сокет клиенту.

- если есть активность со стороны фронтэнда (клиент), мы берем запрос клиента, извлекаем из очереди адрес очередного рабочего (который использовался последним), и отправляем запрос в бэкэнд (рабочему). В конверте теперь первый кадр - адрес рабочего, потом разделитель, потом все три части клиентского запроса.

Данная схема легко расширяется. Например, в процессе работы работники могли бы запускать счетчики производительности для оценки собственного быстродействия и отчитываться брокеру о результатах. А брокер мог бы выбирать самого быстрого из работников, использованных последними.
9 окт 14, 14:20    [16682003]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
ZeroMQ, переходим на последнюю версию: 4.0.4

Здесь: 16562018 было сказано, что биндинг реализован для версий 2.* и 3.*.

Хотя уже год как есть 4я версия. В которой реализованы такие интересные вещи, как новый протокол уровня передачи, могут использоваться криптографические библиотеки и аутентификация коннекта а также добавлен новый тип сокета - ZMQ_STREAM (для работы в качестве TCP клиента или сервера).

Вот файлик, реализующий интерфейсы библиотеки libzmq.dll (используем его в uses вместо старого zmq.pas) :

К сообщению приложен файл (zmq_h.7z - 3Kb) cкачать
13 окт 14, 23:45    [16699729]     Ответить | Цитировать Сообщить модератору
Топик располагается на нескольких страницах: Ctrl  назад   1 2 [3] 4 5 6 7 8 9 10 11   вперед  Ctrl      все
Все форумы / Delphi Ответить