Добро пожаловать в форум, Guest  >>   Войти | Регистрация | Поиск | Правила | В избранное | Подписаться
Все форумы / Delphi Новый топик    Ответить
Топик располагается на нескольких страницах: Ctrl  назад   1 2 3 [4] 5 6 7 8 9 10 11   вперед  Ctrl      все
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
libzmq.dll была откомпилирована с помощью MS VS 2012 Update 4, с указанием, что целевая платформа - Windows XP (v110_XP).
То есть, работать будет на Windows XP и более новых.

Приложенные к библиотеке тесты отработали без проблем, и все примеры, рассмотренные ранее, также работают.

К сообщению приложен файл. Размер - 55Kb
13 окт 14, 23:52    [16699737]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Сама библиотека. Распаковать и положить возле приложения. Больше для работы ничего не нужно.

К сообщению приложен файл (libzmq.7z - 72Kb) cкачать
13 окт 14, 23:54    [16699740]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Вернется к брокеру с балансировкой нагрузки (с учетом перехода на версию ZeroMQ 4.0.4): 16681363
+ Брокер с балансировкой нагрузки
program LoadBalancingBroker;

{$APPTYPE CONSOLE}
// Брокер с балансировкой нагрузки.
//=================================
// Для упрощения запуска пример реализована как многопоточное приложение,
// с использованием протокола inproc
//=================================
// Имеется клиенты (c_NBR_CLIENTS шт) и рабочие (c_NBR_WORKERS шт).
// Каждый рабочий при запуске сообщает брокеру о своей готовности.
// Рабочий ставится в очередь.
//
// Клиент при запуске обращается к брокеру с заданием. Брокер отправляет
// задание первому свободному рабочему. Рабочий, выполнив задание, возвращает
// результат брокеру, брокер пересылает задание исходному клиенту

uses
//  Fastmm4,
  Windows,
  SysUtils,
  zmq_h,
  ZMQ_Utils;

const
  c_NBR_CLIENTS = 5; // Число клиентов
  c_NBR_WORKERS = 3; // // Число рабочих
  c_NMBR_REQ = 2; // // Число запросов от каждого клиента

  // Конечные точки подключения
  c_url_clients = 'inproc://clients';
  c_url_workers = 'inproc://workers';

procedure client_thread_proc(aContext: Pointer);
// Процедура потока клиента
var
  fSocketClient: Pointer;
  fReply: string;
  i: integer;
begin
  fSocketClient := zmq_socket(aContext, ZMQ_REQ);
  // Назначение идентификатора соединения (строка случайных символов)
  s_set_id(fSocketClient);
  zmq_connect(fSocketClient, c_url_clients); // Коннект к брокеру

  for i := 0 to Pred(c_NMBR_REQ) do begin
  // Отправка запроса, получение ответа
    s_send(fSocketClient, 'HELLO');
    fReply := s_recv(fSocketClient);
    z_Log(Format('Client : %s', [fReply]));
  end;
  zmq_close(fSocketClient);
end;


procedure worker_thread_proc(aContext: Pointer);
// Процедура рабочего потока
var
  fSocketWorker: Pointer;
  fIdentity: string;
  fEmpty: string;
  fRequest: string;

begin
  fSocketWorker := zmq_socket(aContext, ZMQ_REQ);
   // Назначение идентификатора соединения
  s_set_id(fSocketWorker);
  zmq_connect(fSocketWorker, c_url_workers); // Коннект к брокеру

  // Соощаем брокеру, что рабочий поток запущен и готов к работе
  s_send(fSocketWorker, 'READY');

  while true do // Рабочий цикл
  begin
    // Читаем и запоминаем все кадры вплоть пустого (fEmpty)
    // В данном примере кадров всего один, но реально их может быть больше
    fIdentity := s_recv(fSocketWorker); // Идентификатор клиента
    if zmq_errno() = ETERM then
      Break; // Уходим, если контекст в процессе завершения

    fEmpty := s_recv(fSocketWorker); // Кадр - разделитель
    Assert(fEmpty = '');
    // Получение запроса, отправка ответа
    fRequest := s_recv(fSocketWorker);
    z_Log(Format('Worker : %s', [fRequest]));

    Sleep(5); // Имитируем выполнение полезной работы

    // Формирование конверта составного сообщения:
    s_send(fSocketWorker, fIdentity, ZMQ_SNDMORE);
      // Идентификатор клиента
    s_send(fSocketWorker, '', ZMQ_SNDMORE); // Разделитель
    s_send(fSocketWorker, 'OK'); // Результат работы
  end;
  zmq_close(fSocketWorker);
end;

procedure thread_proc(aContext: Pointer); cdecl;
var
  fVal: Integer;
begin
  fVal := Integer(aContext^);
  z_Log(IntToStr(fVal));
end;


var
  fContext: Pointer;
  fSocketClients: Pointer;
  fSocketWorkers: Pointer;
  i: Integer;
  fThrId: Cardinal;
  fAvailableWorkers: Integer;
  fZMQPoll: array[0..1] of zmq_pollitem_t;
  fWrkrs_Que: array[0..Pred(c_NBR_WORKERS)] of string; // Очередь рабочих
  fRC: Integer;
  fWorker_id: string;
  fClient_id: string;
  fEmpty: string;
  fReplay: string;
  fRequest: string;
  fCliReqNmbr: Integer; // Номер клиентского запроса
  fThrWIds: array[0..Pred(c_NBR_WORKERS)] of Cardinal;
  fThrCIds: array[0..Pred(c_NBR_CLIENTS)] of Cardinal;
  fP: Pointer;
begin
 // Подготовка контекста и сокетов
  fContext := zmq_ctx_new();
  fSocketClients := zmq_socket(fContext, ZMQ_ROUTER);
  fSocketWorkers := zmq_socket(fContext, ZMQ_ROUTER);
  zmq_bind(fSocketClients, c_url_clients);
  zmq_bind(fSocketWorkers, c_url_workers);

  for i := 0 to Pred(c_NBR_WORKERS) do // Создаем рабочих
    fThrWIds[i] := BeginThread(nil, 0, @worker_thread_proc, fContext, 0, fThrId);

  for i := 0 to Pred(c_NBR_CLIENTS) do // Создаем клиентов
    fThrCIds[i] := BeginThread(nil, 0, @client_thread_proc, fContext, 0, fThrId);


// Главный цикл для LRU очереди. Используется два сокета: fSocketClients для
// клиентов и fSocketWorkers для рабочих. Опрос fSocketWorkers
// выполняется всегда, а fSocketClients - только тогда, когда есть один или
// больше готовых рабочих.
// Сообщения, которые еще не готовы к обработке, в ZMQ хранятся
// во встроенной очередей сообщений.
// Когда мы получаем запрос клиента, мы берем рабочего из начала
// очереди (fWrkrs_Que[0]) и посылаем ему запрос, которых включаеи исходный
// идентификатор клиента.
// Когда же приходит запрос от рабочего, этого рабочего ставим в конец очереди,
// а ответ переправляем исходному клиенту (используя идинтификатор в конверте).

  fAvailableWorkers := 0; // Число доступных рабочих

  fCliReqNmbr := 0;

  // Подготовка пула сокетов
  fZMQPoll[0].socket := fSocketWorkers;
  fZMQPoll[0].fd := 0;
  fZMQPoll[0].events := ZMQ_POLLIN;
  fZMQPoll[1].socket := fSocketClients;
  fZMQPoll[1].fd := 0;
  fZMQPoll[1].events := ZMQ_POLLIN;

  while fCliReqNmbr < c_NBR_CLIENTS * c_NMBR_REQ do begin

    fZMQPoll[0].revents := 0; // Сброс результатов опроса
    fZMQPoll[1].revents := 0;


    // Читать из fSocketClients только тогда, когда есть свободные рабочие
    // Если нет свободных - читать только из fSocketWorkers
    if fAvailableWorkers = 0 then
      fRC := zmq_poll(@fZMQPoll, 1, 11)
        // Только ждем готовности от рабочих
    else
      fRC := zmq_poll(@fZMQPoll, 2, 11); // Читаем оба сокета

    if fRC = -1 then
      Break; // Цикл прерван

    // Обработка действий рабочих на fSocketWorkers
    if (fZMQPoll[0].revents and ZMQ_POLLIN) <> 0 then begin
      fWorker_id := s_recv(fSocketWorkers);
      Assert(fAvailableWorkers < c_NBR_WORKERS);
      // Помещаем рабочего в конец очереди
      fWrkrs_Que[fAvailableWorkers] := fWorker_id;
      Inc(fAvailableWorkers);

      // Второй кадр - пустой
      fEmpty := s_recv(fSocketWorkers);
      assert(fEmpty = '');

      // Третий кадр - готовность ("READY"), иначе это Id клиента в ответе
      fClient_id := s_recv(fSocketWorkers);

      // Если это ответ клиенту, отправить ответ в fSocketClients
      if fClient_id <> 'READY' then begin
        fEmpty := s_recv(fSocketWorkers);
        Assert(fEmpty = '');
        fReplay := s_recv(fSocketWorkers);
        s_send(fSocketClients, fClient_id, ZMQ_SNDMORE);
        s_send(fSocketClients, '', ZMQ_SNDMORE);
        s_send(fSocketClients, fReplay);
        Inc(fCliReqNmbr); // Число обслуженных запросов

      end;
    end;

    // Обработка запросов клиентов:
    if (fZMQPoll[1].revents and ZMQ_POLLIN) <> 0 then
    begin
      // Получаем очередной клиентский запрос, отправляем его рабочему из
      // начала очереди
      // Конверт запроса клиента: [identity][empty][request]
      fClient_id := s_recv(fSocketClients);
      fEmpty := s_recv(fSocketClients);
      Assert(fEmpty = '');
      fRequest := s_recv(fSocketClients);

      s_send(fSocketWorkers, fWrkrs_Que[0], ZMQ_SNDMORE);
      s_send(fSocketWorkers, '', ZMQ_SNDMORE);
      s_send(fSocketWorkers, fClient_id, ZMQ_SNDMORE);
      s_send(fSocketWorkers, '', ZMQ_SNDMORE);
      s_send(fSocketWorkers, fRequest);

      // Извлечение из очереди
      Dec(fAvailableWorkers);
      for i := 0 to Pred(fAvailableWorkers) do
        fWrkrs_Que[i] := fWrkrs_Que[i + 1];
    end;
  end;

  for I := 0 to High(fThrCIds) do // Ждем завершения клиентов
    WaitForSingleObject(fThrCIds[i], INFINITE);
  zmq_threadstart(thread_proc, @i);
  Sleep(1000);
  zmq_close(fSocketClients);
  zmq_close(fSocketWorkers);
  zmq_ctx_destroy(fContext);

//  for I := 0 to High(fThrWIds) do
//    WaitForSingleObject(fThrWIds[i], INFINITE);

  Readln;

end.


Как видим, кода довольно много, чтобы быстро во всем разобраться. Это связано с тем, что использовались вызовы API ZMQ низкого уровня.

Вот основной рабочий цикл:
+ Основной рабочий цикл брокера


  while fCliReqNmbr < c_NBR_CLIENTS * c_NMBR_REQ do begin

    fZMQPoll[0].revents := 0; // Сброс результатов опроса
    fZMQPoll[1].revents := 0;


    // Читать из fSocketClients только тогда, когда есть свободные рабочие
    // Если нет свободных - читать только из fSocketWorkers
    if fAvailableWorkers = 0 then
      fRC := zmq_poll(@fZMQPoll, 1, 11)
        // Только ждем готовности от рабочих
    else
      fRC := zmq_poll(@fZMQPoll, 2, 11); // Читаем оба сокета

    if fRC = -1 then
      Break; // Цикл прерван

    // Обрабока действий рабочих на fSocketWorkers
    if (fZMQPoll[0].revents and ZMQ_POLLIN) <> 0 then begin
      fWorker_id := s_recv(fSocketWorkers);
      Assert(fAvailableWorkers < c_NBR_WORKERS);
      // Помещаем рабочего в конец очереди
      fWrkrs_Que[fAvailableWorkers] := fWorker_id;
      Inc(fAvailableWorkers);

      // Второй кадр - пустой
      fEmpty := s_recv(fSocketWorkers);
      assert(fEmpty = '');

      // Третий кадр - готовность ("READY"), иначе это Id клиента в ответе
      fClient_id := s_recv(fSocketWorkers);

      // Если это ответ клиенту, отправить ответ в fSocketClients
      if fClient_id <> 'READY' then begin
        fEmpty := s_recv(fSocketWorkers);
        Assert(fEmpty = '');
        fReplay := s_recv(fSocketWorkers);
        s_send(fSocketClients, fClient_id, ZMQ_SNDMORE);
        s_send(fSocketClients, '', ZMQ_SNDMORE);
        s_send(fSocketClients, fReplay);
        Inc(fCliReqNmbr); // Число обслуженных запросов

      end;
    end;

    // Обработка запросов клиентов:
    if (fZMQPoll[1].revents and ZMQ_POLLIN) <> 0 then
    begin
      // Получаем очередной клиентский запрос, отправляем его рабочему из
      // начала очереди
      // Конверт запроса клиента: [identity][empty][request]
      fClient_id := s_recv(fSocketClients);
      fEmpty := s_recv(fSocketClients);
      Assert(fEmpty = '');
      fRequest := s_recv(fSocketClients);

      s_send(fSocketWorkers, fWrkrs_Que[0], ZMQ_SNDMORE);
      s_send(fSocketWorkers, '', ZMQ_SNDMORE);
      s_send(fSocketWorkers, fClient_id, ZMQ_SNDMORE);
      s_send(fSocketWorkers, '', ZMQ_SNDMORE);
      s_send(fSocketWorkers, fRequest);

      // Извлечение из очереди
      Dec(fAvailableWorkers);
      for i := 0 to Pred(fAvailableWorkers) do
        fWrkrs_Que[i] := fWrkrs_Que[i + 1];
    end;
  end;


Громоздко, да. И это мы еще использовали наши вспомогательные процедурки вроде s_recv()/s_send(), а без них бы пришлось пересылать сообщения ZMQ и заниматься упаковкой-распоковкой данных.

В общем, если использовать низкуровневое API в сложных задачах, то и кодировать долго, и потом разбираться непросто.

Нужно переходить к более высоким уровням абстракции.

Ранее упоминалась библиотека Delphi: https://github.com/bvarga/delphizmq

Из этой библиотеки мы использовали только модуль zmq.pas, который и обеспечивает API низкого уровня. Модуль zmqapi.pas предоставляет объектный интерфейс более высокого уровня в соответствии видением прекрвсного с создателя библиотеки и, надо полагать, в соответствии с задачами, которые стояли перед ним в момент написания.

К сожалению, больше года библиотека почти не обновляется, и зависла на поддержке ZeroMQ версий 2.* и 3.*.
~~~~~~~~~~~~

К счастью, выход есть: iMatrix (контора, которая и разрабатывает ZeroMQ) создала и развивает библиотеку API высокого уровня: http://czmq.zeromq.org/
24 окт 14, 02:11    [16751832]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
High Level API: http://czmq.zeromq.org/

Вот так на Delphi будет выглядеть "Hello, Worrld!"
program hl_HelloWorld;

{$APPTYPE CONSOLE}

uses
  SysUtils,
  czmq_h;

var
  push: p_zsock_t;
  pull: p_zsock_t;
  fpStr: PChar;
begin

  push := zsock_new_push('inproc://example');
  pull := zsock_new_pull('inproc://example');

  zstr_send(push, 'Hello, World!');

  fpStr := zstr_recv(pull);
  Writeln(fpStr);

  zstr_free(fpStr);

  zsock_destroy(pull);
  zsock_destroy(push);

  Readln;
end.

Здесь пара сокетов: PUSH и PULL, взаимодействующих между собой по inproc - протоколу.
В сокет push посылается строка 'Hello, World!', которая принимается из сокета pull.

Как видим, строка здесь - "Сишная", то есть наш привычный PChar:
var
...
  fpStr: PChar;


Так сделано из-за того, что и эта библиотека (как и libzmq.dll) реализована в виде dll.
Более того, чтобы избежать возможных проблем с менеджером памяти, освобождение памяти, выделенной для строки, также выполняется специальным методом:

  zstr_free(fpStr);


Т.обр, не требуется создание/освобождение контекста, настройки сокетов, биндингов, коннектов и всей возни с упаковкой/распаковкой строк.

К сообщению приложены файлы библиотеки czmq.dll, libzmq.dll, скомпилированные с помощью MS VS 2012 U4 (должны работать на Win XP и более новых).

О том, как использовать эту красоту, расскажу чуть позже.

К сообщению приложен файл (bin.7z - 125Kb) cкачать
24 окт 14, 03:03    [16751865]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Использование.

Для использования CZMQ следует подключить файл czmq.h, приложенный к сообщению.

При импорте .h файлов я старался избегать использования устаревших (deprecated) элементов, однако впоследствии, в процессе изучения ZMQ по не очень новым руководствам, пришлось кое-что добавить.

Возможно, позднее файл биндинга будет изменен.

К сообщению приложен файл (ZMQ_4.7z - 33Kb) cкачать
24 окт 14, 03:27    [16751887]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
PPA
Member

Откуда: Караганда -> Липецк
Сообщений: 805
ZeroMQ
High Level API: http://czmq.zeromq.org/
К сообщению приложены файлы библиотеки czmq.dll, libzmq.dll, скомпилированные с помощью MS VS 2012 U4 (должны работать на Win XP и более новых).
О том, как использовать эту красоту, расскажу чуть позже.


zbeacon использовал? про нее будет рассказ?

http://czmq.zeromq.org/manual:zbeacon
24 окт 14, 08:51    [16752072]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
PPA,

обязательно будет.
24 окт 14, 15:08    [16754360]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Особенности High-level API CZMQ:

Автоматизация обслуживания сокетов. К примеру, при закрытии контекста все его сокеты обработка будут также закрываться. При этом в некоторых случаях для сокетов можно назначить таймайт.

Кроссплатформенное управление тредами.

Передача потоков сообщений от родительских тредов к дочерним. (При этом автоматически будут использоваться сокеты PAIR по протоколу inproc).

Кроссплатформенный доступ к системным часам.

Специальный реактор для замены zmq_poll(). Цикл опроса прост, громоздок. Каждый раз приходится писать один и тот же код: обсчет таймеров и вызов процедур обработки (ридеров) по мере готовности сокетов. Простой реактор с ридерами сокетов и таймерами позволит сократить время написания цикла обработки.

Правильная обработка нажатия Ctrl-C для консольных приложений.
...
Кроме того, в CZMQ версии 3.* добавлены:

Класс zsock, который работает вообще без контекста и с конструкторами, совмещающими операции создания и коннекта/биндинга.
Класс zactor для multithreaded - разработки.
Класс zgossip для исследования конфигурации сети.
Класс zrex для регулярок.
Функций управления процессами - zsys.
29 окт 14, 20:37    [16774869]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Вариант брокера с балансировкой нагрузки, реализованного с помощью High-level API CZMQ:

+ Брокер с балансировкой нагрузки & CZMQ

program hl_LoadBalancingBroker;

{$APPTYPE CONSOLE}
// Брокер с балансировкой нагрузки.
// Демонстрируется использование CZMQ и реактора
//==============================================
// Имеется клиенты (c_NBR_CLIENTS шт) и рабочие (c_NBR_WORKERS шт).
// Каждый рабочий при запуске сообщает брокеру о своей готовности (c_WORKER_READY).
// Рабочий ставится в очередь.
//
// Клиент при запуске обращается к брокеру с заданием. Брокер отправляет
// задание первому свободному рабочему. Рабочий, выполнив задание, возвращает
// результат брокеру, брокер пересылает результат клиенту - заказчику.

uses
  SysUtils
  , zmq_h
  , czmq_h
  , ZMQ_Utils
  ;

const
  c_NBR_CLIENTS = 10; // Число клиентов
  c_NBR_WORKERS = 3; // // Число рабочих

  // Конечные точки подключения
  c_url_clients = 'tcp://%s:5555'; //'inproc://clients';
  c_url_workers = 'tcp://%s:5556'; //'inproc://workers';
  c_domain = 'localhost'; // Сетевой адрес брокера
  c_interf: string = '*'; // Адрес для биндинга

  c_WORKER_READY: byte = 1; // Сигнал готовности рабочего


function client_task(args: Pointer): Pointer; cdecl;
// Функция треда клиента
var
  client: Pointer;
  ctx: p_zctx_t;
  reply: PChar;
begin
  ctx := args; // zctx_new();
  client := zsocket_new(ctx, ZMQ_REQ);
  zsocket_connect(client, c_url_clients, c_domain);
// Запрос - ответ
  while true do begin
    zstr_send(client, 'HELLO');
    reply := zstr_recv(client);
    if (reply = nil) then
      break;
    z_Log('Client: ' + reply);
    zstr_free(reply);
    sleep(1);
  end;
//  zctx_destroy(ctx);
  Result := nil;
end;

function worker_task(args: Pointer): Pointer; cdecl;
// Функция треда рабочего
var
  ctx: p_zctx_t;
  frame: p_zframe_t;
  msg: p_zmsg_t;
  worker: Pointer;
begin
  ctx := args; // zctx_new();
  worker := zsocket_new(ctx, ZMQ_REQ);
  zsocket_connect(worker, c_url_workers, c_domain);

// Сообщаем брокеру о готовности работать
  frame := zframe_new(@c_WORKER_READY, 1);
  zframe_send(frame, worker, 0);

// ОБработка сообщений по мере их получения
  while True do begin
    msg := zmsg_recv(worker);
    if msg = nil then
      break; // Interrupted
    zframe_reset(zmsg_last(msg), PChar('OK'), 2);
    zmsg_send(msg, worker);
  end;
//  zctx_destroy(ctx);
  Result := nil;
end;


// Структура, передаваемая в реактор
type
  p_lbbroker_t = ^lbbroker_t;
  lbbroker_t = packed record
    frontend: Pointer; // Сокет - слушать клиентов
    backend: Pointer; // Сокет - слушать рабочих
    workers: p_zlist_t; // Список свободных рабочих
  end;

// Устройство реактора таково, что все сообщения, приходящие в сокет,
// передаются ректором в функцию обработки. У нас - два обработчика:
// для фронтэнда (клиенты) и для бэкэнда (рабочие)

// Обработка ввода от клиентjd (на фронтэнд)

function s_handle_frontend(loop: p_zloop_t; poller: p_zmq_pollitem_t; arg: Pointer):
  Integer; cdecl;
var
  msg: p_zmsg_t;
  self: p_lbbroker_t;
begin
  self := p_lbbroker_t(arg);
  msg := zmsg_recv(self.frontend); // Сообщение от клиента
  if msg <> nil then begin
    zmsg_wrap(msg, p_zframe_t(zlist_pop(self.workers)));
    zmsg_send(msg, self.backend);
// Завершение обработчика, если нет доступных рабочих
    if zlist_size(self.workers) = 0 then begin
      poller.socket := self.frontend;
      poller.fd := 0;
      poller.events := ZMQ_POLLIN;
      poller.revents := 0;
      zloop_poller_end(loop, poller);
    end;
  end;
  Result := 0;
end;

// Обработка ввода от рабочих (на бэкэнд)

function s_handle_backend(loop: p_zloop_t; poller: p_zmq_pollitem_t; arg: Pointer):
  Integer; cdecl;
var
  frame: p_zframe_t;
  identity: p_zframe_t;
  msg: p_zmsg_t;
  self: p_lbbroker_t;
begin
// Для балансировки нагрузки снова используем идентификацию
  self := p_lbbroker_t(arg);
  msg := zmsg_recv(self.backend);
  if msg <> nil then begin
    identity := zmsg_unwrap(msg);
    zlist_append(self.workers, identity);


    if zlist_size(self.workers) = 1 then begin
    // Разрешить ридер фронтэнда
      poller.socket := self.frontend;
      poller.fd := 0;
      poller.events := ZMQ_POLLIN;
      poller.revents := 0;
      zloop_poller(loop, poller, @s_handle_frontend, self);
    end;
// Переброска сообщения клиенту, если это не "ГОТОВ".
    frame := zmsg_first(msg);
    if CompareMem(zframe_data(frame), @c_WORKER_READY, 1) then
      zmsg_destroy(msg)
    else
      zmsg_send(msg, self.frontend);
  end;
  result := 0;
end;

// Основной тред, порождающий дочерние, запускающий затем реактор.
// Если нажать Ctrl-C, реактор завершится и главный тред тоже завершится.

procedure DoMain;
var
  ctx: p_zctx_t;
  frame: p_zframe_t;
  i: Integer;
  poller: zmq_pollitem_t;
  reactor: p_zloop_t;
  self: p_lbbroker_t;
begin
  ctx := zctx_new(); // Контекст
  New(self); // Данные реактора

  self.frontend := zsocket_new(ctx, ZMQ_ROUTER); // Сокеты реактора
  self.backend := zsocket_new(ctx, ZMQ_ROUTER);

  zsocket_bind(self.frontend, c_url_clients, c_interf);
    // Привязка к интерфейсу
  zsocket_bind(self.backend, c_url_workers, c_interf);

  for i := 0 to pred(c_NBR_CLIENTS) do // Запуск тредов клиентов
    zthread_new(@client_task, ctx);
  for i := 0 to Pred(c_NBR_WORKERS) do // Запуск тредов рабочих
    zthread_new(@worker_task, ctx);

// Очередь доступных рабочих
  self.workers := zlist_new();

// Подготовка и запуск реактора
  reactor := zloop_new();

  poller.socket := self.backend;
  poller.fd := 0;
  poller.events := ZMQ_POLLIN;
  poller.revents := 0;

  zloop_poller(reactor, @poller, @s_handle_backend, self);
  zloop_start(reactor);
  zloop_destroy(reactor);

// Аккуратно завершаем все при выходе
  while zlist_size(self.workers) > 0 do begin
    frame := zlist_pop(self.workers);
    zframe_destroy(frame);
  end;
  zlist_destroy(self.workers);
  zctx_destroy(ctx);
  Dispose(self);
end;

begin
  DoMain;
  Readln;
end.



"Чистый" ZeroMQ в случае прерывания (по Ctrl+C) вернет код завершения операции -1 и установит код ошибка EINTR.
Здесь же (в CZMQ) операция просто вернет nil:

  while true do begin
    zstr_send(client, 'HELLO');
    reply := zstr_recv(client);
    if (reply = nil) then
      break;
    z_Log('Client: ' + reply);
    zstr_free(reply);
    sleep(1);
  end;


Если используем zmq_poll() (16619033), то следим за кодом завершения:

if  zmq_poll (items, 2, 1000 * 1000) = -1 then
  break; // Прерывание


Но в данном примере мы используем РЕАКТОР CZMQ.

Вот что он позволяет.

Любому сокету может быть назначить ридер: код, который вызывается всякий раз, когда в сокете появляются данные, готовые для чтения.
Отключить ридер для сокета.
Назначить таймер, который будет запускаться один или несколько раз через указанные промежутки времени.
Отключить таймер.

Естественно, цикл опроса внутри использует zmq_poll(). Всегда, когда происходит добавление или удаление ридеров, выполняется перестраивание структуру данных для опроса и пересчитываются вычисляются таймауты, чтобы найти следующий таймер.
После этого вызывается ридер и обработчики таймера для каждого сокета и таймера, которые требуют обслуживания.

Когда мы используем шаблон "Реактор", код как бы выворачивается наизнанку. Код внешне выглядит примерно так:

var
  reactor :=  p_zloop_t;
...
begin
...
  reactor := zloop_new ();
  zloop_reader(reactor, self.backend, @s_handle_backend, self);
  zloop_start (reactor);
  zloop_destroy (reactor);


Таким образом, обработка сообщений ZMQ размещена в коде специальных методов - обработчиков. При этом один и тот же обработчик может обрабатывать как активность сокета (попступление данных), так и таймеров сокета.
29 окт 14, 21:41    [16775038]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Что-то у меня обработка Ctrl+C работает неправильно. В "чистом" ZMQ все ОК, а с CZMQ - крутые глюки какие-то.

И не то чтобы это меня особо напрягает, но все же.
29 окт 14, 21:43    [16775046]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Что-то не очень вразумительно про реактор написал.

Ладно, полшага назад.

Вот версия брокера, использующего CZMQ, но без реактора.

+ Брокер с балансировкой нагрузки & CZMQ без реактора
program hl_LoadBalancingBrokerSimple;

{$APPTYPE CONSOLE}
// Брокер с балансировкой нагрузки.
// Демонстрируется использование CZMQ
//======================================================
// Имеется клиенты (c_NBR_CLIENTS шт) и рабочие (c_NBR_WORKERS шт).
// Каждый рабочий при запуске сообщает брокеру о своей готовности (c_WORKER_READY).
// Рабочий ставится в очередь.
//
// Клиент при запуске обращается к брокеру с заданием. Брокер отправляет
// задание первому свободному рабочему. Рабочий, выполнив задание, возвращает
// результат брокеру, брокер пересылает результат клиенту - заказчику.


uses
  SysUtils,
  zmq_h,
  czmq_h,
  ZMQ_Utils;

const
  c_NBR_CLIENTS = 1; // Число клиентов
  c_NBR_WORKERS = 1; // // Число рабочих

  // Конечные точки подключения
  c_url_clients = 'tcp://%s:5555'; //'inproc://clients';
  c_url_workers = 'tcp://%s:5556'; //'inproc://workers';
  c_domain = 'localhost'; // Сетевой адрес брокера
  c_interf: string = '*'; // Адрес для биндинга

  c_WORKER_READY: byte = 1;
    // Сигнал готовности рабочего         ё


function client_task(args: Pointer): Pointer; cdecl;
// Функция треда клиента
var
  client: Pointer;
  ctx: p_zctx_t;
  reply: PChar;
begin
  ctx := args; // zctx_new();
  client := zsocket_new(ctx, ZMQ_REQ);
  zsocket_connect(client, c_url_clients, c_domain);
// Запрос - ответ
  while true do begin
    zstr_send(client, 'HELLO');
    reply := zstr_recv(client);
    if (reply = nil) then
      break;
    z_Log('Client: ' + reply);
    zstr_free(reply);
    sleep(1);
  end;
//  zctx_destroy(ctx);
  Result := nil;
end;

function worker_task(args: Pointer): Pointer; cdecl;
// Функция треда рабочего
var
  ctx: p_zctx_t;
  frame: p_zframe_t;
  msg: p_zmsg_t;
  worker: Pointer;
begin
  ctx := args; // zctx_new();
  worker := zsocket_new(ctx, ZMQ_REQ);
  zsocket_connect(worker, c_url_workers, c_domain);

// Сообщаем брокеру о готовности работать
  frame := zframe_new(@c_WORKER_READY, 1);
  zframe_send(frame, worker, 0);

// ОБработка сообщений по мере их получения
  while True do begin
    msg := zmsg_recv(worker);
    if msg = nil then
      break; // Interrupted
    zframe_reset(zmsg_last(msg), PChar('OK'), 2);
    zmsg_send(msg, worker);
  end;
//  zctx_destroy(ctx);
  Result := nil;
end;




// Основной тред, порождающий дочерние, запускающий затем реактор.
// Если нажать Ctrl-C, реактор завершится и главный тред тоже завершится.

procedure DoMain;
var
  backend: Pointer;
  ctx: p_zctx_t;
  frame: p_zframe_t;
  frontend: Pointer;
  i: Integer;
  identity: p_zframe_t;
  poller: zmq_pollitem_t;
  reactor: p_zloop_t;
  items: array[0..1] of zmq_pollitem_t;
  msg: p_zmsg_t;
  rc: Integer;
  workers: p_zlist_t;
begin
  ctx := zctx_new(); // Контекст

  frontend := zsocket_new(ctx, ZMQ_ROUTER); // Сокеты реактора
  backend := zsocket_new(ctx, ZMQ_ROUTER);

  zsocket_bind(frontend, c_url_clients, c_interf);
    // Привязка к интерфейсу
  zsocket_bind(backend, c_url_workers, c_interf);

  for i := 0 to pred(c_NBR_CLIENTS) do // Запуск тредов клиентов
    zthread_new(@client_task, ctx);
  for i := 0 to Pred(c_NBR_WORKERS) do // Запуск тредов рабочих
    zthread_new(@worker_task, ctx);

// Очередь доступных рабочих
  workers := zlist_new();

 // Главный цикл балансировщика нагрузок.
 // Длинне, чем с реактором, но короче, чем на "чистом" ZMQ
  while True do begin
    items[0].socket := backend;
    items[0].fd := 0;
    items[0].events := ZMQ_POLLIN;
    items[0].revents := 0;
    items[1].socket := frontend;
    items[1].fd := 0;
    items[1].events := ZMQ_POLLIN;
    items[1].revents := 0;

// Опрашиваем клиентов только если есть незанятые рабочие
    if zlist_size(workers) > 0 then
      rc := zmq_poll(@items[0], 2, -1)
    else
      rc := zmq_poll(@items[0], 1, -1);
    if rc = -1 then
      break; // прерывание

// Обработка данных рабочих (от backend)
    if (items[0].revents and ZMQ_POLLIN) <> 0 then begin
  // Используем идентификацию
      msg := zmsg_recv(backend);
      if msg = nil then
        break; // Interrupted
      identity := zmsg_unwrap(msg);
      zlist_append(workers, identity);

// Если это не сообщение о готовности, переслать сообщение клиенту
      frame := zmsg_first(msg);
      if CompareMem(zframe_data(frame), @c_WORKER_READY, 1) then
        zmsg_destroy(msg)
      else
        zmsg_send(msg, frontend);
    end;
    if (items[1].revents and ZMQ_POLLIN) <> 0 then begin
// Получение запроса от клиента, передача первому незанятому рабочему
      msg := zmsg_recv(frontend);
      if msg <> nil then begin
        zmsg_wrap(msg, zlist_pop(workers));
        zmsg_send(msg, backend);
      end;
    end;
  end;
// Аккуратно завершаем все при выходе
  while zlist_size(workers) > 0 do begin
    frame := zlist_pop(workers);
    zframe_destroy(frame);
  end;
  zlist_destroy(workers);
  zctx_destroy(ctx);
end;

begin
  DoMain;
  Readln;
end.


Отличие от версии 16775038 - в главном цикле:
// Очередь доступных рабочих
  workers := zlist_new();

 // Главный цикл балансировщика нагрузок.
 // Длинне, чем с реактором, но короче, чем на "чистом" ZMQ
  while True do begin
    items[0].socket := backend;
    items[0].fd := 0;
    items[0].events := ZMQ_POLLIN;
    items[0].revents := 0;
    items[1].socket := frontend;
    items[1].fd := 0;
    items[1].events := ZMQ_POLLIN;
    items[1].revents := 0;

// Опрашиваем клиентов только если есть незанятые рабочие
    if zlist_size(workers) > 0 then
      rc := zmq_poll(@items[0], 2, -1)
    else
      rc := zmq_poll(@items[0], 1, -1);
    if rc = -1 then
      break; // Прерывание

// Обработка данных рабочих (от backend)
    if (items[0].revents and ZMQ_POLLIN) <> 0 then begin
  // Используем идентификацию
      msg := zmsg_recv(backend);
      if msg = nil then
        break; // Прерывание
      identity := zmsg_unwrap(msg);
      zlist_append(workers, identity);

// Если это не сообщение о готовности, переслать сообщение клиенту
      frame := zmsg_first(msg);
      if CompareMem(zframe_data(frame), @c_WORKER_READY, 1) then
        zmsg_destroy(msg)
      else
        zmsg_send(msg, frontend);
    end;
    if (items[1].revents and ZMQ_POLLIN) <> 0 then begin
// Получение запроса от клиента, передача первому незанятому рабочему
      msg := zmsg_recv(frontend);
      if msg <> nil then begin
        zmsg_wrap(msg, zlist_pop(workers));
        zmsg_send(msg, backend);
      end;
    end;
  end;
// Аккуратно завершаем все при выходе
  while zlist_size(workers) > 0 do begin
    frame := zlist_pop(workers);
    zframe_destroy(frame);
  end;
  zlist_destroy(workers);
  zctx_destroy(ctx);
end;

Объект workers теперь - не массив, а список типа p_zlist_t.
Видна работа с фреймами:

 identity := zmsg_unwrap(msg);
 zlist_append(workers, identity);

Составные сообщения теперь принимаются целиком, а не частями:
 msg := zmsg_recv(frontend);


Ну и забавные вещи вроде строк с форматированием ("Си"шный format()):

const
  // Конечные точки подключения
  c_url_clients = 'tcp://%s:5555'; //'inproc://clients';
  c_url_workers = 'tcp://%s:5556'; //'inproc://workers';
  c_domain = 'localhost'; // Сетевой адрес брокера
  c_interf: string = '*'; // Адрес для биндинга.

begin

...
  zsocket_connect(client, c_url_clients, c_domain);
...
  zsocket_bind(frontend, c_url_clients, c_interf);

Отмечу, что zsocket_connect() / zsocket_bind() - процедуры с переменным числом параметров:

//  Connect a socket to a formatted endpoint
//  Returns 0 if OK, -1 if the endpoint was invalid.
function zsocket_connect(self: pointer; format: PChar): Integer; varargs; cdecl; external   cZMQ_DllName;

А так как процедуры сишные - не забываем, в случае использования констант в параметрах, указывать модификатор типа для односимвольных строк:

const
  c_interf: string = '*'; // Адрес для биндинга.

То же самое:

const
  c_WORKER_READY: byte = 1;
...

begin
...
    frame := zframe_new(@c_WORKER_READY, 1);
...
    if CompareMem(zframe_data(frame), @c_WORKER_READY, 1) then
...


То есть там, где требуется "настоящий" адрес данных - в объявлении константы указываем модификатор типа.
Или просто используем переменную.
...
...
...
Теперь, имхо, разобраться с моим предыдущим сообщением (16775038) гораздо проще.
30 окт 14, 00:22    [16775473]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Шаблон "Асинхронный клиент/сервер".

Будем создавать архитектуру сети N-1, когда несколько разных клиентов асинхронно общаются с одним сервером.

Работать это будет вот так:

- клиенты коннектятся к серверу и отправляют запросы;
- на каждый запрос сервер отправляет 0 или больше ответов;
- клиенты могут отправлять множество запросов без ожидания ответов;
- серверы могут отправлять множество ответов без ожидания новых запросов.

Топология:

К сообщению приложен файл. Размер - 5Kb
30 окт 14, 00:42    [16775525]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
В целях упрощения тестирования, кодировать будем в рамках одного процесса, как мы уже делали несколько раз.
Т.е., множество тредов будет имитировать многопроцессную архитектуру.
При запуске примера видно, что три клиента (каждый со случайным id) выводят на консоль ответы от сервера.
Если присмотреться, то можно заметить, что каждая задача клиента порождает ноль или больше ответов на каждый запрос.

+ Модель асинхронного клиент-сервера

program hl_AsynchClientServer;

{$APPTYPE CONSOLE}

uses
  SysUtils
  , zmq_h
  , czmq_h
  ;


// Асинхронные клиент - сервер ( от DEALER к ROUTER)
//
// Так как это пример, для облегчения запуска все работает в рамках одного процесса.
// В реальности каждая задача должна быть отдельным процессом.


// Задача клиента
// Клиент коннектится к серверу и шлет ему запросы по одному в секунду.
// Собирает ответы в порядке поступления а потом распечатывает.
// Несколько клиентов работаеют параллельно, каждый со своим Id.

function client_task(args: Pointer): Pointer; cdecl;
var
  centitick: Integer;
  client: Pointer;
  ctx: p_zctx_t;
  identity: string;
  items: zmq_pollitem_t;
  msg: p_zmsg_t;
  request_nbr: Integer;
begin
  ctx := zctx_new();
  client := zsocket_new(ctx, ZMQ_DEALER);

// Случайный идентификатор (текст: для облегчения трассировки)
  identity := Format('%4x - %4x', [Random($10000), Random($10000)]);
  zsocket_set_identity(client, PChar(identity));
  zsocket_connect(client, 'tcp://localhost:5570');

  items.socket := client;
  items.fd := 0;
  items.events := ZMQ_POLLIN;
  items.revents := 0;
  request_nbr := 0;
  while true do begin
// Тики по одному в секунду: получаем приходящие сообщения
    for centitick := 0 to 99 do begin
      zmq_poll(@items, 1, 10 * ZMQ_POLL_MSEC);
        // Опрос с таймаутом 0,01 сек
      if (items.revents and ZMQ_POLLIN) <> 0 then begin // Что-то есть
        msg := zmsg_recv(client);
        zframe_print(zmsg_last(msg), PChar(identity));
        zmsg_destroy(&msg);
      end;
    end;
    Inc(request_nbr);
    zstr_sendf(client, PChar('request # %d'), request_nbr);
  end;
  zctx_destroy(ctx);
  result := nil;
end;

// Задача сервера
// Используется многонитевая модель. Для вытягивания запросов в пул
// рабочих и распределения ответов обратно клиентам. Один рабочий может
// одновременно обработать один запрос, но один клиент может общаться с
// множеством рабочих одновременно.

procedure server_worker(args: Pointer; ctx: p_zctx_t; pipe: Pointer); cdecl; forward;

function server_task(args: Pointer): Pointer;
var
  backend: Pointer;
  ctx: p_zctx_t;
  frontend: Pointer;
  thread_nbr: Integer;
begin
// Фронтенд сокет общается с клиентами по tcp
  ctx := zctx_new();
  frontend := zsocket_new(ctx, ZMQ_ROUTER);
  zsocket_bind(frontend, 'tcp://*:5570');

// Бэкенд сокет общается с рабочими по inproc
  backend := zsocket_new(ctx, ZMQ_DEALER);
  zsocket_bind(backend, 'inproc://backend');

// Запуск пула рабочих нитей, точное количество не важно

  for thread_nbr := 0 to 4 do
    zthread_fork(ctx, @server_worker, nil);

// Коннект бэкэнда к фронтэнду через прокси
  zmq_proxy(frontend, backend, nil);

  zctx_destroy(ctx);
  result := nil;

end;

// Каждая задача рабочего работает одновременно над одним запросом и
// отправляет случайное число ответов со случайными паузами между ответами:

procedure server_worker(args: Pointer; ctx: p_zctx_t; pipe: Pointer); cdecl;
var
  content: p_zframe_t;
  identity: p_zframe_t;
  msg: p_zmsg_t;
  replies: Integer;
  reply: Integer;
  worker: Pointer;
begin
  worker := zsocket_new(ctx, ZMQ_DEALER);
  zsocket_connect(worker, 'inproc://backend');

  while true do begin
// Сокет DEALER дает нам конверт ответа и мообщение
    msg := zmsg_recv(worker);
    identity := zmsg_pop(msg);
    content := zmsg_pop(msg);
    assert(content <> nil);
    zmsg_destroy(msg);

// Отправка обратно 0..4 ответов
    replies := Random(5);
    for reply := 0 to pred(replies) do begin
// Sleep какое-то случайное время
      zclock_sleep(Random(1000) + 1);
      zframe_send(identity, worker, c_ZFRAME_REUSE + c_ZFRAME_MORE);
      zframe_send(&content, worker, c_ZFRAME_REUSE);
    end;
    zframe_destroy(&identity);
    zframe_destroy(&content);
  end;
end;

// Главный тред просто запускает 3 клиента и 1 сервер и ждет; затем сервер завершается.

procedure DoMain;
begin
  Randomize;
  zthread_new(@client_task, nil);
  zthread_new(@client_task, nil);
  zthread_new(@client_task, nil);
  zthread_new(@server_task, nil);
  zclock_sleep(5 * 1000);
    // Работаем 5 секунд, потом завершение
end;

begin
  DoMain;
  Readln; // Для отладки
end.



Некоторые замечания к коду примера.

Клиенты шлют запросы раз в секунду и получают обратно ноль или несколько ответов. Чтобы такое сделать с помощью zmq_poll(), мы не может просто опрашивать с 1-секундным таймаутом, или мы завершим отправку нового запроса только через 1 секунду после того, как мы примем последний ответ. Поэтому мы опрашиваем с высокой частотой (100 раз в секунду, по 1/100 секунде на опрос ), что можно считать достаточно точным.

Сервер использует пул рабочих нитей, и каждая из них обрабатывает один запрос синхронно. Он соединяет их со своими фронтэнд сокетами с помощью внутренней очереди. Соединение фронтэнд и бэкэнд сокетов выполняется с помощью вызова zmq_proxy().

Таким образом, более детальная структура асинхронного сервера такова:

К сообщению приложен файл. Размер - 13Kb
30 окт 14, 02:15    [16775691]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Стоит подчеркнуть, что для диалога между клиентами и сервером используются сокеты DEALER -> ROUTER, в то время как внутри сервера, для общения между главной нитью сервера и рабочими используются сокеты DEALER -> DEALER. Если бы сокеты были строго синхронными, то мы бы использовали сокет REPORT. Однако, так как мы хоти отправлять множество ответов, нам нужен асинхронный сокет. Мы не хотим маршрутизировать ответы, так как они всегда идут к одной серверной нити, который слал нам запрос.


Посмотрим на маршрутизацию конверта сообщения. Клиент посылает сообщение, содержащие один кадр. Нить сервера принимает сообщение из двух кадров: исходное сообщение с префиксом из идентификатора клиента. Мы отправляем эти два кадра рабочему, который рассматривает его как обычный конверт ответа и возвращает его нам как двухкадровое сообщение. Затем мы первый кадр используем как идентификатор для маршрутизации второго кадра при отправке ответа клиенту в качестве ответа.

Схема примерно такая:
Client Server frontendWorker
[ DEALER ]<----->[ ROUTER <-----> DEALER <-----> DEALER ]
1 часть2 части2 части



Теперь насчет сокетов. Для реализации балансировщика нагрузуки для общения с рабочими мы могли бы использовать схему ROUTER -> DEALER, но это повлекло бы много дополнительной работы. В данном случае схема DEALER -> DEALER очевидно предпочтительнее, обеспечивая компромисс между низкой латентностью для каждого запроса и повышенным риском разбалансировки нагрузки рабочих. В данном случае было сделано "как проще".

При построении серверов, которые сохраняют состояние клиентов, возникают классические проблемы. Если сервер хранит состояние каждого клиента, а клиентов подключается все больше и больше, в конце концов наступает момент исчерпания ресурсов. Даже если одни и те же клиенты сохраняют коннект, и если вы используете идентификацию по умолчанию, то каждый соединение будет выглядеть как новое.
В примере выше мы немного сжульничали, сохраняя состояние только в течении короткого времени (время, необходимое работнику для обработки запроса), а затем сбрасывая состояние.
Но во многих случаев это непрактично.

Для правильного управления состоянием клиента в асинхронном сервере, необходимо:

- От клиента к серверу следует реализовать heartbeating. В нашем примере, мы шлем запросы по одному в секунду, и это достоверно можно считать хартбитингом.

+ Heartbeat - сердцебиение.
Хартбит - heartbeat - это периодический сигнал, генерируемый аппаратно или программно, предназначенный для определения нормального функционирования системы либо для синхронизации других компонентов системы. Обычно хартбит - это посылка между машинами с регулярными интервалами времени порядка секунды. Если хартбит не принимается в течении какого-то вреиени - обычно в течении нескольких интервалов хартбита - то считается, что машина, которая должна посылать хартбит, неисправна.

Пример: Система "Периметр".

- Сохраняйте состояние клиента, используя идентификатор (сгенерированный автоматически или как-либо иначе) в качестве ключа.

- Следите и реагируйте на остановку хартбита. Если от клиента не приходо запроса, скажем, в течении двух секунд, сервер может это обнаружить и уничтожить все состояния, связанные с данным клиентом.
30 окт 14, 03:36    [16775781]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Рабочий пример. Межброкерная маршрутизация.

Постановка задачи.
Только что нам позвонил лучший клиент и попросил срочно спроектировать мощную систему облачных вычислений. В качестве облака он представляет себе совокупность множества датацентров, каждый кластер клиентов и рабочих которого работают вместе, как единое целое.

Для нас такая задача - как два байта переслать. Мы ж дельфисты.

Смоделирует эту задачу, используя ZeroMQ.
30 окт 14, 03:56    [16775787]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
PPA
Member

Откуда: Караганда -> Липецк
Сообщений: 805
ZeroMQ,

А почему эту тему не ведешь в виде отдельного профильного ресурса?
заведи свой сайт-блог - контент ведь качественный.
а тут ведь затеряется в "мусорных вопросах" если не пинать каждый раз в топ :)
31 окт 14, 10:23    [16781203]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
PPA
ZeroMQ,

А почему эту тему не ведешь в виде отдельного профильного ресурса?
заведи свой сайт-блог - контент ведь качественный.
а тут ведь затеряется в "мусорных вопросах" если не пинать каждый раз в топ :)

Пожалуй, ты прав.

Я вот тут :https://www.sql.ru/blogs/blogs.aspx - оставлял заявку, но воз и ныне там.

Ну и изначально я не предполагал, что материала будет так много.
31 окт 14, 15:10    [16783978]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
чччД
...
Сокеты в Delphi представлены просто указателями (Pointer).
А сообщения zmq_msg_t - структурой:

zmq_msg_t = record
    _: Array[0..32-1] of Byte;
  end; 


...


Ты-дыщь! Начиная с версии 4.1.0, структура zmq_msg_t имеет длину 48 байт!

type
  zmq_msg_t = packed record
    _: array[0..47] of Byte;
  end;


На уровне протоколов обмена данными проблем совместимости "снизу вверх" нет, а на уровне софта - еще как может быть.

Вот тут можно взять свежие .dll и .pas.

Или работать со старыми (.dll и .pas): см. аттачи к 16751865 и 16751887 соответственно.
1 ноя 14, 22:34    [16788831]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Вычитал про форки этой библиотеки: "Crossroad I/O" (уже вроде умер) и NanoMSG .

Форкнул библиотеку Мартин Сюстрик (Martin Su'strik) - один из создателей, с целью:
- изменить лицензию использования;
- сделать сокеты thread-safe;
- расширить режимы работы;
- упростить использование;
- изменить механизмы использования;
- поднять нагрузочную способность;
- добавить несколько полезных утилит


Хотя одной из задач была сделать библиотеку частью ядра Linux, на страничке загрузки есть версия и для Windows.
В составе библиотеки, кроме самой .dll, есть разные утилиты, которые позволяю прямо из командной строки организовать общение с сетью, тестирование, опрос и т.п.

API библиотеки существенно похудел. Это просто супер.
Больше нет работы с контекстом, а сокеты теперь могут общаться в дополнительных режимах. Например, схема "Издатель - Подписчик" расширена до уровня не просто "Слушайте меня все, кто хочет", а еще: "А ну-ка ответьте мне, мои подписчики...".

Забавно, что одним из недостатков ZeroMQ (которое решено в NanoMSG) названа возможность без проблем работать всего () с 10 000 клиентов, а дальше, типа, нужно вызывать процедуры настройки, а еще дальше - глядишь, и ядро придется перекомпилировать... :)
...
К сожалению, библиотека NanoMSG пока только в стадии beta.
4 ноя 14, 07:09    [16795137]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
Краткий обзор NanoMSG: https://hguemar.fedorapeople.org/slides/nanomsg/presentation.html
4 ноя 14, 07:09    [16795138]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
За выходные разобрался с непонятками, исправил ошибки и собрал все здесь: http://delphi-and-zeromq.blogspot.ru/2014/10/zeromq.html
5 ноя 14, 07:21    [16798628]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
PPA
Member

Откуда: Караганда -> Липецк
Сообщений: 805
ZeroMQ
Краткий обзор NanoMSG: https://hguemar.fedorapeople.org/slides/nanomsg/presentation.html


NanoMSG под XP не пашет.
CancelIoEx используется
https://github.com/nanomsg/nanomsg/issues/102
6 ноя 14, 09:33    [16804437]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
PPA
ZeroMQ
Краткий обзор NanoMSG: https://hguemar.fedorapeople.org/slides/nanomsg/presentation.html


NanoMSG под XP не пашет.
CancelIoEx используется
https://github.com/nanomsg/nanomsg/issues/102

Мда. Под Win7 все ОК.

libzmq.dll и czmq.dll я старательно под MS VS 2008 компилю, чтобы и на Win 2000 запускалось. Хотя, не пробовал запускать. Надо будет в виртмашине потестить.
6 ноя 14, 10:43    [16804806]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
ZeroMQ
PPA
пропущено...


NanoMSG под XP не пашет.
CancelIoEx используется
https://github.com/nanomsg/nanomsg/issues/102

Мда. Под Win7 все ОК.

libzmq.dll и czmq.dll я старательно под MS VS 2008 компилю, чтобы и на Win 2000 запускалось. Хотя, не пробовал запускать. Надо будет в виртмашине потестить.


Попробовал ZeroMQ на Win2K: фиквам.

ZeroMQ:
автор
---------------------------
Точка входа в процедуру getaddrinfo не найдена в библиотеке DLL WS2_32.dll.
---------------------------

Это еще лечится, добавлением в проект одной строчкой

#include <Wspiapi.h> // For W2K: поможет, только если czmq  не использовать.


А если использовать czmq.dll, то этого мало, там еще

автор
---------------------------
Точка входа в процедуру GetAdaptersAddresses не найдена в библиотеке DLL IPHLPAPI.DLL.
---------------------------


До WinXP вместо GetAdaptersAddresses использовалась GetAdaptersInfo, функциональности которого, к сожалению, недостаточно.
...
Ну и ладно. Пора на XP.
7 ноя 14, 06:19    [16809823]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
ZeroMQ
Member

Откуда: Оттуда.
Сообщений: 10125
ZeroMQ
Рабочий пример. Межброкерная маршрутизация.

Постановка задачи.
Только что нам позвонил лучший клиент и попросил срочно спроектировать мощную систему облачных вычислений. В качестве облака он представляет себе совокупность множества датацентров, каждый кластер клиентов и рабочих которого работают вместе, как единое целое.

Для нас такая задача - как два байта переслать. Мы ж дельфисты.

Смоделирует эту задачу, используя ZeroMQ.


Долго разбирался и портировал примеры на Delphi.

Здесь подробный рабочий пример, имитирующий сетевые "кластеры".
...
Дальше буду разбираться с обеспечением надежности доставки сообщений.
4 дек 14, 04:22    [16947200]     Ответить | Цитировать Сообщить модератору
Топик располагается на нескольких страницах: Ctrl  назад   1 2 3 [4] 5 6 7 8 9 10 11   вперед  Ctrl      все
Все форумы / Delphi Ответить