Вопрос-ответ

Why filter() after flatMap() is "not completely" lazy in Java streams?

Почему filter() после flatMap() "не полностью" ленив в потоках Java?

У меня есть следующий пример кода:

System.out.println(
"Result: " +
Stream.of(1, 2, 3)
.filter(i -> {
System.out.println(i);
return true;
})
.findFirst()
.get()
);
System.out.println("-----------");
System.out.println(
"Result: " +
Stream.of(1, 2, 3)
.flatMap(i -> Stream.of(i - 1, i, i + 1))
.flatMap(i -> Stream.of(i - 1, i, i + 1))
.filter(i -> {
System.out.println(i);
return true;
})
.findFirst()
.get()
);

Вывод выглядит следующим образом:

1
Result: 1
-----------
-1
0
1
0
1
2
1
2
3
Result: -1

Отсюда я вижу, что в первом случае stream действительно ведет себя лениво - мы используем findFirst() поэтому, как только у нас есть первый элемент, наша фильтрующая лямбда-функция не вызывается.
Однако во втором случае, который использует flatMaps, мы видим, что, несмотря на то, что найден первый элемент, который выполняет условие фильтрации (это просто любой первый элемент, поскольку lambda всегда возвращает true), дальнейшее содержимое потока все еще передается через функцию фильтрации.

Я пытаюсь понять, почему он ведет себя именно так, а не сдается после вычисления первого элемента, как в первом случае. Буду признателен за любую полезную информацию.

Переведено автоматически
Ответ 1

TL; DR, это было рассмотрено в JDK-8075939 и исправлено в Java 10 (и перенесено в Java 8 в JDK-8225328).

При просмотре реализации (ReferencePipeline.java) мы видим метод [ссылка]

@Override
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}

который будет вызываться для findFirst операции. Особая вещь, о которой следует позаботиться, - это sink.cancellationRequested() которая позволяет завершить цикл при первом совпадении. Сравнить с [ссылка]

@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}

@Override
public void accept(P_OUT u) {
try (Stream<? extends R> result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstream);
}
}
};
}
};
}

Метод для продвижения одного элемента заканчивается вызовом forEach в подпотоке без какой-либо возможности более раннего завершения, и комментарий в начале flatMap метода даже сообщает об этой отсутствующей функции.

Поскольку это больше, чем просто оптимизация, поскольку это подразумевает, что код просто прерывается, когда подпоток бесконечен, я надеюсь, что разработчики скоро докажут, что они “могут сделать лучше этого”…


Чтобы проиллюстрировать последствия, хотя Stream.iterate(0, i->i+1).findFirst() работает так, как ожидалось, Stream.of("").flatMap(x->Stream.iterate(0, i->i+1)).findFirst() в конечном итоге получится бесконечный цикл.

Что касается спецификации, большую ее часть можно найти в

глава “Потоковые операции и конвейеры” спецификации пакета:



Промежуточные операции возвращают новый поток. Они всегда ленивы;



... Лень также позволяет избежать проверки всех данных, когда в этом нет необходимости; для таких операций, как "найти первую строку длиной более 1000 символов", необходимо исследовать ровно столько строк, чтобы найти ту, которая обладает желаемыми характеристиками, без проверки всех строк, доступных из источника. (Это поведение становится еще более важным, когда входной поток бесконечен, а не просто велик.)



Кроме того, некоторые операции считаются операциями с коротким замыканием. Промежуточная операция является операцией с коротким замыканием, если при бесконечном вводе она может в результате создать конечный поток. Операция терминала вызывает короткое замыкание, если при бесконечном вводе она может завершиться за конечное время. Наличие операции короткого замыкания в конвейере является необходимым, но недостаточным условием для нормального завершения обработки бесконечного потока за конечное время.


Ясно, что операция короткого замыкания не гарантирует завершение за конечное время, например, когда фильтр не соответствует какому-либо элементу, обработка не может завершиться, но реализация, которая не поддерживает какое-либо завершение за конечное время, просто игнорируя природу короткого замыкания операции, далека от спецификации.

Ответ 2

Элементы входного потока лениво потребляются один за другим. Первый элемент, 1 преобразуется двумя flatMaps в поток -1, 0, 1, 0, 1, 2, 1, 2, 3, так что весь поток соответствует только первому входному элементу. Вложенные потоки с готовностью материализуются конвейером, затем сглаживаются, а затем передаются на filter этап. Это объясняет ваш вывод.

Вышесказанное не вытекает из фундаментального ограничения, но, вероятно, значительно усложнило бы задачу получения полномасштабной лени для вложенных потоков. Я подозреваю, что было бы еще большей проблемой сделать его производительным.

Для сравнения, ленивые продолжения Clojure получают еще один уровень переноса для каждого такого уровня вложенности. Из-за такой конструкции операции могут даже завершаться сбоем с StackOverflowError, когда вложенность используется до предела.

Ответ 3

Что касается прерывания работы с бесконечными подпотоками, поведение flatMap становится еще более удивительным, когда выполняется промежуточная (в отличие от терминальной) операция короткого замыкания.

В то время как следующее работает, как ожидалось, выводя бесконечную последовательность целых чисел

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).forEach(System.out::println);

следующий код выводит только "1", но все равно не завершает работу:

Stream.of("x").flatMap(_x -> Stream.iterate(1, i -> i + 1)).limit(1).forEach(System.out::println);

Я не могу представить чтение спецификации, в котором это не было бы ошибкой.

Ответ 4

В моей бесплатной библиотеке StreamEx я представил коллекторы с коротким замыканием. При сборе последовательного потока с помощью коллектора с коротким замыканием (например, MoreCollectors.first()) из источника потребляется ровно один элемент. Внутренне это реализовано довольно грязным способом: используется пользовательское исключение для прерывания потока управления. Используя мою библиотеку, ваш образец можно было бы переписать таким образом:

System.out.println(
"Result: " +
StreamEx.of(1, 2, 3)
.flatMap(i -> Stream.of(i - 1, i, i + 1))
.flatMap(i -> Stream.of(i - 1, i, i + 1))
.filter(i -> {
System.out.println(i);
return true;
})
.collect(MoreCollectors.first())
.get()
);

Результат следующий:

-1
Result: -1
java lambda java-8 java-stream