Планировщик потоков с ограничением скорости в Java

Справочная информация: я планирую быстро вызывать различные конечные точки API веб-сайта, но политика использования подразумевает, что я могу совершать только пять вызовов в секунду. Я не могу полагаться на вызовы, занимающие в среднем 200 мс, поэтому я написал следующее.

План состоит в том, чтобы любые другие потоки, которые хотят вызвать API, отправляли большую часть Callables исполнителю ниже. Каждый Callable делает вызов API, и исполнитель отправляет их в соответствии с максимальной скоростью API. Затем результаты могут быть собраны позже отправляющим потоком через FutureTask.

// RateLimitedExecutorService.java

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;

public class RateLimitedExecutorService extends TimerTask {

    private final long creditDelay;
    private ExecutorService exec;
    private Timer timer;
    private LinkedBlockingQueue<FutureTask<?>> tasks;

    public RateLimitedExecutorService(int ratePerSecs, int capacity) throws InvalidParameterException {

        if (ratePerSecs <= 0) {
            throw new InvalidParameterException("Rate is <= 0");
        }

        this.exec = Executors.newCachedThreadPool();
        this.tasks = new LinkedBlockingQueue<>(capacity);

        this.timer = new Timer(true);
        this.timer.scheduleAtFixedRate(this, 0, this.creditDelay);
    }

    public <T> FutureTask<T> submit(Callable<T> task) throws NullPointerException, InterruptedException {
        FutureTask<T> futureTask = new FutureTask<T>(task);
        tasks.put(futureTask);
        return futureTask;
    }

    @Override
    public void run() {
        FutureTask<?> task = tasks.poll();
        try {
            exec.submit(task);
        } catch (NullPointerException npe) {
            // swallow: to be expected when task list is empty
        } catch (RejectedExecutionException ree) {
            System.out.println("RLExec: can't schedule task!");
        }
    }

    public void shutdown() {
        this.exec.shutdownNow();
    }
}

Вот пример кода драйвера:

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class TestMain {

    public static void main(String[] args) throws Exception {

        ArrayList<TestCallable> tasks = new ArrayList<>();
        ArrayList<FutureTask<Integer>> res = new ArrayList<>();
        RateLimitedExecutorService exec = new RateLimitedExecutorService(5, 100);

        for (int i = 0; i < 20; i++) {
            tasks.add(new TestCallable(i, (int) (Math.random() * 20)));
        }
        for (TestCallable tc : tasks) {
            res.add(exec.submit(tc));
        }
        for (FutureTask<Integer> ft : res) {
            System.out.println("res: " + ft.get());
        }
        exec.shutdown();
    }

    private static class TestCallable implements Callable<Integer> {

        private int x, y;

        public TestCallable(int a, int b) {
            this.x = a;
            this.y = b;
        }

        @Override
        public Integer call() throws InterruptedException {
            Thread.sleep(y * 10);
            return x + y;
        }
    }
}

У меня нет слишком большого опыта работы с многопоточностью Java, кроме использования встроенных исполнителей. Любая обратная связь (и советы о том, как справиться с InterruptedExceptions) приветствуется!

1 ответ
1

Этот код будет работать, но несколько предложений:

  1. Здесь мы используем таймер, а также исполнители, бок о бок, проблема здесь в том, что вы без необходимости создаете 2 потока, таймер просто передает вызов API другому потоку. Теперь может быть сценарий, когда по какой-то причине установление соединения может занять время, и начальная задержка выше 200 мс, тогда в этом случае, поскольку таймер будет продолжать ставить в очередь другие задачи, может накопиться более 5 задач, и скорость ограничение может не соблюдаться, если позже задержка улучшится.
  2. Даже если вы хотели 5 вызовов в 1 секунду, мы фактически делаем 1 вызов каждые 200 мс, хотя это то же самое, но это приводит к ненужному регулированию на вашем конце.

Вы, чтобы взглянуть на Запланированный поток пула исполнителейон делает то же самое, что и вы, но это также будет страдать от проблемы 1. Чтобы решить, что у вас может быть алгоритм типа tokenBucket, вы можете использовать семафори таймер возвращает токен обратно в семафор после задержки x.

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *