Добро пожаловать в форум, Guest  >>   Войти | Регистрация | Поиск | Правила | В избранное | Подписаться
Все форумы / Java Новый топик    Ответить
Топик располагается на нескольких страницах: [1] 2   вперед  Ctrl      все
 Механизм обработки очереди  [new]
qi_ip
Member

Откуда:
Сообщений: 591
Приветствую!

Подскажите, пожалуйста, какие есть механизмы/приложения/фреймворки, чтобы реализовать параллельный механизм обработки сообщений из очереди.

Собственно сама задача: есть несколько сокетов, по которым постоянно приходят сообщения. Все сообщения сохраняются в таблицу А в БД.

Вскоре, из этой таблицы А строки забираются отсортированные по дате записи в БД и передаются парсеру, который обрабатывает их, модифицирует и складывает модифицированные записи в другую таблицу Б.

	
	   |  	 | 	          |--------|	|--------|
------> | 4	 |	          |	       |	| 1 MOD  |
------> | 3	 | 1-2-3-4-5 |	       |	| 2 MOD  |
------> | 1	 |---------->|PARSER |====>  3 MOD  |
------> | 2	 | THREAD1  |	       |	| 4 MOD  |
------> | 5	 |	          |	       |	| 5 MOD  |
	   |___|	          |______  |	|_______|


Есть необходимость разделить THREAD1 на несколько потоков, чтобы быстрее забирать данные на обработку. Но не совсем понимаю, в какую сторону копать, чтобы не нарушить логику обработки данных на выходе из парсера. В частности, каждая запись обрабатывается парсером разное время, но отдаваться на выходе должна строго в отсортированном порядке по дате изначальной записи.

Спасибо!
3 ноя 19, 21:02    [22009112]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
fixxer
Member

Откуда:
Сообщений: 736
qi_ip, Нужно партиционировать входной поток. Это можно сделать по-разному, например поделить отрезок времени за который запрашиваются данные или взять остаток от деления таймстампа или идентификатора на количество партиций. Вопрос, если вы все равно результат пишете в таблицу, которую можно как угодно сортировать, зачем сохранять порядок при записи?
3 ноя 19, 21:21    [22009120]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
PetroNotC Sharp
Member

Откуда:
Сообщений: 2481
qi_ip,
1. Зачем промежуточно писать в бд?
2. Какая очередность при параллельной работе?
Шутите?
3 ноя 19, 21:48    [22009131]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
qi_ip
Member

Откуда:
Сообщений: 591
fixxer
qi_ip, Нужно партиционировать входной поток. Это можно сделать по-разному, например поделить отрезок времени за который запрашиваются данные или взять остаток от деления таймстампа или идентификатора на количество партиций. Вопрос, если вы все равно результат пишете в таблицу, которую можно как угодно сортировать, зачем сохранять порядок при записи?

Потому что результат с таблицы Б сразу же забирается и отправляется дальше. И получается, что если, например, запись под номером 1, все еще в обработке, а запись под номером 2 уже обработалась, то нарушается последовательность команд.

PetroNotC Sharp
qi_ip,
1. Зачем промежуточно писать в бд?

Изначально при получении сообщений есть минимальные проверки и отправка подтверждения получения сообщения. Плюс нужно сохранять данные на случай, если вдруг приложение упадет.

PetroNotC Sharp
qi_ip,
2. Какая очередность при параллельной работе?
Шутите?

Поэтому и создал топик, так как хотел узнать, возможность реализации этого момента )))) Если бы шутил...хех
3 ноя 19, 22:05    [22009137]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
fixxer
Member

Откуда:
Сообщений: 736
qi_ip,

Возможность есть. Так называемый механизм watermark. Это значение максимального таймстампа на всех партициях. Партиции безопасно писать записи старше текущего значения watermark.
3 ноя 19, 22:21    [22009141]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
PetroNotC Sharp
Member

Откуда:
Сообщений: 2481
qi_ip
Изначально при получении сообщений есть минимальные проверки и отправка подтверждения получения сообщения. Плюс нужно сохранять данные на случай, если вдруг приложение упадет.
это же логи. Или архивация. Получил фио из сокета, отдай копию в фоновый поток и пусть он в фоне сохраняет. А основной от сокета сразу это ФИО отдал на обработку функции
function бизнесЛогика(фио)
Так?

qi_ip
PetroNotC Sharp
2. Какая очередность при параллельной работе?
Шутите?

Поэтому и создал топик, так как хотел узнать, возможность реализации этого момента )))) Если бы шутил...хех

Вас еще раз спросить?
Какой дурак ставит задачу параллельной работы требуя очередности?
4 ноя 19, 00:54    [22009173]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
Sergunka
Member

Откуда: Bay Area, CA
Сообщений: 2001
qi_ip
Приветствую!

Подскажите, пожалуйста, какие есть механизмы/приложения/фреймворки, чтобы реализовать параллельный механизм обработки сообщений из очереди.

Собственно сама задача: есть несколько сокетов, по которым постоянно приходят сообщения. Все сообщения сохраняются в таблицу А в БД.

Вскоре, из этой таблицы А строки забираются отсортированные по дате записи в БД и передаются парсеру, который обрабатывает их, модифицирует и складывает модифицированные записи в другую таблицу Б.

	
	   |  	 | 	          |--------|	|--------|
------> | 4	 |	          |	       |	| 1 MOD  |
------> | 3	 | 1-2-3-4-5 |	       |	| 2 MOD  |
------> | 1	 |---------->|PARSER |====>  3 MOD  |
------> | 2	 | THREAD1  |	       |	| 4 MOD  |
------> | 5	 |	          |	       |	| 5 MOD  |
	   |___|	          |______  |	|_______|


Есть необходимость разделить THREAD1 на несколько потоков, чтобы быстрее забирать данные на обработку. Но не совсем понимаю, в какую сторону копать, чтобы не нарушить логику обработки данных на выходе из парсера. В частности, каждая запись обрабатывается парсером разное время, но отдаваться на выходе должна строго в отсортированном порядке по дате изначальной записи.

Спасибо!



Посмотрите на очередь с приоритетом - приоритет понятно по дате.
https://www.rabbitmq.com/priority.html

Но это все одно не даст Вам требуемого результата так как в вашем случае в любой момент может прийти данные за прошлую неделю

Вы можете буферизовать в очереди несколько тысяч записей и потом уже их раздать на обработку как вариант, но на мой взгляд сортировать данные проще когда уже они попали в БД если у вас там не стоит в постановки к примеру аномали детекшин перед тем как в базу писать ну и тд.

Поговорите с архитектором может он чего не договаривает
4 ноя 19, 02:44    [22009180]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
qi_ip
Member

Откуда:
Сообщений: 591
PetroNotC Sharp
это же логи. Или архивация. Получил фио из сокета, отдай копию в фоновый поток и пусть он в фоне сохраняет. А основной от сокета сразу это ФИО отдал на обработку функции
function бизнесЛогика(фио)
Так?

Буду посмотреть такой вариант

Какой дурак ставит задачу параллельной работы требуя очередности?

Никто не ставит такую задачу ))) Основная цель - ускорить обработку данных парсером, так как на входе накапливаются данные. Поэтому это и был вопрос :)

Sergunka
Но это все одно не даст Вам требуемого результата так как в вашем случае в любой момент может прийти данные за прошлую неделю

Сотрировка идет по времени поступления запроса, то есть, если даже данные содержат информацию прошлой недели, все равно будет сортироваться по времени поступления запроса.
4 ноя 19, 08:45    [22009207]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
yvprod
Member

Откуда:
Сообщений: 16
qi_ip,

Расскажите подробнее, что за данные
4 ноя 19, 09:05    [22009212]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
PetroNotC Sharp
Member

Откуда:
Сообщений: 2481
qi_ip
Никто не ставит такую задачу ))) Основная цель - ускорить обработку данных парсером, так как на входе накапливаются данные. Поэтому это и был вопрос :)

Значит сортировку исключаем из вопроса. Нечего сортировать.
Парсер потокобезопасный?
Если да, то в поток его.
Если нет, то несколько экземпляров парсера.
Это узкое место по вашим словам.
Если ДВЕРЬ узкая, то либо строите рядом три двери, либо три дома (процесса) с такой узкой дверью.
fio = getSoket()
setLogDbAsync(fio
parserA.businessLayer(fio
4 ноя 19, 09:17    [22009217]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
fixxer
Member

Откуда:
Сообщений: 736
Потом, как всегда, окажется, что узким местом была база. И если предварительно не писать или писать асинхронно, то обработчик вытягивает.
4 ноя 19, 11:34    [22009266]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
qi_ip
Member

Откуда:
Сообщений: 591
yvprod
qi_ip,

Расскажите подробнее, что за данные

Данные - обычные текстовые сообщения-команды, но их большой поток (минимум 30-50 команд в секунду) и разных размеров (от 100 байт до 65 кб).

PetroNotC Sharp
Значит сортировку исключаем из вопроса. Нечего сортировать.

Исключить не получиться. Вот смотрите: допустим есть клиент, который постоянно посылает команды. Эти команды обрабатываются парсером, модифицируются и передаются/возвращаются другим клиентам. И если, например, пришли команды:

1. Включить
2. Выгрузить данные
3. Остановить выгрузку
4. Выключить

, то и на другой клиент после парсера они должны попасть в том же порядке.
В случае, если команды 2 и 3 обрабатываются в среднем 1 секунду, а 1 и 4 500 мс может получиться так, что сначала придут команды 1 и 4, а потом только 2 и 3.

Причем, комнда от клиента 1 пришла одна, а после обработки парсером она посылается сразу клиента 2-10, в зависимости, от того для скольких клиентов она предназначена.

fixxer
Потом, как всегда, окажется, что узким местом была база. И если предварительно не писать или писать асинхронно, то обработчик вытягивает.

Сейчас добавляю в код тайминги, чтобы понять, где идет затык, на обработке в Java или же на insert/update в БД.
4 ноя 19, 11:49    [22009272]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
забыл ник
Member

Откуда:
Сообщений: 3048
Еще никто не предлагал партиционировать по userId и пихать все в кафку?
4 ноя 19, 11:55    [22009275]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
yvprod
Member

Откуда:
Сообщений: 16
забыл ник,

я к этому и вел, партиционировать по какому нибудь бизнес-правилу и писать в разные очереди
4 ноя 19, 12:05    [22009283]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
qi_ip
Member

Откуда:
Сообщений: 591
Подскажите, может кто читал, где можно найти опенсорс коды или просто описание работы серверных приложений, например, таких как телеграм или ватцап или чем-то похожих мессенджеров.
4 ноя 19, 12:06    [22009284]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
mayton
Member

Откуда: loopback
Сообщений: 42941
qi_ip
Собственно сама задача: есть несколько сокетов, по которым постоянно приходят сообщения. Все сообщения сохраняются в таблицу А в БД.

Вот с этого момента - фигня какая-то. Никто такую постановку не примет.

С сокетами никто не работает. Должен быть над-сокетный протокол. SOAP, например.
4 ноя 19, 12:09    [22009285]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
yvprod
Member

Откуда:
Сообщений: 16
qi_ip,

на joker Олег Анастасьев рассказывал про фреймворк мессенджера в одноклассниках, но в открытом доступе вроде еще нет

Сообщение было отредактировано: 4 ноя 19, 12:12
4 ноя 19, 12:12    [22009286]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
fixxer
Member

Откуда:
Сообщений: 736
yvprod
забыл ник,

я к этому и вел, партиционировать по какому нибудь бизнес-правилу и писать в разные очереди


+1
4 ноя 19, 12:19    [22009290]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
mad_nazgul
Member

Откуда:
Сообщений: 4926
qi_ip
Подскажите, может кто читал, где можно найти опенсорс коды или просто описание работы серверных приложений, например, таких как телеграм или ватцап или чем-то похожих мессенджеров.


Jabber?!
4 ноя 19, 12:21    [22009291]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
qi_ip
Member

Откуда:
Сообщений: 591
mayton
qi_ip
Собственно сама задача: есть несколько сокетов, по которым постоянно приходят сообщения. Все сообщения сохраняются в таблицу А в БД.

Вот с этого момента - фигня какая-то. Никто такую постановку не примет.

С сокетами никто не работает. Должен быть над-сокетный протокол. SOAP, например.

Обмен данных идет по вебсокету
4 ноя 19, 12:24    [22009293]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
mayton
Member

Откуда: loopback
Сообщений: 42941
Посмотри на это https://docs.oracle.com/javase/8/docs/api/java/util/PriorityQueue.html

Может поможет. И давай модульный тест или макет что ты уже написал.

Трудно говорить об ангелах на кончике иглы...
4 ноя 19, 12:56    [22009305]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
PetroNotC Sharp
Member

Откуда:
Сообщений: 2481
qi_ip
И если, например, пришли команды:

1. Включить
2. Выгрузить данные
3. Остановить выгрузку
4. Выключить

, то и на другой клиент после парсера они должны попасть в том же порядке.
mayton прав.
Ты изобретаешь ПРИКЛАДНОЙ ПРОТОКОЛ.
Нижнего уровня у тебя сокет. Но ты должен сохранить очередность пачки команд.
А если отправили вперемежку команды?
4 ноя 19, 12:56    [22009306]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
PetroNotC Sharp
Member

Откуда:
Сообщений: 2481
PetroNotC Sharp
fio = getSoket()
setLogDbAsync(fio
parserA.businessLayer(fio
покажи тут как команда 4 придет перед командой 1?
1 и 4 идут в разные сокеты?
4 ноя 19, 12:59    [22009308]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
PetroNotC Sharp
Member

Откуда:
Сообщений: 2481
qi_ip
Подскажите, может кто читал, где можно найти опенсорс коды или просто описание работы серверных приложений, например, таких как телеграм или ватцап или чем-то похожих мессенджеров.

Сначала постановку вменяемую дай. Почему парсер тормозит, если его задача тупо передать строку клиенту?
4 ноя 19, 13:01    [22009309]     Ответить | Цитировать Сообщить модератору
 Re: Механизм обработки очереди  [new]
PetroNotC Sharp
Member

Откуда:
Сообщений: 2481
Вадя эту задачу давно решил)))
4 ноя 19, 13:01    [22009310]     Ответить | Цитировать Сообщить модератору
Топик располагается на нескольких страницах: [1] 2   вперед  Ctrl      все
Все форумы / Java Ответить