Java Stream API 从入门到实践:从集合处理到 parallelStream

欢迎你来读这篇博客,这篇博客主要是关于Java Stream API

其中包括了关于我的见解和收集的知识分享。

序言

Java 8 引入 Lambda 和 Stream API 之后,集合处理的写法发生了很大变化。

以前我们处理一个集合,大多数时候都是:

1
2
3
4
5
6
7
8
9
List<UserDTO> result = new ArrayList<>();
for (User user : userList) {
if (user.getAge() >= 18) {
UserDTO dto = new UserDTO();
dto.setUserId(user.getUserId());
dto.setUserName(user.getUserName());
result.add(dto);
}
}

现在可以写成:

1
2
3
4
5
6
7
8
9
List<UserDTO> result = userList.stream()
.filter(user -> user.getAge() >= 18)
.map(user -> {
UserDTO dto = new UserDTO();
dto.setUserId(user.getUserId());
dto.setUserName(user.getUserName());
return dto;
})
.collect(Collectors.toList());

Stream API 让集合处理更像是在描述“我要做什么”,而不是一步步告诉程序“你怎么做”。

但是,Stream 不是银弹。写得好,是优雅;写不好,是高级版迷魂阵。尤其是 parallelStream,看起来一行代码开启多线程,实际可能把公共线程池打爆,甚至让整个应用莫名变慢。

所以这篇博客不只讲 Stream 怎么用,还会重点讲:

  • Stream 的基本概念;
  • Stream 的操作分类;
  • 常用 API;
  • mapflatMapreducecollect 的真实用法;
  • parallelStream 的底层原理;
  • parallelStream 自定义线程池;
  • 什么时候适合用并行流,什么时候千万别用;
  • 业务代码里怎么写才稳。

正文

chapter 1:Stream 是什么

Java 8 引入了Stream API,提供了一种高效、简洁的方式来处理集合数据,它与 java.io 包里的 InputStreamOutputStream 是完全不同的概念。

Stream 是一种用于处理数据流的抽象,它允许我们以声明性的方式对数据进行操作,如过滤、排序、转换等。本文将详细介绍 Java Stream 的基本概念、核心操作、常见用法及其内部工作机制。

Java 8 中的 Stream 是对集合(Collection)对象功能的增强,它专注于对集合对象进行各种非常便利、高效的聚合操作(aggregate operation),或者大批量数据操作(bulk data operation)。

Stream API 借助于同样新出现的 Lambda 表达式,极大地提高编程效率和程序可读性。同时它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势,使用 fork/join 并行方式来拆分任务和加速处理过程。

传统的集合操作使用外部迭代(如 for 循环)来遍历集合,而 Stream 使用内部迭代,通过声明性的方法定义需要对数据进行的操作,由 Stream 框架负责具体的迭代过程。这种方式使代码更加简洁和易读。

1.1 Stream 不是集合

这是很多人刚接触 Stream 时容易混淆的点。

ListSetMap 是存储数据的容器,而 Stream 不是容器。

Stream 更像是一条流水线:

1
数据源 -> 中间操作1 -> 中间操作2 -> 中间操作3 -> 终端操作

例如:

1
2
3
4
5
List<String> result = names.stream()
.filter(name -> name.startsWith("A"))
.map(String::toUpperCase)
.sorted()
.collect(Collectors.toList());

这段代码中:

  • names 是数据源;
  • filter 是过滤;
  • map 是转换;
  • sorted 是排序;
  • collect 是收集结果;
  • Stream 本身不存储数据,只负责组织计算过程。

1.2 Stream 的几个核心特点

Stream 有几个非常重要的特点:

第一,Stream 不存储数据。

它只是从集合、数组、文件、生成器等数据源中读取数据,然后经过管道处理。

第二,Stream 不会直接修改原始数据源。

例如:

1
2
3
4
5
6
7
8
List<String> names = Arrays.asList("tom", "jack", "rose");

List<String> upperNames = names.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());

System.out.println(names);
System.out.println(upperNames);

输出:

1
2
[tom, jack, rose]
[TOM, JACK, ROSE]

原集合没有被修改。

第三,Stream 是惰性执行的。

中间操作不会立即执行,只有遇到终端操作时,整个流水线才会真正执行。

1
2
3
4
5
6
7
8
9
Stream<String> stream = names.stream()
.filter(name -> {
System.out.println("filter: " + name);
return name.startsWith("t");
});

System.out.println("还没有执行终端操作");

List<String> result = stream.collect(Collectors.toList());

collect 调用之前,filter 里面的打印不会执行。

第四,Stream 只能被消费一次。

1
2
3
4
5
6
Stream<String> stream = names.stream();

stream.forEach(System.out::println);

// 再次使用会报错
stream.count();

会出现:

1
java.lang.IllegalStateException: stream has already been operated upon or closed

所以 Stream 不是集合对象,不能反复遍历。用完一次,这条流水线就结束了。

chapter 2:Stream 的生命周期

这是因为流的生命周期有三个阶段:

  • 起始生成阶段。
  • 中间操作:会逐一获取元素并进行处理。可有可无。所有中间操作都是惰性的,因此,流在管道中流动之前,任何操作都不会产生任何影响。Stream 的惰性求值特性使得中间操作不会立即执行,只有在执行终端操作时,整个操作链才会开始计算。这种机制可以有效地减少不必要的计算,提高性能。
  • 终端操作。通常分为最终的消费(foreach 之类的)和归纳(collect)两类。还有重要的一点就是终端操作启动了流在管道中的流动。

完整结构大概是:

1
2
3
4
5
source.stream()
.filter(...)
.map(...)
.sorted(...)
.collect(...);

可以理解为:

1
数据源 -> 中间操作 -> 中间操作 -> 中间操作 -> 终端操作

其中,中间操作可以有多个,终端操作只能有一个。

例如:

1
2
3
4
5
List<String> result = Arrays.asList("java", "spring", "mysql", "redis")
.stream()
.filter(s -> s.length() > 4)
.map(String::toUpperCase)
.collect(Collectors.toList());

在这里:

  • Arrays.asList(...) 是数据源;
  • stream() 是创建流;
  • filter() 是中间操作;
  • map() 是中间操作;
  • collect() 是终端操作。

chapter 3:操作分类

Stream 的操作可以分为两大类:中间操作、终结操作。

中间操作可分为:

  • 无状态(Stateless)操作:指元素的处理不受之前元素的影响;
  • 有状态(Stateful)操作:指该操作只有拿到所有元素之后才能继续下去。

终结操作可分为:

  • 短路(Short-circuiting)操作:指遇到某些符合条件的元素就可以得到最终结果;
  • 非短路(Unshort-circuiting)操作:指必须处理完所有元素才能得到最终结果。

3.1 中间操作

常见中间操作:

  • filter:过滤元素;
  • map:映射元素;
  • flatMap:平铺流;
  • distinct:去重;
  • sorted:排序;
  • peek:查看元素;
  • limit:截取前 N 个元素;
  • skip:跳过前 N 个元素;
  • unordered:去除有序约束。

3.2 终端操作

常见终端操作:

  • forEach:遍历元素;
  • collect:收集结果;
  • reduce:归约操作;
  • toArray:转换为数组;
  • min:最小值;
  • max:最大值;
  • count:计数;
  • anyMatch:是否有任意匹配;
  • allMatch:是否全部匹配;
  • noneMatch:是否全部不匹配;
  • findFirst:找到第一个元素;
  • findAny:找到任意一个元素。

3.3 更准确的分类表

类型 操作 特点
无状态中间操作 filtermapflatMappeekunordered 每个元素可以独立处理
有状态中间操作 distinctsortedlimitskip 需要维护状态,甚至需要看到多个元素
短路终端操作 anyMatchallMatchnoneMatchfindFirstfindAny 不一定处理完所有元素
非短路终端操作 forEachcollectreducecountmaxmintoArray 通常需要处理完整个流

补充一句:limit 也带有短路特征,它虽然是中间操作,但遇到足够数量的元素后,就可以停止继续向后获取数据。

chapter 4:Stream API 创建操作

4.1 通过 Collection 创建流

通过 java.util.Collection.stream() 方法用集合创建流。

1
2
3
4
5
6
7
List<String> list = Arrays.asList("hello", "world", "stream");

// 创建顺序流
Stream<String> stream = list.stream();

// 创建并行流
Stream<String> parallelStream = list.parallelStream();

4.2 通过数组创建流

使用 java.util.Arrays.stream(T[] array) 方法用数组创建流。

1
2
3
String[] array = {"h", "e", "l", "l", "o"};

Stream<String> arrayStream = Arrays.stream(array);

4.3 使用 Stream 静态方法创建流

Stream 的静态方法包括:of()iterate()generate()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Stream<Integer> stream1 = Stream.of(1, 2, 3, 4, 5, 6);

Stream<Integer> stream2 = Stream.iterate(0, (x) -> x + 2).limit(3);
stream2.forEach(System.out::println);

Stream<Double> stream3 = Stream.generate(Math::random).limit(3);
stream3.forEach(System.out::println);

// 输出结果如下:
0
2
4
0.9620319103852426
0.8303672905658537
0.09203215202737569

4.4 使用基本类型流

为了避免装箱、拆箱带来的性能损耗,Java 提供了基本类型流:

  • IntStream
  • LongStream
  • DoubleStream

例如:

1
2
3
4
int sum = IntStream.rangeClosed(1, 100)
.sum();

System.out.println(sum);

再比如:

1
2
3
4
5
6
double average = userList.stream()
.mapToInt(User::getAge)
.average()
.orElse(0);

System.out.println(average);

能用 mapToIntmapToLongmapToDouble 的地方,不一定非要用 map。尤其是数据量大时,基本类型流会更稳一点。

4.5 创建 parallelStream

stream 和 parallelStream 的简单区分:stream 是顺序流,由主线程按顺序对流执行操作,而 parallelStream 是并行流,内部以多线程并行执行的方式对流进行操作,需要注意使用并行流的前提是流中的数据处理没有顺序要求。

1
2
3
4
5
6
7
8
9
10
11
12
// 直接创建
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

numbers.parallelStream()
.filter(n -> n % 2 == 0)
.forEach(System.out::println);

// 通过 parallel() 把顺序流转换成并行流
Optional<Integer> findFirst = numbers.stream()
.parallel()
.filter(x -> x > 4)
.findFirst();

还可以通过 sequential() 把并行流转回顺序流:

1
2
3
4
List<Integer> result = numbers.parallelStream()
.filter(n -> n > 3)
.sequential()
.collect(Collectors.toList());

chapter 5:无状态操作

5.1 filter

filter 是筛选操作,是按照一定的规则校验流中的元素,将符合条件的元素提取到新的流中的操作。

1
Stream<T> filter(Predicate<? super T> predicate);

示例:

1
2
3
4
5
6
List<Integer> list = Arrays.asList(6, 7, 3, 8, 1, 2);

Stream<Integer> stream = list.stream();

stream.filter(x -> x > 5)
.forEach(System.out::println);

输出:

1
2
3
6
7
8

业务中常见写法:

1
2
3
4
List<Order> validOrders = orderList.stream()
.filter(order -> order.getDeleted() == 0)
.filter(order -> order.getStatus() == OrderStatus.PAID)
.collect(Collectors.toList());

如果过滤条件很多,建议抽成方法,不然 Lambda 会变成意大利面条。

1
2
3
4
5
6
7
8
9
List<Order> validOrders = orderList.stream()
.filter(this::isValidPaidOrder)
.collect(Collectors.toList());

private boolean isValidPaidOrder(Order order) {
return order.getDeleted() == 0
&& order.getStatus() == OrderStatus.PAID
&& order.getAmount().compareTo(BigDecimal.ZERO) > 0;
}

5.2 map

map 可以把一个元素类型为 T 的流转换成元素类型为 R 的流。这个方法传入一个 Function 函数式接口,接收一个泛型 T,返回泛型 R。

1
<R> Stream<R> map(Function<? super T, ? extends R> mapper);

将集合中的元素 A 转换成想要得到的 B。

1
2
3
List<String> output = wordList.stream()
.map(String::toUpperCase)
.collect(Collectors.toList());

业务中最常见的是实体转 DTO:

1
2
3
4
5
6
7
8
9
List<UserDTO> dtoList = userList.stream()
.map(user -> {
UserDTO dto = new UserDTO();
dto.setUserId(user.getUserId());
dto.setUserName(user.getUserName());
dto.setPhone(user.getPhone());
return dto;
})
.collect(Collectors.toList());

如果转换逻辑复杂,建议封装方法:

1
2
3
4
5
6
7
8
9
10
11
List<UserDTO> dtoList = userList.stream()
.map(this::convertToDTO)
.collect(Collectors.toList());

private UserDTO convertToDTO(User user) {
UserDTO dto = new UserDTO();
dto.setUserId(user.getUserId());
dto.setUserName(user.getUserName());
dto.setPhone(user.getPhone());
return dto;
}

5.3 flatMap

flatMap 接收一个函数作为参数,将流中的每个值都换成另一个流,然后把所有流连接成一个流。

map 功能类似,区别在于它可以将多个子流压平成一个流。

1
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
List<String> list1 = Arrays.asList("m,k,l,a", "1,3,5,7");

List<String> listNew = list1.stream()
.flatMap(s -> {
// 将每个元素转换成一个 stream
String[] split = s.split(",");
Stream<String> s2 = Arrays.stream(split);
return s2;
})
.collect(Collectors.toList());

System.out.println("处理前的集合:" + list1);
System.out.println("处理后的集合:" + listNew);

// 处理前的集合:["m,k,l,a", "1,3,5,7"]
// 处理后的集合:["m", "k", "l", "a", "1", "3", "5", "7"]

更常见的业务场景是订单和明细:

1
2
3
List<OrderItem> allItems = orderList.stream()
.flatMap(order -> order.getItemList().stream())
.collect(Collectors.toList());

如果用 map,结果会变成:

1
Stream<List<OrderItem>>

而使用 flatMap,结果是:

1
Stream<OrderItem>

一句话:map 是一对一转换,flatMap 是一对多之后再摊平。

5.4 peek

peek 操作接收的是一个 Consumer<T> 函数。顾名思义,peek 操作会按照 Consumer<T> 函数提供的逻辑去消费流中的每一个元素,同时有可能改变元素内部的一些属性。

1
Stream<T> peek(Consumer<? super T> action);

peek 操作一般用于不想改变流中元素本身的类型,或者只想查看元素、调试链路时。

1
2
3
4
5
List<String> result = Stream.of("java", "spring", "mysql")
.peek(s -> System.out.println("before map: " + s))
.map(String::toUpperCase)
.peek(s -> System.out.println("after map: " + s))
.collect(Collectors.toList());

但不要滥用 peek 做业务逻辑。

不推荐:

1
2
3
List<User> users = userList.stream()
.peek(user -> user.setStatus(1))
.collect(Collectors.toList());

更推荐:

1
2
3
4
5
6
List<User> users = userList.stream()
.map(user -> {
user.setStatus(1);
return user;
})
.collect(Collectors.toList());

当然,如果追求不可变对象,那就创建新对象,而不是修改原对象。

5.5 mapToInt、mapToLong、mapToDouble

mapToIntmapToLongmapToDoubleflatMapToDoubleflatMapToIntflatMapToLongmapflatMap 的特例版,也就是针对特定的数据类型进行映射处理。

对应的方法接口如下:

1
2
3
4
5
6
7
8
9
10
11
IntStream mapToInt(ToIntFunction<? super T> mapper);

LongStream mapToLong(ToLongFunction<? super T> mapper);

DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);

IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper);

LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper);

DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper);

示例:

1
2
3
4
Stream<String> stream = Stream.of("hello", "felord.cn");

stream.mapToInt(s -> s.length())
.forEach(System.out::println);

输出:

1
2
5
9

除此之外,还有封装好的 Stream,如 IntStreamLongStreamDoubleStream

5.6 unordered

unordered() 操作不会执行任何操作来显式地对流进行排序。它的作用是消除了流必须保持有序的约束,从而允许后续操作使用不必考虑排序的优化。

对于顺序流,顺序的存在与否不会影响性能,只影响确定性。如果流是顺序的,则在相同的源上重复执行相同的流管道将产生相同的结果;如果是非顺序流,重复执行可能会产生不同的结果。

对于并行流,放宽排序约束有时可以实现更高效的执行。在流有序时,但用户不特别关心该顺序的情况下,使用 unordered 明确地去除有序约束,可以改善某些有状态或终端操作的并行性能。

1
2
3
4
5
6
7
8
Stream.of(5, 1, 2, 6, 3, 7, 4)
.unordered()
.forEach(System.out::println);

Stream.of(5, 1, 2, 6, 3, 7, 4)
.unordered()
.parallel()
.forEach(System.out::println);

两次输出结果对比:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
第一遍:          第二遍:

// 第一行代码输出 // 第一行代码输出
5 5
1 1
2 2
6 6
3 3
7 7
4 4

// 第二行代码输出 // 第二行代码输出
3 3
6 6
4 7
7 5
2 4
1 1
5 2

并行流下,顺序不稳定是正常现象。你要是既想并行,又想严格有序,还想飞快,那就有点像既想马儿跑,又想马儿不吃草,还要它顺便给你写单元测试。

chapter 6:有状态操作

6.1 distinct

distinct 返回由该流的不同元素组成的流,根据 Object.equals(Object) 判断。

distinct() 使用 hashCode()equals() 方法来获取不同的元素。因此,如果是自定义类,需要正确实现 hashCode()equals() 方法。

1
Stream<T> distinct();

示例:

1
2
3
4
Stream<String> stream = Stream.of("1", "3", "4", "10", "4", "6", "23", "3");

stream.distinct()
.forEach(System.out::println);

输出:

1
2
3
4
5
6
1
3
4
10
6
23

如果是对象去重:

1
2
3
4
5
6
7
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
private Long userId;
private String userName;
}

如果没有重写 equalshashCodedistinct() 可能达不到预期。

业务中经常按某个字段去重,可以这样写:

1
2
3
4
5
6
7
8
public static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
Set<Object> seen = ConcurrentHashMap.newKeySet();
return t -> seen.add(keyExtractor.apply(t));
}

List<User> uniqueUsers = userList.stream()
.filter(distinctByKey(User::getUserId))
.collect(Collectors.toList());

如果是并行流,这个方法里用了 ConcurrentHashMap.newKeySet(),相对安全一些。但是,能不用副作用就不用副作用,越简单越不容易炸。

6.2 sorted

sorted 返回由该流的元素组成的流,并根据自然顺序排序。

1
2
3
Stream<T> sorted();

Stream<T> sorted(Comparator<? super T> comparator);

示例:

1
2
3
4
Stream<Integer> stream = Stream.of(3, 1, 10, 16, 8, 4, 9);

stream.sorted()
.forEach(System.out::println);

对象排序:

1
2
3
List<User> users = userList.stream()
.sorted(Comparator.comparing(User::getAge))
.collect(Collectors.toList());

倒序:

1
2
3
List<User> users = userList.stream()
.sorted(Comparator.comparing(User::getAge).reversed())
.collect(Collectors.toList());

多个字段排序:

1
2
3
4
5
6
List<User> users = userList.stream()
.sorted(
Comparator.comparing(User::getAge)
.thenComparing(User::getUserName)
)
.collect(Collectors.toList());

注意:sorted 是有状态操作,需要看到多个元素后才能排序。大数据量时要关注内存和性能。

6.3 limit

limit 获取流中前 n 个元素返回的流。

1
Stream<T> limit(long maxSize);

示例:

1
2
3
4
Stream<Integer> stream = Stream.of(3, 1, 10, 16, 8, 4, 9);

stream.limit(3)
.forEach(System.out::println);

输出:

1
2
3
3
1
10

常见场景:取 Top N。

1
2
3
4
List<Order> top10Orders = orderList.stream()
.sorted(Comparator.comparing(Order::getAmount).reversed())
.limit(10)
.collect(Collectors.toList());

6.4 skip

skip 在丢弃流的前 n 个元素之后,返回由该流的其余元素组成的流。

1
Stream<T> skip(long n);

示例:

1
2
3
4
Stream<Integer> stream = Stream.of(3, 1, 10, 16, 8, 4, 9);

stream.skip(3)
.forEach(System.out::println);

输出:

1
2
3
4
16
8
4
9

分页示例:

1
2
3
4
List<Order> pageList = orderList.stream()
.skip((long) (pageNo - 1) * pageSize)
.limit(pageSize)
.collect(Collectors.toList());

但是注意,真实业务中大分页不要在内存里这么玩,应该交给数据库分页。不然数据量一大,内存会笑着笑着就哭了。

chapter 7:短路操作

7.1 anyMatch

anyMatch 表示 Stream 中只要有一个元素符合传入的 predicate,就返回 true。

1
boolean anyMatch(Predicate<? super T> predicate);

示例:

1
2
3
4
5
6
Stream<Integer> stream = Stream.of(3, 1, 10, 16, 8, 4, 9);

System.out.println("result=" + stream.anyMatch(s -> s == 2));

// 输出
result=false

业务示例:

1
2
boolean hasInvalidOrder = orderList.stream()
.anyMatch(order -> order.getAmount().compareTo(BigDecimal.ZERO) < 0);

7.2 allMatch

allMatch 表示 Stream 中全部元素符合传入的 predicate,返回 true。

1
boolean allMatch(Predicate<? super T> predicate);

示例:

1
2
3
4
5
6
Stream<Integer> stream = Stream.of(3, 1, 10, 16, 8, 4, 9);

System.out.println("result=" + stream.allMatch(s -> s > 0));

// 输出
result=true

业务示例:

1
2
boolean allPaid = orderList.stream()
.allMatch(order -> order.getStatus() == OrderStatus.PAID);

7.3 noneMatch

noneMatch 表示 Stream 中没有任何元素符合传入的 predicate,返回 true。

1
boolean noneMatch(Predicate<? super T> predicate);

示例:

1
2
3
4
5
6
Stream<Integer> stream = Stream.of(3, 1, 10, 16, 8, 4, 9);

System.out.println("result=" + stream.noneMatch(s -> s >= 17));

// 输出
result=true

业务示例:

1
2
boolean noDeletedOrder = orderList.stream()
.noneMatch(order -> order.getDeleted() == 1);

7.4 findFirst

findFirst 用于返回满足条件的第一个元素,但是该元素是封装在 Optional 类中。

1
Optional<T> findFirst();

示例:

1
2
3
4
5
6
Stream<Integer> stream = Stream.of(3, 1, 10, 16, 8, 4, 9);

System.out.println("result=" + stream.findFirst().get());

// 输出
result=3

再看过滤之后:

1
2
3
4
5
6
Stream<Integer> stream = Stream.of(3, 1, 10, 16, 8, 4, 9);

System.out.println("result=" + stream.filter(s -> s > 3).findFirst().get());

// 输出
result=10

建议不要直接 .get(),更稳的是:

1
2
3
4
Integer result = Stream.of(3, 1, 10, 16, 8, 4, 9)
.filter(s -> s > 3)
.findFirst()
.orElse(null);

或者:

1
2
3
4
Integer result = Stream.of(3, 1, 10, 16, 8, 4, 9)
.filter(s -> s > 3)
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("未找到符合条件的数据"));

7.5 findAny

findAny 返回流中的任意元素,该元素也是封装在 Optional 中。

1
Optional<T> findAny();

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
List<String> strAry = Arrays.asList(
"Jhonny", "David", "Jack", "Duke", "Jill", "Dany", "Julia", "Jenish", "Divya"
);

String result = strAry.parallelStream()
.filter(s -> s.startsWith("J"))
.findAny()
.get();

System.out.println("result = " + result);

// 输出
result = Jill

在顺序流中,findAny 很多时候看起来和 findFirst 类似。但在并行流中,findAny 可以返回任意一个符合条件的元素,通常性能更好一些。

如果你不关心顺序,只关心有没有一个结果,用 findAny

如果你必须拿第一个,用 findFirst

chapter 8:非短路终端操作

8.1 forEach

forEach 接收一个 Lambda 表达式,然后在 Stream 的每一个元素上执行该表达式。

1
void forEach(Consumer<? super T> action);

示例:

1
2
3
4
5
6
7
8
9
10
List<String> strAry = Arrays.asList(
"Jhonny", "David", "Jack", "Duke", "Jill", "Dany", "Julia", "Jenish", "Divya"
);

strAry.stream()
.forEach(s -> {
if ("Jack".equalsIgnoreCase(s)) {
System.out.println(s);
}
});

注意,forEach 是终端操作,通常用于消费数据,而不是返回结果。

不推荐:

1
2
3
4
5
List<String> result = new ArrayList<>();

list.stream()
.filter(s -> s.length() > 3)
.forEach(result::add);

推荐:

1
2
3
List<String> result = list.stream()
.filter(s -> s.length() > 3)
.collect(Collectors.toList());

尤其是并行流中,不要对普通 ArrayListforEach(result::add),这是线程不安全的。

8.2 forEachOrdered

forEachOrdered 接收一个 Lambda 表达式,然后按顺序在 Stream 的每一个元素上执行该表达式。

1
void forEachOrdered(Consumer<? super T> action);

示例:

1
2
3
Stream.of("AAA,", "BBB,", "CCC,", "DDD,")
.parallel()
.forEach(System.out::print);

输出顺序可能是:

1
CCC,BBB,DDD,AAA,

如果使用 forEachOrdered

1
2
3
Stream.of("AAA,", "BBB,", "CCC,", "DDD,")
.parallel()
.forEachOrdered(System.out::print);

输出顺序更稳定:

1
AAA,BBB,CCC,DDD,

但要注意,forEachOrdered 为了保证顺序,可能牺牲并行性能。

8.3 toArray

toArray 返回包含此流元素的数组。当有参数时,则使用提供的 generator 函数分配返回的数组,以及分区执行或调整大小可能需要的任何其他数组。

1
2
3
Object[] toArray();

<A> A[] toArray(IntFunction<A[]> generator);

示例:

1
2
3
4
5
6
7
List<String> strList = Arrays.asList(
"Jhonny", "David", "Jack", "Duke", "Jill", "Dany", "Julia", "Jenish", "Divya"
);

Object[] strAryNoArg = strList.stream().toArray();

String[] strAry = strList.stream().toArray(String[]::new);

更推荐使用带类型的写法:

1
2
String[] strAry = strList.stream()
.toArray(String[]::new);

8.4 max

根据提供的 Comparator 返回此流的最大元素。

1
Optional<T> max(Comparator<? super T> comparator);

示例:

1
2
3
4
5
List<Integer> num = Arrays.asList(4, 5, 6);

num.stream()
.max(Integer::compareTo)
.ifPresent(System.out::println);

业务示例:

1
2
Optional<Order> maxOrder = orderList.stream()
.max(Comparator.comparing(Order::getAmount));

8.5 min

根据提供的 Comparator 返回此流的最小元素。

1
Optional<T> min(Comparator<? super T> comparator);

示例:

1
2
3
4
5
List<Integer> num = Arrays.asList(4, 5, 6);

num.stream()
.min(Integer::compareTo)
.ifPresent(System.out::println);

业务示例:

1
2
Optional<Order> minOrder = orderList.stream()
.min(Comparator.comparing(Order::getAmount));

8.6 count

返回此流中的元素计数。

1
long count();

示例:

1
2
3
List<Integer> num = Arrays.asList(4, 5, 6);

System.out.println(num.stream().count());

业务示例:

1
2
3
long paidCount = orderList.stream()
.filter(order -> order.getStatus() == OrderStatus.PAID)
.count();

chapter 9:reduce 归约操作

reduce 接收一个函数作为累加器,数组中的每个值从左到右开始缩减,最终计算为一个值。

1
2
3
4
5
6
7
Optional<T> reduce(BinaryOperator<T> accumulator);

T reduce(T identity, BinaryOperator<T> accumulator);

<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner);

参数的定义结构如下:

1
2
3
4
5
6
7
8
9
10
@FunctionalInterface
public interface BinaryOperator<T> extends BiFunction<T, T, T> {
// 两个静态方法,先进行忽略
}

@FunctionalInterface
public interface BiFunction<T, U, R> {
R apply(T t, U u);
// 一个默认方法,先进行忽略
}

9.1 reduce 示例一:无初始值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
List<Integer> num = Arrays.asList(1, 2, 4, 5, 6, 7);

// 原接口一比一原汁原味写法
Integer integer = num.stream()
.reduce(new BinaryOperator<Integer>() {
@Override
public Integer apply(Integer a, Integer b) {
System.out.println("x:" + a);
return a + b;
}
})
.get();

System.out.println("result:" + integer);

等效写法:

1
2
3
4
5
6
7
8
Integer result = num.stream()
.reduce((x, y) -> {
System.out.println("x:" + x);
return x + y;
})
.get();

System.out.println("result:" + result);

等效的普通写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
boolean flag = false;
int temp = 0;

for (Integer integer : num) {
if (!flag) {
temp = integer;
flag = true;
} else {
System.out.println("x:" + temp);
temp += integer;
}
}

System.out.println("result:" + temp);

9.2 reduce 示例二:有初始值

1
2
3
4
5
6
7
8
9
10
11
12
13
List<Integer> num = Arrays.asList(1, 2, 4, 5, 6, 7);

// 一比一原汁原味写法
Integer integer = num.stream()
.reduce(1, new BinaryOperator<Integer>() {
@Override
public Integer apply(Integer a, Integer b) {
System.out.println("a=" + a);
return a + b;
}
});

System.out.println("result:" + integer);

普通 for 循环写法:

1
2
3
4
5
6
7
8
int temp = 1;

for (Integer integer : num) {
System.out.println("a=" + temp);
temp += integer;
}

System.out.println("result:" + temp);

9.3 reduce 示例三:串行流中的三个参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
List<Integer> num = Arrays.asList(1, 2, 3, 4, 5, 6);

List<Integer> other = new ArrayList<>();
other.addAll(Arrays.asList(7, 8, 9, 10));

num.stream()
.reduce(
other,
(x, y) -> { // 第二个参数
System.out.println(JSON.toJSONString(x));
x.add(y);
return x;
},
(x, y) -> { // 第三个参数
System.out.println("并行才会出现:" + JSON.toJSONString(x));
return x;
}
);

输出结果:

1
2
3
4
5
6
[7,8,9,10,1]
[7,8,9,10,1,2]
[7,8,9,10,1,2,3]
[7,8,9,10,1,2,3,4]
[7,8,9,10,1,2,3,4,5]
[7,8,9,10,1,2,3,4,5,6]

9.4 reduce 示例四:并行流中的错误写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
List<Integer> num = Arrays.asList(4, 5, 6);

List<Integer> other = new ArrayList<>();
other.addAll(Arrays.asList(1, 2, 3));

num.parallelStream()
.reduce(
other,
(x, y) -> { // 第二个参数
x.add(y);
System.out.println(JSON.toJSONString(x));
return x;
},
(x, y) -> { // 第三个参数
x.addAll(y);
System.out.println("结合:" + JSON.toJSONString(x));
return x;
}
);

输出结果可能是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 第一遍
[1,2,3,4,5,6]
[1,2,3,4,5,6]
[1,2,3,4,5,6]
结合:[1,2,3,4,5,6,1,2,3,4,5,6]
结合:[1,2,3,4,5,6,1,2,3,4,5,6,1,2,3,4,5,6,1,2,3,4,5,6]

// 第二遍
[1,2,3,4,6]
[1,2,3,4,6]
[1,2,3,4,6]
结合:[1,2,3,4,6,1,2,3,4,6]
结合:[1,2,3,4,6,1,2,3,4,6,1,2,3,4,6,1,2,3,4,6]

// 第三遍
[1,2,3,5,4,6]
[1,2,3,5,4,6]
[1,2,3,5,4,6]
结合:[1,2,3,5,4,6,1,2,3,5,4,6]
结合:[1,2,3,5,4,6,1,2,3,5,4,6,1,2,3,5,4,6,1,2,3,5,4,6]

每个结果都是乱序的,并且多执行几次,都会出现不同的结果。并且第三个参数组合器内的代码也得到了执行。

这就是因为并行时,使用多线程时顺序性没有保障所产生的结果。

通过实践可以看到:组合器的作用,其实是对参数 2 中各个线程产生的结果进行再一遍的归约操作。

并且仔细看第二遍的执行结果:每一组都少了 1 个值。

所以,对于并行流 parallelStream 操作,必须慎用。

9.5 reduce 的核心原则

reduce 不是用来随便改集合的。它更适合做不可变归约。

适合:

1
2
Integer sum = numbers.stream()
.reduce(0, Integer::sum);

适合:

1
2
3
4
BigDecimal totalAmount = orderList.stream()
.map(Order::getAmount)
.filter(Objects::nonNull)
.reduce(BigDecimal.ZERO, BigDecimal::add);

不适合:

1
2
3
4
5
6
7
8
9
10
11
12
List<Integer> result = numbers.parallelStream()
.reduce(
new ArrayList<>(),
(list, number) -> {
list.add(number);
return list;
},
(list1, list2) -> {
list1.addAll(list2);
return list1;
}
);

如果你想把元素收集到集合,请用 collect,不要用 reduce 硬凹。

正确写法:

1
2
List<Integer> result = numbers.parallelStream()
.collect(Collectors.toList());

9.6 reduce 的三个要求

reduce 时,要记住三个要求:

第一,identity 必须是归约操作的单位元。

比如加法的单位元是 0:

1
2
int sum = numbers.stream()
.reduce(0, Integer::sum);

乘法的单位元是 1:

1
2
int multiply = numbers.stream()
.reduce(1, (a, b) -> a * b);

第二,accumulator 最好是无副作用的。

不要在 accumulator 里面修改外部变量。

不推荐:

1
2
3
4
5
6
7
List<Integer> result = new ArrayList<>();

numbers.stream()
.reduce(0, (a, b) -> {
result.add(b);
return a + b;
});

第三,combiner 必须能正确合并并行结果。

在顺序流里,combiner 可能不执行;在并行流里,combiner 会执行。

所以你不能写一个在顺序流里看起来没问题、在并行流里直接炸锅的 combiner。

chapter 10:collect 收集操作

collect 是一个终端操作,它接收的参数是将流中的元素累积到汇总结果的各种方式。

1
2
3
4
5
<R, A> R collect(Collector<? super T, A, R> collector);

<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);

第一种方式会比较经常使用到,也比较方便使用。

常用方法包括:

  • toList
  • toSet
  • toCollection
  • counting
  • summingInt
  • averagingInt
  • joining
  • maxBy
  • minBy
  • reducing
  • collectingAndThen
  • groupingBy
  • partitioningBy
  • mapping
  • filtering
  • toMap

10.1 toList

1
2
3
List<String> result = Stream.of("java", "spring", "mysql")
.filter(s -> s.length() > 4)
.collect(Collectors.toList());

10.2 toSet

1
2
Set<String> result = Stream.of("java", "spring", "mysql", "java")
.collect(Collectors.toSet());

10.3 toCollection

如果你想指定集合类型,可以用 toCollection

1
2
LinkedList<String> result = Stream.of("java", "spring", "mysql")
.collect(Collectors.toCollection(LinkedList::new));

10.4 joining

拼接字符串:

1
2
3
4
String result = Stream.of("java", "spring", "mysql")
.collect(Collectors.joining(","));

System.out.println(result);

输出:

1
java,spring,mysql

带前后缀:

1
2
3
4
String result = Stream.of("java", "spring", "mysql")
.collect(Collectors.joining(",", "[", "]"));

System.out.println(result);

输出:

1
[java,spring,mysql]

10.5 counting

1
2
Long count = Stream.of("java", "spring", "mysql")
.collect(Collectors.counting());

当然,很多时候直接用 count() 更简单:

1
2
long count = Stream.of("java", "spring", "mysql")
.count();

10.6 summingInt、averagingInt

1
2
3
4
5
Integer totalAge = userList.stream()
.collect(Collectors.summingInt(User::getAge));

Double avgAge = userList.stream()
.collect(Collectors.averagingInt(User::getAge));

也可以用基本类型流:

1
2
3
4
5
6
7
8
int totalAge = userList.stream()
.mapToInt(User::getAge)
.sum();

double avgAge = userList.stream()
.mapToInt(User::getAge)
.average()
.orElse(0);

10.7 groupingBy

按字段分组:

1
2
Map<Integer, List<User>> groupByAge = userList.stream()
.collect(Collectors.groupingBy(User::getAge));

按状态分组:

1
2
Map<OrderStatus, List<Order>> orderMap = orderList.stream()
.collect(Collectors.groupingBy(Order::getStatus));

分组后统计数量:

1
2
3
4
5
Map<OrderStatus, Long> countMap = orderList.stream()
.collect(Collectors.groupingBy(
Order::getStatus,
Collectors.counting()
));

分组后金额求和:

1
2
3
4
5
6
7
8
Map<OrderStatus, BigDecimal> amountMap = orderList.stream()
.collect(Collectors.groupingBy(
Order::getStatus,
Collectors.mapping(
Order::getAmount,
Collectors.reducing(BigDecimal.ZERO, BigDecimal::add)
)
));

10.8 多字段分组

可以创建一个 key 对象:

1
2
3
4
5
6
7
8
@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderGroupKey {
private Long shopId;
private Long supplierId;
private Integer billType;
}

使用:

1
2
3
4
5
6
Map<OrderGroupKey, List<Order>> groupMap = orderList.stream()
.collect(Collectors.groupingBy(order -> new OrderGroupKey(
order.getShopId(),
order.getSupplierId(),
order.getBillType()
)));

如果只是临时脚本,也可以拼字符串:

1
2
3
4
Map<String, List<Order>> groupMap = orderList.stream()
.collect(Collectors.groupingBy(order ->
order.getShopId() + "_" + order.getSupplierId() + "_" + order.getBillType()
));

但正式业务里更建议使用明确的 key 对象,少用字符串硬拼。硬拼 key 写多了,以后排查问题就像考古。

10.9 partitioningBy

partitioningBy 是特殊的分组,只分成 true 和 false 两组。

1
2
Map<Boolean, List<User>> partitionMap = userList.stream()
.collect(Collectors.partitioningBy(user -> user.getAge() >= 18));

结果:

1
2
3
4
{
true: 成年用户列表,
false: 未成年用户列表
}

10.10 toMap

把 List 转成 Map:

1
2
Map<Long, User> userMap = userList.stream()
.collect(Collectors.toMap(User::getUserId, Function.identity()));

但注意,如果 key 重复,会抛异常。

1
java.lang.IllegalStateException: Duplicate key

所以业务里更推荐写 merge 函数:

1
2
3
4
5
6
Map<Long, User> userMap = userList.stream()
.collect(Collectors.toMap(
User::getUserId,
Function.identity(),
(oldValue, newValue) -> newValue
));

保留旧值:

1
2
3
4
5
6
Map<Long, User> userMap = userList.stream()
.collect(Collectors.toMap(
User::getUserId,
Function.identity(),
(oldValue, newValue) -> oldValue
));

如果需要指定 Map 类型:

1
2
3
4
5
6
7
Map<Long, User> userMap = userList.stream()
.collect(Collectors.toMap(
User::getUserId,
Function.identity(),
(oldValue, newValue) -> newValue,
LinkedHashMap::new
));

10.11 collectingAndThen

collectingAndThen 可以在收集之后再做一次转换。

例如收集成不可变集合:

1
2
3
4
5
6
List<User> result = userList.stream()
.filter(user -> user.getAge() >= 18)
.collect(Collectors.collectingAndThen(
Collectors.toList(),
Collections::unmodifiableList
));

10.12 collect 三参数写法

collect 的三参数形式:

1
2
3
<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);

示例:

1
2
3
4
5
6
7
8
List<String> result = Stream.of("java", "spring", "mysql")
.collect(
ArrayList::new,
ArrayList::add,
ArrayList::addAll
);

System.out.println(result);

这个写法可以理解为:

  • ArrayList::new:创建结果容器;
  • ArrayList::add:把元素放入当前容器;
  • ArrayList::addAll:并行流时合并多个容器。

这也是为什么在并行流中,collect(Collectors.toList()) 通常比 forEach(list::add) 更安全,因为 Collector 会为不同线程维护不同的中间容器,然后再合并。

chapter 11:Optional 补充

很多 Stream 的终端操作返回 Optional

  • findFirst
  • findAny
  • max
  • min
  • reduce 无初始值版本

例如:

1
2
3
Optional<User> optionalUser = userList.stream()
.filter(user -> user.getUserId().equals(1001L))
.findFirst();

11.1 不推荐直接 get

不推荐:

1
User user = optionalUser.get();

如果没有值,会抛出:

1
NoSuchElementException

11.2 推荐写法

有默认值:

1
User user = optionalUser.orElse(new User());

默认值需要延迟创建:

1
User user = optionalUser.orElseGet(User::new);

不存在就抛业务异常:

1
User user = optionalUser.orElseThrow(() -> new BizException("用户不存在"));

存在才执行:

1
2
3
optionalUser.ifPresent(user -> {
System.out.println(user.getUserName());
});

转换:

1
2
3
String userName = optionalUser
.map(User::getUserName)
.orElse("未知用户");

chapter 12:ParallelStream

parallelStream 默认使用了 fork-join 框架,其默认线程数通常与 CPU 核心数相关。

12.1 设置 parallelStream 默认公用线程池的全局并发数

1
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int cupNum = Runtime.getRuntime().availableProcessors();
log.info("CPU num:{}", cupNum);

long firstNum = 1;
long lastNum = 10000;

List<Long> aList = LongStream.rangeClosed(firstNum, lastNum)
.boxed()
.collect(Collectors.toList());

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");

aList.parallelStream()
.forEach(e -> {
log.info("输出:{}", e);
});

注意:这个方式是全局设置,影响的是 ForkJoinPool.commonPool()。在 Spring Boot 这种服务端应用里,不建议在业务代码中随便设置这个全局参数。

它更适合在 JVM 启动参数里设置,例如:

1
-Djava.util.concurrent.ForkJoinPool.common.parallelism=4

并且这个配置应该在 commonPool 初始化之前生效。如果 commonPool 已经初始化,再设置这个属性,可能达不到预期效果。

12.2 通过 ForkJoinPool 定义私有线程池

采用自定义的 ForkJoinPool 线程池去提交任务,主线程不会参与计算。

ForkJoinPool 线程池采用 submit 异步提交任务,通过 get 方法阻塞主线程,直到任务执行完成,再调用 shutdown 方法关闭线程池。

注意,等待提交任务执行完毕不能采用 awaitTermination() 方法,该方法是等待指定时间后强制关闭线程池。

效率方面,针对高密度的 CPU 计算任务,提高线程池的并发数,反而会降低任务的执行效率,因为 CPU 抢占和大量线程频繁切换会增加任务的耗时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
int cupNum = Runtime.getRuntime().availableProcessors();
log.info("CPU num:{}", cupNum);

long firstNum = 1;
long lastNum = 10000;

List<Long> aList = LongStream.rangeClosed(firstNum, lastNum)
.boxed()
.collect(Collectors.toList());

ForkJoinPool forkJoinPool = new ForkJoinPool(8);

try {
List<Long> longs = forkJoinPool.submit(() ->
aList.parallelStream()
.map(e -> {
return e + 1;
})
.collect(Collectors.toList())
).get();

// 通过调用 get 方法,等待任务执行完毕
System.out.println(longs.size());
System.out.println("执行结束");

// 错误示例:
// forkJoinPool.awaitTermination(20, TimeUnit.SECONDS);

// 错误示例:
// ForkJoinTask future = forkJoinPool.submit(() -> aList.parallelStream().forEach(e -> {
// log.info("输出:{}", e);
// }));
// future.get(10, TimeUnit.MINUTES);
// get 方法不能起到阻塞主线程、等待任务执行完毕的作用。
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
forkJoinPool.shutdown();
}

更规范一点的写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ForkJoinPool forkJoinPool = new ForkJoinPool(8);

try {
return forkJoinPool.submit(() ->
list.parallelStream()
.map(this::calculate)
.collect(Collectors.toList())
).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("并行任务被中断", e);
} catch (ExecutionException e) {
throw new RuntimeException("并行任务执行失败", e);
} finally {
forkJoinPool.shutdown();
}

这里有两个点特别重要:

第一,InterruptedException 不能只打印日志,最好恢复中断标记。

1
Thread.currentThread().interrupt();

第二,自定义线程池必须关闭。

1
forkJoinPool.shutdown();

否则可能出现线程资源泄漏。

12.3 parallelStream 背后的 ForkJoinPool

parallelStream 背后使用的是 Fork/Join 思想。

Fork/Join 的核心是“分而治之”:

1
2
3
4
5
6
大任务
-> 子任务1
-> 子任务2
-> 子任务3
-> 子任务4
-> 合并结果

例如:

1
2
3
List<Integer> result = list.parallelStream()
.map(this::calculate)
.collect(Collectors.toList());

底层会尝试把 list 拆成多个分片,不同线程处理不同分片,最后再合并结果。

ForkJoinPool 的典型特点是工作窃取。

简单理解:

  • 每个 worker 线程有自己的任务队列;
  • 如果自己的队列处理完了,就去别的线程队列里偷任务;
  • 这样可以提升 CPU 利用率。

这也是为什么 ArrayList、数组、IntStream.range 这类容易拆分的数据源,更适合并行流。

LinkedList、IO 流、迭代器式数据源,拆分成本更高,并行效果往往不好。

12.4 commonPool 的问题

默认情况下,parallelStream 使用的是公共线程池:

1
ForkJoinPool.commonPool()

这个池是整个 JVM 共享的。

这意味着,假如你的应用里多个地方都用了 parallelStream,它们可能会抢同一个 commonPool。

在服务端应用里,这个问题非常关键。

比如你有两个接口:

1
2
3
4
5
6
7
8
9
// 接口 A
list.parallelStream()
.map(this::callThirdApi)
.collect(Collectors.toList());

// 接口 B
dataList.parallelStream()
.map(this::heavyCalculate)
.collect(Collectors.toList());

如果接口 A 里面是阻塞 IO,比如调用第三方 API、查数据库、查 Redis,它可能把 commonPool 的 worker 线程占住。

这时接口 B 即使只是普通 CPU 计算,也会受到影响。

所以,parallelStream 的坑不在于它不能用,而在于它默认共享 commonPool,影响范围不是当前方法,而可能是整个应用。

12.5 parallelStream 中的主线程参与问题

默认 commonPool 场景下,调用线程可能会参与一部分任务执行,ForkJoinPool 的 worker 线程也会参与执行。

这意味着日志里可能会看到:

1
2
3
4
main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3

如果使用自定义 ForkJoinPool 包起来:

1
2
3
4
5
forkJoinPool.submit(() ->
list.parallelStream()
.map(this::calculate)
.collect(Collectors.toList())
).get();

通常任务会在自定义 ForkJoinPool 的 worker 线程中执行,主线程主要是在外层等待结果。

这也是为什么自定义线程池可以起到隔离作用。

12.6 parallelStream 顺序问题

并行流不保证处理顺序。

1
2
3
4
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);

numbers.parallelStream()
.forEach(System.out::println);

输出可能是:

1
2
3
4
5
6
7
8
9
6
3
8
1
5
2
9
4
7

如果要保证输出顺序:

1
2
numbers.parallelStream()
.forEachOrdered(System.out::println);

但这会牺牲并行性能。

所以要记住:

  • 不关心顺序:forEach
  • 关心顺序:forEachOrdered
  • 关心性能:尽量不要强行要求顺序

12.7 parallelStream 线程安全问题

错误写法:

1
2
3
4
List<Integer> result = new ArrayList<>();

numbers.parallelStream()
.forEach(result::add);

这个代码在并行流中是线程不安全的。

可能出现:

  • 数据丢失;
  • 顺序混乱;
  • 抛异常;
  • 偶现问题,不好复现。

正确写法:

1
2
3
List<Integer> result = numbers.parallelStream()
.filter(n -> n > 3)
.collect(Collectors.toList());

或者:

1
2
3
Set<Integer> result = numbers.parallelStream()
.filter(n -> n > 3)
.collect(Collectors.toSet());

如果确实需要并发集合:

1
2
3
4
Set<Integer> result = ConcurrentHashMap.newKeySet();

numbers.parallelStream()
.forEach(result::add);

但更推荐能用 collect 就用 collect

12.8 parallelStream 中不要做阻塞 IO

不推荐:

1
2
3
List<UserInfo> result = userIds.parallelStream()
.map(userId -> userApi.queryUserInfo(userId))
.collect(Collectors.toList());

看起来很爽,一行代码并发查用户信息。

但问题是:

  • 这是阻塞 IO,不是 CPU 计算;
  • 默认使用 commonPool,可能影响整个应用;
  • 没有明确限流;
  • 没有清晰的超时控制;
  • 出问题时不好定位;
  • 还可能打爆下游服务。

更推荐显式线程池或异步框架:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ExecutorService executorService = Executors.newFixedThreadPool(16);

try {
List<CompletableFuture<UserInfo>> futures = userIds.stream()
.map(userId -> CompletableFuture.supplyAsync(
() -> userApi.queryUserInfo(userId),
executorService
))
.collect(Collectors.toList());

List<UserInfo> result = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
} finally {
executorService.shutdown();
}

这样至少线程池、并发度、隔离边界都掌握在自己手里。

12.9 parallelStream 适合什么场景

比较适合:

  • CPU 密集型计算;
  • 数据量较大;
  • 每个元素处理互不影响;
  • 不依赖顺序;
  • 没有共享可变状态;
  • 数据源容易拆分,比如数组、ArrayList、IntStream.range;
  • 每个元素的计算成本足够高,能抵消线程调度成本。

示例:

1
2
3
List<Result> resultList = dataList.parallelStream()
.map(this::heavyCalculate)
.collect(Collectors.toList());

12.10 parallelStream 不适合什么场景

不适合:

  • 数据量很小;
  • 简单字段转换;
  • IO 密集型任务;
  • 调用数据库;
  • 调用 Redis;
  • 调用第三方接口;
  • 依赖 ThreadLocal;
  • 依赖 MDC 日志上下文;
  • 依赖事务上下文;
  • 操作共享集合;
  • 严格要求顺序;
  • 线上服务里没有明确隔离的公共能力。

例如:

1
2
3
4
list.parallelStream()
.forEach(item -> {
orderMapper.updateById(item);
});

这种写法不推荐。

数据库本来就是共享资源,并行把它打满,不代表你的接口性能提升了,很可能只是把数据库从“上班”变成“上刑”。

12.11 parallelStream 和 ThreadLocal、MDC

在 Spring Boot 项目里,经常会用:

  • ThreadLocal 存用户上下文;
  • MDC 存 traceId;
  • 事务上下文;
  • 租户上下文;
  • 数据权限上下文。

但是 parallelStream 会切换到 ForkJoinPool 的线程中执行。

所以这里的上下文可能丢失。

例如:

1
2
3
4
5
6
MDC.put("traceId", traceId);

list.parallelStream()
.forEach(item -> {
log.info("处理数据: {}", item);
});

日志中可能拿不到 traceId

如果你的业务强依赖上下文,不要轻易使用 parallelStream

可以考虑:

  • 显式传参;
  • 使用自定义线程池并包装上下文;
  • 使用 TransmittableThreadLocal;
  • 干脆不要在这类场景中并行处理。

chapter 13:parallelStream 自定义线程池完整示例

13.1 求和示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ParallelStreamPoolDemo {

public static void main(String[] args) {
long firstNum = 1;
long lastNum = 1_000_000;

List<Long> list = LongStream.rangeClosed(firstNum, lastNum)
.boxed()
.collect(Collectors.toList());

ForkJoinPool customThreadPool = new ForkJoinPool(4);

try {
Long total = customThreadPool.submit(() ->
list.parallelStream()
.reduce(0L, Long::sum)
).get();

System.out.println("total = " + total);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("任务被中断", e);
} catch (ExecutionException e) {
throw new RuntimeException("任务执行失败", e);
} finally {
customThreadPool.shutdown();
}
}
}

13.2 CPU 密集型任务示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class PrimeDemo {

public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(4);

try {
List<Integer> primes = forkJoinPool.submit(() ->
IntStream.range(1, 1_000_000)
.parallel()
.filter(PrimeDemo::isPrime)
.boxed()
.collect(Collectors.toList())
).get();

System.out.println(primes.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("任务被中断", e);
} catch (ExecutionException e) {
throw new RuntimeException("任务执行失败", e);
} finally {
forkJoinPool.shutdown();
}
}

private static boolean isPrime(int n) {
if (n <= 1) {
return false;
}

for (int i = 2; i <= Math.sqrt(n); i++) {
if (n % i == 0) {
return false;
}
}

return true;
}
}

13.3 为什么线程数不是越大越好

假设机器只有 8 核 CPU,你设置:

1
new ForkJoinPool(64)

不一定更快,甚至可能更慢。

原因:

  • CPU 核心有限;
  • 线程越多,上下文切换越多;
  • 缓存命中率可能下降;
  • 任务拆分和合并本身也有成本;
  • 如果任务很小,调度成本可能比计算成本还高。

经验上:

  • CPU 密集型:线程数接近 CPU 核心数;
  • IO 密集型:不要用 parallelStream,改用明确的线程池或异步模型;
  • 线上业务:必须压测,不要凭感觉。

chapter 14:Stream 在业务中的常见写法

14.1 List 转 Map

1
2
3
4
5
6
Map<Long, User> userMap = userList.stream()
.collect(Collectors.toMap(
User::getUserId,
Function.identity(),
(oldValue, newValue) -> newValue
));

14.2 提取 ID 列表

1
2
3
4
5
List<Long> userIds = userList.stream()
.map(User::getUserId)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());

14.3 按状态分组

1
2
Map<Integer, List<Order>> statusOrderMap = orderList.stream()
.collect(Collectors.groupingBy(Order::getStatus));

14.4 按状态统计数量

1
2
3
4
5
Map<Integer, Long> statusCountMap = orderList.stream()
.collect(Collectors.groupingBy(
Order::getStatus,
Collectors.counting()
));

14.5 金额求和

1
2
3
4
BigDecimal totalAmount = orderList.stream()
.map(Order::getAmount)
.filter(Objects::nonNull)
.reduce(BigDecimal.ZERO, BigDecimal::add);

14.6 多条件过滤

1
2
3
4
5
List<Order> result = orderList.stream()
.filter(order -> order.getDeleted() == 0)
.filter(order -> order.getStatus() == OrderStatus.PAID)
.filter(order -> order.getAmount().compareTo(BigDecimal.ZERO) > 0)
.collect(Collectors.toList());

更推荐:

1
2
3
4
5
6
7
8
9
List<Order> result = orderList.stream()
.filter(this::isValidOrder)
.collect(Collectors.toList());

private boolean isValidOrder(Order order) {
return order.getDeleted() == 0
&& order.getStatus() == OrderStatus.PAID
&& order.getAmount().compareTo(BigDecimal.ZERO) > 0;
}

14.7 分组后求和

1
2
3
4
5
6
7
8
Map<Long, BigDecimal> shopAmountMap = orderList.stream()
.collect(Collectors.groupingBy(
Order::getShopId,
Collectors.mapping(
Order::getAmount,
Collectors.reducing(BigDecimal.ZERO, BigDecimal::add)
)
));

也可以这样写,更直观一点:

1
2
3
4
5
6
7
8
9
Map<Long, BigDecimal> shopAmountMap = orderList.stream()
.collect(Collectors.groupingBy(
Order::getShopId,
Collectors.reducing(
BigDecimal.ZERO,
Order::getAmount,
BigDecimal::add
)
));

14.8 分组后取最大值

1
2
3
4
5
Map<Long, Optional<Order>> shopMaxOrderMap = orderList.stream()
.collect(Collectors.groupingBy(
Order::getShopId,
Collectors.maxBy(Comparator.comparing(Order::getAmount))
));

如果不想要 Optional:

1
2
3
4
5
6
7
8
Map<Long, Order> shopMaxOrderMap = orderList.stream()
.collect(Collectors.groupingBy(
Order::getShopId,
Collectors.collectingAndThen(
Collectors.maxBy(Comparator.comparing(Order::getAmount)),
optional -> optional.orElse(null)
)
));

14.9 财务结算类聚合示例

比如有结算明细:

1
2
3
4
5
6
7
8
@Data
public class SettlementDetail {
private Long shopId;
private Long supplierId;
private Integer billType;
private BigDecimal taxAmount;
private BigDecimal noTaxAmount;
}

想按店铺、供应商、单据类型聚合:

1
2
3
4
5
6
7
8
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SettlementGroupKey {
private Long shopId;
private Long supplierId;
private Integer billType;
}

聚合:

1
2
3
4
5
6
7
8
9
10
11
12
Map<SettlementGroupKey, BigDecimal> amountMap = detailList.stream()
.collect(Collectors.groupingBy(
detail -> new SettlementGroupKey(
detail.getShopId(),
detail.getSupplierId(),
detail.getBillType()
),
Collectors.mapping(
detail -> Optional.ofNullable(detail.getTaxAmount()).orElse(BigDecimal.ZERO),
Collectors.reducing(BigDecimal.ZERO, BigDecimal::add)
)
));

如果需要聚合成对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Map<SettlementGroupKey, SettlementSummary> summaryMap = detailList.stream()
.collect(Collectors.groupingBy(
detail -> new SettlementGroupKey(
detail.getShopId(),
detail.getSupplierId(),
detail.getBillType()
),
Collector.of(
SettlementSummary::new,
(summary, detail) -> {
summary.setTaxAmount(
summary.getTaxAmount().add(
Optional.ofNullable(detail.getTaxAmount()).orElse(BigDecimal.ZERO)
)
);
summary.setNoTaxAmount(
summary.getNoTaxAmount().add(
Optional.ofNullable(detail.getNoTaxAmount()).orElse(BigDecimal.ZERO)
)
);
},
(left, right) -> {
left.setTaxAmount(left.getTaxAmount().add(right.getTaxAmount()));
left.setNoTaxAmount(left.getNoTaxAmount().add(right.getNoTaxAmount()));
return left;
}
)
));

SettlementSummary 可以这样定义:

1
2
3
4
5
6
7
@Data
public class SettlementSummary {

private BigDecimal taxAmount = BigDecimal.ZERO;

private BigDecimal noTaxAmount = BigDecimal.ZERO;
}

这种写法适合中小规模内存聚合。如果数据量非常大,还是优先考虑 SQL 聚合、分批处理、临时表、预计算等方案。

chapter 15:Stream 常见坑

15.1 Stream 重复使用

错误:

1
2
3
4
5
Stream<String> stream = list.stream();

stream.forEach(System.out::println);

long count = stream.count();

会报错。

正确:

1
2
3
list.stream().forEach(System.out::println);

long count = list.stream().count();

15.2 修改外部变量

错误:

1
2
3
4
int sum = 0;

numbers.stream()
.forEach(n -> sum += n);

局部变量在 Lambda 中必须是 effectively final。

有人可能会改成数组:

1
2
3
4
int[] sum = {0};

numbers.stream()
.forEach(n -> sum[0] += n);

这在并行流中不安全。

正确:

1
2
3
int sum = numbers.stream()
.mapToInt(Integer::intValue)
.sum();

15.3 forEach 添加到外部集合

错误:

1
2
3
4
List<Integer> result = new ArrayList<>();

numbers.parallelStream()
.forEach(result::add);

正确:

1
2
List<Integer> result = numbers.parallelStream()
.collect(Collectors.toList());

15.4 toMap key 重复

错误:

1
2
Map<Long, User> userMap = userList.stream()
.collect(Collectors.toMap(User::getUserId, Function.identity()));

如果 userId 重复,会抛异常。

正确:

1
2
3
4
5
6
Map<Long, User> userMap = userList.stream()
.collect(Collectors.toMap(
User::getUserId,
Function.identity(),
(oldValue, newValue) -> newValue
));

15.5 空指针问题

错误:

1
2
3
4
List<String> names = userList.stream()
.map(User::getUserName)
.map(String::toUpperCase)
.collect(Collectors.toList());

如果 userName 为 null,会 NPE。

正确:

1
2
3
4
5
List<String> names = userList.stream()
.map(User::getUserName)
.filter(Objects::nonNull)
.map(String::toUpperCase)
.collect(Collectors.toList());

15.6 BigDecimal 求和空值问题

错误:

1
2
3
BigDecimal total = orderList.stream()
.map(Order::getAmount)
.reduce(BigDecimal.ZERO, BigDecimal::add);

如果 amount 有 null,会 NPE。

正确:

1
2
3
BigDecimal total = orderList.stream()
.map(order -> Optional.ofNullable(order.getAmount()).orElse(BigDecimal.ZERO))
.reduce(BigDecimal.ZERO, BigDecimal::add);

或者:

1
2
3
4
BigDecimal total = orderList.stream()
.map(Order::getAmount)
.filter(Objects::nonNull)
.reduce(BigDecimal.ZERO, BigDecimal::add);

15.7 滥用 Stream

不是所有循环都要改成 Stream。

普通写法更清晰时,就用普通写法。

例如:

1
2
3
4
5
6
7
8
9
10
11
for (Order order : orderList) {
if (order == null) {
continue;
}

if (order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
continue;
}

doSomething(order);
}

如果硬改成 Stream:

1
2
3
4
orderList.stream()
.filter(Objects::nonNull)
.filter(order -> order.getAmount().compareTo(BigDecimal.ZERO) > 0)
.forEach(this::doSomething);

这还行。

但如果里面充满复杂判断、异常处理、分支逻辑,就不一定比 for 循环更清晰了。

代码是写给人看的,机器只是顺便执行一下。

chapter 16:性能建议

16.1 小集合不一定要用 Stream

小集合用 for 循环可能更快、更直接。

Stream 的优势是表达能力,不是无条件性能更高。

16.2 避免不必要的装箱拆箱

不推荐:

1
2
Integer sum = numbers.stream()
.reduce(0, Integer::sum);

推荐:

1
2
3
int sum = numbers.stream()
.mapToInt(Integer::intValue)
.sum();

16.3 filter 尽量放前面

如果后面的 map 很重,先过滤可以减少处理量。

推荐:

1
2
3
4
List<Result> result = list.stream()
.filter(this::isValid)
.map(this::heavyConvert)
.collect(Collectors.toList());

不推荐:

1
2
3
4
List<Result> result = list.stream()
.map(this::heavyConvert)
.filter(this::isValidResult)
.collect(Collectors.toList());

16.4 distinct、sorted 要慎用

distinctsorted 都是有状态操作。

数据量大时,它们可能成为性能瓶颈。

16.5 parallelStream 要压测

不要觉得并行一定更快。

是否更快取决于:

  • 数据量;
  • 任务计算成本;
  • CPU 核数;
  • 数据源是否容易拆分;
  • 是否有共享状态;
  • 是否有锁;
  • 是否阻塞;
  • 是否要求顺序。

结论只有一个:压测。别靠直觉,直觉经常比线上事故来得还快。

16.6 高性能场景用 JMH 测

如果你真的关心性能,不要用 System.currentTimeMillis() 随便测。

应该使用 JMH。

简单说,JMH 可以帮你避免:

  • JVM 预热问题;
  • JIT 优化影响;
  • 死代码消除;
  • 单次运行偶然性。

Stream 写法优雅,但在极致性能场景下,for 循环依然可能更强。

chapter 17:Stream 与 Spring Boot 项目实践建议

17.1 Controller 层少写复杂 Stream

Controller 层应该尽量薄。

不推荐:

1
2
3
4
5
6
7
8
@GetMapping("/list")
public List<UserDTO> list() {
return userService.list().stream()
.filter(...)
.map(...)
.sorted(...)
.collect(Collectors.toList());
}

推荐把复杂逻辑放到 Service:

1
2
3
4
@GetMapping("/list")
public List<UserDTO> list() {
return userService.queryUserDTOList();
}

17.2 Service 层可以使用 Stream 做内存转换

1
2
3
4
5
6
7
public List<UserDTO> queryUserDTOList() {
List<User> userList = userRepository.queryValidUsers();

return userList.stream()
.map(this::convertToDTO)
.collect(Collectors.toList());
}

17.3 大数据量优先数据库处理

如果要过滤、排序、分组、分页,优先考虑数据库:

  • where
  • group by
  • order by
  • limit

不要把几十万数据查到 JVM 再 Stream。

不推荐:

1
2
3
4
5
6
7
8
List<Order> allOrders = orderMapper.selectAll();

List<Order> result = allOrders.stream()
.filter(...)
.sorted(...)
.skip(...)
.limit(...)
.collect(Collectors.toList());

推荐:

1
2
3
4
5
select *
from order_table
where status = ?
order by create_time desc
limit ?, ?

17.4 并行流不要包数据库操作

不推荐:

1
2
orderList.parallelStream()
.forEach(order -> orderMapper.updateById(order));

如果确实要批量更新,考虑:

  • 批量 SQL;
  • 分批提交;
  • 消息队列削峰;
  • 明确线程池;
  • 限流;
  • 事务边界;
  • 重试和幂等。

17.5 日志上下文要特别注意

如果使用 parallelStream,MDC 可能丢失。

例如:

1
log.info("traceId={}", MDC.get("traceId"));

在并行线程里可能拿不到。

如果你的系统依赖 traceId、requestId、tenantId,最好不要在这类链路里随意使用 parallelStream。

chapter 18:Stream API 总结

Stream 的核心价值是让集合处理更加声明式、链式、可组合。

它适合:

  • 过滤;
  • 转换;
  • 分组;
  • 聚合;
  • 去重;
  • 排序;
  • 查找;
  • 统计。

但是,使用 Stream 要注意:

  • 中间操作是惰性的;
  • 终端操作会触发执行;
  • Stream 不能重复使用;
  • map 是转换;
  • flatMap 是摊平;
  • reduce 适合不可变归约;
  • collect 适合可变容器收集;
  • parallelStream 不是免费午餐;
  • 并行流不要乱碰共享变量;
  • IO 密集型任务不要默认使用 parallelStream;
  • 服务端应用要警惕 commonPool 共享问题。

如果用一句话总结:

Stream 是一把手术刀,不是电锯。用得准,代码优雅;用得猛,线上冒烟。

参考资料

  1. Oracle Java 官方文档:Stream API。
  2. Oracle Java 官方文档:Collectors。
  3. Oracle Java 官方文档:ForkJoinPool。
  4. CSDN:JAVA– 在 Java8 Parallel Stream 中如何自定义线程池?
  5. 博客园:Java8 并行流 parallelStream 原理分析及注意事项。
  6. 个人原文整理:Stream API 笔记与 parallelStream 示例。

启示录

富贵岂由人,时会高志须酬。

能成功于千载者,必以近察远。

Stream API 的价值,不只是少写几行 for 循环,而是让数据处理过程变得更像一条清晰的管道。

但是越高级的工具,越需要边界感。

顺序流追求表达力,并行流追求吞吐量;一个适合优雅地处理集合,一个适合在明确场景下压榨 CPU。真正成熟的写法,不是把所有循环都改成 Stream,也不是看到 parallelStream 就兴奋,而是知道什么时候该用,什么时候不该用。

写代码也是这样:能快是一种能力,能稳才是工程。


Java Stream API 从入门到实践:从集合处理到 parallelStream
https://allendericdalexander.github.io/2026/06/02/java_steam_api/
作者
AtLuoFu
发布于
2026年6月2日
许可协议