Регулировка выполнения перечисления Задач

Пример использования:

var users = await usernames.SelectTaskResults(GetUserDetails, 4);

где GetUserDetails(string username) – это метод, который вызывает HttpClient для доступа к API и возвращает объект User. В SelectTaskResults возвращает ожидаемые задачи. В лучшем случае для этого примера будет 4 параллельных потока, вызывающих API одновременно.

Проблемы: я волнуюсь .Wait() может вызвать тупик.

Код:

public static async Task<IEnumerable<TResult>> SelectTaskResults<TInput, TResult>(
        this IEnumerable<TInput> source,
        Func<TInput, Task<TResult>> taskFunc,
        int degreesOfParallelism = 1,
        bool throwFaulted = false,
        CancellationToken cancellationToken = default(CancellationToken))
    {
        // Task.Run creates the task but it doesn't start executing immediately - for debugging
        var tasks = source
            .Select(input => Task.Run(() => taskFunc(input), cancellationToken))
            .ToArray();
        
        Parallel.ForEach(tasks,
            new ParallelOptions { MaxDegreeOfParallelism = degreesOfParallelism }
            , task =>
            {
                try
                {
                    task.Wait(cancellationToken); // .Start() doesn't work for promise-like tasks
                }
                catch (Exception)
                {
                    if (throwFaulted)
                    {
                        throw;
                    }
                }
            }
        );
        
        var output = await Task.WhenAll(tasks.Where(x=> !x.IsFaulted));
        
        return output.AsEnumerable();
    }

2 ответа
2

ОСАГО внутри async реализация: смешение концепций реализации делает код сложным или даже ошибочным. Ну наконец то Parallel.ForEach блокирует текущий поток, что запрещено в async программирование.

Также избегайте блокировки операций на Task например .Wait() или же .Result для незавершенного Task.

Но я предлагаю совершенно другой подход. Рассмотрим упрощенный пример.

public class ThrottledApiClient : IDisposable
{
    private readonly HttpClient _client;
    private readonly SemaphoreSlim _semaphore;

    public ThrottledApiClient(string baseUrl, int concurrentDegree = 0)
    {
        _client = new HttpClient();
        _client.BaseAddress = new Uri(baseUrl);
        _semaphore = new SemaphoreSlim(concurrentDegree == 0 ? Environment.ProcessorCount * 2 : concurrentDegree);
    }

    public async Task<string[]> PostJsonRequestsAsync(IEnumerable<(string, string)> requests, CancellationToken token)
    {
        List<Task<string>> tasks = new List<Task<string>>();
        // wrap the loop with try-catch (OperationCanceledException) to handle Cancellation right here if necessary
        foreach ((string path, string json) in requests)
        {
            await _semaphore.WaitAsync(token);
            tasks.Add(PostJsonAsync(path, json, token));
        }

        return await Task.WhenAll(tasks);

        // or with filter
        //await Task.WhenAll(tasks);
        //return tasks.Where(t => t.IsCompletedSucessfully).Select(t => t.Result).ToArray();
    }

    private async Task<string> PostJsonAsync(string url, string json, CancellationToken token)
    {
        try
        {
            using HttpContent content = new StringContent(json, Encoding.UTF8, "application/json");
            using HttpResponseMessage response = await _client.PostAsync(url, content, token);
            // response.EnsureSuccessStatusCode(); // to throw on not success instead of returning result
            return await response.Content.ReadAsStringAsync(token);
        }
        // catch (HttpRequestException) 
        // catch (OperationCanceledException) { return null; } // to swallow cancellation
        finally
        {
            _semaphore.Release();
        }
    }

    public void Dispose()
    {
        _client.Dispose();
        _semaphore.Dispose();
    }
}

И демо.

(string, string)[] requests = new (string, string)[]
{
    ("/getAccounts", "{...}"),
    ("/getMediaPath", "{...}")
};

using (var api = new ThrottledApiClient("https://api.example.com", 4))
using (var cts = new CancellationTokenSource())
{
    var results = await api.PostJsonRequestsAsync(requests, cts.Token);
    //...
}

Вы можете вызвать PostJsonRequestsAsync даже одновременно несколько раз, даже из разных потоков, но только 4 запросы будут активны одновременно.

SemaphoreSlim инструмент синхронизации по умолчанию в async программирование на данный момент.

  • Спасибо за ответ. SemaphoreSlim – это то, что я попробовал в первую очередь. Первый аргумент SemaphoreSlim устанавливает начальное значение concurrentDegree, а второй – максимальное значение. Когда я установил максимум, у меня появилось исключение, в котором говорилось, что SemaphoreSlim превышает его максимум.

    – Пол Тоцке

  • @PaulTotzke У тебя нет причин доверять мне, правильно. Но first argument sets the initial concurrentDegree and the 2nd sets the max = нет. Один аргумент означает «у вас 4 свободных слота», два аргумента означают «у вас 4 слота и изначально 2 из них заняты», где занятые слоты – первый аргумент, общая степень – второй. Когда у тебя все слоты свободны и звони .Release(), вы получите исключение. Используйте один аргумент для конструктора.

    – эспот


  • @PaulTotzke исправление: public SemaphoreSlim (int initialCount, int maxCount) средства new SemaphoreSlim(freeSlots, maxSlots)

    – эспот


  • Я снова тестирую SemaphoreSlim. Это исключение отключило меня от его использования, но, возможно, я неправильно понял: System.Threading.SemaphoreFullException: Adding the specified count to the semaphore would cause it to exceed its maximum count. at System.Threading.SemaphoreSlim.Release(Int32 releaseCount) at System.Threading.SemaphoreSlim.Release()

    – Пол Тоцке

  • 1

    Разрешите нам продолжить обсуждение в чате.

    – Пол Тоцке

Мое текущее решение:

public static async Task<IEnumerable<TResult>> SelectTaskResults<TInput, TResult>(
        this IEnumerable<TInput> source,
        Func<TInput, Task<TResult>> task,
        int degreesOfParallelism = 1,
        bool throwFaulted = false,
        CancellationToken cancellationToken = default(CancellationToken))
    {
        var throttle = new SemaphoreSlim(degreesOfParallelism, degreesOfParallelism);

        var tasks = new List<Task<TResult>>();

        foreach(var input in source)
        {
            await throttle.WaitAsync(cancellationToken);

            tasks.Add(Run(input));
        }
        
        var output = await Task.WhenAll(tasks.Where(x=> throwFaulted || !x.IsFaulted));
        
        return output.AsEnumerable();

        async Task<TResult> Run(TInput value)
        {
            try
            {
                return await task(value);
            }
            finally
            {
                throttle.Release();
            }
        }
    }

  • Добро пожаловать в Code Review! Хотя это альтернативное решение, оно не дает хорошего обзор. пожалуйста, объясните, что вы сделали и почему вы это сделали. так как этот ответ может получить отрицательные голоса и / или быть удален.

    – Малахия

  • @Malachi, это сообщение OP согласно If you have changed your code you can either post it as an answer в комментарии под вопросом.

    – эспот

  • 1

    @aepot Все ответы должны быть обзорами. Совершенно нормально загрузить новое решение как часть этого. Если ответ хотя бы расскажет, что отличается от оригинала, и кратко объяснит, почему он лучше, это будет прекрасно.

    – мачта

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *