Stream Api

Stream Api - способ работать со структурами данных в функциональном стиле. Уменьшает объем кода. Чаще всего с помощью stream в Java 8 работают с коллекциями, но на самом деле этот механизм может использоваться и для других задач. Не работает с примитивами за исключением int, long, double для которых есть отдельные Stream.

Каждый элемент в коллекции будет проходить по всей коллекции по очереди. Только после прохождения 1-го элемента через terminal оператор начнется обработка второго. Такой подход зовётся pull iteration. По этому skip(), filter(), и distinct() лучше ставить вначале.

Поток нельзя вызывать повторно. IllegalStateException.

Стримы бывают последовательными(sequential) и параллельными(parallel).

Примеры создания Stream

Виды операторов:

Intermediate(конвейерные, промежуточные) операторы

Конвейерные операторы - возвращают новый Stream и не выполняются(lazy) без терминального оператора.

  1. map(Function mapper) - преобразует каждый элемент стрима. Одно выходное значение из каждого входного.
  2. flatMap(Function<T, Stream <R>> mapper) - создаёт произвольное количество выходных значений(0 или больше) для каждого входного. Под капотом из каждого элемента создается новый поток и в конце все потоки создают исходный поток.
  3. peek(Consumer<T>) - Возвращает тот же стрим, но применяет функцию к каждому элементу стрима.
  4. filter(Predicate predicate) - фильтрует стрим, пропуская только те элементы, что проходят по условию. collection.stream().filter(«a1»::equals).count()
  5. dropWhile(Predicate predicate) — фильтрует стрим, пропуская только те элементы, что не проходят по условию.
  6. takeWhile(Predicate predicate) — пропускает далее элементы пока условие выполняется и при первом не выполнении, больше никого не пропускает.
  7. limit(long maxSize) – ограничивает стрим по количеству элементов.
  8. skip(long n) – пропускаем n элементов.
  9. sorted() – сортировка.
  10. sorted(Comparator comparator) – сортирует стрим (сортировка как у TreeMap).
  11. distinct() - убирает повторы элементов для метода equals.

Изменение потока в поток примитивных типов

Потоки примитивов используют специальные лямбда-выражения. Например, IntFunction вместо Function, или IntPredicate вместо Predicate.

Потоки примитивов поддерживают дополнительные терминальные методы: sum() и average()

Параллельность

При добавлении parallel или sequential будет изменен тип стрима, но єтот тип будет использован только после запуска терминальной операции. Можно несколько раз менять тип в одном stream, но будет выполнен только последний указанный.

Terminal операторы

Поток завершиться и возвращают другой объект, такой как коллекция, примитивы, объекты, Optional и т.д. Повторный вызов вернет ошибку IllegalStateException.

  1. forEach(Consumer action) - аналог for each.
  2. count() – возвращает количество элементов стрима.
  3. reduce - позволяет выполнять агрегатные функции на всей коллекцией и возвращать один результат.
  4. collect(Collector collector) – метод собирает все элементы в список, множество или другую коллекцию, сгруппировывает элементы по какому-нибудь критерию, объединяет всё в строку и т.д.
  5. toArray - возвращает массив значений стрима.
  6. min(Comparator comparator)
  7. max(Comparator comparator)
  8. findFirst() - получить первый элемент стрима.
  9. findAny() - Возвращает любой подходящий элемент из стрима (возвращает Optional). collection.stream().findAny().orElse(«1»)
  10. allMatch(Predicate predicate) - возвращает true, если все элементы стрима удовлетворяют условию или false.
  11. anyMatch(Predicate predicate) - вернет true, если хотя бы один элемент стрима удовлетворяет условию predicate.
  12. noneMatch(Predicate predicate) - вернёт true, если, пройдя все элементы стрима, ни один не удовлетворил условию.

Дополнительные методы у числовых стримов:

Примеры использования:

Проверка

var list = List.of("A", "B", "C");
var matchingElements = Stream.of("B", "C", "D")
        .anyMatch(list::contains);
        

Поиск совпадений элементов двух списков

var list = List.of("A", "B", "C");
var matchingElements = List.of("B", "C", "D")
    .stream()
    .filter(list::contains)
    .toList();
        

Поиск не совпадений элементов двух списков

var list = List.of("A", "B", "C");
var matchingElements = List.of("B", "C", "D")
    .stream()
    .filter(element -> !list.contains(element))
    .toList();
        

Нахождения процентиля(персентиль)

var latencies = Stream.of(10, 3, 6, 7, 8, 8, 9, 13, 15, 16, 20).toList();
System.out.println(percentile(latencies, 25));
System.out.println(percentile(latencies, 50));
System.out.println(percentile(latencies, 75));
System.out.println(percentile(latencies, 100));

public static long percentile(List<Integer> latencies, double percentile) {
    int index = (int) Math.ceil(percentile / 100.0 * latencies.size());
    return latencies.get(index-1);
}
        

Нахождения процентиля(персентиль)

var latencies = Stream.of(10, 3, 6, 7, 8, 8, 9, 13, 15, 16, 20).toList();
System.out.println(percentile(latencies, 25));
System.out.println(percentile(latencies, 50));
System.out.println(percentile(latencies, 75));
System.out.println(percentile(latencies, 100));

public static long percentile(List<Integer> latencies, double percentile) {
    int index = (int) Math.ceil(percentile / 100.0 * latencies.size());
    return latencies.get(index-1);
}
        

List to Map

var list = Stream.of("1", "2", "3").toList();
var map = list.stream().collect(Collectors.toMap(i -> i, i -> "sample"));
        

Parallel stream

Parallel stream - потоки данных которые обрабатываются параллельно на нескольких потоках, чтобы ускорить выполнение коллекций. Добавление parallelStream() бездумно — это не "оптимизация", а лотерея с шансом на баги и падение.

Выполняется в ForkJoinPool.commonPool() потоках. Если есть другие задачи, использующие этот же пул, они могут начать конкурировать за потоки, замедляя всё приложение.

Размер основного пула потоков может достигать 5 потоков выполнения — точное число зависит от количества доступных физических ядер процессора.

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // получить число потоков
        

Настройка

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
        

Когда не использовать

Преимущество

Как это работает:

  1. Коллекция разбивается на подзадачи.
  2. Каждая подзадача исполняется в потоках ForkJoinPool.commonPool() - по умолчанию количество потоков равное, количеству ядер процессора.
  3. Результаты собираются и объединяются.

Как создать

List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6);

// Последовательный поток
int sum1 = numbers.stream().mapToInt(Integer::intValue).sum();

// Параллельный поток
int sum2 = numbers.parallelStream().mapToInt(Integer::intValue).sum();