Добро пожаловать в форум, Guest  >>   Войти | Регистрация | Поиск | Правила | В избранное | Подписаться
Все форумы / Delphi Новый топик    Ответить
Топик располагается на нескольких страницах: Ctrl  назад   1 2 3 4 5 [6] 7 8 9 10 11   вперед  Ctrl      все
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
defecator
Member

Откуда:
Сообщений: 39122
В Windows 8.1 "искаропки" интерфейса DirectPlay не оказалось.
А в Windows 7 "искаропки" он ещё есть
25 мар 16, 13:50    [18977739]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
Feg16
Member [заблокирован]

Откуда: دولة اسرا
Сообщений: 5414
Блог
defecator
А в Windows 7 "искаропки" он ещё есть
Warning: Microsoft DirectPlay has been deprecated. 
Этого достаточно чтобы сморщив нос закрыть страницу :(
25 мар 16, 13:59    [18977792]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
Feg16
Member [заблокирован]

Откуда: دولة اسرا
Сообщений: 5414
Блог
А то будет потом как с RedAlert - поставьте тот компонент, настройте протокол IPX
25 мар 16, 14:00    [18977800]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
defecator
Member

Откуда:
Сообщений: 39122
Feg16
defecator
А в Windows 7 "искаропки" он ещё есть
Warning: Microsoft DirectPlay has been deprecated. 
Этого достаточно чтобы сморщив нос закрыть страницу :(

какие тонкие нюхательные ощущения, я поражаюсь.
А с древним кодом нет, не работал никогда ? Тошнило тебя ?
25 мар 16, 14:01    [18977804]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
Feg16
Member [заблокирован]

Откуда: دولة اسرا
Сообщений: 5414
Блог
defecator
А с древним кодом нет, не работал никогда ? Тошнило тебя ?
Одно дело пилить на дядю говнокод на практике, а второе заменять один движок на другой. Смысл мне ставить раком проект, если он у пользователей попросту не станет работать?
25 мар 16, 16:28    [18978797]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
ZeroMQ
Особенности High-level API CZMQ:

Автоматизация обслуживания сокетов. К примеру, при закрытии контекста все его сокеты обработка будут также закрываться. При этом в некоторых случаях для сокетов можно назначить таймайт.
...


Интересно:

//  Create an attached thread. An attached thread gets a ctx and a PAIR
//  pipe back to its parent. It must monitor its pipe, and exit if the
//  pipe becomes unreadable. Do not destroy the ctx, the thread does this
//  automatically when it ends.
type 
  p_zthread_attached_fn = ^zthread_attached_fn;
  zthread_attached_fn = procedure(args: Pointer; ctx: p_zctx_t; pipe: Pointer); cdecl;
...
interface

  function zthread_fork(ctx: p_zctx_t; thread_fn: p_zthread_attached_fn; args: Pointer):
    Pointer; cdecl; external cZMQ_DllName;


zthread_fork создает и запускает "прикрепленную нить", фактически запуская cdecl - функцию типа zthread_attached_fn и автоматически создавая пару сокетов для общения с нитью, как и сказано в описании выше.

Так вот, контекст ZMQ (параметр ctx: p_zctx_t) не просто передается, а создается его копия в блоке данных нити.
В рамках нового контекста в нити и будет создаваться новый пул сокетов и связанных с ними коннектов, очередей и т.п.
Т.е., по завершении нити произойдет корректная финализация копии контекста и связанных с ним сокетов и проч.
31 мар 16, 14:22    [19000503]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
Dima T
Member

Откуда:
Сообщений: 13193
чччД, с аутентификацией не разбирался?

В тестах есть пример, но слишком простой.

С сервером понятно, непонятно как узнать клиенту что у него "Invalid username or password"?
На клиенте libzmq.dll получает "400", рвет соединение, больше его не пытается установить, но наружу никак это не сообщает. Снаружи мой код работает как будто все хорошо. zmq_send() отправляет, zmq_recv() висит в ожидании приема.

Нагуглил это, вроде тот же вопрос, но мало что там понял, у меня с английским не очень хорошо.
21 окт 16, 17:37    [19810451]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
Dima T
Member

Откуда:
Сообщений: 13193
Может кому пригодиться. Все что нарыл - это включить мониторинг (zmq_socket_monitor ()) и анализировать что будет после ZMQ_EVENT_DISCONNECTED, если после попытки установить соединение прекратятся, то считаем что сервер прислал команду отключиться.
24 окт 16, 16:17    [19816581]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Dima T,

выстраивая свой протокол, я сделал так, что в момент handshake клиент передает серверу свои текстовые идентификационные данные (имя компа, юзернэйм windows и т.п.) в отдельном фрейме. А после handshake, обменявшись открытыми ключами для шифрования (алг. Диффи-Хеллмана), в целях экономии трафика использую собственный открытый Д-Х ключ заодно и как 8-байтный UID для identity коннекта.
Сервер кэширует UID'ы и хранит их в течении заданного времени (5 сек без активности -> "свободен!).
24 окт 16, 16:46    [19816728]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Т.е., аутентификацией на сервере занимаюсь я сам.
Что удобно и позволяет выстраивать различные, удобные для моей левой пятки схемы.
24 окт 16, 16:49    [19816747]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Схема примерно вот такая.

Клиент получает от сервера сообщения:
- хартбит - посылается периодически, чтобы клиент знал: сервер жив;
- запрос выполнен - успешный ответ сервера на любой запрос клиента (кроме запроса "гуд бай, сервер!").
- запрос об идентификации - ответ сервера на любой запрос, когда сервер не знает о данном identity ничего, клиент должен представиться;

Сервер получает запросы от клиента:
- хартбит - посылается периодически, чтобы сервер знал: клиент жив.
- запрос - посылается клиентом по мере надобности.
- запрос с блоком идентификации (высылается на запрос сервера).
24 окт 16, 17:12    [19816857]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
kealon(Ruslan)
Member

Откуда: Нижневартовск
Сообщений: 4070
чччД,

я вот одного не понял, если что бы получить сообщение нужно сперва выделить под него память где тогда асинхронность?
25 окт 16, 12:55    [19819465]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
kealon(Ruslan)
чччД,

я вот одного не понял, если что бы получить сообщение нужно сперва выделить под него память где тогда асинхронность?

Нет, конечно.

Не обязательно. Используй zmq_msg_recv():

длина_сообщения := zmq_msg_recv(сокет, сообщение, флаги);

Если длина_сообщения >= 0, значит, все ОК.
А данные после приема можно получить вот так: zmq_msg_data(fMessage).

А асинхронность - в поллинге: http://delphi-and-zeromq.blogspot.ru/2014/11/05-zeromq-zmqpoll.html
25 окт 16, 13:35    [19819677]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
Dima T
Member

Откуда:
Сообщений: 13193
чччД
Т.е., аутентификацией на сервере занимаюсь я сам.
Что удобно и позволяет выстраивать различные, удобные для моей левой пятки схемы.

Тоже склоняюсь к самодельной аутентификации.
Свой протокол это хорошо, возможно даже лучше для безопасности чем встроенный, т.к. можно всунуть перехват zmq_setsockopt() установки логина/пароля и готово. Или трафик послушать. В PLAIN режиме оно открытым текстом идет.

Единственный плюс встроенной что сервер рвет соединение если зацепились вовсе не не через ZMQ, если встроеная отключена - не рвет, просто не отвечает.
Еще встроенная показывает IP откуда коннект, без нее можно через монитор попробовать сопоставить. Я не определился, нужен ли вообще мне этот IP или без него проживу.

Хочу задействовать новые сокеты ZMQ_CLIENT, ZMQ_SERVER. Управление идентификацией гибче чем с DILLER-ROUTER. И побыстрее должно работать теоретически, т.к. маршрутизация по ID сессии (int32), а не по имени клиента (строка произвольной длины)
Смущает что они подключаются по
#define ZMQ_BUILD_DRAFT_API

т.е. вроде как еще в варианте черновика. Есть опыт использования? Не глючат?
25 окт 16, 14:40    [19820075]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
чччД
Guest
Dima T
...
Хочу задействовать новые сокеты ZMQ_CLIENT, ZMQ_SERVER...
[/src]
т.е. вроде как еще в варианте черновика. Есть опыт использования? Не глючат?

Не пользовался пока, чуть-чуть почитал только.
25 окт 16, 15:12    [19820207]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
kealon(Ruslan)
Member

Откуда: Нижневартовск
Сообщений: 4070
чччД,

нда, не густо - нулевая обёртка над апи практически
25 окт 16, 21:45    [19821745]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
kealon(Ruslan)
Member

Откуда: Нижневартовск
Сообщений: 4070
чччД,
всё больше склоняюсь, что вот такой подход более практичен и в плане результата и в плане лёгкости разработки.
ещё бы для асинхронных операций чтения память выделять когда она требуется, а не заранее - тогда наверное можно и с классическим пулингом посостязаться по затратам ресурсов
25 окт 16, 22:11    [19821801]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
Товарищ младший сержант
Member [заблокирован]

Откуда:
Сообщений: 5122
kealon(Ruslan)
чччД,

нда, не густо - нулевая обёртка над апи практически

Где нулевая обертка над апи?
25 окт 16, 23:59    [19822012]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
kealon(Ruslan)
Member

Откуда: Нижневартовск
Сообщений: 4070
Товарищ младший сержант
kealon(Ruslan)
чччД,

нда, не густо - нулевая обёртка над апи практически

Где нулевая обертка над апи?


чччД
А асинхронность - в поллинге: http://delphi-and-zeromq.blogspot.ru/2014/11/05-zeromq-zmqpoll.html

// Процесс обработки для обоих сокетов
  while true do begin
 
    zmq_poll(@fItems[0], Length(fItems), -1);
    if ((fItems[0].revents and ZMQ_POLLIN) <> 0) then begin
      fSize := zmq_recv(fSocketReceiver, @fMsgBuff, SizeOf(fMsgBuff), 0);
      if fSize >= 0 then
        // Выполнение задания
      else
        break;
      if ((fItems[1].revents and ZMQ_POLLIN) <> 0) then begin
        fSize := zmq_recv(fSocketSubscriber, @fMsgBuff, SizeOf(fMsgBuff), 0);
        if fSize >= 0 then
    // Обработка данных о погоде
        else
          break;
      end;
    end;
  end;
26 окт 16, 07:02    [19822161]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
Dima T
Member

Откуда:
Сообщений: 13193
kealon(Ruslan)
чччД,
всё больше склоняюсь, что вот такой подход более практичен и в плане результата и в плане лёгкости разработки.

ХЗ что проще. Ниже код сервера с многопоточной обработкой, букав поменьше и нет необходимости синхронизации доступа к очереди на обработку.
kealon(Ruslan)
ещё бы для асинхронных операций чтения память выделять когда она требуется, а не заранее - тогда наверное можно и с классическим пулингом посостязаться по затратам ресурсов

zmq_msg_recv() выделяет ровно столько сколько надо по приходу сообщения.

+ Многопоточный эхо-сервер
Сообщения принимаются и раздаются трем потокам на обработку.
#define ZMQ_BUILD_DRAFT_API
#include "zmq.h"

#include <assert.h>
#include <stdlib.h>
#include <thread>

void *ctx = NULL;
void *sock = NULL;

// поток обработки сообщений
void worker(int num) {
	printf("worker %d start\n", num);
	void* queue = zmq_socket(ctx, ZMQ_PULL);	assert(queue);
	int rc = zmq_connect(queue, "inproc://echo-queue"); assert(rc == 0);
	while(1) {
		// получение указателя из очереди
		zmq_msg_t p;
		rc = zmq_msg_init(&p); assert(rc == 0);
		rc = zmq_msg_recv(&p, queue, 0); assert(rc == sizeof(zmq_msg_t*));
		zmq_msg_t* msg = NULL;
		memcpy(&msg, zmq_msg_data(&p), sizeof(zmq_msg_t*));
		rc = zmq_msg_close(&p); assert(rc == 0);
		// обработка сообщения
		//printf("worker %d recv %d bytes from %X\n", num, zmq_msg_size(msg), zmq_msg_routing_id(msg));
		rc = zmq_msg_send(msg, sock, 0); assert(rc != -1); // отправка msg и освобождение памяти
		free(msg);
	}
	zmq_close(queue);
}

int main (void) {
	ctx = zmq_ctx_new (); assert (ctx);
	// сокет для сообщений от клиентов
	sock = zmq_socket(ctx, ZMQ_SERVER);	assert(sock);
	int rc = zmq_bind(sock, "tcp://*:12345"); assert(rc == 0); 
	// сокет для очереди обработки
	void* queue = zmq_socket(ctx, ZMQ_PUSH);	assert(queue);
	rc = zmq_bind(queue, "inproc://echo-queue"); assert(rc == 0);
	// Запуск потоков обработки
	std::thread w1(worker, 1), w2(worker, 2), w3(worker, 3);
	// прием сообщений от клиентов
	while (1) {
		zmq_msg_t* msg = (zmq_msg_t*)malloc(sizeof(zmq_msg_t)); assert(msg); // инициализация пустого сообщения
		rc = zmq_msg_init(msg); assert(rc == 0);// инициализация пустого сообщения
		rc = zmq_msg_recv(msg, sock, 0); assert(rc > 0); // ожидание приема, прием с выделением памяти
		// Постановка в очередь на обработку
		zmq_msg_t p;
		rc = zmq_msg_init_size(&p, sizeof(zmq_msg_t*)); assert(rc == 0);
		memcpy(zmq_msg_data(&p), &msg, sizeof(zmq_msg_t*));
		rc = zmq_msg_send(&p, queue, 0); assert(rc == sizeof(zmq_msg_t*));
	}
	zmq_close(sock);
	zmq_ctx_destroy(ctx);
	return 0;
}

Для переделки в полноценный сервер просто дописать поток обработки worker().

Провел небольшой тест на скорость. Клиент открывает 1000 соединений к серверу и в каждое шлет 100 запросов.
Соединение через 127.0.0.1. Проц i7 4 ядра. Win10
СоединенийВремя мсСкорость запрос/секПамять сервера Мб
10004 82820 70038 Мб
300023 57012 700122 Мб
в состоянии простоя (без соединений) память, используемая сервером, 5-6 Мб.
+ код клиента
#define COUNT 1000 // предел 1023, для 3000 запускал три client.exe
int main (void)
{
	printf("Connecting to ECHO server\n");
	void *ctx = zmq_ctx_new(); assert(ctx);

	void *sock[COUNT];
	int rc;
	for(int i = 0; i < COUNT; i++) {
		sock[i] = zmq_socket(ctx, ZMQ_CLIENT); assert(sock[i]);
		rc = zmq_connect(sock[i], "tcp://127.0.0.1:12345"); assert(rc == 0);
	}


	int t = GetTickCount();

	for (int i = 0; i < 100; i++) {
		for(int j = 0; j < COUNT; j++) {
			zmq_msg_t msg;
			rc = zmq_msg_init_size(&msg, 5); assert(rc == 0);
			memcpy(zmq_msg_data(&msg), "Hello", 5);
			rc = zmq_msg_send(&msg, sock[j], 0); assert(rc == 5);
		}
		for (int j = 0; j < COUNT; j++) {
			zmq_msg_t msg;
			rc = zmq_msg_init(&msg); assert(rc == 0);
			rc = zmq_msg_recv(&msg, sock[j], 0); assert(rc == 5);
		}
	}
	printf("Time: %d\n", GetTickCount() - t);
	zmq_close(sock);
	zmq_ctx_destroy(ctx);
	return 0;
}
26 окт 16, 09:10    [19822312]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
Dima T
Member

Откуда:
Сообщений: 13193
Перемудрил я в коде сервера с указателями и похоже утечка памяти есть.
Надо передавать содержимое zmq_msg_t, это как понимаю структура-заголовок (64 байта) где все ссылки на данные сообщения.
Переделал, немного побыстрее стало. Сервер при простое занимает 2-3 Мб памяти.
+ исходник
#define ZMQ_BUILD_DRAFT_API
#include "zmq.h"

#include <assert.h>
#include <stdlib.h>
#include <thread>

void *ctx = NULL;
void *sock = NULL;

// рабочий поток обработки сообщений
void worker(int num) {
	printf("worker %d start\n", num);
	void* queue = zmq_socket(ctx, ZMQ_PULL);	assert(queue);
	int rc = zmq_connect(queue, "inproc://echo-queue"); assert(rc == 0);
	while(1) {
		// получение указателя из очереди
		zmq_msg_t p;
		rc = zmq_msg_init(&p); assert(rc == 0);
		rc = zmq_msg_recv(&p, queue, 0); assert(rc == sizeof(zmq_msg_t));
		zmq_msg_t* msg = (zmq_msg_t*)zmq_msg_data(&p);
		// обработка сообщения
		//printf("worker %d recv %d bytes from %X\n", num, zmq_msg_size(msg), zmq_msg_routing_id(msg));
		rc = zmq_msg_send(msg, sock, 0); assert(rc != -1); // отправка msg и освобождение памяти
		rc = zmq_msg_close(&p); assert(rc == 0); // освобождение памяти сообщения с указателем
	}
	zmq_close(queue);
}

int main (void) {
	ctx = zmq_ctx_new (); assert (ctx);
	// сокет для сообщений от клиентов
	sock = zmq_socket(ctx, ZMQ_SERVER);	assert(sock);
	int rc = zmq_bind(sock, "tcp://*:12345"); assert(rc == 0); 
	// сокет для очереди обработки
	void* queue = zmq_socket(ctx, ZMQ_PUSH);	assert(queue);
	rc = zmq_bind(queue, "inproc://echo-queue"); assert(rc == 0);
	// Запуск потоков обработки
	std::thread w1(worker, 1), w2(worker, 2), w3(worker, 3);
	// прием сообщений от клиентов
	while (1) {
		zmq_msg_t msg;
		rc = zmq_msg_init(&msg); assert(rc == 0);// инициализация пустого сообщения
		rc = zmq_msg_recv(&msg, sock, 0); assert(rc > 0); // ожидание приема, прием с выделением памяти
		// Постановка в очередь на обработку
		zmq_msg_t p;
		rc = zmq_msg_init_size(&p, sizeof(zmq_msg_t)); assert(rc == 0);
		memcpy(zmq_msg_data(&p), &msg, sizeof(zmq_msg_t));
		rc = zmq_msg_send(&p, queue, 0); assert(rc == sizeof(zmq_msg_t));
	}
	zmq_close(sock);
	zmq_ctx_destroy(ctx);
	return 0;
}
26 окт 16, 10:08    [19822557]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
Dima T
Member

Откуда:
Сообщений: 13193
kealon(Ruslan)
я вот одного не понял, если что бы получить сообщение нужно сперва выделить под него память где тогда асинхронность?

ИМХУ ты тут теплое с мягким в кучу замешал.
Асинхронность это алгоритмический подход, несинхронно, т.е. не ждать выполнения каждого шага, т.е. дал команду сделать то-то до тогда-то и забыл. Получил ответ "то-то сделано", запустил следующее, истек таймаут "тогда-то наступило и ничего не сделано", повторил команду "сделать то-то" или еще какие действия. А когда выделилась память - пофиг. Мысли ширше или ширее )))
26 окт 16, 20:07    [19825613]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
kealon(Ruslan)
Member

Откуда: Нижневартовск
Сообщений: 4070
Dima T
kealon(Ruslan)
я вот одного не понял, если что бы получить сообщение нужно сперва выделить под него память где тогда асинхронность?

ИМХУ ты тут теплое с мягким в кучу замешал.
Асинхронность это алгоритмический подход, несинхронно, т.е. не ждать выполнения каждого шага, т.е. дал команду сделать то-то до тогда-то и забыл. Получил ответ "то-то сделано", запустил следующее, истек таймаут "тогда-то наступило и ничего не сделано", повторил команду "сделать то-то" или еще какие действия. А когда выделилась память - пофиг. Мысли ширше или ширее )))

ага, коревато, но ты же понял, что я хотел спросить :-)

Dima T
kealon(Ruslan)
чччД,
всё больше склоняюсь, что вот такой подход более практичен и в плане результата и в плане лёгкости разработки.

ХЗ что проще. Ниже код сервера с многопоточной обработкой, букав поменьше и нет необходимости синхронизации доступа к очереди на обработку.
kealon(Ruslan)
ещё бы для асинхронных операций чтения память выделять когда она требуется, а не заранее - тогда наверное можно и с классическим пулингом посостязаться по затратам ресурсов

zmq_msg_recv() выделяет ровно столько сколько надо по приходу сообщения.

пример зачётный, именно то, что хотел
к твоему примеру очень неплохо должно ложиться, то что описано в статье
PS: мда, что-то слишком много вокруг этих корутин последнее время крутится, случайности неслучайны...
26 окт 16, 22:28    [19825981]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
Dima T
Member

Откуда:
Сообщений: 13193
kealon(Ruslan)
PS: мда, что-то слишком много вокруг этих корутин последнее время крутится, случайности неслучайны...

ИМХУ это просто удобный синтаксис. Адаптированный к асинхронным алгоритмам.
Нынче повышение производительности железа де факто остановилось для синхронных алгоритмов, т.е. производительность одного ядра не меняется, а дополнительные ядра пользы не дают. Поэтому все направились на смену подходов, чтобы хоть как-то эти ядра хоть чем-то загрузить. Плюс сетевого взаимодействия стало больше.

А дальше возникает проблема удобства разработки и отладки, т.к. асинхронные алгоритмы в разы сложнее чем синхронные аналоги.
Например в C# много в эту сторону наработано: await/async, встроенный пул потоков, асинхронная модель на основе задач (TAP) и т.д. Все это создано в т.ч. чтобы убрать асинхронные сложности от глаз разработчика.
Там где подобного нет: один из вариантов - очереди сообщений, ZeroMQ тут очень даже в тему, особенно если требуется работа по сети.

+ PS Во втором варианте с копированием zmq_msg_t я тоже накосячил. Разработчики не советуют туда лезть напрямую.

Во всех описаниях функций zmq_msg_*() пишут
Never access zmq_msg_t members directly, instead always use the zmq_msg family of functions.

Поправил так
#define ZMQ_BUILD_DRAFT_API
#include "zmq.h"

#include <assert.h>
#include <stdlib.h>
#include <thread>

void *ctx = NULL;
void *sock = NULL;

// рабочий поток обработки сообщений
void worker(int num) {
	printf("worker %d start\n", num);
	void* queue = zmq_socket(ctx, ZMQ_PULL);	assert(queue);
	int rc = zmq_connect(queue, "inproc://echo-queue"); assert(rc == 0);
	while(1) {
		// получение указателя из очереди
		zmq_msg_t p;
		rc = zmq_msg_init(&p); assert(rc == 0);
		rc = zmq_msg_recv(&p, queue, 0); assert(rc == sizeof(zmq_msg_t));
		zmq_msg_t* msg = (zmq_msg_t*)zmq_msg_data(&p);
		// обработка сообщения
		//printf("worker %d recv %d bytes from %X\n", num, zmq_msg_size(msg), zmq_msg_routing_id(msg));
		rc = zmq_msg_send(msg, sock, 0); assert(rc != -1); // отправка msg и освобождение памяти
		rc = zmq_msg_close(&p); assert(rc == 0); // освобождение памяти сообщения с указателем
	}
	zmq_close(queue);
}

int main (void) {
	ctx = zmq_ctx_new (); assert (ctx);
	// сокет для сообщений от клиентов
	sock = zmq_socket(ctx, ZMQ_SERVER);	assert(sock);
	int rc = zmq_bind(sock, "tcp://*:12345"); assert(rc == 0); 
	// сокет для очереди обработки
	void* queue = zmq_socket(ctx, ZMQ_PUSH);	assert(queue);
	rc = zmq_bind(queue, "inproc://echo-queue"); assert(rc == 0);
	// Запуск потоков обработки
	std::thread w1(worker, 1), w2(worker, 2), w3(worker, 3);
	// прием сообщений от клиентов
	while (1) {
		// Сообщение для отправки в очередь
		zmq_msg_t p;
		rc = zmq_msg_init_size(&p, sizeof(zmq_msg_t)); assert(rc == 0);
		zmq_msg_t* msg = (zmq_msg_t*)zmq_msg_data(&p);

		// Ожидание сообщений от клиентов
		rc = zmq_msg_init(msg); assert(rc == 0);// инициализация пустого сообщения
		rc = zmq_msg_recv(msg, sock, 0); assert(rc > 0); // ожидание приема, прием с выделением памяти

		// Постановка в очередь на обработку
		rc = zmq_msg_send(&p, queue, 0); assert(rc == sizeof(zmq_msg_t));
	}
	zmq_close(sock);
	zmq_ctx_destroy(ctx);
	return 0;
}
27 окт 16, 08:04    [19826377]     Ответить | Цитировать Сообщить модератору
 Re: ZeroMQ - сокеты на стероидах, часть 3 (а для чего?).  [new]
Dima T
Member

Откуда:
Сообщений: 13193
Поизучал что реально происходит в inproc - пришел к выводу что не надо никаких указателей для inproc соединений, там и так указатели передаются.
Точнее если сообщение маленькое (5 байт например), то само сообщение, если большое (100 байт пробовал), то указатель, а содержимое сообщения остается в памяти там же где и было. Т.е. ZMQ сама отлично решает как быстрее доставить.

Итого: сообщение можно принимать с TCP соединения и сразу отправлять в inproc очередь на обработку без какого-либо шаманства.
+ Код сервера стал меньше, понятнее и быстрее на 5%
#define ZMQ_BUILD_DRAFT_API
#include "zmq.h"

#include <assert.h>
#include <stdlib.h>
#include <thread>

void *ctx = NULL;
void *sock = NULL;

// поток обработки сообщений
void worker(int num) {
	printf("worker %d start\n", num);
	void* queue = zmq_socket(ctx, ZMQ_PULL); assert(queue);
	int rc = zmq_connect(queue, "inproc://echo-queue"); assert(rc == 0);

	while (1) {
		// получение сообщения из очереди
		zmq_msg_t msg;
		rc = zmq_msg_init(&msg); assert(rc == 0);
		rc = zmq_msg_recv(&msg, queue, 0); assert(rc > 0);
		// обработка сообщения
		printf("worker %d recv %d bytes at %X from %X\n", num, zmq_msg_size(&msg), zmq_msg_data(&msg), zmq_msg_routing_id(&msg));
		rc = zmq_msg_send(&msg, sock, 0); assert(rc != -1); // отправка msg и освобождение памяти
	}
	zmq_close(queue);
}

int main (void) {
	ctx = zmq_ctx_new (); assert (ctx);
	// сокет для сообщений от клиентов
	sock = zmq_socket(ctx, ZMQ_SERVER); assert(sock);
	int rc = zmq_bind(sock, "tcp://*:12345"); assert(rc == 0); 
	// сокет для очереди обработки
	void* queue = zmq_socket(ctx, ZMQ_PUSH);	assert(queue);
	rc = zmq_bind(queue, "inproc://echo-queue"); assert(rc == 0);
	// запуск потоков обработки
	std::thread w1(worker, 1), w2(worker, 2), w3(worker, 3);
	// прием сообщений от клиентов
	while (1) {
		// Ожидание сообщений от клиентов
		zmq_msg_t msg;
		rc = zmq_msg_init(&msg); assert(rc == 0);// инициализация пустого сообщения
		rc = zmq_msg_recv(&msg, sock, 0); assert(rc > 0); // ожидание приема, прием с выделением памяти
		printf("recv %d bytes at %X from %X\n", zmq_msg_size(&msg), zmq_msg_data(&msg), zmq_msg_routing_id(&msg));
		// Постановка в очередь на обработку
		rc = zmq_msg_send(&msg, queue, 0); assert(rc > 0);
	}
	zmq_close(sock);
	zmq_ctx_destroy(ctx);
	return 0;
}
28 окт 16, 07:59    [19831560]     Ответить | Цитировать Сообщить модератору
Топик располагается на нескольких страницах: Ctrl  назад   1 2 3 4 5 [6] 7 8 9 10 11   вперед  Ctrl      все
Все форумы / Delphi Ответить