Добро пожаловать в форум, Guest  >>   Войти | Регистрация | Поиск | Правила | В избранное | Подписаться
Все форумы / Delphi Новый топик    Ответить
Топик располагается на нескольких страницах: Ctrl  назад   1 [2] 3 4 5 6 7 8 9 10 11   вперед  Ctrl      все
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Таким образом, со статичным борокером сообщений задача решена.

Можно даже запустить общий брокер для всех типов сообщений, и строить сетевую архитектуру вокруг него.

Т.е., топология "Звезда" работает. Какое-то время.
...пока не возникнут вопросы вроде пропускной способности и расширения/усложнения логики брокера.

АДЪ неминуем.

ZMQ придет, порядок наведет!
26 сен 14, 00:12    [16623516]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Куда лучше реализовать брокер не в качестве транспорта сообщений, а в качестве поставщика адресной информации.

Для этого следует использовать сокеты ZMQ типа XPUB и XSUB, так как с ними ZMQ не пересылает сообщения от издателя к подписчику напрямую.
Сокеты XSUB и XPUB - точно такие же, как и сокеты типа SUB и PUB, за исключением того, что они обрабатывают подписки в форме специальных сообщений. А при подключении SUB и PUB сокетов к XPUB и XSUB сокетам первые связываются друг с другом уже по известным адресам.


То есть, основной поток данных идет, минуя брокер:
26 сен 14, 00:27    [16623545]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
чччД
...
То есть, основной поток данных идет, минуя брокер:


К сообщению приложен файл. Размер - 3Kb
26 сен 14, 00:27    [16623546]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
А ты еще спрашиваешь
defecator
...а для чего ?
...

26 сен 14, 00:30    [16623550]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
vavan
Member

Откуда: Казань
Сообщений: 3252
чччД
с винсокетами ты можешь послать всех, кто (например) неправильно отправил заголовок, еще "в процессе", на принимая сообщения целиком
"послать" ты можешь только закрыв сокет. а так пока тебе в него будут вваливать придется читать
26 сен 14, 09:08    [16623952]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
vavan
чччД
с винсокетами ты можешь послать всех, кто (например) неправильно отправил заголовок, еще "в процессе", на принимая сообщения целиком
"послать" ты можешь только закрыв сокет. а так пока тебе в него будут вваливать придется читать

Вот именно - c WinSockets ты можешь закрыть сокет в процессе чтения, не дочитав...

А в ZMQ - ты начинаешь читать, когда "все уже здеcь".
26 сен 14, 09:40    [16624062]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
vavan
Member

Откуда: Казань
Сообщений: 3252
чччД
в ZMQ - ты начинаешь читать, когда "все уже здеcь"
при условии что оно поместилось
26 сен 14, 09:48    [16624109]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
vavan
чччД
в ZMQ - ты начинаешь читать, когда "все уже здеcь"
при условии что оно поместилось

При всех условиях.
Если ты начал читать, значит - сообщение здесь. Они либо приходит, либо нет.
26 сен 14, 11:36    [16624826]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Шаблон "Разделяемая очередь".

Потихоньку перейдем к сокетам DEALER и ROUTER.
...

В реальности может потребоваться, чтобы множество клиентов могли подключаться к разным сервисам (например, для распределения нагрузки по сервисам).

Для реализации коннектов "много:много" есть два пути:
- каждый клиентский сокет может коннектиться ко множеству сервисных конечных точек. То есть, один клиентский сокет типа (REQ) коннектится к сервисным сокетам с известными адресами. После этого запросы будут распределяться между сервисами.


Например, коннектим клинтский сокет к трем конечным точкам ("сервисам") :A, B, и C.
Вспомним этот (16562041) пример, поясняющий схему "Запрос - Ответ" (REQ - REP). Чтобы клиент (16562066) подключился к трем сервисам (например, по tpc на localhost, на трех разных портах), нужно

  zmq_connect(requester, 'tcp://localhost:5555');
  zmq_connect(requester, 'tcp://localhost:5556');
  zmq_connect(requester, 'tcp://localhost:5557');

Клиент последовательно выполняет запросы R1, R2, R3, R4. В результате запросы R1 и R4 отправляются к сервису A, R2 - к B, R3 - к C.
Циклически распределяя запросы (наш любимый roud-robin).

Такая конструкция позволяет добавлять без проблем добавлять сколько угодно клиентов.
Как показано выше, с помощью zmq_connect() можно добавлять сколько угодно сервисов. Беда в том, что клиент должен знать, где находится новый сервис. Если клиентов - 100, и в течении скажем, суток, добавляется всего три новых сервиса, то нужно в итоге триста раз переконфигурировать всех клиентов.
Что грустно.

К сообщению приложен файл. Размер - 7Kb
28 сен 14, 03:15    [16630265]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
В идеале, мы должны быть в состоянии добавлять и удалять сервисы или клиентов в любое время, не касаясь любой другой части топологии.
...
автор
Для реализации коннектов "много:много" есть два пути:
- каждый клиентский сокет может коннектиться ко множеству сервисных конечных точек. То есть, один клиентский сокет типа (REQ) коннектится к сервисным сокетам с известными адресами. После этого запросы будут распределяться между сервисами.

- второй путь - использование брокера запросов как промежуточного слоя.
...

Напишем крошечный брокер запросов, реализующий желаемую гибкость топологии.
Брокер соединит две конечные точки - фронтенд (сокет стороны клиентов) и бэкэнд (сокет стороны сервисов).
Затем брокер, используя zmq_poll(), будет отслеживать активность этих сокетов, и перебрасывать сообщения от одного сокета к другому. При чем, в ручном управлении очередности использования сервисов нет необходимости, т.к. ZeroMQ делает это автоматически для каждого сокета.
Когда мы построили приложение (16562041) по схеме "Запрос - Ответ (Req_Rep)", система получилась с синхронным диалогом обмена. Клиент шлет запрос. Сервис читает запрос и шлет ответ. Клиент читает ответ. Если клиент или сервис будут выполнять что-то другое (например, клиент пошлет два запроса подряд без ожидания ответа), система просто перестанет работать.

...

Конечно, раз теперь мы умеем пользоваться zmq_poll() (16619247), сделаем можно сделать брокер неблокирующим.
Но мы пойдем другим путем, и не станем использовать сокеты типа REP и REQ.

Так вот, есть схема использования пар сокетов, которые реализуют схему "Посредник - Маршрутизатор", режимы сокетов соответственно называются DEALER и ROUTER. Они позволяют получить неблокирующий режим для схемы "Запрос - Ответ".
В нашей схеме "Запрос - Ответ" сокет REQ будет "говорить" с сокетом ROUTER, а сокет DEALER - с сокетом REP.
Сокеты DEALER ROUTER будет как раз размещаться на нашем брокере, передачу сообщений между ними мы обеспечим с помощью кода. Будем извлекать сообщение из одного и передавать его другому сокету.

Наш брокер схемы "Запрос - Ответ" привязывается к двум конечным точкам: одна для коннектов к ней клиентов(фронтэнд сокет), вторая - для коннектов сервисов(бэкэнд сокет).

Задача та же, что и в первом примере: возведение в квадрат целых чисел. Клиент отсылает запросы, сервис (сервер) - выполняет.
Запрос - беззнаковое x32 целое число (UInt32) , ответ - квадрат беззнаковое x64 целого (UInt64).

Вот что у нас было:
+ Клиент

program BrRR_Client;

{$APPTYPE CONSOLE}
// Клиент
// Коннектится сокетом REQ к tcp://localhost:5559
// Шлет целое число сервису (серверу), обратно получает квадрат числа

uses
  SysUtils, ZMQ;
const
  c_ReqCnt = 100;
var
  fContext: Pointer;
  fSocketRequester: Pointer;
  fSrcVal: Cardinal;
  fResultVal: UInt64;
  i: integer;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketRequester := zmq_socket(fContext, ZMQ_REQ);
  zmq_connect(fSocketRequester, 'tcp://localhost:5559'); // Коннект к сервису
  Randomize();
  for i := 0 to Pred(c_ReqCnt) do begin
    fSrcVal := Cardinal(Random(-1)); // Генерация случайного целого
    zmq_send(fSocketRequester, fSrcVal, SizeOf(fSrcVal), 0); // Запрос
    zmq_recv(fSocketRequester, fResultVal, SizeOf(fResultVal), 0); // Ответ
    Writeln('Iter: ', i, ' src=', fSrcVal, ' result=', fResultVal);
  end;
  zmq_close(fSocketRequester);
  zmq_ctx_destroy(fContext);
  Readln;

end.



+ Сервис (сервер)
program BrRR_Service;

{$APPTYPE CONSOLE}
// Сервис (сервер)
// Находится в известной клиентам конечной точке, связывает сокет REP с tcp:*:5560,
// Получает целое число, возводит в квадрат и отправляет обратно результат

uses
  SysUtils, ZMQ;
var
  fContext: Pointer;
  fSocketResponder: Pointer;
  fSrcVal: Cardinal;
  fResultVal: UInt64;
  i: integer;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketResponder := zmq_socket(fContext, ZMQ_REP);
  zmq_bind(fSocketResponder, 'tcp://*:5559'); // Привязка к конечной точке
  Writeln('Starting service...');
  i := 0;
  while True do begin
    zmq_recv(fSocketResponder, fSrcVal, SizeOf(fSrcVal), 0); // Запрос
    fResultVal := UInt64(fSrcVal) * UInt64(fSrcVal); // "Полезная работа"
    zmq_send(fSocketResponder, fResultVal, SizeOf(fResultVal), 0); // Ответ
    Writeln('Iter: ', i, ' src=', fSrcVal, ' result=', fResultVal);
    Inc(i);
  end;
  zmq_close(fSocketResponder);
  zmq_ctx_destroy(fContext);
  Readln;

end.


Пока ничего нового не видим: продублирована функциональность схемы "Вопрос - Ответ". Клиент - коннектится к конечной точке - к сервису, сервис находится в этой конечной точке и ждет запросов клиентов. "Конечная точка" - это известный адрес (Например, "tcp://localhost:5560").
То есть, реализована вот эта схема: 16630265.

Будем улучшать мир. Воткнем между ними брокер.
28 сен 14, 05:04    [16630300]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Брокер.

Что-то новенькое: сокеты DIALER и ROUTER.

+ Брокер

program BrRR_Broker;

{$APPTYPE CONSOLE}
// Брокер
// Находится в известной клиентам и сервисам конечной точке,
// Находится в известной клиентам конечной точке,
// связывает сокет ZMQ_ROUTER с tcp:*:5559,
// связывает сокет ZMQ_DEALER с tcp:*:5560,
// Перекидывает сообщения между сокетами от FrontEnd к BackEnd
// и обратно

uses
  SysUtils, ZMQ, Math;
var
  fContext: Pointer;
  fSocketFrontEnd: Pointer;
  fSocketBackEnd: Pointer;
  fZMQPoll: array[0..1] of pollitem_t;
  fMsg: zmq_msg_t;
  fDoMore: Boolean;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketFrontEnd := zmq_socket(fContext, ZMQ_ROUTER);
  fSocketBackEnd := zmq_socket(fContext, ZMQ_DEALER);
  zmq_bind(fSocketFrontEnd, 'tcp://*:5559');
    // Конечная точка для клиентов
  zmq_bind(fSocketBackEnd, 'tcp://*:5560');
    // Кончная точка для сервисов
  fZMQPoll[0].socket := fSocketFrontEnd;
    // Инициализация пула сокетов
  fZMQPoll[0].fd := 0;
  fZMQPoll[0].events := ZMQ_POLLIN;
  fZMQPoll[0].revents := 0;
  fZMQPoll[1].socket := fSocketBackEnd;
  fZMQPoll[1].fd := 0;
  fZMQPoll[1].events := ZMQ_POLLIN;
  fZMQPoll[1].revents := 0;
  while true do begin
    zmq_poll(fZMQPoll[0], Length(fZMQPoll), -1);
      // Проверка состояния сокетов из пула
    if (fZMQPoll[0].revents and ZMQ_POLLIN) <> 0 then
      while True do
      begin // Трансляция сообщний от клиента к сервису
      // Обработка всх частей сообщения
        zmq_msg_init(fMsg);
        zmq_msg_recv(fMsg, fSocketFrontEnd, 0);
        fDoMore := zmq_msg_more(fMSG) <> 0;
        zmq_msg_send(fMsg, fSocketBackEnd, IfThen(fDoMore, ZMQ_SNDMORE, 0));
        zmq_msg_close(fMsg);
        if not fDoMore then
          Break; // Это была последняя часть сообщения
      end;
    if (fZMQPoll[1].revents and ZMQ_POLLIN) <> 0 then
      while True do
        // Трансляция сообщний от сервиса к клиенту
      begin // Обработка всх частей сообщения
        zmq_msg_init(fMsg);
        zmq_msg_recv(fMsg, fSocketBackEnd, 0);
        fDoMore := zmq_msg_more(fMSG) <> 0;
        zmq_msg_send(fMsg, fSocketFrontEnd, IfThen(fDoMore, ZMQ_SNDMORE, 0));
        zmq_msg_close(fMsg);
        if not fDoMore then
          Break; // Это была последняя часть сообщения
      end;
  end;
  zmq_close(fSocketFrontEnd);
  zmq_close(fSocketBackEnd);
  zmq_ctx_destroy(fContext);
  Readln;

end.



Чтобы все заработало, нужно у сервиса из 16630300 заменить биндинг на коннект:

//  zmq_bind(fSocketResponder, 'tcp://*:5559'); // Привязка к конечной точке
  zmq_connect (fSocketResponder, 'tcp://localhost:5560'); // Коннект к конкретной точке


Брокер в асинхронном режиме слушает пул из сокетов с помощью zmq_poll(), затем читает (возможно, по частям) сообщение и транслирует его в выходной сокет. В обе стороны.

Чтение по частям реализовано "для общности". Чтобы работало с любыми сообщениями.
Интересно, что наши крошечные сообщения
     zmq_send(fSocketRequester, fSrcVal, SizeOf(fSrcVal), 0); // Запрос
...
     zmq_recv(fSocketRequester, fResultVal, SizeOf(fResultVal), 0); // Ответ
и
    zmq_recv(fSocketResponder, fSrcVal, SizeOf(fSrcVal), 0); // Запрос
...
    zmq_send(fSocketResponder, fResultVal, SizeOf(fResultVal), 0); // Ответ

порождают на стороне брокера каскад из нескольких составных сообщений из структур zmq_msg_t, что хорошо видно в отладчике.


Теперь можно запускать сколько хочешь сервисов и сколько хочешь клиентов - все они будут общаться через брокер.

Такой брокер для схемы "Запрос- Ответ" существенно облегчает обслуживание сети , так как клиентам нет нужды знать, где размещены сервисы, а сервисам нет нужны знать, где размещены клиенты. Единственной статической точкой является брокер.
Жизнь налаживается. :)
28 сен 14, 07:08    [16630307]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
чччД
...
Теперь можно запускать сколько хочешь сервисов и сколько хочешь клиентов - все они будут общаться через брокер.

Такой брокер для схемы "Запрос- Ответ" существенно облегчает обслуживание сети , так как клиентам нет нужды знать, где размещены сервисы, а сервисам нет нужны знать, где размещены клиенты. Единственной статической точкой является брокер.
...


К сообщению приложен файл. Размер - 10Kb
28 сен 14, 07:09    [16630308]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Приведенный выше код трансляции сообщений представляется очень полезным для многих случаев для схемы "Запрос - Ответ".

Так вот, ZeroMQ есть даже специальный метод, реализующий все, что мы нако'дли в брокере.
+
См. ниже: "Та-да-мммм!"


function zmq_proxy( frontend, backend, capture: Pointer ): Integer; 


frontend - сокет фронтэнд
backend - сокет бэкэнд
capture - сокет для перехвата сообщений (nil, если не используется)

Технически нет никакой разницы между фронтэнд и бэкэнд сокетами.


+ Та-да-мммм!
program BrRR_Broker;

{$APPTYPE CONSOLE}
// Брокер
// Находится в известной клиентам и сервисам конечной точке,
// Находится в известной клиентам конечной точке,
// связывает сокет ZMQ_ROUTER с tcp:*:5559,
// связывает сокет ZMQ_DEALER с tcp:*:5560,
// Перекидывает сообщения между сокетами от FrontEnd к BackEnd
// и обратно

uses
  SysUtils, ZMQ, Math;
var
  fContext: Pointer;
  fSocketFrontEnd: Pointer;
  fSocketBackEnd: Pointer;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketFrontEnd := zmq_socket(fContext, ZMQ_ROUTER);
  fSocketBackEnd := zmq_socket(fContext, ZMQ_DEALER);
  zmq_bind(fSocketFrontEnd, 'tcp://*:5559'); // Конечная точка для клиентов
  zmq_bind(fSocketBackEnd, 'tcp://*:5560'); // Кончная точка для сервисов

  zmq_proxy (fSocketFrontEnd, fSocketBackEnd, nil); // Старт прокси!

  zmq_close(fSocketFrontEnd);
  zmq_close(fSocketBackEnd);
  zmq_ctx_destroy(fContext);
  Readln;

end.


+

Дальше будет еще интереснее. Картинка с другого сайта.
28 сен 14, 07:26    [16630309]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Важно.
Видов сокетов, которые практически можно использовать в брокере:

ROUTER - DEALER
XSUB - XPUB
PULL - PUSH.
28 сен 14, 07:32    [16630312]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Мосты.

Вопрос: "как передать сообщения из одной подсети в другую?"
Или даже - из сети с протоколом tcp в сеть pgm.

Вариант решения - с помощью моста.

В качестве моста используем только что рассмотренный прокси (брокер сообщений).
То есть, "мост" - это маленькое приложение, которое общается одним сокетом по одному протоколу, а другим - по другому.
Ну и преобразовывает сообщения в подходящий для протокола вид, если нужно.

В качестве примера напишем крошечный прокси, который, находясь между издателем и множеством подписчиков, соединяет две разные сети.
Вернемся к примеру с метеостанцией. 16583825
Предположим, что сервер-издатель (которые измеряет температуру, давление и проч) работает во внутренней сети, часть подписчиков - тоже во внутренней. А еще часть - во внешней.
Создаем прокси, в которой фронтэнд сокет (SUB) будет общаться с внутренней сетью, а бэкэнд сокет (PUB) - с внешней.
Прокси будет подписываться фронтэнд-сокетом на события сервиса погоды и пере-публиковывать их на бэкэнд-сокете.
+ Прокси для метео: "мост в интернет"

program BrRR_BrokerMeteoInterNet;

{$APPTYPE CONSOLE}
// Брокер для проекта "Метео"
// Реализует мост между интрасаетью метеосервера
// и внешней сетью. Размещается во внутренней сети,
// для брокера открыт "наружу" tcp порт 8100.
// "Внешние" клиенты коннектятся к известной конечной точке tcp://10.1.1.0:8100
// Коннектит сокет ZMQ_XSUB с известной конечной точкой метеосервера tcp://192.168.55.210:5556
// Связывает сокет ZMQ_XPUB с tcp://10.1.1.0:8100,
// Перекидывает сообщения подписки между сокетами от FrontEnd к BackEnd
// и обратно

uses
  SysUtils, ZMQ;
var
  fContext: Pointer;
  fSocketFrontEnd: Pointer;
  fSocketBackEnd: Pointer;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketFrontEnd := zmq_socket(fContext, ZMQ_XSUB);
  fSocketBackEnd := zmq_socket(fContext, ZMQ_XPUB);
  zmq_connect(fSocketFrontEnd, 'tcp://192.168.55.210:5556'); // Конечная точка метеосервиса
  zmq_bind(fSocketBackEnd, 'tcp://10.1.1.0:8100'); // Конечная точка для "внешних" подписчиков
//  zmq_bind(fSocketBackEnd, 'tcp://*:8100'); // Можно и так, главное - чтобы внешние клиенты знали адрес

  zmq_proxy (fSocketFrontEnd, fSocketBackEnd, nil); // Старт прокси

  zmq_close(fSocketFrontEnd);
  zmq_close(fSocketBackEnd);
  zmq_ctx_destroy(fContext);
  Readln;

end.



Очень похоже на код предыдущего прокси, но используется для трансляции сообщений из одной подсети в другую. Точно так же подобный прокси - мост можно использовать, например, для подключения подписчиков в мультикаст PGM сети с издателем в TCP.
28 сен 14, 20:33    [16631345]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
чччД
...прокси, который, находясь между издателем и множеством подписчиков, соединяет две разные сети...


К сообщению приложен файл. Размер - 16Kb
28 сен 14, 20:34    [16631347]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Кино нашел, на русском:

http://wiki.4intra.net/0MQ_—_Сокеты_на_стероидах_(Сергей_Гулько,_OSDN-UA-2012)
28 сен 14, 20:46    [16631355]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
чччД
Кино нашел, на русском:

http://wiki.4intra.net/0MQ_—_Сокеты_на_стероидах_(Сергей_Гулько,_OSDN-UA-2012)


...У ZeroMQ пропускная способность выше, чем у TCP/IP, хотя ZeroMQ работает over TCP/IP...

Поискал - "каким же образом?"
Нашел кое-какое описание, вроде специальной упаковки сообщений ZMQ в пакеты tcp.
28 сен 14, 20:57    [16631362]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Обработка ошибок. ETERM.

Обработка ошибок ZeroMQ основана на двух положениях:
- считается, что процессы уязвимы от внутренних ошибок
- считается, что внешние ошибки (и атаки) можно обработать (отразить атаки).

В качестве примера приводится функционирование живой клетки, которая самоуничтожается при внутреннем сбое и максимально долго борется с внешними угрозами.

Надежность кода должна обеспечиваться использованием Assert(). Срабатывание Assert() вызывает либо завершение приложения, либо исключительную ситуацию.
Когда ZeroMQ обнаруживает внешнюю проблему, она возвращает соответствующий код завершения. В редких случаях, ZMQ показывает сообщения "молча", если нет очевидной стратегии, позволяющей восстановиться после ошибки.

В рассмотренных ранее примерах обработки ошибок не было.
В реальном коде необходимо анализировать код завершения вызова каждого метода ZMQ.


Существует несколько простых правил, ноги которых растут еще с соглашений POSIX:

- Методы, которые создают объекты, в случае ошибки возвращают nil;
- Методы, которые обрабатывают данные, могут вернуть число обработанных байт, или -1 в случае ошибки;
- Другие методы возвращают 0, когда все ОК, и, в случае ошибки - ненулевой код ошибке;
- Код ошибки доступен через errno (для ОС POSIX) или через метод zmq_errno();
- Описание ошибки (например, для логирования) можно получить с помощью метода zmq_strerror().

Пример:
  fContext := zmq_ctx_new(); // Инициализация
  Assert(fContext <> nil);

  fSocketFrontEnd := zmq_socket(fContext, ZMQ_ROUTER);
  Assert(fSocketFrontEnd <> nil);

  fSocketBackEnd := zmq_socket(fContext, ZMQ_DEALER);
  Assert(fSocketBackEnd <> nil);

  fRC := zmq_bind(fSocketFrontEnd, 'tcp://*:5559'); // Конечная точка для клиентов
  Assert(fRC <> -1, 'Bind failed: tcp://*:5559');

  fRC := zmq_bind(fSocketBackEnd, 'tcp://*:5560'); // Кончная точка для сервисов
  Assert(fRC <> -1, 'Bind failed: tcp://*:5560');



Есть две исключительные ситуации, которые могут обрабатываться как некритические:
- когда ваш код принимает сообщение с ZMQ_NOWAIT ("асинхронно"), но данных не ожидается. ZMQ вернет -1 и установит код ошибка равным EAGAIN;
- когда один поток вызывает zmq_ctx_destroy(), а второй все еще выполняет работу в блокирующем режиме, то вызов zmq_ctx_destroy() вызывает закрытие контекста и всех блокирующих вызовов с кодом завершения -1, а код ошибки устанавливается равным ETERM.

После того, как код будет отлажен и "вылизан", Asserts() могут быть отключены опциями компиляции. Однако, не стоит компилировать саму библиотеку ZMQ с отключенными assert() - запросто можно прозевать проблему в самом неожиданном месте.


Рассмотрим способы "чистого" завершения процессов. В качестве примера возьмем параллельный трубопровод из примера: 16607833.

Мы возьмем пример параллельного газопровода из предыдущего раздела. Если мы в фоне стартовали множество рабочих процессов "Worker", то теперь мы хотим уничтожить их всех, когда все пакетное задание будет выполнено. Будем делать это, отправляя рабочим процессам сообщение "умри!". Этим будет заниматься процесс - сборщик "Synk" (так как он знает, когда завершается пакетное задание).

Каким же образом подключить сборщик "Synk" к рабочим процессам? Сокеты PUSH/PULL допускают передачу только в одну сторону. Можно использовать сокеты другого типа, или смешать несколько потоков. Попробуем следующее: используем схему "Издатель-Подписчик" (pub-sub) для отправки рабочим процессам сообщения "умри!".

Описание:
Сборщик ("Sink") создает сокет - издатель (PUB) в новой конечной точке.
Рабочие процессы ("Worker") связывают свои входные сокеты с этой конечной точкой.
Когда сборщик ("Sink") определяет, что задание выполнено, он посылает сигнал "умри!" в свой сокет - издатель (PUB).
Когда рабочий процесс ("Worker") обнаруживает сообщение "умри!", он завершается.

Сборщик почти не меняется, добавится еще один сокет и отправка сообщения "умри!":
var
...
  fSocketController: Pointer;
...
begin
...
  // Сокет для отправки сигнала "умри!"
  fSocketController := zmq_socket(fContext, ZMQ_PUB);
  zmq_bind(fSocketController, 'tcp://*:5559');
...
  // Отправка сигнала "умри!" рабочим процессам
  zmq_send(fSocketController, PChar('KILL')^, 4, 0);
...


Вот полный код сборщика:
+ Сборщик "Sink"

program PL_Sink;

{$APPTYPE CONSOLE}

uses
  SysUtils, Windows, ZMQ;

const
  c_task_count = 100;

var
  fContext: Pointer;
  fSocketReceiver: Pointer;
  fSocketController: Pointer;
  fStart: Cardinal;
  fTaskCount: Integer;
  fDummy: string;
begin
 // Инициализация
  fContext := zmq_ctx_new();

  // Сокет для приема сигналов
  fSocketReceiver := zmq_socket(fContext, ZMQ_PULL);
  zmq_bind(fSocketReceiver, 'tcp://*:5558');

  // Сокет для отправки сигнала "умри!"
  fSocketController := zmq_socket(fContext, ZMQ_PUB);
  zmq_bind(fSocketController, 'tcp://*:5559');

  // Ожидание первого сигнала о старте пакета задач
  zmq_recv(fSocketReceiver, PChar(nil)^, 0, 0);

  // Фиксация времени начала выполнения пакета
  fStart := GetTickCount();
  // Ждем подтверждения выполнения от 100 рабочих процессов
  for fTaskCount := 0 to Pred(c_task_count) do begin
    zmq_recv(fSocketReceiver, PChar(nil)^, 0, 0);

    if (fTaskCount mod 10 = 0) then // "Статус - бар" :)
      Write(':')
    else
      Write('.')
  end;
  // Вычисление и показ времени выполнения пакета задач
  Writeln('Total elapsed time: ', GetTickCount() - fStart, 'ms');

  // Отправка сигнала "умри!" рабочим процессам
  zmq_send(fSocketController, PChar('KILL')^, 4, 0);

  zmq_close(fSocketReceiver);
  zmq_ctx_destroy(fContext);
  Readln(fDummy);

end.


Естественно, рабочий процесс "Worker" тоже придется переделать.
Теперь "Worker" управляется двумя сокетами: PULL - для получения задачи, и SUB - для получения команд управления.
Не забываем, что SUB сокет должен быть настроен:
  zmq_setsockopt(fSocketController, ZMQ_SUBSCRIBE, nil, 0);

Используем технику с zmq_poll (), которую уже применяли ранее.

+ Код рабочего процесса "Worker"
program PL_Worker;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ;

var
  fContext: Pointer;
  fSocketReceiver: Pointer;
  fSocketSender: Pointer;
  fSocketController: Pointer;
  fStrMsg: string;
  fLen: Integer;
  fMsg: zmq_msg_t;
  fDummy: string;
  fPollItems: array[0..1] of pollitem_t;
begin
  fContext := zmq_ctx_new();

 // Сокет для приема сообщений
  fSocketReceiver := zmq_socket(fContext, ZMQ_PULL);
  zmq_connect(fSocketReceiver, 'tcp://localhost:5557');

// Сокет для отправки сообщений
  fSocketSender := zmq_socket(fContext, ZMQ_PUSH);
  zmq_connect(fSocketSender, 'tcp://localhost:5558');

// Сокет для приема управляющх сигналов
  fSocketController := zmq_socket(fContext, ZMQ_SUB);
  zmq_connect(fSocketController, 'tcp://localhost:5559');
  zmq_setsockopt(fSocketController, ZMQ_SUBSCRIBE, nil, 0);

  fPollItems[0].socket := fSocketReceiver;
  fPollItems[0].fd := 0;
  fPollItems[0].events := ZMQ_POLLIN;
  fPollItems[0].revents := 0;
  fPollItems[1].socket := fSocketController;
  fPollItems[1].fd := 0;
  fPollItems[1].events := ZMQ_POLLIN;
  fPollItems[1].revents := 0;

// Бесконечный цикл выполнения заданий
  while True do begin
    zmq_poll(fPollItems[0], 2, -1);
    if (fPollItems[0].revents and ZMQ_POLLIN) <> 0 then begin
      zmq_msg_init(fMsg);
      fLen := zmq_msg_recv(fMsg, fSocketReceiver, 0); // Получение задания
      SetLength(fStrMsg, fLen div SizeOf(Char)); // Перевод буфера сообщения в строку
      Move(zmq_msg_data(fMsg)^, PChar(fStrMsg)^, fLen);
      Writeln(fStrMsg); // Отображение в консоли процесса "работы"
      Sleep(StrToInt(fStrMsg)); // "Полезная" работа...
      zmq_send(fSocketSender, PChar(fDummy)^, 0, 0);   // Отправка сигнала сборщику результата
    end;
   // Все, что получаем из контроллера, считаем командой 'KILL'
    if (fPollItems[1].revents and ZMQ_POLLIN) <> 0 then
      Break; // Выход из цикла обработки
  end;
  zmq_close(fSocketReceiver);
  zmq_close(fSocketSender);
  zmq_close(fSocketController);
  zmq_ctx_destroy(fContext);
//  Readln(fDummy);

end.



Теперь запускаем: процесс "Sink", один или несколько процессов "Worker", процесс "Ventilator". В консоли процесса "Ventilator" жмем Enter и наблюдаем примерно такую картину: 16607848
За исключением того, что процессы "Worker" по завершению пакетного задания будут завершены: по завершению работы процесс "Sink" посылает сигнал "умри!" всем подписчикам (процессам "Worker").
29 сен 14, 07:53    [16632282]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Топология только что описанной сетевой задачи:

К сообщению приложен файл. Размер - 14Kb
29 сен 14, 07:54    [16632283]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Обработка Crtl+C для консольных приложений Windows.
Казалось бы, нажали и хрясь - приложение убито. Если бы.

Как было сказано выше - можно получить зависон, особенно если приложение многопоточное.
В общем, надо бы пройтись по всем тредам, сообщить им о своих намерениях, чтобы те завершили работу с соетами

В общем, "все не так однозначно".

Все, что написано в документации - касается всяческих Linux.

Нам, дельфистам, придется использовать специальные обрабочики: подключаем в uses модуль Windows и настраиваем:


uses
  ...Windows, ...;
...
begin
...
  Windows.SetConsoleCtrlHandler(CtrlCHandler, True);


Шаблон для CtrlCHandler:

function console_handler( dwCtrlType: DWORD ): BOOL;
var
  i: Integer;
begin
  if CTRL_C_EVENT = dwCtrlType then
  begin
    // Выполняем "мягкое" завершение
...
  end else 
    result := False;
end;

Как выполнить "мягкое" завершение - зависит от текущей задачи.
30 сен 14, 00:27    [16636514]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Мультипоточность с ZeroMQ.

ZeroMQ предоставляет простой способ создания многопоточных приложений.
При этом не потребуются ни мьютексы, ни блокировки, ни прочие дела для организации межпоточного взаимодействия, кроме сообщений, отправляемых через сокеты ZMQ.

Есть одно железное правило для успешного многопоточного кодинга - "не разделять между потоками изменяющиеся данные". Обычно, когда два потока в приложении пытаются между собой разделять данные, то это выглядит как будто два алкаша пытаются разделить пиво. И чем больше алкашей собирается за столом, тем хуже ситуация.
Большинство многопоточных приложений похоже сборище пьяных алоголиков, учинивших драку в баре.

Одна известная фирма, производящая то ли рамки, то ли форточки, даже опубликовала статью "Решение 11 вероятных проблем вашего многопоточного кода", в которой упомянуты всяческие ужастики вроде забытой синхронизации, незаблокированной модификации общих данных, и проч.

Для беспроблемного написания многопоточного кода с помощью ZeroMQ следует руководствоваться следующими правилами:

- Изолируйте данные внутри потока и никогда не разделяйте их между потоками. Единственным исключением являются контексты ZeroMQ, которые являются threadsafe.
- Держитесь подальше от классических механизмов параллелизма, таких как мьютексы, критические секции, семафоры и т.д. Это анти-паттерны в приложениях ZeroMQ.
- Создайте один ZeroMQ контекст в начале вашего процесса и передавайте его всем создаваемым потокам, с которыми будете взаимодействовать через InProc сокеты ZMQ.
- Для создания структуры вашего приложения используйте подключаемые (attach) потоки, и соедините их с их родительскими потоками через сокеты PAIR по протоколу InProc. Порядок работы: привязываем (zmq_bind()) сокет, а затем создаем дочерний поток, который коннектится к сокету родительского потока.
- Используйте отключаемые (detach) для имитации работы самостоятельных задач, с учетом своих условий. Подключите их по протоколу TCP. Позже вы можете переместить их в автономные процессы без значительного изменения кода.
- Все взаимодействия между потоками происходит как ZeroMQ сообщения, которые вы можете определить более или менее формально.
- Не разделять сокеты ZeroMQ между потоками. Сокеты ZeroMQ не потокобезопасны. Технически есть возможность передачи сокета от одного потока к другому, но это требует навыка. Единственное место, где разумно и оправдано разделение сокетов между потоками - это удаление сокетов в деструкторах ваших классовых оберток над ZMQ.


Например.

Предположим, в вашем приложении нужно более одного прокси, и вы хотите, чтобы каждый из них выполнятся в своем потоке. Очень легко допустить ошибку, создав фронтэнд и бэкэнд сокеты такого прокси в одном потоке, а затем передавая сокеты в прокси (который живет в другом потоке). Возможно, на первый раз все заработает, но глюки - неминуемы, причем в совершенно произвольные моменты времени.
Запоминаем: не используем (и не закрываем) сокеты нигде, кроме как потоках, их создавших.

Если следовать перечисленным правилам, можно легко создавать надежные многопоточные приложения. Логика приложения может размещаться потоках, процессах, или узлах сети, в соответствии с текущими планами по захвату мира.

ZeroMQ использует нативные потоки ОС (Windows), а не виртуальные "зеленые" потоки. Для отладки можно использовать стандартные средства, вроде ThreadChecker от Intel, чтобы увидеть, что ваше приложение делает. К недостаткам использования нативных потоков можно отнести, что API-интерфейсы "родной" многопоточности конкретной ОС не всегда портируются, и, к примеру, если вы используете огромное количество потоков (тысячи), то некоторые ОС просто не потянут такой нагрузки.
...
Перейдем к практике. Превратим наш старый сервер 16562041, в нечто более работоспособное.
Старый сервер был однопоточным. Если обслуживание каждого запроса было легким, то это нормально: один ZMQ поток может работать на полной скорости ядра процессора без ожидания и выполнять очень много работы.
Но серверы из реальной жизни на каждый запрос делают более сложную работу. Одноядерного сервера может оказаться недостаточно, когда по серверу жахнет сразу 10000 клиентов. Сервер из реальной жизни будет запускать несколько рабочих потоков. После чего он будет принимать запросы так быстро, как это возможно и раздавать их своим своих рабочим потокам.
Рабочие потоки будут "перемалывать цифирь" и отправлять результаты обратно.

Конечно, это можно сделать с помощью прокси-брокера и внешних рабочих процессов, но часто легче начать один процесс, который займет все шестнадцать ядер, чем шестнадцать процессов, каждый из которых жрет одно ядро. Кроме того, рабочие процессы будут занимать линии связи и поглощать сетевой трафик и просто тормозить.
30 сен 14, 03:25    [16636652]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Многопоточный сервис:
+ Сервис

program RR_Service;

{$APPTYPE CONSOLE}
// Многопоточный сервис (сервер)
// Находится в известной клиентам конечной точке, "слушает" tcp порт 5555,
// к которому привязан сокет ZMQ_ROUTER
// Получает целое число, возводит в квадрат и отправляет обратно результат

uses
  SysUtils, ZMQ;

procedure Worker_Routine(aContext: Pointer); // Процедура потока
var
  fSocketReceiver: Pointer;
  fSrcVal: Cardinal;
  fResultVal: UInt64;
  i: integer;
begin
// Сокет для общения с диспетчером
  fSocketReceiver := zmq_socket(aContext, ZMQ_REP);
  zmq_connect(fSocketReceiver, 'inproc://worker');
  i := 0;
  while True do
  begin
    zmq_recv(fSocketReceiver, fSrcVal, SizeOf(fSrcVal), 0); // Запрос
    fResultVal := UInt64(fSrcVal) * UInt64(fSrcVal); // "Полезная работа"
    zmq_send(fSocketReceiver, fResultVal, SizeOf(fResultVal), 0); // Ответ
    Writeln('In thread, Iter: ', i, ' src=', fSrcVal, ' result=', fResultVal);
    Inc(i);
  end;
  zmq_close(fSocketReceiver);
end;

var
  fContext: Pointer;
  fSocketClients: Pointer;
  fSocketWorkers: Pointer;
  i: integer;
  fThreadId : Cardinal;
  fRez : Integer;
begin

  fContext := zmq_ctx_new(); // Инициализация

  fSocketClients := zmq_socket(fContext, ZMQ_ROUTER);
  fRez := zmq_bind(fSocketClients, 'tcp://*:5555'); // Привязка к конечной точке

  fSocketWorkers := zmq_socket(fContext, ZMQ_DEALER);
  fRez := zmq_bind(fSocketWorkers, 'inproc://worker');// Привязка к конечной точке

  Writeln('Starting service...');
  for i := 0 to 2 do  // Запуск пула рабочих потоков
    BeginThread(nil, 0, @Worker_Routine, fContext, 0, fThreadId);

// Соединение рабочих потоков с потоками клиентов через очередь
  zmq_proxy(fSocketClients, fSocketWorkers, nil);

  zmq_close(fSocketClients);
  zmq_close(fSocketWorkers);
  zmq_ctx_destroy(fContext);
  Readln;

end.



Клиент - все тот же:
+ Клиент

program RR_Client;

{$APPTYPE CONSOLE}
// Клиент
// Коннектится сокетом REQ к tcp://localhost:5555
// Шлет целое число сервису (серверу), обратно получает квадрат числа

uses
  SysUtils, ZMQ;
const
  c_ReqCnt = 100;
var
  fContext: Pointer;
  fSocketRequester: Pointer;
  fSrcVal: Cardinal;
  fResultVal: UInt64;
  i: integer;
begin

  fContext := zmq_ctx_new(); // Инициализация
  fSocketRequester := zmq_socket(fContext, ZMQ_REQ);
  zmq_connect(fSocketRequester, 'tcp://localhost:5555');
    // Коннект к сервису
  Randomize();
  for i := 0 to Pred(c_ReqCnt) do begin
    fSrcVal := Cardinal(Random(-1));
      // Генерация случайного целого
    zmq_send(fSocketRequester, fSrcVal, SizeOf(fSrcVal), 0); // Запрос
    Write('Iter: ', i, ' src=', fSrcVal);
    zmq_recv(fSocketRequester, fResultVal, SizeOf(fResultVal), 0); // Ответ
    Writeln(' result=', fResultVal);
  end;
  zmq_close(fSocketRequester);
  zmq_ctx_destroy(fContext);
  Readln;

end.



Прежде всего интересен сервис ("сервер"). Основной поток приложения поочередно транслирует запросы от сокета fSocketClients (сокет типа ZMQ_ROUTER) к сокету fSocketWorkers (сокет типа ZMQ_DEALER) и обратно, используя знакомый метод zmq_proxy().
То есть, основной поток является брокером ("прокси") сообщений.
Для обработки данных запущено три потока. Процедура потока - Worker_Routine(). В процедуре потока создается сокет fSocketReceiver типа (ZMQ_REP). Сокет fSocketReceiver рабочего потока связан с сокетом fSocketWorkers основного потока по inproc протоколу.
Сокет рабочего потока:
// Сокет для общения с диспетчером
  fSocketReceiver := zmq_socket(aContext, ZMQ_REP);
  zmq_connect(fSocketReceiver, 'inproc://worker');


Сокет основного потока:
  fSocketWorkers := zmq_socket(fContext, ZMQ_DEALER);
  fRez := zmq_bind(fSocketWorkers, 'inproc://worker');// Привязка к конечной точке

Естественно, вместо inproc - протокола можно было бы использовать tcp, но пришлось бы занять порт и, кроме того, inproc гораздо быстрее.

Еще раз: работа сервиса.

Сервис запускает несколько рабочих потоков. Каждый рабочих поток создает сокет типа REP и затем в цикле обрабатывает запросы к сокету. Рабочие потоки представляют собой обычные однопоточные сервисы, разница - в транспорте (inproc вместо tcp), и в направлении операции "привязать - подключить" (bind-connect).

Сервер создает сокет типа ROUTER, чтобы общаться с клиентами и связывает его (сокет) с внешним интерфейсом по tcp.

Сервис создает сокет типа DEALER, чтобы общаться с рабочими процессами и связывает этот сокет с внутренним интерфейсом (с помощью inproc).

Сервер зарускает прокси, который соединяет оба сокета. Прокси получает входящие запросы от всех клиентов и распределяет эти запросы между рабочими потоками. При этом он правильно выполняет маршрутизацию ответов так, чтобы они вернулись туда, откуда приходили запросы.

То есть, цепочка запрос - ответ выглядит так: REQ-ROUTER-очередь-DEALER-REP.
30 сен 14, 06:03    [16636680]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Получилась вот такая структура:

К сообщению приложен файл. Размер - 7Kb
30 сен 14, 07:54    [16636743]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
В выводе сервиса видно, что номера итераций повторяются по три раза - по числу запущенных потоков.

К сообщению приложен файл. Размер - 103Kb
30 сен 14, 08:07    [16636759]     Ответить | Цитировать Сообщить модератору
Топик располагается на нескольких страницах: Ctrl  назад   1 [2] 3 4 5 6 7 8 9 10 11   вперед  Ctrl      все
Все форумы / Delphi Ответить