JDK源码分析——java流式处理实例分析

M君 6月前 ⋅ 337 阅读

由于在项目中使用了很多流式的处理,但是却对什么是流式处理没有概念,所以闲暇之余看了一下ArrayList的stream的实现,并按照该实现并结合Splitterator的样例实现了一个简单的可以并发的List,该List也支持流式处理的方式。

由于本来是为了了解深入的分析一下流式处理而编写的样例,但是个人觉得通过调试分析该样例可以帮助大家对什么流式处理由一个初步的印象,而且可以参考该样例编写自己的流式处理结构,所以把相关的源代码分享给大家。可以通过对tryAdvance和trySplit打断点的方式,对普通流和并发流进行分析,可以加深对两个任务划分机制的理解。

进阶2:如果需要更加深入的分析流的工作方式,建议通过对ArrayList的ArrayListSpliterator的trySplit打断点,然后进行并发流的调试,可以更加直观的看到任务的生成和划分机制。

 

自定义简单的list实现

package stream;

import java.util.AbstractList;
import java.util.List;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* 不可变数组,用于测试流处理
* @param
*/
public class ImmutableArrayList extends AbstractList implements List {

/** 元素存储数组 */
private final Object[] elements;

public ImmutableArrayList(Object[] data, Object[] tags){
int dataSize = data.length;
int tagSize = tags.length;
if(dataSize != tagSize){
throw new IllegalArgumentException("Array size not equal data size:"
+dataSize+",tag length:"+tagSize);
}
this.elements = new Object[2*dataSize];
for (int i = 0, j = 0; i < dataSize; i++){
elements[j++] = data[i];
elements[j++] = tags[i];
}
}

@Override
public Stream stream() {
// 普通流
return StreamSupport.stream(spliterator(), false);
}

@Override
public Stream parallelStream() {
// 并发流
return StreamSupport.stream(spliterator(), true);
}

@Override
public Spliterator spliterator() {
return new ImmutableSpliterator<>(elements, 0, elements.length);
}

static final class ImmutableSpliterator implements Spliterator{

private Object[] array;
private int origin;
private final int fence;

ImmutableSpliterator(Object[] array, int origin, int fence){
this.array = array;
this.origin = origin;
this.fence = fence;
}

@Override
public void forEachRemaining(Consumer<? super E> action) {
// 1、自定义遍历方式
// for (; origin < fence; origin++){
// action.accept((E)array[origin]);
// }
// 2、tryAdvance
do { }while (tryAdvance(action));
}

@Override
public boolean tryAdvance(Consumer<? super E> action) {
// 数据访问限制判断
if(origin < fence){
action.accept((E)array[origin]);
origin += 2;
return true;
}
return false;
}

@Override
public Spliterator trySplit() {
// 任务粒度划分判读
int lo = origin;
int mid = ((lo + fence) >>> 1) & ~1;
if(lo < mid){
origin = mid;
return new ImmutableSpliterator<>(array, lo, mid);
}
return null;
}

@Override
public long estimateSize() {

return (long)((fence - origin) / 2);
}

@Override
public int characteristics() {
return ORDERED | SIZED | IMMUTABLE | SUBSIZED;
}
}

@Override
public E get(int index) {
rangeCheck(index);
return (E)elements[index];
}

@Override
public int size() {
return elements.length;
}

private void rangeCheck(int index){
if(index >= elements.length){
throw new IndexOutOfBoundsException(outOfBoundsMsg(index));
}
}

private String outOfBoundsMsg(int index){
return "Index:"+index+",Size:"+elements.length;
}
}

 

流测试

package stream;

import org.junit.Before;
import org.junit.Test;

public class TestMain {

ImmutableArrayList immutableArrayList;

@Before
public void init(){
int dataSize = 4;
String[] data = new String[dataSize];
String[] tags = new String[dataSize];
for (int i = 0; i < dataSize; i++){
data[i] = "Data:" + i;
tags[i] = "Tags" + i;
}
immutableArrayList = new ImmutableArrayList(data, tags);
}

@Test
public void testSteam(){
System.out.println("普通流测试");
System.out.println("打印所有信息:");
immutableArrayList.stream().forEach(System.out::println);
System.out.println("过滤并打印Data信息:");
immutableArrayList.stream().filter(
param -> {
String str = (String)param;
return str.startsWith("Data");
}
).forEach(System.out::println);
}

@Test
public void testParallelStream(){
System.out.println("并发流测试");
System.out.println("打印所有信息:");
immutableArrayList.parallelStream().forEach(System.out::println);
System.out.println("过滤并打印Data信息:");
immutableArrayList.stream().filter(
param -> {
String str = (String)param;
return str.startsWith("Data");
}
).forEach(System.out::println);
}
}

 

JDK中流式任务划分过程

// Similar to AbstractTask but doesn't need to track child tasks
public void compute() {
Spliterator rightSplit = spliterator, leftSplit;
long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
if ((sizeThreshold = targetSize) == 0L)
targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
boolean forkRight = false;
Sink taskSink = sink;
ForEachTask<S, T> task = this;
while (!isShortCircuit || !taskSink.cancellationRequested()) {
if (sizeEstimate <= sizeThreshold ||
(leftSplit = rightSplit.trySplit()) == null) {
task.helper.copyInto(taskSink, rightSplit);
break;
}
ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
task.addToPendingCount(1);
ForEachTask<S, T> taskToFork;
if (forkRight) {
forkRight = false;
rightSplit = leftSplit;
taskToFork = task;
task = leftTask;
}
else {
forkRight = true;
taskToFork = leftTask;
}
taskToFork.fork();
sizeEstimate = rightSplit.estimateSize();
}
task.spliterator = null;
task.propagateCompletion();
}

 

 


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: