Добро пожаловать в форум, Guest  >>   Войти | Регистрация | Поиск | Правила | В избранное | Подписаться
Все форумы / WinForms, .Net Framework Новый топик    Ответить
 Отловить ошибки при многопоточном выполнении  [new]
vb_sub
Member

Откуда:
Сообщений: 894
Всем привет, у меня есть обработчик, для которого нужно выполнить следующую функциональность.
1) Получить список ID-документов через http-запрос
2) Для каждого ID документа сделать http-запрос и распарсить его.
3) Получить распарсенные данные из всех документов.

Я реализую многопоточную обработку посредством использования System.Threading.Channels.
       // Главная точка входа
	public async Task ProcessDocsAsync()
        {
	    //канал для получения и отправки ID-документов
            var docIDChannel = Channel.CreateUnbounded<Doc>(new UnboundedChannelOptions() { SingleReader = false,SingleWriter = true });
			
	    //канал, куда складываем значения из распарсенных JSON-документов
            var parseChannel = Channel.CreateUnbounded<(Doc docID, string uid)>();
			
            var taskList = new List<Task>();

            //читаем номера документов посредством http-запроса 
            taskList
            .Add(Task.Run(async () => await GetDocsNumbersAsync(docIDChannel.Writer, _apiProcessorSettings.GetDocsRoute()))
            .ContinueWith(t =>
            {
                docIDChannel.Writer.Complete();
                Console.ForegroundColor = ConsoleColor.Green;
                Console.WriteLine("DocReading Complete");
                Console.ForegroundColor = ConsoleColor.White;
            }));

           //создаем 5 задач, которые будут распарсивать документы
            var processList = Enumerable
                .Range(1, 5)
                .Select(_ => Task.Run(async () => await ParseDocByIsAsync(docIDChannel.Reader, parseChannel.Writer)));

            //информируем канал о том, что больше добавления элементов не произойдет
            var procTask = Task.WhenAll(processList).ContinueWith(t =>
            {
                parseChannel.Writer.Complete();
                Console.ForegroundColor = ConsoleColor.Green;
                Console.WriteLine("Parsing Complete");
                Console.ForegroundColor = ConsoleColor.White;
            });

            taskList.Add(procTask);

            // читаем распарсенные документы
            taskList
            .Add(Task.Run(async () => await ConsumeOutputProductsInConsoleAsync(parseChannel.Reader)));
			
            await Task.WhenAll(taskList);
            Console.WriteLine("Done");
        }
		
		
	//Получение документов
        private async Task GetDocsNumbersAsync(ChannelWriter<Doc> channelWriter, string route)
        {
            var сlient = _httpClientFactory.CreateClient("сlient");

            using var docStream = await сlient.GetStreamAsync(route);

            using JsonDocument jDocument = await JsonDocument.ParseAsync(docStream);
            var root = jDocument.RootElement.GetProperty("docs").EnumerateArray();

            foreach (var item in root)
            {
                await channelWriter.WriteAsync(new Doc(
                    item.GetProperty("number").GetString()));
            }
        }
			
	//парсинг json-документов	
        private async Task ParseDocByIsAsync(ChannelReader<Doc> docChannel,
            ChannelWriter<(Doc doc, string uid)> uitChannel)
        {
            while (await docChannel.WaitToReadAsync())
            {
                while (docChannel.TryRead(out var doc))
                {
                    try
                    {
                        Console.WriteLine($"start processing doc:{doc.DocID}, thread:{Thread.CurrentThread.ManagedThreadId}");

                        int accumulateCount = 0;
                        var сlient = _httpClientFactory.CreateClient("сlient");

                        var response = await сlient.GetAsync($"route/{doc.DocID}/", HttpCompletionOption.ResponseHeadersRead);

                        using Stream streamToReadFrom = await response.Content.ReadAsStreamAsync();
                        using JsonDocument jDocument = await JsonDocument.ParseAsync(streamToReadFrom);
                        var root = jDocument.RootElement;

                        //количество
                        var allCount = root.GetProperty("total").GetInt32(); /// вот здесь ошибка при многопоточном использовании

                        //парсим документ и в итоге отправляем распарсенные значение в канал
			///...
			await uitChannel.WriteAsync((doc, uit));
                    }
                    catch (Exception ex)
                    {
                        Console.ForegroundColor = ConsoleColor.Red;
                        Console.WriteLine("Error: " + ex.Message);
                        Console.ForegroundColor = ConsoleColor.White;
                    }
                }
            }
        }
			
        // чтение распарсенных документов			
        private async Task ConsumeOutputProductsInConsoleAsync(ChannelReader<(Doc doc, string uid)> uitChannel)
        {
            long cnt = 0;
            while (await uitChannel.WaitToReadAsync())
            {
                if (uitChannel.TryRead(out var doc))
                {
                    cnt++;
                }
            }
            Console.WriteLine($"Count is:{cnt}");
        }


Для вышеуказаного сниппета я получаю нежелательные side-эффекты, которые отсутствуют если я запускаю парсинг в режиме одной задачи
 var processList = Enumerable
                .Range(1, 1)
                .Select(_ => Task.Run(async () => await ParseDocByIsAsync(docIDChannel.Reader, parseChannel.Writer)));

В методе ParseDocByIsAsync я делаю http-запрос, в результате которого возвращается JSON- документ, у которого в обязательном порядке есть свойство "total"- проверял много раз через Postman.
В результате выполнения кода я попадаю в секцию "Exception" с текстом ошибки об отсутствии свойства/токена "total" у Json-документа.
Я проверяю url, по которому запрашивается Json- он приходит корректный. Проверяю значение переменной "root" при возникновении ошибки- действительно именно это свойство отсутствует. Как будто один поток прочитал свойство, закорраптил его, положил в пул для дальнейшего выполнения, другой поток подхватил таску и у же не может его нормально прочитать.
Ошибка носит вероятностный характер и её вероятность возрастает с увеличением количества Task для обработки парсинга. При 5 тасках на парсинг вероятность ошибки около 90%, хотя бывало и успешное выполнение.
Также заметил, что один документ может обрабатываться несколькими потоками- возможно это просто перекладывание таски из одного в другой, но использование Channels обещает потокобезопасное получение элементов из канала и то что элемент не может быть получен срузу несколькими потоками. Хоть бери и на BlockCollection откатывайся. Подскажите, кто с такой ситуацией сталкивался. Спасибо
19 апр 21, 16:06    [22311085]     Ответить | Цитировать Сообщить модератору
 Re: Отловить ошибки при многопоточном выполнении  [new]
Где-то в степи
Member

Откуда: Под Таганрогом
Сообщений: 4370
vb_sub,
Зачем вы нам это показываете? Вы эксгибионист?
Зачем тут таски, при долгоиграющих операциях, в контексте их умопомрачительных колличествах?
У вас есть процедура загрузки, напишите ее тест и тестируйте, не разбираясь, глядя на хрустальный шар ... - чуйкой чувствую не реентерабельность(загадочно так)
19 апр 21, 23:35    [22311278]     Ответить | Цитировать Сообщить модератору
 Re: Отловить ошибки при многопоточном выполнении  [new]
fkthat
Member

Откуда:
Сообщений: 4880
Для чего так люто смешивать async/await и ContinueWith. Для чего заворачивать вызовы асинхронных методов в асинхронные длегаты, а потом еще и вызывать их через Task.Run. Это не код, а говнокод.
20 апр 21, 01:16    [22311287]     Ответить | Цитировать Сообщить модератору
 Re: Отловить ошибки при многопоточном выполнении  [new]
vb_sub
Member

Откуда:
Сообщений: 894
Где-то в степи,
создаю таски, чтобы каждый json-документ парсился в отдельной задаче - они друг от друга не зависят, поэтому хочу сделать их выполнение параллельным.
Рентабельность здесь очень-высока-выполнение множества независимых задач, если запускаешь в отдельных тасках, то время выполнения уменьшается кратно.

Сообщение было отредактировано: 20 апр 21, 06:57
20 апр 21, 06:56    [22311303]     Ответить | Цитировать Сообщить модератору
 Re: Отловить ошибки при многопоточном выполнении  [new]
vb_sub
Member

Откуда:
Сообщений: 894
fkthat,
continueWith нужно для того, чтобы по завершению Task или группы Task закрыть канал и уведомить слушателей, что больше элементов не будет. Если это 1 Task, то канал можно закрыть прямо из метода, если же это группа Task, то необходимо дожидаться пока все задачи не завершатся.
Заворачивать в асинхронные делегаты приходится потому что внутри методов(ParseDocByIsAsync), для которых они вызываются есть await, соответственно я не могу сделать метод просто Task- приходится делать async Task.

Сообщение было отредактировано: 20 апр 21, 06:55
20 апр 21, 06:59    [22311304]     Ответить | Цитировать Сообщить модератору
 Re: Отловить ошибки при многопоточном выполнении  [new]
Где-то в степи
Member

Откуда: Под Таганрогом
Сообщений: 4370
vb_sub,
ну если Вы не охотно создаете свои потоки.
воспользуйтесь WebClient
там множество изощренных рализаций, и есть неблокирующие выполнения, с возможностью отмены, событийная модель.
ваш код бы сократился до 10 строк, без вычурных каналов.
20 апр 21, 11:21    [22311382]     Ответить | Цитировать Сообщить модератору
 Re: Отловить ошибки при многопоточном выполнении  [new]
hVostt
Member

Откуда:
Сообщений: 19318
vb_sub,

Лютая забористая дичь :)
20 апр 21, 22:57    [22311688]     Ответить | Цитировать Сообщить модератору
 Re: Отловить ошибки при многопоточном выполнении  [new]
vb_sub
Member

Откуда:
Сообщений: 894
hVostt,
какой вариант не дичь?
21 апр 21, 07:51    [22311734]     Ответить | Цитировать Сообщить модератору
 Re: Отловить ошибки при многопоточном выполнении  [new]
Где-то в степи
Member

Откуда: Под Таганрогом
Сообщений: 4370
vb_sub,
все придумали до нас, программировать не надо, просто копипаст и вперде.
код набранный копипастом
+

static void Main(string[] args)
        {
            MyDownloader myDownloader=new MyDownloader();
            myDownloader.
                AddUrl("http://soft.eurodir.ru/test-speed-100Mb.bin").
                AddUrl("http://soft-ru.eurodir.ru/test-speed-ru-100Mb.bin?w=22222").
                AddUrl("http://soft-ru.eurodir.ru/test-speed-ru-100Mb.bin?w=8888888888888").
                OnComplete+=(sender, dictionary) =>Console.WriteLine("************* download done ***********"); 
          
            myDownloader.Run();
            Console.ReadKey();
        }

        private sealed class MyDownloader
        {
            public event EventDownloadComplete OnComplete;
            private readonly Dictionary<string,string> _dictionaryResult=new Dictionary<string, string>();
            public MyDownloader AddUrl(string url) { 
                _dictionaryResult.Add(url,null);
                return this;
            }
            public void Run()
            {
                foreach (var keyValuePair in _dictionaryResult)
                   InnerDownloadWorker(keyValuePair.Key);
            }

            private void InnerDownloadWorker(string url)
            {
                var webClient = new WebClient();
                webClient.DownloadStringCompleted += (sender, args) =>
                {
                    lock (this)
                    {
                        _dictionaryResult[url] = args.Result;
                        if (_dictionaryResult.All(a => a.Value != null))
                        {
                            OnCompleteCore(_dictionaryResult);
                        }
                    }
                };
                webClient.DownloadProgressChanged += (sender, args) => { Console.WriteLine($"{url} .............................. {args.ProgressPercentage}"); }; 
                webClient.DownloadStringAsync(new Uri($"{url}"));
            }

            private void OnCompleteCore(Dictionary<string, string> args)
            {
                OnComplete?.Invoke(this, args);
            }
            public delegate void EventDownloadComplete(object sender, Dictionary<string,string> args);
        }


21 апр 21, 10:01    [22311766]     Ответить | Цитировать Сообщить модератору
 Re: Отловить ошибки при многопоточном выполнении  [new]
fkthat
Member

Откуда:
Сообщений: 4880
Где-то в степи
код набранный копипастом

Оно и видно. Код из начала двухтысячных.
21 апр 21, 10:53    [22311809]     Ответить | Цитировать Сообщить модератору
 Re: Отловить ошибки при многопоточном выполнении  [new]
hVostt
Member

Откуда:
Сообщений: 19318
vb_sub
hVostt,
какой вариант не дичь?


Попробуйте Dataflow, может понравится :)

Сообщение было отредактировано: 21 апр 21, 22:32
21 апр 21, 22:39    [22312177]     Ответить | Цитировать Сообщить модератору
 Re: Отловить ошибки при многопоточном выполнении  [new]
love_bach
Member

Откуда:
Сообщений: 822
fkthat
Где-то в степи
код набранный копипастом

Оно и видно. Код из начала двухтысячных.


и что с ним не так?
22 апр 21, 18:47    [22312750]     Ответить | Цитировать Сообщить модератору
 Re: Отловить ошибки при многопоточном выполнении  [new]
Roman Mejtes
Member

Откуда: г. Пермь
Сообщений: 4206
hVostt,
я вообще с нее балдею, так удобно и просто делать многопоточные комбайны, просто сказка. для интеграции, трансформации данные, просто сказка. на днях в 1 проекте внедрил, надо было 100500 файлов обработать, трансформировать данные и объединить в файлах порциями. Лучше не придумаешь.
22 апр 21, 21:58    [22312817]     Ответить | Цитировать Сообщить модератору
Все форумы / WinForms, .Net Framework Ответить