当前位置:网站首页>Collector原理解析
Collector原理解析
2022-07-28 05:52:00 【要有价值】
一、collect方法使用
先介绍一段代码
Person p1 = new Person("hehe", 20);
Person p2 = new Person("wang", 20);
Person p6 = new Person("hou", 25);
Person p5 = new Person("hou", 25);
Person p3 = new Person("mabi", 30);
Person p4 = new Person("li", 30);
List<Person> personList = Arrays.asList(p1, p2, p3, p4, p5, p6);
List<String> per1 = personList.stream()
.map(Person::getName).filter(String::toUpperCase).collect(toList()); --收集成集合
复制代码
这是最常用 最简单的函数式编程,经过包装后甚至类似于执行一个SQL;你只需要传入一些Predicate等 函数,就可以达到预期。 下面我们来研究下 collect(toList()) 究竟发生了什么。
二、collect方法的疑问
collect方法主要做收集使用,但并不意味着就要收集成集合,请记住这句话。
- collect 它是个重载方法:
<R, A> R collect(Collector<? super T, A, R> collector);->这是大多数人使用的 该方法要求传入一个 Collector ,返回一个R
<R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner); ->这是另一个稍微底层的API
- 看这个方法瞬间懵逼,Collector是什么? 范型 T ,A ,R 分别代表了什么? 但你直接使用toList(),却觉得好像理所应当(我就想把person的Name收集成个集合嘛),这也是java8中函数式编程的意义之一,非常非常容易理解代码的含义。
- –但你是否想过,它收集成了什么集合? ArrayList 、LinkedList 又或是其他实现?
三、 Collector 解析
一切都要源于 collect(Collector<? super T, A, R> collector);中的Collector
public interface Collector<T, A, R> {
Supplier<A> supplier();
BiConsumer<A, T> accumulator();
BinaryOperator<A> combiner();
Function<A, R> finisher();
Set<Characteristics> characteristics();
//...略
}
复制代码
Collector是一个接口,其中有几个抽象方法 supplier、accumulator、combiner、finisher、characteristics,当你构造一个它的实现类,传给collec(),就会按照你的要求来进行收集!
Supplier<A> supplier()提供一个函数式接口 T get();
- 就是生产者,我们需要给Collector提供一个容器来存放Stream中的元素。
Supplier s1 = HashSet::newSupplier s2 = ArrayList::new
类比:StringBuilder::new, 创建一个StringBuilder来收集CharSequence我们完成了容器初步的搭建- 思考:并发流会创建多个容器吗?
2.BiConsumer<A, T> accumulator() 提供一个计算器, void accept(T t, U u);
- 他是一个消费者,而且还是二元消费者,需要提供 T U 它进行消费; 它的意义是:每次stream中的新元素传来,需要怎么将它归入supplier创建好的容器中。 比如:
List::add->将元素加入ListStringBuilder::append-〉将CharSequence加入StringBuilder 我们完成了流中元素和容器间的关系- 思考:并发流会怎么执行呢?
Function<A, R> finisher()提供一个最终的返回结果
- A 就是supplier中提供的容器, R是要最终返回的,A=R也是完全可以的。 比如说:
- Function.identity(); 会直接返回你传入的 return t -> t;
- 将List -> Set; List -> Map 都是OK的,完全看你想怎么把容器A返回。
4.Set<Characteristics> characteristics(); 这是一个声明: 1、排序问题UNORDERED,集合中是否按照顺序排呀? 2、并发问题CONCURRENT,可以支持多个线程操作(accumulator)同一个 A容器 3、容器和最终返回结果是否一致IDENTITY_FINISH finisher()中,A==R ?
BinaryOperator<A> combiner()提供一个融合器,这个是用在并发流中的;
- BinaryOperator 是个二元处理器,接受 2个参数,返回1个,
R apply(T t, U u)这个是严格的 - 来看看我的使用方法吧:
- (List left, List right) -> { left.addAll(right); ->这是集合的addALl方法,将两个集合合二为一
- (StringBuiler r1, StringBuiler r2) -> { r1.append(r2); return r1; } ->这是将两个StringBuiler拼接,返回其中一个
- stream(),其实有个兄弟,叫stream().parallel() 或者 parallelStream(); 他们可以使得程序并发对stream进行处理,默认是看你cpu的核数 + cpu的超线程技术的总和;
- 1、如果使用了并发流,那么每个线程将调用一次supplier,创建一个A, 每个线程都会去执行accumulator()把流中自己获取到的元素,加入自己的容器A。
- 2、如果使用串行流,那么只会创建一个容器A;单线程accumulator。
- 3、如果开启了parallelStream ,characteristics声明了CONCURRENT,那么不得了,程序将支持多个线程操作同一个容器A,A只会创建一个,这就直接导致了线程不安全问题:如果A是一个List,一个线程在遍历集合,另一个在添加元素,会抛出并发修改异常,如果同时都在set元素,会产生覆盖效果,此时,你提供的容器A,必须是线程安全的集合!! StringBuilder 就应该换成StringBuffer。 (请注意:此时就不会进行combiner,毕竟只有一个容器A)
- 4、如果开启了parallelStream characteristics没声明CONCURRENT,那么就会使线程隔离,每个线程创建一个容器A,自己accumulate,再进行combiner,将多个容器融合(串行融合)。
- 5、如果使用了 IDENTITY_FINISH 那么程序根本不会执行finisher,因为认为 A == R,直接返回A就得了,没必要执行。
四、 Collector实现
public class Collector_custom {
public static void main(String[] args) {
Set<Integer> collect = Arrays.asList(1, 2, 3, 3, 4, 5, 6).stream().collect(new MyCollector<Integer>());
System.out.println(collect);
}
public static class MyCollector<T> implements Collector<T, Set<T>, Set<T>> {
@Override
public Supplier<Set<T>> supplier() {
System.out.println("MyCollector.supplier");
return HashSet::new; -->我们提供一个HashSet
}
@Override
public BiConsumer<Set<T>, T> accumulator() {
System.out.println("MyCollector.accumulator");
return Set::add; -->我们处理Set 和流中元素T的关系
}
@Override
public BinaryOperator<Set<T>> combiner() {
System.out.println("MyCollector.combiner");
return (st1, st2) -> {
st1.addAll(st2);
return st1; ->如果是并发流,创建了多个容器,我们处理多个容器间的关系
};
}
@Override
public Function<Set<T>, Set<T>> finisher() {
System.out.println("MyCollector.finisher");
return Function.identity(); -> 处理 容器和最终返回的规约,我们选择都是返回Set<T>
}
@Override
public Set<Characteristics> characteristics() {
System.out.println("MyCollector.characteristics");
return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH, UNORDERED));
--> 当我们使用了 IDENTITY_FINISH ,其实就不用再写finisher();不知道你明不明白?
--> UNORDERED 不追求顺序,我们毕竟用的HashSet
}
}
}
复制代码
java8自己也有实现: 和我们实现的有点区别,我们已经对5个参数实现好了, 它是需要用户自己传,这就是函数式编程!
static class CollectorImpl<T, A, R> implements Collector<T, A, R> {
private final Supplier<A> supplier;
private final BiConsumer<A, T> accumulator;
private final BinaryOperator<A> combiner;
private final Function<A, R> finisher;
private final Set<Characteristics> characteristics;
//自己传入 5个参数,
CollectorImpl(Supplier<A> supplier,
BiConsumer<A, T> accumulator,
BinaryOperator<A> combiner,
Function<A,R> finisher,
Set<Characteristics> characteristics) {
this.supplier = supplier;
this.accumulator = accumulator;
this.combiner = combiner;
this.finisher = finisher;
this.characteristics = characteristics;
}
CollectorImpl(Supplier<A> supplier,
BiConsumer<A, T> accumulator,
BinaryOperator<A> combiner,
Set<Characteristics> characteristics) {
this(supplier, accumulator, combiner, castingIdentity(), characteristics);
}
@Override
public BiConsumer<A, T> accumulator() {
return accumulator;
}
@Override
public Supplier<A> supplier() {
return supplier;
}
@Override
public BinaryOperator<A> combiner() {
return combiner;
}
@Override
public Function<A, R> finisher() {
return finisher;
}
@Override
public Set<Characteristics> characteristics() {
return characteristics;
}
}
复制代码
五 并发流总结
- 如果是stream(),串行流,那么supplier创建一次容器A, accumulator 对每个元素累加,N个元素N次,combiner不执行,finisher主要看是否使用IDENTITY_FINISH;
- 如果是parallelStream(),并行,主要看是否开启CONCURRENT,此枚举支持多个线程操作同一个容器A, 有可能导致并发安全问题 ;如果不开启,则每个线程创建一个容器A,自己对自己accumulate,最后再combiner,很安全。
六 toList()的实现也太简单了吧
public static <T>
Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
(left, right) -> {
left.addAll(right); return left; },
CH_ID);
//用了内置的静态类CollectorImpl,传入了ArrayList::new,List::add、(left, right) -> { left.addAll(right); return left;
//CH_ID 就是一个实现好了的Set枚举集合
}
复制代码
七:collect 源码的小探究
1、finisher()执行的时机
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
//假设我们是串行
container = evaluate(ReduceOps.makeRef(collector));
}
//这里看到了IDENTITY_FINISH的作用吗? 如果有就返回 container,没有才去finisher()
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
复制代码
2、其他方法的执行时机 ->ReduceOps.makeRef(collector)
makeRef(Collector<? super T, I, ?> collector) {
//执行了!
Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
BiConsumer<I, ? super T> accumulator = collector.accumulator();
BinaryOperator<I> combiner = collector.combiner();
class ReducingSink extends Box<I>
implements AccumulatingSink<T, I, ReducingSink> {
@Override
public void begin(long size) {
state = supplier.get();
}
@Override
public void accept(T t) {
accumulator.accept(state, t);
}
@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
@Override
public int getOpFlags() {
//这里对UNORDERED 进行判断
return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
? StreamOpFlag.NOT_ORDERED
: 0;
}
};
}
复制代码
八:总结
熟悉了Collector后,看Collectors的所有方法,都会非常简单。
边栏推荐
- Don't be afraid of ESD static electricity. This article tells you some solutions
- 登录heroku出现 IP address mismatch的解决方案
- 调整数组顺序使奇数位于偶数前面——每日两题
- uniapp 移动端 两种横竖屏切换方案
- Isolation level RR, gap lock, unreal reading
- LeNet5、AlexNet、VGGNet、ResNet
- CAS vs 数据库乐观锁
- 软考证书还能这样用!拿到证书=获得职称?
- Log in to heroku and the solution of IP address mismatch appears
- EMC中class A和class B哪个更严格?
猜你喜欢

Summary of project experience

教程篇(7.0) 06. 零信任网络访问ZTNA * FortiClient EMS * Fortinet 网络安全专家 NSE 5

Daily question - split equal sum subset

Principle and configuration of redis master-slave replication

.NET 6.0中使用Identity框架实现JWT身份认证与授权

xmpp 服务研究(二) prosody 创建账户

深入剖析单例模式的实现

ASP. Net core technology insider and project practice after reading

Big talk persistence and redolog

Isolation level RR, gap lock, unreal reading
随机推荐
和为s的两个数字——每日两题
Eslint FAQ solutions collection
Why is ESD protection so important for integrated circuits? How to protect?
The cornerstone of EMC - complete knowledge of electromagnetic compatibility filtering!
Heroku operation summary
Modify the conf file through sed
Log in to heroku and the solution of IP address mismatch appears
EMC rectification ideas
C语言详解系列——数组详解,一维数组、二维数组
磁环选型攻略及EMC整改技巧
短作业优先SJF
What is the root cause of EMC's problems?
Qucs preliminary use guide (not Multism)
Overview of distributed system development
EMC中的基石-电磁兼容滤波知识大全!
flowable工作流所有业务概念
Safflower STL
.NET 6.0中使用Identity框架实现JWT身份认证与授权
再次出现用户净流失,大失颜面的中国移动推出超低价套餐争取用户
Deeply analyze the implementation of singleton mode