Вопрос-ответ

Custom thread pool in Java 8 parallel stream

Пользовательский пул потоков в параллельном потоке Java 8

Возможно ли указать пользовательский пул потоков для Java 8 parallel stream? Я нигде не могу его найти.

Представьте, что у меня есть серверное приложение, и я хотел бы использовать параллельные потоки. Но приложение большое и многопоточное, поэтому я хочу разделить его на части. Я не хочу, чтобы медленно выполняемая задача в одном модуле приложения блокировала задачи из другого модуля.

Если я не могу использовать разные пулы потоков для разных модулей, это означает, что я не могу безопасно использовать параллельные потоки в большинстве ситуаций реального мира.

Попробуйте следующий пример. Некоторые задачи с интенсивным использованием ЦП выполняются отдельными потоками. В задачах используются параллельные потоки. Первая задача прерывается, поэтому каждый шаг занимает 1 секунду (имитируется переходом потока в режим ожидания). Проблема в том, что другие потоки застревают и ждут завершения прерванной задачи. Это надуманный пример, но представьте приложение-сервлет и кого-то, отправляющего длительно выполняющуюся задачу в общий пул fork join.

public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();

es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));


es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}

private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}

public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
Переведено автоматически
Ответ 1

На самом деле есть хитрость, как выполнить параллельную операцию в определенном пуле ответвлений. Если вы выполняете ее как задачу в пуле ответвлений, она остается там и не использует обычный пул.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
final List<Integer> primes = forkJoinPool.submit(() ->
// Parallel task here, for example
IntStream.range(1, 1_000_000).parallel()
.filter(PrimesPrint::isPrime)
.boxed().collect(Collectors.toList())
).get();
System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}

Трюк основан на ForkJoinTask.fork который указывает: "Организует асинхронное выполнение этой задачи в пуле, в котором выполняется текущая задача, если применимо, или с использованием ForkJoinPool.commonPool() если нет inForkJoinPool()"

Ответ 2

Параллельные потоки используют значение по умолчанию, ForkJoinPool.commonPool которое по умолчанию имеет на один поток меньше, чем у вас процессоров, возвращаемое Runtime.getRuntime().availableProcessors() (это означает, что параллельные потоки оставляют один процессор для вызывающего потока).


Для приложений, которым требуются отдельные или настраиваемые пулы, ForkJoinPool может быть создан с заданным целевым уровнем параллелизма; по умолчанию он равен количеству доступных процессоров.


Это также означает, что если у вас есть вложенные параллельные потоки или несколько параллельных потоков, запущенных одновременно, все они будут совместно использовать один и тот же пул. Преимущество: вы никогда не будете использовать больше, чем по умолчанию (количество доступных процессоров). Недостаток: вы можете не назначить "все процессоры" каждому инициируемому вами параллельному потоку (если у вас их несколько). (Очевидно, вы можете использовать ManagedBlocker, чтобы обойти это.)

Чтобы изменить способ выполнения параллельных потоков, вы можете либо


  • отправьте выполнение параллельного потока в свой собственный ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); или

  • вы можете изменить размер общего пула, используя системные свойства: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") для целевого параллелизма в 20 потоков.


Пример последнего на моей машине с 8 процессорами. Если я запущу следующую программу:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.print((System.currentTimeMillis() - start) + " ");
});

Результат таков:


215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416


Итак, вы можете видеть, что параллельный поток обрабатывает 8 элементов одновременно, т. Е. Использует 8 потоков. Однако, если я раскомментирую прокомментированную строку, результат будет:


215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216


На этот раз параллельный поток использовал 20 потоков, и все 20 элементов в потоке обрабатывались одновременно.

Ответ 3

В качестве альтернативы трюку с запуском параллельных вычислений внутри вашего собственного ForkJoinPool вы также можете передать этот пул методу CompletableFuture.supplyAsync, как в:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
//parallel task here, for example
range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()),
forkJoinPool
);
Ответ 4

Исходное решение (установка общего свойства параллелизма ForkJoinPool) больше не работает. Судя по ссылкам в исходном ответе, обновление, которое устраняет эту проблему, было обратно перенесено на Java 8. Как упоминалось в связанных потоках, не было гарантии, что это решение будет работать вечно. Исходя из этого, решением является решение forkjoinpool.submit with .get, обсуждаемое в принятом ответе. Я думаю, что backport также исправляет ненадежность этого решения.

ForkJoinPool fjpool = new ForkJoinPool(10);
System.out.println("stream.parallel");
IntStream range = IntStream.range(0, 20);
fjpool.submit(() -> range.parallel()
.forEach((int theInt) ->
{
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.println(Thread.currentThread().getName() + " -- " + theInt);
})).get();
System.out.println("list.parallelStream");
int [] array = IntStream.range(0, 20).toArray();
List<Integer> list = new ArrayList<>();
for (int theInt: array)
{
list.add(theInt);
}
fjpool.submit(() -> list.parallelStream()
.forEach((theInt) ->
{
try { Thread.sleep(100); } catch (Exception ignore) {}
System.out.println(Thread.currentThread().getName() + " -- " + theInt);
})).get();
java concurrency java-8 java-stream