26.2 函数式数据处理:基本用法

26.2 函数式数据处理:基本用法

上一节介绍了Lambda表达式和函数式接口,本节探讨它们的应用:函数式数据处理,针对常见的集合数据处理,Java 8引入了一套新的类库,位于包java.util.stream下,称为Stream API。这套API操作数据的思路不同于我们之前介绍的容器类API,它们是函数式的,非常简洁、灵活、易读。具体有什么不同呢?本节先介绍一些基本的API,下节讨论一些高级功能。

接口Stream类似于一个迭代器,但提供了更为丰富的操作,Stream API的主要操作就定义在该接口中。Java 8给Collection接口增加了两个默认方法,它们可以返回一个Stream,如下所示:

1
2
3
4
5
6
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}

stream()返回的是一个顺序流,parallelStream()返回的是一个并行流。顺序流就是由一个线程执行操作。而并行流背后可能有多个线程并行执行,与之前介绍的并发技术不同,使用并行流不需要显式管理线程,使用方法与顺序流是一样的。

下面我们主要针对顺序流学习Stream接口,包括其用法和基本原理,随后我们再介绍并行流,先来看一些简单的示例。

26.2.1 基本示例

上一节演示时使用了学生类Student和学生列表List<Student> lists,本节继续使用它们,看一些基本的过滤、转换以及过滤和转换组合的例子。

1.基本过滤

返回学生列表中90分以上的,传统上的代码一般是这样:

1
2
3
4
5
6
List<Student> above90List = new ArrayList<>();
for(Student t : students) {
if(t.getScore() > 90) {
above90List.add(t);
}
}

使用Stream API,代码可以这样:

1
2
List<Student> above90List = students.stream()
.filter(t->t.getScore()>90).collect(Collectors.toList());

先通过stream()得到一个Stream对象,然后调用Stream上的方法,filter()过滤得到90分以上的,它的返回值依然是一个Stream,为了转换为List,调用了collect方法并传递了一个Collectors.toList(),表示将结果收集到一个List中。

代码更为简洁易读了,这种数据处理方式称为函数式数据处理。与传统代码相比,其特点是:
1)没有显式的循环迭代,循环过程被Stream的方法隐藏了。
2)提供了声明式的处理函数,比如filter,它封装了数据过滤的功能,而传统代码是命令式的,需要一步步的操作指令。
3)流畅式接口,方法调用链接在一起,清晰易读。

2.基本转换

根据学生列表返回名称列表,传统上的代码一般是这样:

1
2
3
4
List<String> nameList = new ArrayList<>(students.size());
for(Student t : students) {
nameList.add(t.getName());
}

使用Stream API,代码可以这样:

1
2
List<String> nameList = students.stream()
.map(Student::getName).collect(Collectors.toList());

这里使用了Stream的map函数,它的参数是一个Function函数式接口,这里传递了方法引用。

3.基本的过滤和转换组合

返回90分以上的学生名称列表,传统上的代码一般是这样:

1
2
3
4
5
6
List<String> nameList = new ArrayList<>();
for(Student t : students) {
if(t.getScore() > 90) {
nameList.add(t.getName());
}
}

使用函数式数据处理的思路,可以将这个问题分解为由两个基本函数实现:
1)过滤:得到90分以上的学生列表。
2)转换:将学生列表转换为名称列表。

使用Stream API,可以将基本函数filter()和map()结合起来,代码可以这样:

1
2
3
List<String> above90Names = students.stream()
.filter(t->t.getScore()>90).map(Student::getName)
.collect(Collectors.toList());

这种组合利用基本函数、声明式实现集合数据处理功能的编程风格,就是函数式数据处理。

代码更为直观易读了,但你可能会担心它的性能有问题。filter()和map()都需要对流中的每个元素操作一次,一起使用会不会就需要遍历两次呢?答案是否定的,只需要一次。实际上,调用filter()和map()都不会执行任何实际的操作,它们只是在构建操作的流水线,调用collect才会触发实际的遍历执行,在一次遍历中完成过滤、转换以及收集结果的任务

像filter和map这种不实际触发执行、用于构建流水线、返回Stream的操作称为中间操作(intermediate operation),而像collect这种触发实际执行、返回具体结果的操作称为终端操作(terminal operation)。Stream API中还有更多的中间和终端操作,下面我们具体介绍。

26.2.2 中间操作

除了filter和map, Stream API的中间操作还有distinct、sorted、skip、limit、peek、mapToLong、mapToInt、mapToDouble、flatMap等,我们逐个介绍。

1. distinct

distinct返回一个新的Stream,过滤重复的元素,只留下唯一的元素,是否重复是根据equals方法来比较的,distinct可以与其他函数(如filter、map)结合使用。比如,返回字符串列表中长度小于3的字符串、转换为小写、只保留唯一的,代码可以为:

1
2
3
4
List<String> list = Arrays.asList(new String[]{"abc", "def", "hello", "Abc"});
List<String> retList = list.stream()
.filter(s->s.length()<=3).map(String::toLowerCase).distinct()
.collect(Collectors.toList());

虽然都是中间操作,但distinct与filter和map是不同的。filter和map都是无状态的,对于流中的每一个元素,处理都是独立的,处理后即交给流水线中的下一个操作;distinct不同,它是有状态的,在处理过程中,它需要在内部记录之前出现过的元素,如果已经出现过,即重复元素,它就会过滤掉,不传递给流水线中的下一个操作。对于顺序流,内部实现时,distinct操作会使用HashSet记录出现过的元素,如果流是有顺序的,需要保留顺序,会使用LinkedHashSet。

2. sorted

有两个sorted方法:

1
2
Stream<T> sorted()
Stream<T> sorted(Comparator<? super T> comparator)

它们都对流中的元素排序,都返回一个排序后的Stream。第一个方法假定元素实现了Comparable接口,第二个方法接受一个自定义的Comparator。比如,过滤得到90分以上的学生,然后按分数从高到低排序,分数一样的按名称排序,代码为:

1
2
3
4
List<Student> list = students.stream().filter(t->t.getScore()>90)
.sorted(Comparator.comparing(Student::getScore)
.reversed().thenComparing(Student::getName))
.collect(Collectors.toList());

这里,使用了Comparator的comparing、reversed和thenComparing构建了Comparator。

与distinct一样,sorted也是一个有状态的中间操作,在处理过程中,需要在内部记录出现过的元素。其不同是,每碰到流中的一个元素,distinct都能立即做出处理,要么过滤,要么马上传递给下一个操作;sorted需要先排序,为了排序,它需要先在内部数组中保存碰到的每一个元素,到流结尾时再对数组排序,然后再将排序后的元素逐个传递给流水线中的下一个操作。

3. skip/limit

它们的定义为:

1
2
Stream<T> skip(long n)
Stream<T> limit(long maxSize)

skip跳过流中的n个元素,如果流中元素不足n个,返回一个空流,limit限制流的长度为maxSize。比如,将学生列表按照分数排序,返回第3名到第5名,代码为:

1
2
3
List<Student> list = students.stream()
.sorted(Comparator.comparing(Student::getScore).reversed())
.skip(2).limit(3).collect(Collectors.toList());

skip和limit都是有状态的中间操作。对前n个元素,skip的操作就是过滤,对后面的元素,skip就是传递给流水线中的下一个操作。limit的一个特点是:它不需要处理流中的所有元素,只要处理的元素个数达到maxSize,后面的元素就不需要处理了,这种可以提前结束的操作称为短路操作

skip和limit只能根据元素数目进行操作,Java 9增加了两个新方法,相当于更为通用的skip和limit:

1
2
3
4
//通用的skip,在谓词返回为true的情况下一直进行skip操作,直到某次返回false
default Stream<T> dropWhile(Predicate<? super T> predicate)
//通用的limit,在谓词返回为true的情况下一直接受,直到某次返回false
default Stream<T> takeWhile(Predicate<? super T> predicate)

4. peek

peek的定义为:

1
Stream<T> peek(Consumer<? super T> action)

它返回的流与之前的流是一样的,没有变化,但它提供了一个Consumer,会将流中的每一个元素传给该Consumer。这个方法的主要目的是支持调试,可以使用该方法观察在流水线中流转的元素,比如:

1
2
3
List<String> above90Names = students.stream().filter(t->t.getScore()>90)
.peek(System.out::println).map(Student::getName)
.collect(Collectors.toList());

5. mapToLong/mapToInt/mapToDouble

map函数接受的参数是一个Function<T, R>,为避免装箱/拆箱,提高性能,Stream还有如下返回基本类型特定流的方法:

1
2
3
DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper)
IntStream mapToInt(ToIntFunction<? super T> mapper)
LongStream mapToLong(ToLongFunction<? super T> mapper)

DoubleStream/IntStream/LongStream是基本类型特定的流,有一些专门的更为高效的方法。比如,求学生列表的分数总和,代码为:

1
double sum = students.stream().mapToDouble(Student::getScore).sum();

6. flatMap

flatMap的定义为:

1
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper)

它接受一个函数mapper,对流中的每一个元素,mapper会将该元素转换为一个流Stream,然后把新生成流的每一个元素传递给下一个操作。比如:

1
2
3
4
5
6
List<String> lines = Arrays.asList(new String[]{
"hello abc", "老马 编程"});
List<String> words = lines.stream()
.flatMap(line -> Arrays.stream(line.split("\\s+")))
.collect(Collectors.toList());
System.out.println(words);

这里的mapper将一行字符串按空白符分隔为了一个单词流,Arrays.stream可以将一个数组转换为一个流,输出为:

1
[hello, abc, 老马, 编程]

可以看出,实际上,flatMap完成了一个1到n的映射。

26.2.3 终端操作

中间操作不触发实际的执行,返回值是Stream,而终端操作触发执行,返回一个具体的值,除了collect, Stream API的终端操作还有max、min、count、allMatch、anyMatch、noneMatch、findFirst、findAny、forEach、toArray、reduce等,我们逐个介绍。

1. max/min

max/min的定义为:

1
2
Optional<T> max(Comparator<? super T> comparator)
Optional<T> min(Comparator<? super T> comparator)

它们返回流中的最大值/最小值,它们的返回值类型是Optional<T>,而不是T。

java.util.Optional是Java 8引入的一个新类,它是一个泛型容器类,内部只有一个类型为T的单一变量value,可能为null,也可能不为null。Optional有什么用呢?它用于准确地传递程序的语义,它清楚地表明,其代表的值可能为null,程序员应该进行适当的处理

Optional定义了一些方法,比如:

1
2
3
4
5
6
7
8
9
10
11
12
//value不为null时返回true
public boolean isPresent()
//返回实际的值,如果为null,抛出异常NoSuchElementException
public T get()
//如果value不为null,返回value,否则返回other
public T orElse(T other)
//构建一个空的Optional, value为null
public static<T> Optional<T> empty()
//构建一个非空的Optional, 参数value不能为null
public static <T> Optional<T> of(T value)
//构建一个Optional,参数value可以为null,也可以不为null
public static <T> Optional<T> ofNullable(T value)

在max/min的例子中,通过声明返回值为Optional,我们可以知道具体的返回值不一定存在,这发生在流中不含任何元素的情况下。

看个简单的例子,返回分数最高的学生,代码为:

1
2
Student student = students.stream()
.max(Comparator.comparing(Student::getScore).reversed()).get();

这里,假定students不为空。

2. count

count很简单,就是返回流中元素的个数。比如,统计大于90分的学生个数,代码为:

1
long above90Count = students.stream().filter(t->t.getScore()>90).count();

3. allMatch/anyMatch/noneMatch

这几个函数都接受一个谓词Predicate,返回一个boolean值,用于判定流中的元素是否满足一定的条件。它们的区别是:

  • allMatch:只有在流中所有元素都满足条件的情况下才返回true。
  • anyMatch:只要流中有一个元素满足条件就返回true。
  • noneMatch:只有流中所有元素都不满足条件才返回true。

如果流为空,那么这几个函数的返回值都是true。

比如,判断是不是所有学生都及格了(不小于60分),代码可以为:

1
boolean allPass = students.stream().allMatch(t->t.getScore()>=60);

这几个操作都是短路操作,不一定需要处理所有元素就能得出结果,比如,对于all-Match,只要有一个元素不满足条件,就能返回false。

4. findFirst/findAny

它们的定义为:

1
2
Optional<T> findFirst()
Optional<T> findAny()

它们的返回类型都是Optional,如果流为空,返回Optional.empty()。findFirst返回第一个元素,而findAny返回任一元素,它们都是短路操作。随便找一个不及格的学生,代码可以为:

1
2
3
4
5
Optional<Student> student = students.stream().filter(t->t.getScore()<60)
.findAny();
if(student.isPresent()){
//处理不及格的学生
}

5. forEach

有两个forEach方法:

1
2
void forEach(Consumer<? super T> action)
void forEachOrdered(Consumer<? super T> action)

它们都接受一个Consumer,对流中的每一个元素,传递元素给Consumer。区别在于:在并行流中,forEach不保证处理的顺序,而forEachOrdered会保证按照流中元素的出现顺序进行处理。

比如,逐行打印大于90分的学生,代码可以为:

1
students.stream().filter(t->t.getScore()>90).forEach(System.out::println);

6. toArray

toArray将流转换为数组,有两个方法:

1
2
Object[] toArray()
<A> A[] toArray(IntFunction<A[]> generator)

不带参数的toArray返回的数组类型为Object[],这通常不是期望的结果,如果希望得到正确类型的数组,需要传递一个类型为IntFunction的generator。IntFunction的定义为:

1
2
3
public interface IntFunction<R> {
R apply(int value);
}

generator接受的参数是流的元素个数,它应该返回对应大小的正确类型的数组。

比如,获取90分以上的学生数组,代码可以为:

1
2
Student[] above90Arr = students.stream().filter(t->t.getScore()>90)
.toArray(Student[]::new);

Student[]::new就是一个类型为IntFunction<Student[]>的generator。

7. reduce

reduce代表归约或者叫折叠,它是max/min/count的更为通用的函数,将流中的元素归约为一个值。有三个reduce函数:

1
2
3
4
Optional<T> reduce(BinaryOperator<T> accumulator);
T reduce(T identity, BinaryOperator<T> accumulator);
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner);

第一个reduce函数基本等同于调用:

1
2
3
4
5
6
7
8
9
10
11
boolean foundAny = false;
T result = null;
for(T element : this stream) {
if(! foundAny) {
foundAny = true;
result = element;
}
else
result = accumulator.apply(result, element);
}
return foundAny ? Optional.of(result) : Optional.empty();

比如,使用reduce函数求分数最高的学生,代码可以为:

1
2
3
4
5
6
7
Student topStudent = students.stream().reduce((accu, t) -> {
if(accu.getScore() >= t.getScore()) {
return accu;
} else {
return t;
}
}).get();

第二个reduce函数多了一个identity参数,表示初始值,它基本等同于调用:

1
2
3
4
T result = identity;
for(T element : this stream)
result = accumulator.apply(result, element)
return result;

第一个和第二个reduce函数的返回类型只能是流中元素的类型,而第三个reduce函数更为通用,它的归约类型可以自定义,另外,它多了一个combiner参数。combiner用在并行流中,用于合并子线程的结果。对于顺序流,它基本等同于调用:

1
2
3
4
U result = identity;
for(T element : this stream)
result = accumulator.apply(result, element)
return result;

注意与第二个reduce函数相区分,它的结果类型不是T,而是U。比如,使用reduce函数计算学生分数的和,代码可以为:

1
2
3
4
double sumScore = students.stream().reduce(0d,
(sum, t) -> sum += t.getScore(),
(sum1, sum2) -> sum1 += sum2
);

从以上可以看出,reduce函数虽然更为通用,但比较费解,难以使用,一般情况下应该优先使用其他函数。collect函数比reduce函数更为通用、强大和易用,关于它,我们稍后再详细介绍。

26.2.4 构建流

前面我们主要使用的是Collection的stream方法,换做parallelStream方法,就会使用并行流,接口方法都是通用的。但并行流内部会使用多线程,线程个数一般与系统的CPU核数一样,以充分利用CPU的计算能力。

进一步来说,并行流内部会使用Java 7引入的fork/join框架,即处理由fork和join两个阶段组成,fork就是将要处理的数据拆分为小块,多线程按小块进行并行计算,join就是将小块的计算结果进行合并,具体我们就不探讨了。使用并行流,不需要任何线程管理的代码,就能实现并行。

除了通过Collection接口的stream/parallelStream获取流,还有一些其他方式可以获取流。Arrays有一些stream方法,可以将数组或子数组转换为流,比如:

1
2
3
4
public static IntStream stream(int[] array)
public static DoubleStream stream(double[] array, int startInclusive,
int endExclusive)
public static <T> Stream<T> stream(T[] array)

输出当前目录下所有普通文件的名字,代码可以为:

1
2
3
File[] files = new File(".").listFiles();
Arrays.stream(files).filter(File::isFile).map(File::getName)
.forEach(System.out::println);

Stream也有一些静态方法,可以构建流,比如:

1
2
3
4
5
6
7
8
9
10
//返回一个空流
public static<T> Stream<T> empty()
//返回只包含一个元素t的流
public static<T> Stream<T> of(T t)
//返回包含多个元素values的流
public static<T> Stream<T> of(T... values)
//通过Supplier生成流,流的元素个数是无限的
public static<T> Stream<T> generate(Supplier<T> s)
//同样生成无限流,第一个元素为seed,第二个为f(seed),第三个为f(f(seed)),以此类推
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f)

输出10个随机数,代码可以为:

1
Stream.generate(()->Math.random()).limit(10).forEach(System.out::println);

输出100个递增的奇数,代码可以为:

1
Stream.iterate(1, t->t+2).limit(100).forEach(System.out::println);

26.2.5 函数式数据处理思维

可以看出,使用Stream API处理数据集合,与直接使用容器类API处理数据的思路是完全不一样的。流定义了很多数据处理的基本函数,对于一个具体的数据处理问题,解决的主要思路就是组合利用这些基本函数,以声明式的方式简洁地实现期望的功能,这种思路就是函数式数据处理思维,相比直接利用容器类API的命令式思维,思考的层次更高

Stream API的这种思路也不是新发明,它与数据库查询语言SQL是很像的,都是声明式地操作集合数据,很多函数都能在SQL中找到对应,比如filter对应SQL的where, sorted对应order by等。SQL一般都支持分组(group by)功能,StreamAPI也支持,但关于分组,我们下节再介绍。

Stream API也与各种基于Unix系统的管道命令类似。熟悉Unix系统的都知道,Unix有很多命令,大部分命令只是专注于完成一件事情,但可以通过管道的方式将多个命令链接起来,完成一些复杂的功能,比如:

1
cat nginx_access.log | awk '{print $1}' | sort | uniq -c | sort -rnk 1 | head -n 20

以上命令可以分析nginx访问日志,统计出访问次数最多的前20个IP地址及其访问次数。具体来说,cat命令输出nginx访问日志到流,一行为一个元素,awk输出行的第一列,这里为IP地址,sort按IP进行排序,”uniq -c”按IP统计计数,”sort -rnk 1”按计数从高到低排序,”head -n 20”输出前20行。