Добро пожаловать в форум, Guest  >>   Войти | Регистрация | Поиск | Правила | В избранное | Подписаться
Все форумы / Delphi Новый топик    Ответить
 ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
Часть 1 - здесь.

Совсем забыл. Картинка с другого сайта.

Всякая "серебряная пуля" требует соответствующего вступления с целью охмурения потенциальных адептов.

И так.

Как это началось
Мы взяли нормальные TCP сокеты, внедрили в них смесь радиоактивных изотопов, похищенные из секретного советского атомного исследовательского проекта, облучили их космическими лучами эры 1950х и передали их в руки обдолбанного дикой смесью наркотиков автора комиксов про супергероев с плохо скрываемой склонностью к фетишизму в отношении к бодибилдингу в обтягивающих трико.
Да, сокеты ZeroMQ - супергерои, спасающие сетевой мир.


Так и написано.
18 сен 14, 17:10    [16591748]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
Картинка, полностью все проясняющая:

К сообщению приложен файл. Размер - 9Kb
18 сен 14, 17:11    [16591756]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
И так, в первой части были рассмотрены схемы "Запрос - Ответ" (Request-Replay):

К сообщению приложен файл. Размер - 4Kb
18 сен 14, 17:14    [16591775]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
А так же схема "Издатель - Подписчик" (Publisher - Subscriber).

К сообщению приложен файл. Размер - 8Kb
18 сен 14, 17:15    [16591781]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
Теперь попробуем разобраться с более удивительными вещами.
+ Примеры

Примеры.



Схема "Параллельный трубопровод" (Parallel Pipeline).

К сообщению приложен файл. Размер - 10Kb
18 сен 14, 17:20    [16591809]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
В общем, задача для суперкомпьютеров. :)

Рассматриваем последнюю картинку.

Процесс "Ventilator" - ставит задачи, которые будут решаться в параллельно выполняемых процессах "Worker".
Процесс "Worker" - рабочий процесс, выполняющий поставленную задачу.
Процесс "Synk" - сборщик результатов от процессов "Worker".

Считаем, что рабочие процессы типа "Worker" работают зверски быстро. Наверное, используя квантовые графические чипы с сигнальными процессорами, объединенными в нейронную сеть.

Практические задачи, которые в первую очередь приходят в голову:
- проверка натуральных чисел "на простоту";
- преобразование изображений к нужному формату;
- генерация биткоинов;
- ...

В нашем примере каждый процесс Worker просто будет "спать" в течении указанного ему времени. Картинка с другого сайта..
Причем, спать они начнут - по команде. После того как все построятся запустятся.

Кто здесь "сервер", кто "клиент"? Имхо, те, чьи сетевые адреса более-менее фиксированы. В данном случае - это процессы "Ventilator" и "Synk". Хотя задачи выполнять будут все же процессы "Work"... клиенты... Хм.

И так.
"Ventilator" сообщает системе ZeroMQ, что к нему можно цепляться по tcp к порту 5557, через который он будет раздавать задания. К этому порту будут цепляться процессы "Worker", которые мугут располагаться где угодно в сети. Всем, кто прицепился, будет отправлено задание, каждому - свое.
"Synk" сообщает системе, что он готов принимать сообщения по tcp на порт 5558. В этот порт процессы "Worker" будут посылать сообщения о завершении очередной задачи. Туда же будет посылать сигнал "Ventilator" о начале процесса пакетной обработки.

Поехали, создаем "Ventilator".
23 сен 14, 04:32    [16607833]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
+
program PL_Ventilator;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ;
const
  c_task_count = 100;
var
  fContext: Pointer;
  fSocketSender: Pointer;
  fSocketSink: Pointer;

  fTotal_Time: Cardinal;
  fWork_Load: Cardinal;
  i: Integer;
  fStrMsg: string;

begin
  // Инициализация
  fContext := zmq_ctx_new();
  // Сокет, рассылающий задания рабочим процессам
  fSocketSender := zmq_socket(fContext, ZMQ_PUSH);
  zmq_bind(fSocketSender, 'tcp://*:5557');

  // Сокет, отсылающий сборщику результатов сигнал о начале пакетной обработки
  fSocketSink := zmq_socket(fContext, ZMQ_PUSH);
  zmq_connect(fSocketSink, 'tcp://localhost:5558');

  Writeln('Press Enter when the workers are ready:');
  Readln(fStrMsg);

  Writeln('Sending tasks to workers...');

 // Первое сообщение - '0', просто сигнад начинать прием и обработку заданий
  fStrMsg := '0';
  zmq_send(fSocketSink, PChar(fStrMsg)^, Length(fStrMsg) * SizeOf(Char), 0);

  Randomize;

 // Отправляем 100 заданий
  fTotal_Time := 0;
    // Суммарная оценка времени выполнения всех заданий в ms
  for i := 0 to Pred(c_task_count) do
  begin
    // Случайное значение fWork_Load от 1 to 100 ms
    fWork_Load := Random(100) + 1;
    fTotal_Time := fTotal_Time + fWork_Load;
      // Накопление значения суммарной общего времени
    fStrMsg := IntToStr(fWork_Load);
    zmq_send(fSocketSender, PChar(fStrMsg)^, Length(fStrMsg) + SizeOf(Char), 0);
  end;
  Writeln('Total expected time:', fTotal_Time, 'ms');
  sleep(1000); // Даем системе 0MQ время на доставку

  zmq_close(fSocketSink);
  zmq_close(fSocketSender);
  zmq_ctx_destroy(fContext);
  Readln(fStrMsg);

end.


В соответствии с заданием, создается два сокета. fSocketSender - через него будут отправляться задания. fSocketSink - на него сборщику результатов будет отправлен сигнал о начале пакетной обработки.

После запуска "Ventilator"-а следует запустить некоторое число процессов "Worker" и один процесс - сборщик "Synk".
Ну и нажать Enter в консоли процесса "Ventilator". :)

После этого сборщику ("Synk") отсылается сигнал о начале процесса пакетной обработки.
И в цикле, 100 раз на сокет fSocketSender отправляются сообщения - задания ("спать ... мсек").
Сообщение генерируется как случайное значение от 1 до 100 и пересылается в виде строки (просто так).

При этом, задания распределяются циклически, (round-robin), равномерно нагружая процессы "Worker".
Таким образом, процессов "Worker" должно быть не менее одного. Если процессов "Worker" доступных нет, рассылка блокируется (процесс "Ventilator" переходит в синхронный режим).
23 сен 14, 04:43    [16607834]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
Процесс "Worker".
+
program PL_Worker;

{$APPTYPE CONSOLE}

uses
  SysUtils, ZMQ;

var
  fContext: Pointer;
  fSocketReceiver: Pointer;
  fSocketSender: Pointer;
  fStrMsg: string;
  fLen: Integer;
  fMsg: zmq_msg_t;
  fDummy : string;
begin
  fContext := zmq_ctx_new();

 // Сокет для приема сообщений
  fSocketReceiver := zmq_socket(fContext, ZMQ_PULL);
  zmq_connect(fSocketReceiver, 'tcp://localhost:5557');

// Сокет для отправки сообщений
  fSocketSender := zmq_socket(fContext, ZMQ_PUSH);
  zmq_connect(fSocketSender, 'tcp://localhost:5558');

// Бесконечный цикл выполнения заданий
  while True do begin
    zmq_msg_init(fMsg);
    fLen := zmq_msg_recv(fMsg, fSocketReceiver, 0); // Получение задания

    SetLength(fStrMsg, fLen div SizeOf(Char));   // Перевод буфера сообщения в строку
    Move(zmq_msg_data(fMsg)^, PChar(fStrMsg)^, fLen);

    Writeln(fStrMsg); // Отображение в консоли процесса "работы"

    Sleep(StrToInt(fStrMsg)); // "Полезная" работа...

    zmq_send(fSocketSender, PChar(fDummy)^, 0, 0); // Отправка сигнала сборщику результата
  end;
  zmq_close(fSocketReceiver);
  zmq_close(fSocketSender);
  zmq_ctx_destroy(fContext);
  Readln(fDummy);

end.


Процесс "Worker" подключается к двум известным tcp адресам - к процессу "Ventilator":
   fSocketReceiver := zmq_socket(fContext, ZMQ_PULL);
  zmq_connect(fSocketReceiver, 'tcp://localhost:5557');

... и к процессу "Synk":
  fSocketSender := zmq_socket(fContext, ZMQ_PUSH);
  zmq_connect(fSocketSender, 'tcp://localhost:5558');

Затем в бесконечном цикле получает сообщения от процесса "Ventilator", копирует их в строку, выводит в консоль и выполняет задание ("спит"), по звершению которого отсылает сигнал сборщику результатов:
    zmq_msg_init(fMsg);
    fLen := zmq_msg_recv(fMsg, fSocketReceiver, 0); // Получение задания

    SetLength(fStrMsg, fLen div SizeOf(Char));   // Перевод буфера сообщения в строку
    Move(zmq_msg_data(fMsg)^, PChar(fStrMsg)^, fLen);

    Writeln(fStrMsg); // Отображение в консоли процесса "работы"

    Sleep(StrToInt(fStrMsg)); // "Полезная" работа...

    zmq_send(fSocketSender, PChar(fDummy)^, 0, 0); // Отправка сигнала сборщику результата


Сборщику результатов ничего полезного не отправляют, просто сигнал как факт.
23 сен 14, 04:51    [16607836]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
Последнее: код сборщике сообщений "Synk".
+
program PL_Sink;

{$APPTYPE CONSOLE}

uses
  SysUtils, Windows, ZMQ;

const
  c_task_count = 100;

var
  fContext: Pointer;
  fSocketReceiver: Pointer;
  fStart: Cardinal;
  fTaskCount: Integer;
  fDummy: string;
begin
 // Инициализация
  fContext := zmq_ctx_new();

  // Сокет для приема сигналов
  fSocketReceiver := zmq_socket(fContext, ZMQ_PULL);
  zmq_bind(fSocketReceiver, 'tcp://*:5558');

  // Ожидание первого сигнала о старте пакета задач
  zmq_recv(fSocketReceiver, PChar(nil)^, 0, 0);

  // Фиксация времени начала выполнения пакета
  fStart := GetTickCount();
  // Ждем подтверждения выполнения от 100 рабочих процессов
  for fTaskCount := 0 to Pred(c_task_count) do begin
    zmq_recv(fSocketReceiver, PChar(nil)^, 0, 0);

    if (fTaskCount mod 10 = 0) then // "Статус - бар" :)
      Write(':')
    else
      Write('.')
  end;
// Вычисление и показ времени выполнения пакета задач
  Writeln('Total elapsed time: ', GetTickCount() - fStart, 'ms');

  zmq_close(fSocketReceiver);
  zmq_ctx_destroy(fContext);
  Readln(fDummy);

end.

Тоже все просто. Сборщик слушает сокет
  fSocketReceiver := zmq_socket(fContext, ZMQ_PULL);
  zmq_bind(fSocketReceiver, 'tcp://*:5558');

Первое сообщение он получает от процесса "Ventilator", чтобы зафиксировать время начала пакетной обработки.
Далее сообщения поступают от исполнителей "Worker".
Содержание сообщений нас не интересует, поэтому длина ожидаемого сообщения указана равной нулю.

После приема 100 сообщений выводится общее время выполнения.
23 сен 14, 04:56    [16607839]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
Проверяем.
Запускаем все три приложения, в консоли приложения "Ventilator" жмем Enter, наблюдаем процесс исполнения 100 задач.
Видим, что расчетное время выполнения примерно равно реальному:

К сообщению приложен файл. Размер - 60Kb
23 сен 14, 05:03    [16607841]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
Если же запустить не один, а три экземпляра приложения "Worker", то станет видно, что общее время выполнения задания из 100 задач уменьшилась чуть ли не втрое:

К сообщению приложен файл. Размер - 57Kb
23 сен 14, 05:08    [16607843]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
Для 5 запущенных процессов "Worker" время уменьшится ещё больше.
23 сен 14, 05:11    [16607844]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest


К сообщению приложен файл. Размер - 46Kb
23 сен 14, 05:14    [16607848]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
Таким образом, библиотека ZeroMQ предоставляет простое средство в том числе и для создания системы распределенных вычислений.
Вы создаете систему, которую обслуживают, изначально одна вычислительная установки "Worker".
Если не хватает мощности - вы просто подключаете еще две, прямо без прерывания рабочего процесса, и обслуживает вас не одна, а уже три установки.
23 сен 14, 05:24    [16607851]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
В общем случае, установки могут находиться где угодно. Главное - чтобы они могли подключиться к процессам "Ventilator" и "Synk" по tcp. То есть, получаем то, о чем любят говорить "энерпрайзники": масштабирование в ширину. Когда мощность системы увеличивается простым наращиванием (конечно, до определенного уровня) параллельных элементов.
23 сен 14, 05:29    [16607852]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
По поводу процедуры синхронизации.

Процессы "Worker" коннектятся с помощью метода zmq_connect(), который выполняется не мгновенно.
Поэтому, если "Ventilator" начнет раздавать задачи сразу, то первые из подключившихся получат "под завязку", а последние так и не загрузятся. Это связано в том числе и с тем, что сообщения встают в очередь как на стороне "Ventilator", так и на стороне "Worker".

PUSH - сокет процесса "Ventilator" равномерно распределяет задания по процессам "Worker" (если последние успели проконнектиться к "Ventilator"). Это называется "балансировка нагрузки".

PULL - сокет сборщика (процесса "Synk") собирает результаты от процессов "Worker" равномерно. Процесс называется "справедливой очередью" (fair-queuing).


Соединение по схеме "Трубопровод" (Pipeline) так же страдает синдромом "запаздывания подключения". Это приводит к тому, что PUSH - сокеты не могут правильно балансировать нагрузку. Если используются сокеты PUSH и PULL и какой-то из рабочих процессов (вроде рассмотренного выше "Worker") получает больше сообщений, чем другие, то это потому, что PULL-сокет коннектится быстрее, чем другие сокеты и выгребает все сообщения еще до того, как законнектятся все остальные.

В общем, разбираться надо. Тем более ,что в документации есть целый раздел, посвященный управлению балансировкой нагрузки.
23 сен 14, 05:54    [16607857]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
По поводу создаваемых объектов ZeroMQ.

Использование ZeroMQ всегда начинается с создания контекста, с помощью которого создаются сокеты.
Контекст создается вызовом метода zmq_ctx_new (). В приложение нужно создать и использовать ровно один контекст для всего процесса.
Технически, контекст является контейнером для всех сокетов в одном процессе, и выступает в качестве транспорта для InProc - протокола сокетов (inproc протокол является самым быстрым для связи между сокетами в одном процессе).

Контекст является thread-safe переменной, то есть, может использоваться разными потоками процесса безо всякой синхронизации.

И так, обычно при старте приложения один раз создаем (zmq_ctx_new ()) контекст ZeroMQ, и один раз освобождаем при завершении приложения (zmq_ctx_destroy ())

Сокеты. Метод zmq_socket(2) создает новый сокет в заданном контексте. Сокеты ZeroMQ не thread-safe, поэтому должны использоваться только в тех потоках, в которых они были созданы.

~~~~~~~~~~~~~~~~

По поводу подчистки хвостов.

Кроме очевидной возможной утечки памяти, ZeroMQ довольно привередлива к процессу завершения. Если, например, остались незакрытыми сокеты, то метод zmq_ctx_destroy () будет висеть вечно. И даже если вы закроете все сокеты, то zmq_ctx_destroy () по умолчанию будет ждать вечно, если есть ожидающие соединений или идет процесс отсылки, и вы не сбросили сокеты перед их закрытием.
23 сен 14, 06:22    [16607865]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 2.  [new]
чччД
Guest
В документации рекумендуется:

- По возможности использовать методы zmq_send() и zmq_recv(). Этот позволит не заморачиваться со структурой zmq_msg_t.

- Если вы все же используете метода zmq_msg_recv(), не забывайте освобождайть сообщение вызовом zmq_msg_close().

- Если вы открываете и закрываете много сокетов - это, скорее всего говорит от неправильно спроектированном приложении. В некоторых случаях хэндлы сокетов не закрываются до самого освобождения контекста.

- При завершении приложения всегда освобождайте контекст вызовом zmq_ctx_destroy().


Если приложение многопоточное, все сложнее.
1. Не пытаемся использовать одни и теже сокеты в разных потоках. Просто не пытаемся.
2. Каждый сокет завершаем специальным запросом. Правильно - установить для сокета малое значение таймаута (опция сокета ZMQ_LINGER) (1 секунду), а затем закрыть сокет.

3. Освобождение контекста. В случае выполнения любой блокирующей операции отправки или получения сообщений в подключенных потоках(которые используют тот же контекст) операция завершится с ошибкой. Ловим эту ошибку, а затем устанавливаем таймаут, потом закрываем сокеты в потоках, потом завершаемся. Не освобождаем один и тот же контекст дважды. Метод zmq_ctx_destroy() в главном потоке будет заблокирован до тех пор, пока все сокеты контекста не будут аккуратно закрыты.
...
~~~~~~~~~~~~~~~~~~~~

Всё, типа. Можно покорять сетевой мир.
23 сен 14, 06:45    [16607870]     Ответить | Цитировать Сообщить модератору
Все форумы / Delphi Ответить