Пул пулов потоков

У меня на работе была проблема, что у нас есть некоторые задачи, которые нужно выполнить как можно быстрее. Для этого мы реализовали их так, чтобы они были многопоточными в ExecutorService. Первоначально для каждого типа задач у нас был свой ExecutorService (простой Executors.newFixedThreadPool (cpuCount)). Однако эти задачи выполняются очень короткими пакетами, между которыми может быть много времени. Поэтому вместо того, чтобы все потоки работали одновременно, мы хотели, чтобы у них был тайм-аут. Кроме того, очень маловероятно, что разные типы задач выполняются одновременно.

Решение, которое я придумал, заключалось в реализации моей собственной службы AbstractExecutorService, которая делегирует работу пулу ThreadPools, чтобы они могли истекать по тайм-ауту, а также у вас есть минимальное количество простаивающих потоков. Пожалуйста, дайте мне отзывы о концепции, реализации, стиле кодирования, комментариях и обо всем, что вы можете придумать.

package com.anon.exec;

import static java.util.stream.Collectors.*;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
 * This is a utility class to have a pool of thread pools.
 * It is used so that you can have the minimal number of threads running at the same time while at the same time providing multiple thread pools with equal priority
 * so that tasks scheduled together are similarly finished instead of having to wait for unrelated tasks.
 *
 * The class is designed to time out the threads in the thread pool.
 */
public class ThreadPoolPool extends AbstractExecutorService {

    private final List<ThreadPoolExecutor> internalExecutors;

    /**
     * Create a new pool of Thread Pools. You can specify the number of pools and the number of threads per pool.
     * @param poolCount the number of pools to create
     * @param poolSize the number of threads per pool
     * @param nameScheme this name is appended to the front of the thread name
     * @param threadTimeout how long until the thread times out
     * @param threadTimeoutUnit what unit the threadTimeout is in
     */
    public ThreadPoolPool(final int poolCount, final int poolSize, String nameScheme, final long threadTimeout, final TimeUnit threadTimeoutUnit) {
        if (poolCount < 1) {
            throw new IllegalArgumentException("poolCount must be at least 1");
        }
        if (poolSize < 1) {
            throw new IllegalArgumentException("poolSize must be at least 1");
        }
        if (nameScheme == null) {
            nameScheme = "ThreadPoolPool-" + hashCode();
        }
        if (threadTimeout < 1) {
            throw new IllegalArgumentException("threadTimeout must be at least 1");
        }
        Objects.requireNonNull(threadTimeoutUnit, "threadTimeoutUnit must not be null");

        this.internalExecutors = Collections.synchronizedList(new ArrayList<>(poolCount));
        for (int i = 0; i < poolCount; i++) {
            final ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(nameScheme + "-pool-" + i + "-thread-%d").build();
            ThreadPoolExecutor exec = new ThreadPoolExecutor(poolSize, poolSize, threadTimeout, threadTimeoutUnit, new LinkedBlockingQueue<>(), factory);
            exec.allowCoreThreadTimeOut(true);
            this.internalExecutors.add(exec);
        }
    }

    @Override
    public void shutdown() {
        synchronized (this.internalExecutors) {
            for (ThreadPoolExecutor pool : this.internalExecutors) {
                pool.shutdown();
            }
        }
    }

    @Override
    public List<Runnable> shutdownNow() {
        synchronized (this.internalExecutors) {
            return this.internalExecutors.stream().map(ExecutorService::shutdownNow).flatMap(List::stream).collect(toList());
        }
    }

    @Override
    public boolean isShutdown() {
        synchronized (this.internalExecutors) {
            for (ThreadPoolExecutor pool : this.internalExecutors) {
                if (!pool.isShutdown()) {
                    return false;
                }
            }
            return true;
        }
    }

    @Override
    public boolean isTerminated() {
        synchronized (this.internalExecutors) {
            for (ThreadPoolExecutor pool : this.internalExecutors) {
                if (!pool.isTerminated()) {
                    return false;
                }
            }
            return true;
        }
    }

    @Override
    public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
        synchronized (this.internalExecutors) {
            for (ThreadPoolExecutor pool : this.internalExecutors) {
                if (!pool.awaitTermination(timeout, unit)) {
                    return false;
                }
            }
            return true;
        }
    }

    @Override
    public void execute(final Runnable command) {
        ThreadPoolExecutor exec = getAvailableExecutor(1);
        exec.execute(command);
    }

    @Override
    public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getAvailableExecutor(tasks.size());
        return exec.invokeAny(tasks);
    }

    @Override
    public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        ThreadPoolExecutor exec = getAvailableExecutor(tasks.size());
        return exec.invokeAny(tasks, timeout, unit);
    }

    @Override
    public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) throws InterruptedException {
        ThreadPoolExecutor exec = getAvailableExecutor(tasks.size());
        return exec.invokeAll(tasks);
    }

    @Override
    public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) throws InterruptedException {
        ThreadPoolExecutor exec = getAvailableExecutor(tasks.size());
        return exec.invokeAll(tasks, timeout, unit);
    }

    /**
     * Finds an executor that can execute the given number of tasks. If there isn't one that can fit all tasks, the executor with the smallest current number of active tasks is returned.
     * @param taskCount the number of tasks you want to execute
     * @return a executor to execute your tasks
     */
    public ThreadPoolExecutor getAvailableExecutor(final int taskCount) {
        synchronized (this.internalExecutors) {
            int minActiveCount = Integer.MAX_VALUE;
            ThreadPoolExecutor minPool = null;
            for (ThreadPoolExecutor pool : this.internalExecutors) {
                int activeCount = pool.getActiveCount();
                if (activeCount < minActiveCount) {
                    minPool = pool;
                    minActiveCount = activeCount;
                }
                if (activeCount + taskCount < pool.getMaximumPoolSize()) {
                    return pool;
                }
            }
            if (minPool != null) {
                return minPool;
            }
            else {
                return this.internalExecutors.stream().findAny() // we check in the constructor that we have at least one executor
                        .orElseThrow(() -> new IllegalStateException("ThreadPoolPool has no Executors, this is an illegal state"));
            }
        }
    }

}

```

0

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *