欢迎你来读这篇博客,这篇博客主要是关于Java Stream API。
其中包括了关于我的见解和收集的知识分享。
序言
Java 8 引入 Lambda 和 Stream API 之后,集合处理的写法发生了很大变化。
以前我们处理一个集合,大多数时候都是:
1 2 3 4 5 6 7 8 9
| List<UserDTO> result = new ArrayList<>(); for (User user : userList) { if (user.getAge() >= 18) { UserDTO dto = new UserDTO(); dto.setUserId(user.getUserId()); dto.setUserName(user.getUserName()); result.add(dto); } }
|
现在可以写成:
1 2 3 4 5 6 7 8 9
| List<UserDTO> result = userList.stream() .filter(user -> user.getAge() >= 18) .map(user -> { UserDTO dto = new UserDTO(); dto.setUserId(user.getUserId()); dto.setUserName(user.getUserName()); return dto; }) .collect(Collectors.toList());
|
Stream API 让集合处理更像是在描述“我要做什么”,而不是一步步告诉程序“你怎么做”。
但是,Stream 不是银弹。写得好,是优雅;写不好,是高级版迷魂阵。尤其是 parallelStream,看起来一行代码开启多线程,实际可能把公共线程池打爆,甚至让整个应用莫名变慢。
所以这篇博客不只讲 Stream 怎么用,还会重点讲:
- Stream 的基本概念;
- Stream 的操作分类;
- 常用 API;
map、flatMap、reduce、collect 的真实用法;
parallelStream 的底层原理;
parallelStream 自定义线程池;
- 什么时候适合用并行流,什么时候千万别用;
- 业务代码里怎么写才稳。
正文
chapter 1:Stream 是什么
Java 8 引入了Stream API,提供了一种高效、简洁的方式来处理集合数据,它与 java.io 包里的 InputStream 和 OutputStream 是完全不同的概念。
Stream 是一种用于处理数据流的抽象,它允许我们以声明性的方式对数据进行操作,如过滤、排序、转换等。本文将详细介绍 Java Stream 的基本概念、核心操作、常见用法及其内部工作机制。
Java 8 中的 Stream 是对集合(Collection)对象功能的增强,它专注于对集合对象进行各种非常便利、高效的聚合操作(aggregate operation),或者大批量数据操作(bulk data operation)。
Stream API 借助于同样新出现的 Lambda 表达式,极大地提高编程效率和程序可读性。同时它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势,使用 fork/join 并行方式来拆分任务和加速处理过程。
传统的集合操作使用外部迭代(如 for 循环)来遍历集合,而 Stream 使用内部迭代,通过声明性的方法定义需要对数据进行的操作,由 Stream 框架负责具体的迭代过程。这种方式使代码更加简洁和易读。
1.1 Stream 不是集合
这是很多人刚接触 Stream 时容易混淆的点。
List、Set、Map 是存储数据的容器,而 Stream 不是容器。
Stream 更像是一条流水线:
1
| 数据源 -> 中间操作1 -> 中间操作2 -> 中间操作3 -> 终端操作
|
例如:
1 2 3 4 5
| List<String> result = names.stream() .filter(name -> name.startsWith("A")) .map(String::toUpperCase) .sorted() .collect(Collectors.toList());
|
这段代码中:
names 是数据源;
filter 是过滤;
map 是转换;
sorted 是排序;
collect 是收集结果;
- Stream 本身不存储数据,只负责组织计算过程。
1.2 Stream 的几个核心特点
Stream 有几个非常重要的特点:
第一,Stream 不存储数据。
它只是从集合、数组、文件、生成器等数据源中读取数据,然后经过管道处理。
第二,Stream 不会直接修改原始数据源。
例如:
1 2 3 4 5 6 7 8
| List<String> names = Arrays.asList("tom", "jack", "rose");
List<String> upperNames = names.stream() .map(String::toUpperCase) .collect(Collectors.toList());
System.out.println(names); System.out.println(upperNames);
|
输出:
1 2
| [tom, jack, rose] [TOM, JACK, ROSE]
|
原集合没有被修改。
第三,Stream 是惰性执行的。
中间操作不会立即执行,只有遇到终端操作时,整个流水线才会真正执行。
1 2 3 4 5 6 7 8 9
| Stream<String> stream = names.stream() .filter(name -> { System.out.println("filter: " + name); return name.startsWith("t"); });
System.out.println("还没有执行终端操作");
List<String> result = stream.collect(Collectors.toList());
|
在 collect 调用之前,filter 里面的打印不会执行。
第四,Stream 只能被消费一次。
1 2 3 4 5 6
| Stream<String> stream = names.stream();
stream.forEach(System.out::println);
stream.count();
|
会出现:
1
| java.lang.IllegalStateException: stream has already been operated upon or closed
|
所以 Stream 不是集合对象,不能反复遍历。用完一次,这条流水线就结束了。
chapter 2:Stream 的生命周期
这是因为流的生命周期有三个阶段:
- 起始生成阶段。
- 中间操作:会逐一获取元素并进行处理。可有可无。所有中间操作都是惰性的,因此,流在管道中流动之前,任何操作都不会产生任何影响。Stream 的惰性求值特性使得中间操作不会立即执行,只有在执行终端操作时,整个操作链才会开始计算。这种机制可以有效地减少不必要的计算,提高性能。
- 终端操作。通常分为最终的消费(foreach 之类的)和归纳(collect)两类。还有重要的一点就是终端操作启动了流在管道中的流动。
完整结构大概是:
1 2 3 4 5
| source.stream() .filter(...) .map(...) .sorted(...) .collect(...);
|
可以理解为:
1
| 数据源 -> 中间操作 -> 中间操作 -> 中间操作 -> 终端操作
|
其中,中间操作可以有多个,终端操作只能有一个。
例如:
1 2 3 4 5
| List<String> result = Arrays.asList("java", "spring", "mysql", "redis") .stream() .filter(s -> s.length() > 4) .map(String::toUpperCase) .collect(Collectors.toList());
|
在这里:
Arrays.asList(...) 是数据源;
stream() 是创建流;
filter() 是中间操作;
map() 是中间操作;
collect() 是终端操作。
chapter 3:操作分类
Stream 的操作可以分为两大类:中间操作、终结操作。
中间操作可分为:
- 无状态(Stateless)操作:指元素的处理不受之前元素的影响;
- 有状态(Stateful)操作:指该操作只有拿到所有元素之后才能继续下去。
终结操作可分为:
- 短路(Short-circuiting)操作:指遇到某些符合条件的元素就可以得到最终结果;
- 非短路(Unshort-circuiting)操作:指必须处理完所有元素才能得到最终结果。
3.1 中间操作
常见中间操作:
filter:过滤元素;
map:映射元素;
flatMap:平铺流;
distinct:去重;
sorted:排序;
peek:查看元素;
limit:截取前 N 个元素;
skip:跳过前 N 个元素;
unordered:去除有序约束。
3.2 终端操作
常见终端操作:
forEach:遍历元素;
collect:收集结果;
reduce:归约操作;
toArray:转换为数组;
min:最小值;
max:最大值;
count:计数;
anyMatch:是否有任意匹配;
allMatch:是否全部匹配;
noneMatch:是否全部不匹配;
findFirst:找到第一个元素;
findAny:找到任意一个元素。
3.3 更准确的分类表
| 类型 |
操作 |
特点 |
| 无状态中间操作 |
filter、map、flatMap、peek、unordered |
每个元素可以独立处理 |
| 有状态中间操作 |
distinct、sorted、limit、skip |
需要维护状态,甚至需要看到多个元素 |
| 短路终端操作 |
anyMatch、allMatch、noneMatch、findFirst、findAny |
不一定处理完所有元素 |
| 非短路终端操作 |
forEach、collect、reduce、count、max、min、toArray |
通常需要处理完整个流 |
补充一句:limit 也带有短路特征,它虽然是中间操作,但遇到足够数量的元素后,就可以停止继续向后获取数据。
chapter 4:Stream API 创建操作
4.1 通过 Collection 创建流
通过 java.util.Collection.stream() 方法用集合创建流。
1 2 3 4 5 6 7
| List<String> list = Arrays.asList("hello", "world", "stream");
Stream<String> stream = list.stream();
Stream<String> parallelStream = list.parallelStream();
|
4.2 通过数组创建流
使用 java.util.Arrays.stream(T[] array) 方法用数组创建流。
1 2 3
| String[] array = {"h", "e", "l", "l", "o"};
Stream<String> arrayStream = Arrays.stream(array);
|
4.3 使用 Stream 静态方法创建流
Stream 的静态方法包括:of()、iterate()、generate()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| Stream<Integer> stream1 = Stream.of(1, 2, 3, 4, 5, 6);
Stream<Integer> stream2 = Stream.iterate(0, (x) -> x + 2).limit(3); stream2.forEach(System.out::println);
Stream<Double> stream3 = Stream.generate(Math::random).limit(3); stream3.forEach(System.out::println);
0 2 4 0.9620319103852426 0.8303672905658537 0.09203215202737569
|
4.4 使用基本类型流
为了避免装箱、拆箱带来的性能损耗,Java 提供了基本类型流:
IntStream
LongStream
DoubleStream
例如:
1 2 3 4
| int sum = IntStream.rangeClosed(1, 100) .sum();
System.out.println(sum);
|
再比如:
1 2 3 4 5 6
| double average = userList.stream() .mapToInt(User::getAge) .average() .orElse(0);
System.out.println(average);
|
能用 mapToInt、mapToLong、mapToDouble 的地方,不一定非要用 map。尤其是数据量大时,基本类型流会更稳一点。
4.5 创建 parallelStream
stream 和 parallelStream 的简单区分:stream 是顺序流,由主线程按顺序对流执行操作,而 parallelStream 是并行流,内部以多线程并行执行的方式对流进行操作,需要注意使用并行流的前提是流中的数据处理没有顺序要求。
1 2 3 4 5 6 7 8 9 10 11 12
| List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
numbers.parallelStream() .filter(n -> n % 2 == 0) .forEach(System.out::println);
Optional<Integer> findFirst = numbers.stream() .parallel() .filter(x -> x > 4) .findFirst();
|
还可以通过 sequential() 把并行流转回顺序流:
1 2 3 4
| List<Integer> result = numbers.parallelStream() .filter(n -> n > 3) .sequential() .collect(Collectors.toList());
|
chapter 5:无状态操作
5.1 filter
filter 是筛选操作,是按照一定的规则校验流中的元素,将符合条件的元素提取到新的流中的操作。
1
| Stream<T> filter(Predicate<? super T> predicate);
|
示例:
1 2 3 4 5 6
| List<Integer> list = Arrays.asList(6, 7, 3, 8, 1, 2);
Stream<Integer> stream = list.stream();
stream.filter(x -> x > 5) .forEach(System.out::println);
|
输出:
业务中常见写法:
1 2 3 4
| List<Order> validOrders = orderList.stream() .filter(order -> order.getDeleted() == 0) .filter(order -> order.getStatus() == OrderStatus.PAID) .collect(Collectors.toList());
|
如果过滤条件很多,建议抽成方法,不然 Lambda 会变成意大利面条。
1 2 3 4 5 6 7 8 9
| List<Order> validOrders = orderList.stream() .filter(this::isValidPaidOrder) .collect(Collectors.toList());
private boolean isValidPaidOrder(Order order) { return order.getDeleted() == 0 && order.getStatus() == OrderStatus.PAID && order.getAmount().compareTo(BigDecimal.ZERO) > 0; }
|
5.2 map
map 可以把一个元素类型为 T 的流转换成元素类型为 R 的流。这个方法传入一个 Function 函数式接口,接收一个泛型 T,返回泛型 R。
1
| <R> Stream<R> map(Function<? super T, ? extends R> mapper);
|
将集合中的元素 A 转换成想要得到的 B。
1 2 3
| List<String> output = wordList.stream() .map(String::toUpperCase) .collect(Collectors.toList());
|
业务中最常见的是实体转 DTO:
1 2 3 4 5 6 7 8 9
| List<UserDTO> dtoList = userList.stream() .map(user -> { UserDTO dto = new UserDTO(); dto.setUserId(user.getUserId()); dto.setUserName(user.getUserName()); dto.setPhone(user.getPhone()); return dto; }) .collect(Collectors.toList());
|
如果转换逻辑复杂,建议封装方法:
1 2 3 4 5 6 7 8 9 10 11
| List<UserDTO> dtoList = userList.stream() .map(this::convertToDTO) .collect(Collectors.toList());
private UserDTO convertToDTO(User user) { UserDTO dto = new UserDTO(); dto.setUserId(user.getUserId()); dto.setUserName(user.getUserName()); dto.setPhone(user.getPhone()); return dto; }
|
5.3 flatMap
flatMap 接收一个函数作为参数,将流中的每个值都换成另一个流,然后把所有流连接成一个流。
与 map 功能类似,区别在于它可以将多个子流压平成一个流。
1
| <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
|
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| List<String> list1 = Arrays.asList("m,k,l,a", "1,3,5,7");
List<String> listNew = list1.stream() .flatMap(s -> { String[] split = s.split(","); Stream<String> s2 = Arrays.stream(split); return s2; }) .collect(Collectors.toList());
System.out.println("处理前的集合:" + list1); System.out.println("处理后的集合:" + listNew);
|
更常见的业务场景是订单和明细:
1 2 3
| List<OrderItem> allItems = orderList.stream() .flatMap(order -> order.getItemList().stream()) .collect(Collectors.toList());
|
如果用 map,结果会变成:
而使用 flatMap,结果是:
一句话:map 是一对一转换,flatMap 是一对多之后再摊平。
5.4 peek
peek 操作接收的是一个 Consumer<T> 函数。顾名思义,peek 操作会按照 Consumer<T> 函数提供的逻辑去消费流中的每一个元素,同时有可能改变元素内部的一些属性。
1
| Stream<T> peek(Consumer<? super T> action);
|
peek 操作一般用于不想改变流中元素本身的类型,或者只想查看元素、调试链路时。
1 2 3 4 5
| List<String> result = Stream.of("java", "spring", "mysql") .peek(s -> System.out.println("before map: " + s)) .map(String::toUpperCase) .peek(s -> System.out.println("after map: " + s)) .collect(Collectors.toList());
|
但不要滥用 peek 做业务逻辑。
不推荐:
1 2 3
| List<User> users = userList.stream() .peek(user -> user.setStatus(1)) .collect(Collectors.toList());
|
更推荐:
1 2 3 4 5 6
| List<User> users = userList.stream() .map(user -> { user.setStatus(1); return user; }) .collect(Collectors.toList());
|
当然,如果追求不可变对象,那就创建新对象,而不是修改原对象。
5.5 mapToInt、mapToLong、mapToDouble
mapToInt、mapToLong、mapToDouble、flatMapToDouble、flatMapToInt、flatMapToLong 是 map 和 flatMap 的特例版,也就是针对特定的数据类型进行映射处理。
对应的方法接口如下:
1 2 3 4 5 6 7 8 9 10 11
| IntStream mapToInt(ToIntFunction<? super T> mapper);
LongStream mapToLong(ToLongFunction<? super T> mapper);
DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper);
LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper);
DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper);
|
示例:
1 2 3 4
| Stream<String> stream = Stream.of("hello", "felord.cn");
stream.mapToInt(s -> s.length()) .forEach(System.out::println);
|
输出:
除此之外,还有封装好的 Stream,如 IntStream、LongStream、DoubleStream。
5.6 unordered
unordered() 操作不会执行任何操作来显式地对流进行排序。它的作用是消除了流必须保持有序的约束,从而允许后续操作使用不必考虑排序的优化。
对于顺序流,顺序的存在与否不会影响性能,只影响确定性。如果流是顺序的,则在相同的源上重复执行相同的流管道将产生相同的结果;如果是非顺序流,重复执行可能会产生不同的结果。
对于并行流,放宽排序约束有时可以实现更高效的执行。在流有序时,但用户不特别关心该顺序的情况下,使用 unordered 明确地去除有序约束,可以改善某些有状态或终端操作的并行性能。
1 2 3 4 5 6 7 8
| Stream.of(5, 1, 2, 6, 3, 7, 4) .unordered() .forEach(System.out::println);
Stream.of(5, 1, 2, 6, 3, 7, 4) .unordered() .parallel() .forEach(System.out::println);
|
两次输出结果对比:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| 第一遍: 第二遍:
5 5 1 1 2 2 6 6 3 3 7 7 4 4
3 3 6 6 4 7 7 5 2 4 1 1 5 2
|
并行流下,顺序不稳定是正常现象。你要是既想并行,又想严格有序,还想飞快,那就有点像既想马儿跑,又想马儿不吃草,还要它顺便给你写单元测试。
chapter 6:有状态操作
6.1 distinct
distinct 返回由该流的不同元素组成的流,根据 Object.equals(Object) 判断。
distinct() 使用 hashCode() 和 equals() 方法来获取不同的元素。因此,如果是自定义类,需要正确实现 hashCode() 和 equals() 方法。
示例:
1 2 3 4
| Stream<String> stream = Stream.of("1", "3", "4", "10", "4", "6", "23", "3");
stream.distinct() .forEach(System.out::println);
|
输出:
如果是对象去重:
1 2 3 4 5 6 7
| @Data @AllArgsConstructor @NoArgsConstructor public class User { private Long userId; private String userName; }
|
如果没有重写 equals 和 hashCode,distinct() 可能达不到预期。
业务中经常按某个字段去重,可以这样写:
1 2 3 4 5 6 7 8
| public static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) { Set<Object> seen = ConcurrentHashMap.newKeySet(); return t -> seen.add(keyExtractor.apply(t)); }
List<User> uniqueUsers = userList.stream() .filter(distinctByKey(User::getUserId)) .collect(Collectors.toList());
|
如果是并行流,这个方法里用了 ConcurrentHashMap.newKeySet(),相对安全一些。但是,能不用副作用就不用副作用,越简单越不容易炸。
6.2 sorted
sorted 返回由该流的元素组成的流,并根据自然顺序排序。
1 2 3
| Stream<T> sorted();
Stream<T> sorted(Comparator<? super T> comparator);
|
示例:
1 2 3 4
| Stream<Integer> stream = Stream.of(3, 1, 10, 16, 8, 4, 9);
stream.sorted() .forEach(System.out::println);
|
对象排序:
1 2 3
| List<User> users = userList.stream() .sorted(Comparator.comparing(User::getAge)) .collect(Collectors.toList());
|
倒序:
1 2 3
| List<User> users = userList.stream() .sorted(Comparator.comparing(User::getAge).reversed()) .collect(Collectors.toList());
|
多个字段排序:
1 2 3 4 5 6
| List<User> users = userList.stream() .sorted( Comparator.comparing(User::getAge) .thenComparing(User::getUserName) ) .collect(Collectors.toList());
|
注意:sorted 是有状态操作,需要看到多个元素后才能排序。大数据量时要关注内存和性能。
6.3 limit
limit 获取流中前 n 个元素返回的流。
1
| Stream<T> limit(long maxSize);
|
示例:
1 2 3 4
| Stream<Integer> stream = Stream.of(3, 1, 10, 16, 8, 4, 9);
stream.limit(3) .forEach(System.out::println);
|
输出:
常见场景:取 Top N。
1 2 3 4
| List<Order> top10Orders = orderList.stream() .sorted(Comparator.comparing(Order::getAmount).reversed()) .limit(10) .collect(Collectors.toList());
|
6.4 skip
skip 在丢弃流的前 n 个元素之后,返回由该流的其余元素组成的流。
示例:
1 2 3 4
| Stream<Integer> stream = Stream.of(3, 1, 10, 16, 8, 4, 9);
stream.skip(3) .forEach(System.out::println);
|
输出:
分页示例:
1 2 3 4
| List<Order> pageList = orderList.stream() .skip((long) (pageNo - 1) * pageSize) .limit(pageSize) .collect(Collectors.toList());
|
但是注意,真实业务中大分页不要在内存里这么玩,应该交给数据库分页。不然数据量一大,内存会笑着笑着就哭了。
chapter 7:短路操作
7.1 anyMatch
anyMatch 表示 Stream 中只要有一个元素符合传入的 predicate,就返回 true。
1
| boolean anyMatch(Predicate<? super T> predicate);
|
示例:
1 2 3 4 5 6
| Stream<Integer> stream = Stream.of(3, 1, 10, 16, 8, 4, 9);
System.out.println("result=" + stream.anyMatch(s -> s == 2));
result=false
|
业务示例:
1 2
| boolean hasInvalidOrder = orderList.stream() .anyMatch(order -> order.getAmount().compareTo(BigDecimal.ZERO) < 0);
|
7.2 allMatch
allMatch 表示 Stream 中全部元素符合传入的 predicate,返回 true。
1
| boolean allMatch(Predicate<? super T> predicate);
|
示例:
1 2 3 4 5 6
| Stream<Integer> stream = Stream.of(3, 1, 10, 16, 8, 4, 9);
System.out.println("result=" + stream.allMatch(s -> s > 0));
result=true
|
业务示例:
1 2
| boolean allPaid = orderList.stream() .allMatch(order -> order.getStatus() == OrderStatus.PAID);
|
7.3 noneMatch
noneMatch 表示 Stream 中没有任何元素符合传入的 predicate,返回 true。
1
| boolean noneMatch(Predicate<? super T> predicate);
|
示例:
1 2 3 4 5 6
| Stream<Integer> stream = Stream.of(3, 1, 10, 16, 8, 4, 9);
System.out.println("result=" + stream.noneMatch(s -> s >= 17));
result=true
|
业务示例:
1 2
| boolean noDeletedOrder = orderList.stream() .noneMatch(order -> order.getDeleted() == 1);
|
7.4 findFirst
findFirst 用于返回满足条件的第一个元素,但是该元素是封装在 Optional 类中。
1
| Optional<T> findFirst();
|
示例:
1 2 3 4 5 6
| Stream<Integer> stream = Stream.of(3, 1, 10, 16, 8, 4, 9);
System.out.println("result=" + stream.findFirst().get());
result=3
|
再看过滤之后:
1 2 3 4 5 6
| Stream<Integer> stream = Stream.of(3, 1, 10, 16, 8, 4, 9);
System.out.println("result=" + stream.filter(s -> s > 3).findFirst().get());
result=10
|
建议不要直接 .get(),更稳的是:
1 2 3 4
| Integer result = Stream.of(3, 1, 10, 16, 8, 4, 9) .filter(s -> s > 3) .findFirst() .orElse(null);
|
或者:
1 2 3 4
| Integer result = Stream.of(3, 1, 10, 16, 8, 4, 9) .filter(s -> s > 3) .findFirst() .orElseThrow(() -> new IllegalArgumentException("未找到符合条件的数据"));
|
7.5 findAny
findAny 返回流中的任意元素,该元素也是封装在 Optional 中。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13
| List<String> strAry = Arrays.asList( "Jhonny", "David", "Jack", "Duke", "Jill", "Dany", "Julia", "Jenish", "Divya" );
String result = strAry.parallelStream() .filter(s -> s.startsWith("J")) .findAny() .get();
System.out.println("result = " + result);
result = Jill
|
在顺序流中,findAny 很多时候看起来和 findFirst 类似。但在并行流中,findAny 可以返回任意一个符合条件的元素,通常性能更好一些。
如果你不关心顺序,只关心有没有一个结果,用 findAny。
如果你必须拿第一个,用 findFirst。
chapter 8:非短路终端操作
8.1 forEach
forEach 接收一个 Lambda 表达式,然后在 Stream 的每一个元素上执行该表达式。
1
| void forEach(Consumer<? super T> action);
|
示例:
1 2 3 4 5 6 7 8 9 10
| List<String> strAry = Arrays.asList( "Jhonny", "David", "Jack", "Duke", "Jill", "Dany", "Julia", "Jenish", "Divya" );
strAry.stream() .forEach(s -> { if ("Jack".equalsIgnoreCase(s)) { System.out.println(s); } });
|
注意,forEach 是终端操作,通常用于消费数据,而不是返回结果。
不推荐:
1 2 3 4 5
| List<String> result = new ArrayList<>();
list.stream() .filter(s -> s.length() > 3) .forEach(result::add);
|
推荐:
1 2 3
| List<String> result = list.stream() .filter(s -> s.length() > 3) .collect(Collectors.toList());
|
尤其是并行流中,不要对普通 ArrayList 做 forEach(result::add),这是线程不安全的。
8.2 forEachOrdered
forEachOrdered 接收一个 Lambda 表达式,然后按顺序在 Stream 的每一个元素上执行该表达式。
1
| void forEachOrdered(Consumer<? super T> action);
|
示例:
1 2 3
| Stream.of("AAA,", "BBB,", "CCC,", "DDD,") .parallel() .forEach(System.out::print);
|
输出顺序可能是:
如果使用 forEachOrdered:
1 2 3
| Stream.of("AAA,", "BBB,", "CCC,", "DDD,") .parallel() .forEachOrdered(System.out::print);
|
输出顺序更稳定:
但要注意,forEachOrdered 为了保证顺序,可能牺牲并行性能。
8.3 toArray
toArray 返回包含此流元素的数组。当有参数时,则使用提供的 generator 函数分配返回的数组,以及分区执行或调整大小可能需要的任何其他数组。
1 2 3
| Object[] toArray();
<A> A[] toArray(IntFunction<A[]> generator);
|
示例:
1 2 3 4 5 6 7
| List<String> strList = Arrays.asList( "Jhonny", "David", "Jack", "Duke", "Jill", "Dany", "Julia", "Jenish", "Divya" );
Object[] strAryNoArg = strList.stream().toArray();
String[] strAry = strList.stream().toArray(String[]::new);
|
更推荐使用带类型的写法:
1 2
| String[] strAry = strList.stream() .toArray(String[]::new);
|
8.4 max
根据提供的 Comparator 返回此流的最大元素。
1
| Optional<T> max(Comparator<? super T> comparator);
|
示例:
1 2 3 4 5
| List<Integer> num = Arrays.asList(4, 5, 6);
num.stream() .max(Integer::compareTo) .ifPresent(System.out::println);
|
业务示例:
1 2
| Optional<Order> maxOrder = orderList.stream() .max(Comparator.comparing(Order::getAmount));
|
8.5 min
根据提供的 Comparator 返回此流的最小元素。
1
| Optional<T> min(Comparator<? super T> comparator);
|
示例:
1 2 3 4 5
| List<Integer> num = Arrays.asList(4, 5, 6);
num.stream() .min(Integer::compareTo) .ifPresent(System.out::println);
|
业务示例:
1 2
| Optional<Order> minOrder = orderList.stream() .min(Comparator.comparing(Order::getAmount));
|
8.6 count
返回此流中的元素计数。
示例:
1 2 3
| List<Integer> num = Arrays.asList(4, 5, 6);
System.out.println(num.stream().count());
|
业务示例:
1 2 3
| long paidCount = orderList.stream() .filter(order -> order.getStatus() == OrderStatus.PAID) .count();
|
chapter 9:reduce 归约操作
reduce 接收一个函数作为累加器,数组中的每个值从左到右开始缩减,最终计算为一个值。
1 2 3 4 5 6 7
| Optional<T> reduce(BinaryOperator<T> accumulator);
T reduce(T identity, BinaryOperator<T> accumulator);
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
|
参数的定义结构如下:
1 2 3 4 5 6 7 8 9 10
| @FunctionalInterface public interface BinaryOperator<T> extends BiFunction<T, T, T> { }
@FunctionalInterface public interface BiFunction<T, U, R> { R apply(T t, U u); }
|
9.1 reduce 示例一:无初始值
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| List<Integer> num = Arrays.asList(1, 2, 4, 5, 6, 7);
Integer integer = num.stream() .reduce(new BinaryOperator<Integer>() { @Override public Integer apply(Integer a, Integer b) { System.out.println("x:" + a); return a + b; } }) .get();
System.out.println("result:" + integer);
|
等效写法:
1 2 3 4 5 6 7 8
| Integer result = num.stream() .reduce((x, y) -> { System.out.println("x:" + x); return x + y; }) .get();
System.out.println("result:" + result);
|
等效的普通写法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| boolean flag = false; int temp = 0;
for (Integer integer : num) { if (!flag) { temp = integer; flag = true; } else { System.out.println("x:" + temp); temp += integer; } }
System.out.println("result:" + temp);
|
9.2 reduce 示例二:有初始值
1 2 3 4 5 6 7 8 9 10 11 12 13
| List<Integer> num = Arrays.asList(1, 2, 4, 5, 6, 7);
Integer integer = num.stream() .reduce(1, new BinaryOperator<Integer>() { @Override public Integer apply(Integer a, Integer b) { System.out.println("a=" + a); return a + b; } });
System.out.println("result:" + integer);
|
普通 for 循环写法:
1 2 3 4 5 6 7 8
| int temp = 1;
for (Integer integer : num) { System.out.println("a=" + temp); temp += integer; }
System.out.println("result:" + temp);
|
9.3 reduce 示例三:串行流中的三个参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| List<Integer> num = Arrays.asList(1, 2, 3, 4, 5, 6);
List<Integer> other = new ArrayList<>(); other.addAll(Arrays.asList(7, 8, 9, 10));
num.stream() .reduce( other, (x, y) -> { System.out.println(JSON.toJSONString(x)); x.add(y); return x; }, (x, y) -> { System.out.println("并行才会出现:" + JSON.toJSONString(x)); return x; } );
|
输出结果:
1 2 3 4 5 6
| [7,8,9,10,1] [7,8,9,10,1,2] [7,8,9,10,1,2,3] [7,8,9,10,1,2,3,4] [7,8,9,10,1,2,3,4,5] [7,8,9,10,1,2,3,4,5,6]
|
9.4 reduce 示例四:并行流中的错误写法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| List<Integer> num = Arrays.asList(4, 5, 6);
List<Integer> other = new ArrayList<>(); other.addAll(Arrays.asList(1, 2, 3));
num.parallelStream() .reduce( other, (x, y) -> { x.add(y); System.out.println(JSON.toJSONString(x)); return x; }, (x, y) -> { x.addAll(y); System.out.println("结合:" + JSON.toJSONString(x)); return x; } );
|
输出结果可能是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| [1,2,3,4,5,6] [1,2,3,4,5,6] [1,2,3,4,5,6] 结合:[1,2,3,4,5,6,1,2,3,4,5,6] 结合:[1,2,3,4,5,6,1,2,3,4,5,6,1,2,3,4,5,6,1,2,3,4,5,6]
[1,2,3,4,6] [1,2,3,4,6] [1,2,3,4,6] 结合:[1,2,3,4,6,1,2,3,4,6] 结合:[1,2,3,4,6,1,2,3,4,6,1,2,3,4,6,1,2,3,4,6]
[1,2,3,5,4,6] [1,2,3,5,4,6] [1,2,3,5,4,6] 结合:[1,2,3,5,4,6,1,2,3,5,4,6] 结合:[1,2,3,5,4,6,1,2,3,5,4,6,1,2,3,5,4,6,1,2,3,5,4,6]
|
每个结果都是乱序的,并且多执行几次,都会出现不同的结果。并且第三个参数组合器内的代码也得到了执行。
这就是因为并行时,使用多线程时顺序性没有保障所产生的结果。
通过实践可以看到:组合器的作用,其实是对参数 2 中各个线程产生的结果进行再一遍的归约操作。
并且仔细看第二遍的执行结果:每一组都少了 1 个值。
所以,对于并行流 parallelStream 操作,必须慎用。
9.5 reduce 的核心原则
reduce 不是用来随便改集合的。它更适合做不可变归约。
适合:
1 2
| Integer sum = numbers.stream() .reduce(0, Integer::sum);
|
适合:
1 2 3 4
| BigDecimal totalAmount = orderList.stream() .map(Order::getAmount) .filter(Objects::nonNull) .reduce(BigDecimal.ZERO, BigDecimal::add);
|
不适合:
1 2 3 4 5 6 7 8 9 10 11 12
| List<Integer> result = numbers.parallelStream() .reduce( new ArrayList<>(), (list, number) -> { list.add(number); return list; }, (list1, list2) -> { list1.addAll(list2); return list1; } );
|
如果你想把元素收集到集合,请用 collect,不要用 reduce 硬凹。
正确写法:
1 2
| List<Integer> result = numbers.parallelStream() .collect(Collectors.toList());
|
9.6 reduce 的三个要求
写 reduce 时,要记住三个要求:
第一,identity 必须是归约操作的单位元。
比如加法的单位元是 0:
1 2
| int sum = numbers.stream() .reduce(0, Integer::sum);
|
乘法的单位元是 1:
1 2
| int multiply = numbers.stream() .reduce(1, (a, b) -> a * b);
|
第二,accumulator 最好是无副作用的。
不要在 accumulator 里面修改外部变量。
不推荐:
1 2 3 4 5 6 7
| List<Integer> result = new ArrayList<>();
numbers.stream() .reduce(0, (a, b) -> { result.add(b); return a + b; });
|
第三,combiner 必须能正确合并并行结果。
在顺序流里,combiner 可能不执行;在并行流里,combiner 会执行。
所以你不能写一个在顺序流里看起来没问题、在并行流里直接炸锅的 combiner。
chapter 10:collect 收集操作
collect 是一个终端操作,它接收的参数是将流中的元素累积到汇总结果的各种方式。
1 2 3 4 5
| <R, A> R collect(Collector<? super T, A, R> collector);
<R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);
|
第一种方式会比较经常使用到,也比较方便使用。
常用方法包括:
toList
toSet
toCollection
counting
summingInt
averagingInt
joining
maxBy
minBy
reducing
collectingAndThen
groupingBy
partitioningBy
mapping
filtering
toMap
10.1 toList
1 2 3
| List<String> result = Stream.of("java", "spring", "mysql") .filter(s -> s.length() > 4) .collect(Collectors.toList());
|
10.2 toSet
1 2
| Set<String> result = Stream.of("java", "spring", "mysql", "java") .collect(Collectors.toSet());
|
10.3 toCollection
如果你想指定集合类型,可以用 toCollection。
1 2
| LinkedList<String> result = Stream.of("java", "spring", "mysql") .collect(Collectors.toCollection(LinkedList::new));
|
10.4 joining
拼接字符串:
1 2 3 4
| String result = Stream.of("java", "spring", "mysql") .collect(Collectors.joining(","));
System.out.println(result);
|
输出:
带前后缀:
1 2 3 4
| String result = Stream.of("java", "spring", "mysql") .collect(Collectors.joining(",", "[", "]"));
System.out.println(result);
|
输出:
10.5 counting
1 2
| Long count = Stream.of("java", "spring", "mysql") .collect(Collectors.counting());
|
当然,很多时候直接用 count() 更简单:
1 2
| long count = Stream.of("java", "spring", "mysql") .count();
|
10.6 summingInt、averagingInt
1 2 3 4 5
| Integer totalAge = userList.stream() .collect(Collectors.summingInt(User::getAge));
Double avgAge = userList.stream() .collect(Collectors.averagingInt(User::getAge));
|
也可以用基本类型流:
1 2 3 4 5 6 7 8
| int totalAge = userList.stream() .mapToInt(User::getAge) .sum();
double avgAge = userList.stream() .mapToInt(User::getAge) .average() .orElse(0);
|
10.7 groupingBy
按字段分组:
1 2
| Map<Integer, List<User>> groupByAge = userList.stream() .collect(Collectors.groupingBy(User::getAge));
|
按状态分组:
1 2
| Map<OrderStatus, List<Order>> orderMap = orderList.stream() .collect(Collectors.groupingBy(Order::getStatus));
|
分组后统计数量:
1 2 3 4 5
| Map<OrderStatus, Long> countMap = orderList.stream() .collect(Collectors.groupingBy( Order::getStatus, Collectors.counting() ));
|
分组后金额求和:
1 2 3 4 5 6 7 8
| Map<OrderStatus, BigDecimal> amountMap = orderList.stream() .collect(Collectors.groupingBy( Order::getStatus, Collectors.mapping( Order::getAmount, Collectors.reducing(BigDecimal.ZERO, BigDecimal::add) ) ));
|
10.8 多字段分组
可以创建一个 key 对象:
1 2 3 4 5 6 7 8
| @Data @AllArgsConstructor @NoArgsConstructor public class OrderGroupKey { private Long shopId; private Long supplierId; private Integer billType; }
|
使用:
1 2 3 4 5 6
| Map<OrderGroupKey, List<Order>> groupMap = orderList.stream() .collect(Collectors.groupingBy(order -> new OrderGroupKey( order.getShopId(), order.getSupplierId(), order.getBillType() )));
|
如果只是临时脚本,也可以拼字符串:
1 2 3 4
| Map<String, List<Order>> groupMap = orderList.stream() .collect(Collectors.groupingBy(order -> order.getShopId() + "_" + order.getSupplierId() + "_" + order.getBillType() ));
|
但正式业务里更建议使用明确的 key 对象,少用字符串硬拼。硬拼 key 写多了,以后排查问题就像考古。
10.9 partitioningBy
partitioningBy 是特殊的分组,只分成 true 和 false 两组。
1 2
| Map<Boolean, List<User>> partitionMap = userList.stream() .collect(Collectors.partitioningBy(user -> user.getAge() >= 18));
|
结果:
1 2 3 4
| { true: 成年用户列表, false: 未成年用户列表 }
|
10.10 toMap
把 List 转成 Map:
1 2
| Map<Long, User> userMap = userList.stream() .collect(Collectors.toMap(User::getUserId, Function.identity()));
|
但注意,如果 key 重复,会抛异常。
1
| java.lang.IllegalStateException: Duplicate key
|
所以业务里更推荐写 merge 函数:
1 2 3 4 5 6
| Map<Long, User> userMap = userList.stream() .collect(Collectors.toMap( User::getUserId, Function.identity(), (oldValue, newValue) -> newValue ));
|
保留旧值:
1 2 3 4 5 6
| Map<Long, User> userMap = userList.stream() .collect(Collectors.toMap( User::getUserId, Function.identity(), (oldValue, newValue) -> oldValue ));
|
如果需要指定 Map 类型:
1 2 3 4 5 6 7
| Map<Long, User> userMap = userList.stream() .collect(Collectors.toMap( User::getUserId, Function.identity(), (oldValue, newValue) -> newValue, LinkedHashMap::new ));
|
10.11 collectingAndThen
collectingAndThen 可以在收集之后再做一次转换。
例如收集成不可变集合:
1 2 3 4 5 6
| List<User> result = userList.stream() .filter(user -> user.getAge() >= 18) .collect(Collectors.collectingAndThen( Collectors.toList(), Collections::unmodifiableList ));
|
10.12 collect 三参数写法
collect 的三参数形式:
1 2 3
| <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);
|
示例:
1 2 3 4 5 6 7 8
| List<String> result = Stream.of("java", "spring", "mysql") .collect( ArrayList::new, ArrayList::add, ArrayList::addAll );
System.out.println(result);
|
这个写法可以理解为:
ArrayList::new:创建结果容器;
ArrayList::add:把元素放入当前容器;
ArrayList::addAll:并行流时合并多个容器。
这也是为什么在并行流中,collect(Collectors.toList()) 通常比 forEach(list::add) 更安全,因为 Collector 会为不同线程维护不同的中间容器,然后再合并。
chapter 11:Optional 补充
很多 Stream 的终端操作返回 Optional:
findFirst
findAny
max
min
reduce 无初始值版本
例如:
1 2 3
| Optional<User> optionalUser = userList.stream() .filter(user -> user.getUserId().equals(1001L)) .findFirst();
|
11.1 不推荐直接 get
不推荐:
1
| User user = optionalUser.get();
|
如果没有值,会抛出:
11.2 推荐写法
有默认值:
1
| User user = optionalUser.orElse(new User());
|
默认值需要延迟创建:
1
| User user = optionalUser.orElseGet(User::new);
|
不存在就抛业务异常:
1
| User user = optionalUser.orElseThrow(() -> new BizException("用户不存在"));
|
存在才执行:
1 2 3
| optionalUser.ifPresent(user -> { System.out.println(user.getUserName()); });
|
转换:
1 2 3
| String userName = optionalUser .map(User::getUserName) .orElse("未知用户");
|
chapter 12:ParallelStream
parallelStream 默认使用了 fork-join 框架,其默认线程数通常与 CPU 核心数相关。
12.1 设置 parallelStream 默认公用线程池的全局并发数
1
| System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");
|
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| int cupNum = Runtime.getRuntime().availableProcessors(); log.info("CPU num:{}", cupNum);
long firstNum = 1; long lastNum = 10000;
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum) .boxed() .collect(Collectors.toList());
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");
aList.parallelStream() .forEach(e -> { log.info("输出:{}", e); });
|
注意:这个方式是全局设置,影响的是 ForkJoinPool.commonPool()。在 Spring Boot 这种服务端应用里,不建议在业务代码中随便设置这个全局参数。
它更适合在 JVM 启动参数里设置,例如:
1
| -Djava.util.concurrent.ForkJoinPool.common.parallelism=4
|
并且这个配置应该在 commonPool 初始化之前生效。如果 commonPool 已经初始化,再设置这个属性,可能达不到预期效果。
12.2 通过 ForkJoinPool 定义私有线程池
采用自定义的 ForkJoinPool 线程池去提交任务,主线程不会参与计算。
ForkJoinPool 线程池采用 submit 异步提交任务,通过 get 方法阻塞主线程,直到任务执行完成,再调用 shutdown 方法关闭线程池。
注意,等待提交任务执行完毕不能采用 awaitTermination() 方法,该方法是等待指定时间后强制关闭线程池。
效率方面,针对高密度的 CPU 计算任务,提高线程池的并发数,反而会降低任务的执行效率,因为 CPU 抢占和大量线程频繁切换会增加任务的耗时。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| int cupNum = Runtime.getRuntime().availableProcessors(); log.info("CPU num:{}", cupNum);
long firstNum = 1; long lastNum = 10000;
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum) .boxed() .collect(Collectors.toList());
ForkJoinPool forkJoinPool = new ForkJoinPool(8);
try { List<Long> longs = forkJoinPool.submit(() -> aList.parallelStream() .map(e -> { return e + 1; }) .collect(Collectors.toList()) ).get();
System.out.println(longs.size()); System.out.println("执行结束");
} catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { forkJoinPool.shutdown(); }
|
更规范一点的写法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| ForkJoinPool forkJoinPool = new ForkJoinPool(8);
try { return forkJoinPool.submit(() -> list.parallelStream() .map(this::calculate) .collect(Collectors.toList()) ).get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("并行任务被中断", e); } catch (ExecutionException e) { throw new RuntimeException("并行任务执行失败", e); } finally { forkJoinPool.shutdown(); }
|
这里有两个点特别重要:
第一,InterruptedException 不能只打印日志,最好恢复中断标记。
1
| Thread.currentThread().interrupt();
|
第二,自定义线程池必须关闭。
1
| forkJoinPool.shutdown();
|
否则可能出现线程资源泄漏。
12.3 parallelStream 背后的 ForkJoinPool
parallelStream 背后使用的是 Fork/Join 思想。
Fork/Join 的核心是“分而治之”:
1 2 3 4 5 6
| 大任务 -> 子任务1 -> 子任务2 -> 子任务3 -> 子任务4 -> 合并结果
|
例如:
1 2 3
| List<Integer> result = list.parallelStream() .map(this::calculate) .collect(Collectors.toList());
|
底层会尝试把 list 拆成多个分片,不同线程处理不同分片,最后再合并结果。
ForkJoinPool 的典型特点是工作窃取。
简单理解:
- 每个 worker 线程有自己的任务队列;
- 如果自己的队列处理完了,就去别的线程队列里偷任务;
- 这样可以提升 CPU 利用率。
这也是为什么 ArrayList、数组、IntStream.range 这类容易拆分的数据源,更适合并行流。
而 LinkedList、IO 流、迭代器式数据源,拆分成本更高,并行效果往往不好。
12.4 commonPool 的问题
默认情况下,parallelStream 使用的是公共线程池:
1
| ForkJoinPool.commonPool()
|
这个池是整个 JVM 共享的。
这意味着,假如你的应用里多个地方都用了 parallelStream,它们可能会抢同一个 commonPool。
在服务端应用里,这个问题非常关键。
比如你有两个接口:
1 2 3 4 5 6 7 8 9
| list.parallelStream() .map(this::callThirdApi) .collect(Collectors.toList());
dataList.parallelStream() .map(this::heavyCalculate) .collect(Collectors.toList());
|
如果接口 A 里面是阻塞 IO,比如调用第三方 API、查数据库、查 Redis,它可能把 commonPool 的 worker 线程占住。
这时接口 B 即使只是普通 CPU 计算,也会受到影响。
所以,parallelStream 的坑不在于它不能用,而在于它默认共享 commonPool,影响范围不是当前方法,而可能是整个应用。
12.5 parallelStream 中的主线程参与问题
默认 commonPool 场景下,调用线程可能会参与一部分任务执行,ForkJoinPool 的 worker 线程也会参与执行。
这意味着日志里可能会看到:
1 2 3 4
| main ForkJoinPool.commonPool-worker-1 ForkJoinPool.commonPool-worker-2 ForkJoinPool.commonPool-worker-3
|
如果使用自定义 ForkJoinPool 包起来:
1 2 3 4 5
| forkJoinPool.submit(() -> list.parallelStream() .map(this::calculate) .collect(Collectors.toList()) ).get();
|
通常任务会在自定义 ForkJoinPool 的 worker 线程中执行,主线程主要是在外层等待结果。
这也是为什么自定义线程池可以起到隔离作用。
12.6 parallelStream 顺序问题
并行流不保证处理顺序。
1 2 3 4
| List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream() .forEach(System.out::println);
|
输出可能是:
如果要保证输出顺序:
1 2
| numbers.parallelStream() .forEachOrdered(System.out::println);
|
但这会牺牲并行性能。
所以要记住:
- 不关心顺序:
forEach
- 关心顺序:
forEachOrdered
- 关心性能:尽量不要强行要求顺序
12.7 parallelStream 线程安全问题
错误写法:
1 2 3 4
| List<Integer> result = new ArrayList<>();
numbers.parallelStream() .forEach(result::add);
|
这个代码在并行流中是线程不安全的。
可能出现:
- 数据丢失;
- 顺序混乱;
- 抛异常;
- 偶现问题,不好复现。
正确写法:
1 2 3
| List<Integer> result = numbers.parallelStream() .filter(n -> n > 3) .collect(Collectors.toList());
|
或者:
1 2 3
| Set<Integer> result = numbers.parallelStream() .filter(n -> n > 3) .collect(Collectors.toSet());
|
如果确实需要并发集合:
1 2 3 4
| Set<Integer> result = ConcurrentHashMap.newKeySet();
numbers.parallelStream() .forEach(result::add);
|
但更推荐能用 collect 就用 collect。
12.8 parallelStream 中不要做阻塞 IO
不推荐:
1 2 3
| List<UserInfo> result = userIds.parallelStream() .map(userId -> userApi.queryUserInfo(userId)) .collect(Collectors.toList());
|
看起来很爽,一行代码并发查用户信息。
但问题是:
- 这是阻塞 IO,不是 CPU 计算;
- 默认使用 commonPool,可能影响整个应用;
- 没有明确限流;
- 没有清晰的超时控制;
- 出问题时不好定位;
- 还可能打爆下游服务。
更推荐显式线程池或异步框架:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| ExecutorService executorService = Executors.newFixedThreadPool(16);
try { List<CompletableFuture<UserInfo>> futures = userIds.stream() .map(userId -> CompletableFuture.supplyAsync( () -> userApi.queryUserInfo(userId), executorService )) .collect(Collectors.toList());
List<UserInfo> result = futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } finally { executorService.shutdown(); }
|
这样至少线程池、并发度、隔离边界都掌握在自己手里。
12.9 parallelStream 适合什么场景
比较适合:
- CPU 密集型计算;
- 数据量较大;
- 每个元素处理互不影响;
- 不依赖顺序;
- 没有共享可变状态;
- 数据源容易拆分,比如数组、ArrayList、IntStream.range;
- 每个元素的计算成本足够高,能抵消线程调度成本。
示例:
1 2 3
| List<Result> resultList = dataList.parallelStream() .map(this::heavyCalculate) .collect(Collectors.toList());
|
12.10 parallelStream 不适合什么场景
不适合:
- 数据量很小;
- 简单字段转换;
- IO 密集型任务;
- 调用数据库;
- 调用 Redis;
- 调用第三方接口;
- 依赖 ThreadLocal;
- 依赖 MDC 日志上下文;
- 依赖事务上下文;
- 操作共享集合;
- 严格要求顺序;
- 线上服务里没有明确隔离的公共能力。
例如:
1 2 3 4
| list.parallelStream() .forEach(item -> { orderMapper.updateById(item); });
|
这种写法不推荐。
数据库本来就是共享资源,并行把它打满,不代表你的接口性能提升了,很可能只是把数据库从“上班”变成“上刑”。
12.11 parallelStream 和 ThreadLocal、MDC
在 Spring Boot 项目里,经常会用:
ThreadLocal 存用户上下文;
MDC 存 traceId;
- 事务上下文;
- 租户上下文;
- 数据权限上下文。
但是 parallelStream 会切换到 ForkJoinPool 的线程中执行。
所以这里的上下文可能丢失。
例如:
1 2 3 4 5 6
| MDC.put("traceId", traceId);
list.parallelStream() .forEach(item -> { log.info("处理数据: {}", item); });
|
日志中可能拿不到 traceId。
如果你的业务强依赖上下文,不要轻易使用 parallelStream。
可以考虑:
- 显式传参;
- 使用自定义线程池并包装上下文;
- 使用 TransmittableThreadLocal;
- 干脆不要在这类场景中并行处理。
chapter 13:parallelStream 自定义线程池完整示例
13.1 求和示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class ParallelStreamPoolDemo {
public static void main(String[] args) { long firstNum = 1; long lastNum = 1_000_000;
List<Long> list = LongStream.rangeClosed(firstNum, lastNum) .boxed() .collect(Collectors.toList());
ForkJoinPool customThreadPool = new ForkJoinPool(4);
try { Long total = customThreadPool.submit(() -> list.parallelStream() .reduce(0L, Long::sum) ).get();
System.out.println("total = " + total); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("任务被中断", e); } catch (ExecutionException e) { throw new RuntimeException("任务执行失败", e); } finally { customThreadPool.shutdown(); } } }
|
13.2 CPU 密集型任务示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class PrimeDemo {
public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(4);
try { List<Integer> primes = forkJoinPool.submit(() -> IntStream.range(1, 1_000_000) .parallel() .filter(PrimeDemo::isPrime) .boxed() .collect(Collectors.toList()) ).get();
System.out.println(primes.size()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("任务被中断", e); } catch (ExecutionException e) { throw new RuntimeException("任务执行失败", e); } finally { forkJoinPool.shutdown(); } }
private static boolean isPrime(int n) { if (n <= 1) { return false; }
for (int i = 2; i <= Math.sqrt(n); i++) { if (n % i == 0) { return false; } }
return true; } }
|
13.3 为什么线程数不是越大越好
假设机器只有 8 核 CPU,你设置:
不一定更快,甚至可能更慢。
原因:
- CPU 核心有限;
- 线程越多,上下文切换越多;
- 缓存命中率可能下降;
- 任务拆分和合并本身也有成本;
- 如果任务很小,调度成本可能比计算成本还高。
经验上:
- CPU 密集型:线程数接近 CPU 核心数;
- IO 密集型:不要用 parallelStream,改用明确的线程池或异步模型;
- 线上业务:必须压测,不要凭感觉。
chapter 14:Stream 在业务中的常见写法
14.1 List 转 Map
1 2 3 4 5 6
| Map<Long, User> userMap = userList.stream() .collect(Collectors.toMap( User::getUserId, Function.identity(), (oldValue, newValue) -> newValue ));
|
14.2 提取 ID 列表
1 2 3 4 5
| List<Long> userIds = userList.stream() .map(User::getUserId) .filter(Objects::nonNull) .distinct() .collect(Collectors.toList());
|
14.3 按状态分组
1 2
| Map<Integer, List<Order>> statusOrderMap = orderList.stream() .collect(Collectors.groupingBy(Order::getStatus));
|
14.4 按状态统计数量
1 2 3 4 5
| Map<Integer, Long> statusCountMap = orderList.stream() .collect(Collectors.groupingBy( Order::getStatus, Collectors.counting() ));
|
14.5 金额求和
1 2 3 4
| BigDecimal totalAmount = orderList.stream() .map(Order::getAmount) .filter(Objects::nonNull) .reduce(BigDecimal.ZERO, BigDecimal::add);
|
14.6 多条件过滤
1 2 3 4 5
| List<Order> result = orderList.stream() .filter(order -> order.getDeleted() == 0) .filter(order -> order.getStatus() == OrderStatus.PAID) .filter(order -> order.getAmount().compareTo(BigDecimal.ZERO) > 0) .collect(Collectors.toList());
|
更推荐:
1 2 3 4 5 6 7 8 9
| List<Order> result = orderList.stream() .filter(this::isValidOrder) .collect(Collectors.toList());
private boolean isValidOrder(Order order) { return order.getDeleted() == 0 && order.getStatus() == OrderStatus.PAID && order.getAmount().compareTo(BigDecimal.ZERO) > 0; }
|
14.7 分组后求和
1 2 3 4 5 6 7 8
| Map<Long, BigDecimal> shopAmountMap = orderList.stream() .collect(Collectors.groupingBy( Order::getShopId, Collectors.mapping( Order::getAmount, Collectors.reducing(BigDecimal.ZERO, BigDecimal::add) ) ));
|
也可以这样写,更直观一点:
1 2 3 4 5 6 7 8 9
| Map<Long, BigDecimal> shopAmountMap = orderList.stream() .collect(Collectors.groupingBy( Order::getShopId, Collectors.reducing( BigDecimal.ZERO, Order::getAmount, BigDecimal::add ) ));
|
14.8 分组后取最大值
1 2 3 4 5
| Map<Long, Optional<Order>> shopMaxOrderMap = orderList.stream() .collect(Collectors.groupingBy( Order::getShopId, Collectors.maxBy(Comparator.comparing(Order::getAmount)) ));
|
如果不想要 Optional:
1 2 3 4 5 6 7 8
| Map<Long, Order> shopMaxOrderMap = orderList.stream() .collect(Collectors.groupingBy( Order::getShopId, Collectors.collectingAndThen( Collectors.maxBy(Comparator.comparing(Order::getAmount)), optional -> optional.orElse(null) ) ));
|
14.9 财务结算类聚合示例
比如有结算明细:
1 2 3 4 5 6 7 8
| @Data public class SettlementDetail { private Long shopId; private Long supplierId; private Integer billType; private BigDecimal taxAmount; private BigDecimal noTaxAmount; }
|
想按店铺、供应商、单据类型聚合:
1 2 3 4 5 6 7 8
| @Data @AllArgsConstructor @NoArgsConstructor public class SettlementGroupKey { private Long shopId; private Long supplierId; private Integer billType; }
|
聚合:
1 2 3 4 5 6 7 8 9 10 11 12
| Map<SettlementGroupKey, BigDecimal> amountMap = detailList.stream() .collect(Collectors.groupingBy( detail -> new SettlementGroupKey( detail.getShopId(), detail.getSupplierId(), detail.getBillType() ), Collectors.mapping( detail -> Optional.ofNullable(detail.getTaxAmount()).orElse(BigDecimal.ZERO), Collectors.reducing(BigDecimal.ZERO, BigDecimal::add) ) ));
|
如果需要聚合成对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| Map<SettlementGroupKey, SettlementSummary> summaryMap = detailList.stream() .collect(Collectors.groupingBy( detail -> new SettlementGroupKey( detail.getShopId(), detail.getSupplierId(), detail.getBillType() ), Collector.of( SettlementSummary::new, (summary, detail) -> { summary.setTaxAmount( summary.getTaxAmount().add( Optional.ofNullable(detail.getTaxAmount()).orElse(BigDecimal.ZERO) ) ); summary.setNoTaxAmount( summary.getNoTaxAmount().add( Optional.ofNullable(detail.getNoTaxAmount()).orElse(BigDecimal.ZERO) ) ); }, (left, right) -> { left.setTaxAmount(left.getTaxAmount().add(right.getTaxAmount())); left.setNoTaxAmount(left.getNoTaxAmount().add(right.getNoTaxAmount())); return left; } ) ));
|
SettlementSummary 可以这样定义:
1 2 3 4 5 6 7
| @Data public class SettlementSummary {
private BigDecimal taxAmount = BigDecimal.ZERO;
private BigDecimal noTaxAmount = BigDecimal.ZERO; }
|
这种写法适合中小规模内存聚合。如果数据量非常大,还是优先考虑 SQL 聚合、分批处理、临时表、预计算等方案。
chapter 15:Stream 常见坑
15.1 Stream 重复使用
错误:
1 2 3 4 5
| Stream<String> stream = list.stream();
stream.forEach(System.out::println);
long count = stream.count();
|
会报错。
正确:
1 2 3
| list.stream().forEach(System.out::println);
long count = list.stream().count();
|
15.2 修改外部变量
错误:
1 2 3 4
| int sum = 0;
numbers.stream() .forEach(n -> sum += n);
|
局部变量在 Lambda 中必须是 effectively final。
有人可能会改成数组:
1 2 3 4
| int[] sum = {0};
numbers.stream() .forEach(n -> sum[0] += n);
|
这在并行流中不安全。
正确:
1 2 3
| int sum = numbers.stream() .mapToInt(Integer::intValue) .sum();
|
15.3 forEach 添加到外部集合
错误:
1 2 3 4
| List<Integer> result = new ArrayList<>();
numbers.parallelStream() .forEach(result::add);
|
正确:
1 2
| List<Integer> result = numbers.parallelStream() .collect(Collectors.toList());
|
15.4 toMap key 重复
错误:
1 2
| Map<Long, User> userMap = userList.stream() .collect(Collectors.toMap(User::getUserId, Function.identity()));
|
如果 userId 重复,会抛异常。
正确:
1 2 3 4 5 6
| Map<Long, User> userMap = userList.stream() .collect(Collectors.toMap( User::getUserId, Function.identity(), (oldValue, newValue) -> newValue ));
|
15.5 空指针问题
错误:
1 2 3 4
| List<String> names = userList.stream() .map(User::getUserName) .map(String::toUpperCase) .collect(Collectors.toList());
|
如果 userName 为 null,会 NPE。
正确:
1 2 3 4 5
| List<String> names = userList.stream() .map(User::getUserName) .filter(Objects::nonNull) .map(String::toUpperCase) .collect(Collectors.toList());
|
15.6 BigDecimal 求和空值问题
错误:
1 2 3
| BigDecimal total = orderList.stream() .map(Order::getAmount) .reduce(BigDecimal.ZERO, BigDecimal::add);
|
如果 amount 有 null,会 NPE。
正确:
1 2 3
| BigDecimal total = orderList.stream() .map(order -> Optional.ofNullable(order.getAmount()).orElse(BigDecimal.ZERO)) .reduce(BigDecimal.ZERO, BigDecimal::add);
|
或者:
1 2 3 4
| BigDecimal total = orderList.stream() .map(Order::getAmount) .filter(Objects::nonNull) .reduce(BigDecimal.ZERO, BigDecimal::add);
|
15.7 滥用 Stream
不是所有循环都要改成 Stream。
普通写法更清晰时,就用普通写法。
例如:
1 2 3 4 5 6 7 8 9 10 11
| for (Order order : orderList) { if (order == null) { continue; }
if (order.getAmount().compareTo(BigDecimal.ZERO) <= 0) { continue; }
doSomething(order); }
|
如果硬改成 Stream:
1 2 3 4
| orderList.stream() .filter(Objects::nonNull) .filter(order -> order.getAmount().compareTo(BigDecimal.ZERO) > 0) .forEach(this::doSomething);
|
这还行。
但如果里面充满复杂判断、异常处理、分支逻辑,就不一定比 for 循环更清晰了。
代码是写给人看的,机器只是顺便执行一下。
chapter 16:性能建议
16.1 小集合不一定要用 Stream
小集合用 for 循环可能更快、更直接。
Stream 的优势是表达能力,不是无条件性能更高。
16.2 避免不必要的装箱拆箱
不推荐:
1 2
| Integer sum = numbers.stream() .reduce(0, Integer::sum);
|
推荐:
1 2 3
| int sum = numbers.stream() .mapToInt(Integer::intValue) .sum();
|
16.3 filter 尽量放前面
如果后面的 map 很重,先过滤可以减少处理量。
推荐:
1 2 3 4
| List<Result> result = list.stream() .filter(this::isValid) .map(this::heavyConvert) .collect(Collectors.toList());
|
不推荐:
1 2 3 4
| List<Result> result = list.stream() .map(this::heavyConvert) .filter(this::isValidResult) .collect(Collectors.toList());
|
16.4 distinct、sorted 要慎用
distinct 和 sorted 都是有状态操作。
数据量大时,它们可能成为性能瓶颈。
16.5 parallelStream 要压测
不要觉得并行一定更快。
是否更快取决于:
- 数据量;
- 任务计算成本;
- CPU 核数;
- 数据源是否容易拆分;
- 是否有共享状态;
- 是否有锁;
- 是否阻塞;
- 是否要求顺序。
结论只有一个:压测。别靠直觉,直觉经常比线上事故来得还快。
16.6 高性能场景用 JMH 测
如果你真的关心性能,不要用 System.currentTimeMillis() 随便测。
应该使用 JMH。
简单说,JMH 可以帮你避免:
- JVM 预热问题;
- JIT 优化影响;
- 死代码消除;
- 单次运行偶然性。
Stream 写法优雅,但在极致性能场景下,for 循环依然可能更强。
chapter 17:Stream 与 Spring Boot 项目实践建议
17.1 Controller 层少写复杂 Stream
Controller 层应该尽量薄。
不推荐:
1 2 3 4 5 6 7 8
| @GetMapping("/list") public List<UserDTO> list() { return userService.list().stream() .filter(...) .map(...) .sorted(...) .collect(Collectors.toList()); }
|
推荐把复杂逻辑放到 Service:
1 2 3 4
| @GetMapping("/list") public List<UserDTO> list() { return userService.queryUserDTOList(); }
|
17.2 Service 层可以使用 Stream 做内存转换
1 2 3 4 5 6 7
| public List<UserDTO> queryUserDTOList() { List<User> userList = userRepository.queryValidUsers();
return userList.stream() .map(this::convertToDTO) .collect(Collectors.toList()); }
|
17.3 大数据量优先数据库处理
如果要过滤、排序、分组、分页,优先考虑数据库:
where
group by
order by
limit
不要把几十万数据查到 JVM 再 Stream。
不推荐:
1 2 3 4 5 6 7 8
| List<Order> allOrders = orderMapper.selectAll();
List<Order> result = allOrders.stream() .filter(...) .sorted(...) .skip(...) .limit(...) .collect(Collectors.toList());
|
推荐:
1 2 3 4 5
| select * from order_table where status = ? order by create_time desc limit ?, ?
|
17.4 并行流不要包数据库操作
不推荐:
1 2
| orderList.parallelStream() .forEach(order -> orderMapper.updateById(order));
|
如果确实要批量更新,考虑:
- 批量 SQL;
- 分批提交;
- 消息队列削峰;
- 明确线程池;
- 限流;
- 事务边界;
- 重试和幂等。
17.5 日志上下文要特别注意
如果使用 parallelStream,MDC 可能丢失。
例如:
1
| log.info("traceId={}", MDC.get("traceId"));
|
在并行线程里可能拿不到。
如果你的系统依赖 traceId、requestId、tenantId,最好不要在这类链路里随意使用 parallelStream。
chapter 18:Stream API 总结
Stream 的核心价值是让集合处理更加声明式、链式、可组合。
它适合:
- 过滤;
- 转换;
- 分组;
- 聚合;
- 去重;
- 排序;
- 查找;
- 统计。
但是,使用 Stream 要注意:
- 中间操作是惰性的;
- 终端操作会触发执行;
- Stream 不能重复使用;
map 是转换;
flatMap 是摊平;
reduce 适合不可变归约;
collect 适合可变容器收集;
parallelStream 不是免费午餐;
- 并行流不要乱碰共享变量;
- IO 密集型任务不要默认使用 parallelStream;
- 服务端应用要警惕 commonPool 共享问题。
如果用一句话总结:
Stream 是一把手术刀,不是电锯。用得准,代码优雅;用得猛,线上冒烟。
参考资料
- Oracle Java 官方文档:Stream API。
- Oracle Java 官方文档:Collectors。
- Oracle Java 官方文档:ForkJoinPool。
- CSDN:JAVA– 在 Java8 Parallel Stream 中如何自定义线程池?
- 博客园:Java8 并行流 parallelStream 原理分析及注意事项。
- 个人原文整理:Stream API 笔记与 parallelStream 示例。
启示录
富贵岂由人,时会高志须酬。
能成功于千载者,必以近察远。
Stream API 的价值,不只是少写几行 for 循环,而是让数据处理过程变得更像一条清晰的管道。
但是越高级的工具,越需要边界感。
顺序流追求表达力,并行流追求吞吐量;一个适合优雅地处理集合,一个适合在明确场景下压榨 CPU。真正成熟的写法,不是把所有循环都改成 Stream,也不是看到 parallelStream 就兴奋,而是知道什么时候该用,什么时候不该用。
写代码也是这样:能快是一种能力,能稳才是工程。