Справочная информация: я планирую быстро вызывать различные конечные точки API веб-сайта, но политика использования подразумевает, что я могу совершать только пять вызовов в секунду. Я не могу полагаться на вызовы, занимающие в среднем 200 мс, поэтому я написал следующее.
План состоит в том, чтобы любые другие потоки, которые хотят вызвать API, отправляли большую часть Callables
исполнителю ниже. Каждый Callable
делает вызов API, и исполнитель отправляет их в соответствии с максимальной скоростью API. Затем результаты могут быть собраны позже отправляющим потоком через FutureTask
.
// RateLimitedExecutorService.java
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
public class RateLimitedExecutorService extends TimerTask {
private final long creditDelay;
private ExecutorService exec;
private Timer timer;
private LinkedBlockingQueue<FutureTask<?>> tasks;
public RateLimitedExecutorService(int ratePerSecs, int capacity) throws InvalidParameterException {
if (ratePerSecs <= 0) {
throw new InvalidParameterException("Rate is <= 0");
}
this.exec = Executors.newCachedThreadPool();
this.tasks = new LinkedBlockingQueue<>(capacity);
this.timer = new Timer(true);
this.timer.scheduleAtFixedRate(this, 0, this.creditDelay);
}
public <T> FutureTask<T> submit(Callable<T> task) throws NullPointerException, InterruptedException {
FutureTask<T> futureTask = new FutureTask<T>(task);
tasks.put(futureTask);
return futureTask;
}
@Override
public void run() {
FutureTask<?> task = tasks.poll();
try {
exec.submit(task);
} catch (NullPointerException npe) {
// swallow: to be expected when task list is empty
} catch (RejectedExecutionException ree) {
System.out.println("RLExec: can't schedule task!");
}
}
public void shutdown() {
this.exec.shutdownNow();
}
}
Вот пример кода драйвера:
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class TestMain {
public static void main(String[] args) throws Exception {
ArrayList<TestCallable> tasks = new ArrayList<>();
ArrayList<FutureTask<Integer>> res = new ArrayList<>();
RateLimitedExecutorService exec = new RateLimitedExecutorService(5, 100);
for (int i = 0; i < 20; i++) {
tasks.add(new TestCallable(i, (int) (Math.random() * 20)));
}
for (TestCallable tc : tasks) {
res.add(exec.submit(tc));
}
for (FutureTask<Integer> ft : res) {
System.out.println("res: " + ft.get());
}
exec.shutdown();
}
private static class TestCallable implements Callable<Integer> {
private int x, y;
public TestCallable(int a, int b) {
this.x = a;
this.y = b;
}
@Override
public Integer call() throws InterruptedException {
Thread.sleep(y * 10);
return x + y;
}
}
}
У меня нет слишком большого опыта работы с многопоточностью Java, кроме использования встроенных исполнителей. Любая обратная связь (и советы о том, как справиться с InterruptedExceptions
) приветствуется!
1 ответ
Этот код будет работать, но несколько предложений:
- Здесь мы используем таймер, а также исполнители, бок о бок, проблема здесь в том, что вы без необходимости создаете 2 потока, таймер просто передает вызов API другому потоку. Теперь может быть сценарий, когда по какой-то причине установление соединения может занять время, и начальная задержка выше 200 мс, тогда в этом случае, поскольку таймер будет продолжать ставить в очередь другие задачи, может накопиться более 5 задач, и скорость ограничение может не соблюдаться, если позже задержка улучшится.
- Даже если вы хотели 5 вызовов в 1 секунду, мы фактически делаем 1 вызов каждые 200 мс, хотя это то же самое, но это приводит к ненужному регулированию на вашем конце.
Вы, чтобы взглянуть на Запланированный поток пула исполнителейон делает то же самое, что и вы, но это также будет страдать от проблемы 1. Чтобы решить, что у вас может быть алгоритм типа tokenBucket, вы можете использовать семафори таймер возвращает токен обратно в семафор после задержки x.
Вишва