Пример использования:
var users = await usernames.SelectTaskResults(GetUserDetails, 4);
где GetUserDetails(string username)
— это метод, который вызывает HttpClient для доступа к API и возвращает объект User. В SelectTaskResults
возвращает ожидаемые задачи. В лучшем случае для этого примера будет 4 параллельных потока, вызывающих API одновременно.
Проблемы: я волнуюсь .Wait()
может вызвать тупик.
Код:
public static async Task<IEnumerable<TResult>> SelectTaskResults<TInput, TResult>(
this IEnumerable<TInput> source,
Func<TInput, Task<TResult>> taskFunc,
int degreesOfParallelism = 1,
bool throwFaulted = false,
CancellationToken cancellationToken = default(CancellationToken))
{
// Task.Run creates the task but it doesn't start executing immediately - for debugging
var tasks = source
.Select(input => Task.Run(() => taskFunc(input), cancellationToken))
.ToArray();
Parallel.ForEach(tasks,
new ParallelOptions { MaxDegreeOfParallelism = degreesOfParallelism }
, task =>
{
try
{
task.Wait(cancellationToken); // .Start() doesn't work for promise-like tasks
}
catch (Exception)
{
if (throwFaulted)
{
throw;
}
}
}
);
var output = await Task.WhenAll(tasks.Where(x=> !x.IsFaulted));
return output.AsEnumerable();
}
2 ответа
ОСАГО внутри async
реализация: смешение концепций реализации делает код сложным или даже ошибочным. Ну наконец то Parallel.ForEach
блокирует текущий поток, что запрещено в async
программирование.
Также избегайте блокировки операций на Task
например .Wait()
или же .Result
для незавершенного Task
.
Но я предлагаю совершенно другой подход. Рассмотрим упрощенный пример.
public class ThrottledApiClient : IDisposable
{
private readonly HttpClient _client;
private readonly SemaphoreSlim _semaphore;
public ThrottledApiClient(string baseUrl, int concurrentDegree = 0)
{
_client = new HttpClient();
_client.BaseAddress = new Uri(baseUrl);
_semaphore = new SemaphoreSlim(concurrentDegree == 0 ? Environment.ProcessorCount * 2 : concurrentDegree);
}
public async Task<string[]> PostJsonRequestsAsync(IEnumerable<(string, string)> requests, CancellationToken token)
{
List<Task<string>> tasks = new List<Task<string>>();
// wrap the loop with try-catch (OperationCanceledException) to handle Cancellation right here if necessary
foreach ((string path, string json) in requests)
{
await _semaphore.WaitAsync(token);
tasks.Add(PostJsonAsync(path, json, token));
}
return await Task.WhenAll(tasks);
// or with filter
//await Task.WhenAll(tasks);
//return tasks.Where(t => t.IsCompletedSucessfully).Select(t => t.Result).ToArray();
}
private async Task<string> PostJsonAsync(string url, string json, CancellationToken token)
{
try
{
using HttpContent content = new StringContent(json, Encoding.UTF8, "application/json");
using HttpResponseMessage response = await _client.PostAsync(url, content, token);
// response.EnsureSuccessStatusCode(); // to throw on not success instead of returning result
return await response.Content.ReadAsStringAsync(token);
}
// catch (HttpRequestException)
// catch (OperationCanceledException) { return null; } // to swallow cancellation
finally
{
_semaphore.Release();
}
}
public void Dispose()
{
_client.Dispose();
_semaphore.Dispose();
}
}
И демо.
(string, string)[] requests = new (string, string)[]
{
("/getAccounts", "{...}"),
("/getMediaPath", "{...}")
};
using (var api = new ThrottledApiClient("https://api.example.com", 4))
using (var cts = new CancellationTokenSource())
{
var results = await api.PostJsonRequestsAsync(requests, cts.Token);
//...
}
Вы можете вызвать PostJsonRequestsAsync
даже одновременно несколько раз, даже из разных потоков, но только 4
запросы будут активны одновременно.
SemaphoreSlim
инструмент синхронизации по умолчанию в async
программирование на данный момент.
Мое текущее решение:
public static async Task<IEnumerable<TResult>> SelectTaskResults<TInput, TResult>(
this IEnumerable<TInput> source,
Func<TInput, Task<TResult>> task,
int degreesOfParallelism = 1,
bool throwFaulted = false,
CancellationToken cancellationToken = default(CancellationToken))
{
var throttle = new SemaphoreSlim(degreesOfParallelism, degreesOfParallelism);
var tasks = new List<Task<TResult>>();
foreach(var input in source)
{
await throttle.WaitAsync(cancellationToken);
tasks.Add(Run(input));
}
var output = await Task.WhenAll(tasks.Where(x=> throwFaulted || !x.IsFaulted));
return output.AsEnumerable();
async Task<TResult> Run(TInput value)
{
try
{
return await task(value);
}
finally
{
throttle.Release();
}
}
}
Добро пожаловать в Code Review! Хотя это альтернативное решение, оно не дает хорошего обзор. пожалуйста, объясните, что вы сделали и почему вы это сделали. так как этот ответ может получить отрицательные голоса и / или быть удален.
— Малахия♦
@Malachi, это сообщение OP согласно
If you have changed your code you can either post it as an answer
в комментарии под вопросом.— эспот
- 1
@aepot Все ответы должны быть обзорами. Совершенно нормально загрузить новое решение как часть этого. Если ответ хотя бы расскажет, что отличается от оригинала, и кратко объяснит, почему он лучше, это будет прекрасно.
— мачта
Спасибо за ответ. SemaphoreSlim — это то, что я попробовал в первую очередь. Первый аргумент SemaphoreSlim устанавливает начальное значение concurrentDegree, а второй — максимальное значение. Когда я установил максимум, у меня появилось исключение, в котором говорилось, что SemaphoreSlim превышает его максимум.
— Пол Тоцке
@PaulTotzke У тебя нет причин доверять мне, правильно. Но
first argument sets the initial concurrentDegree and the 2nd sets the max
= нет. Один аргумент означает «у вас 4 свободных слота», два аргумента означают «у вас 4 слота и изначально 2 из них заняты», где занятые слоты — первый аргумент, общая степень — второй. Когда у тебя все слоты свободны и звони.Release()
, вы получите исключение. Используйте один аргумент для конструктора.— эспот
@PaulTotzke исправление:
public SemaphoreSlim (int initialCount, int maxCount)
средстваnew SemaphoreSlim(freeSlots, maxSlots)
— эспот
Я снова тестирую SemaphoreSlim. Это исключение отключило меня от его использования, но, возможно, я неправильно понял:
System.Threading.SemaphoreFullException: Adding the specified count to the semaphore would cause it to exceed its maximum count. at System.Threading.SemaphoreSlim.Release(Int32 releaseCount) at System.Threading.SemaphoreSlim.Release()
— Пол Тоцке
Разрешите нам продолжить обсуждение в чате.
— Пол Тоцке
Показывать 4 больше комментариев