关于How to skip even lines of a Stream obtained from the Files.lines问题,我遵循了公认的答案方法,基于filterEven()接口(interface)实现了自己的Spliterator<T>方法,例如:

public static <T> Stream<T> filterEven(Stream<T> src) { 
    Spliterator<T> iter = src.spliterator(); 
    AbstractSpliterator<T> res = new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED) 
    { 
        @Override 
        public boolean tryAdvance(Consumer<? super T> action) { 
            iter.tryAdvance(item -> {});    // discard 
            return iter.tryAdvance(action); // use 
        } 
    }; 
    return StreamSupport.stream(res, false); 
} 

我可以通过以下方式使用:
Stream<DomainObject> res = Files.lines(src) 
filterEven(res) 
     .map(line -> toDomainObject(line)) 

但是,相对于使用带有副作用的 filter()的下一种方法来衡量这种方法的性能,我注意到下一种方法的效果更好:
final int[] counter = {0}; 
final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0; 
Stream<DomainObject> res = Files.lines(src) 
     .filter(line -> isEvenLine ()) 
     .map(line -> toDomainObject(line)) 

我使用JMH测试了性能,但没有在基准测试中包含文件负载。我以前将其加载到数组中。然后,每个基准测试都从先前的数组创建 Stream<String>开始,然后过滤偶数行,然后应用 mapToInt()提取 int字段的值,最后是 max()操作。这是基准测试之一(您可以检查整个 Program here,在这里您有 data file with about 186 lines):
@Benchmark 
public int maxTempFilterEven(DataSource src){ 
    Stream<String> content = Arrays.stream(src.data) 
            .filter(s-> s.charAt(0) != '#') // Filter comments 
            .skip(1);                       // Skip line: Not available 
    return filterEven(content)              // Filter daily info and skip hourly 
            .mapToInt(line -> parseInt(line.substring(14, 16))) 
            .max() 
            .getAsInt(); 
} 

我不明白为什么 filter()方法具有比 filterEven()(〜50ops/ms)更好的性能(〜80ops/ms)?

请您参考如下方法:

简介

我想我知道原因,但是不幸的是,我不知道如何提高基于Spliterator的解决方案的性能(至少在不重写整个Streams API功能的情况下)。

旁注1 :设计Stream API时,性能并不是最重要的设计目标。如果性能至关重要,则最有可能在不使用Stream API的情况下重新编写代码将使代码更快。 (例如,Stream API不可避免地会增加内存分配,从而增加GC压力)。另一方面,在大多数情况下,Stream API以相对较小的性能下降为代价提供了更好的高级API。

部分 1 理论上的简短回答
Stream旨在实现一种内部迭代作为消耗的主要方式,而外部迭代(即基于Spliterator的方式)是一种“模拟”的附加手段。因此,外部迭代会涉及一些开销。懒惰对外部迭代的效率增加了一些限制,并且由于需要支持flatMap,因此有必要在此过程中使用某种动态缓冲区。

旁注2 在某些情况下,基于Spliterator的迭代可能与内部迭代一样快(在这种情况下为filter)。在直接从包含数据的Spliterator直接创建Stream的情况下尤其如此。要查看它,可以修改测试以将第一个过滤器具体化为String s数组:

String[] filteredData = Arrays.stream(src.data) 
            .filter(s-> s.charAt(0) != '#') // Filter comments 
            .skip(1)   
            .toArray(String[]::new); 

然后比较 maxTempFiltermaxTempFilterEven的性能,以接受该预先过滤的 String[] filteredData。如果您想知道为什么会这样,您可能应该阅读本长答案的其余部分,或者至少阅读第2部分。

部分 2 更长的理论答案:

流的设计主要是通过某些终端操作将其整体消耗掉。尽管被支持,但一个接一个的迭代元素并不是设计使用流的主要方式。

请注意,使用“功能性” Stream API(例如 mapflatMapfilterreducecollect),您不能在某个步骤中说“我已经有足够的数据,不再遍历源并推送值”。您可以丢弃某些传入数据(就像 filter一样),但不能停止迭代。 ( takeskip转换实际上是在内部使用 Spliterator来实现的; anyMatchallMatchnoneMatchfindFirstfindAny等使用非公共(public)API j.u.s.Sink.cancellationRequested,它们也更容易使用,因为不能进行多个终端操作。)如果管道中的所有转换都是同步的,则可以将它们组合为一个聚合函数( Consumer),然后在一个简单的循环中调用它(可以选择将循环执行拆分为多个线程)。这就是我简化的基于状态的过滤器所代表的含义(请参见 中的代码,向我显示一些代码部分)。如果管道中有一个 flatMap,它会变得更加复杂,但是思路仍然相同。

基于 Spliterator的转换从根本上有所不同,因为它向管道增加了异步的,由消费者驱动的步骤。现在, Spliterator而不是源 Stream驱动了迭代过程。如果您直接在源 Spliterator上请求 Stream,它也许可以返回一些实现,该实现只需对其内部数据结构进行迭代,这就是为什么实现预先过滤的数据应消除性能差异的原因。但是,如果您为某些非空管道创建 Spliterator,则除了要求源将元素逐一推送通过管道,直到某个元素通过所有过滤器之外,没有其他(简单)的选择(另请参见示例2中的第二个示例)。 ,向我显示一些部分代码)。源元素被逐个而不是分批推送的事实是使 Stream变得懒惰的基本决定的结果。需要缓冲区而不是仅一个元素是对 flatMap的支持的结果:从源中推送一个元素可以为 Spliterator生成许多元素。

部分 3 向我显示一些代码

本部分试图为“理论”部分中所描述的代码提供支持(包括链接到实际代码和模拟代码)。

首先,您应该知道当前的Streams API实现将非终端(中间)操作累积到单个惰性管道中(请参阅 j.u.s.AbstractPipeline及其子类(例如 j.u.s.ReferencePipeline)。然后,当应用了终端操作时,原始操作中的所有元素 Stream通过管道“推送”。

您看到的是两件事的结果:
  • 在您遇到的情况下,流管道不同的事实
    里面有一个基于Spliterator的步骤。
  • 您的OddLines不是
  • 管道中的第一步

    具有状态过滤器的代码或多或少类似于以下简单代码:
    static int similarToFilter(String[] data) 
    { 
        final int[] counter = {0}; 
        final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0; 
        int skip = 1; 
     
        boolean reduceEmpty = true; 
        int reduceState = 0; 
        for (String outerEl : data) 
        { 
            if (outerEl.charAt(0) != '#') 
            { 
                if (skip > 0) 
                    skip--; 
                else 
                { 
                    if (isEvenLine.test(outerEl)) 
                    { 
                        int intEl = parseInt(outerEl.substring(14, 16)); 
                        if (reduceEmpty) 
                        { 
                            reduceState = intEl; 
                            reduceEmpty = false; 
                        } 
                        else 
                        { 
                            reduceState = Math.max(reduceState, intEl); 
                        } 
                    } 
                } 
            } 
        } 
        return reduceState; 
    } 
    

    请注意,这实际上是一个内部包含一些计算(过滤/转换)的循环。

    另一方面,当您在管道中添加 Spliterator时,情况会发生很大变化,即使使用与实际发生的情况基本相似的简化代码,它也会变得更大,例如:
    interface Sp<T> 
    { 
        public boolean tryAdvance(Consumer<? super T> action); 
    } 
     
    static class ArraySp<T> implements Sp<T> 
    { 
        private final T[] array; 
        private int pos; 
     
        public ArraySp(T[] array) 
        { 
            this.array = array; 
        } 
     
        @Override 
        public boolean tryAdvance(Consumer<? super T> action) 
        { 
            if (pos < array.length) 
            { 
                action.accept(array[pos]); 
                pos++; 
                return true; 
            } 
            else 
            { 
                return false; 
            } 
        } 
    } 
     
    static class WrappingSp<T> implements Sp<T>, Consumer<T> 
    { 
        private final Sp<T> sourceSp; 
        private final Predicate<T> filter; 
     
        private final ArrayList<T> buffer = new ArrayList<T>(); 
        private int pos; 
     
     
        public WrappingSp(Sp<T> sourceSp, Predicate<T> filter) 
        { 
            this.sourceSp = sourceSp; 
            this.filter = filter; 
        } 
     
        @Override 
        public void accept(T t) 
        { 
            buffer.add(t); 
        } 
     
        @Override 
        public boolean tryAdvance(Consumer<? super T> action) 
        { 
            while (true) 
            { 
                if (pos >= buffer.size()) 
                { 
                    pos = 0; 
                    buffer.clear(); 
                    sourceSp.tryAdvance(this); 
                } 
                // failed to fill buffer 
                if (buffer.size() == 0) 
                    return false; 
     
                T nextElem = buffer.get(pos); 
                pos++; 
                if (filter.test(nextElem)) 
                { 
                    action.accept(nextElem); 
                    return true; 
                } 
            } 
        } 
    } 
     
    static class OddLineSp<T> implements Sp<T>, Consumer<T> 
    { 
        private Sp<T> sourceSp; 
     
        public OddLineSp(Sp<T> sourceSp) 
        { 
            this.sourceSp = sourceSp; 
        } 
     
        @Override 
        public boolean tryAdvance(Consumer<? super T> action) 
        { 
            if (sourceSp == null) 
                return false; 
     
            sourceSp.tryAdvance(this); 
            if (!sourceSp.tryAdvance(action)) 
            { 
                sourceSp = null; 
            } 
            return true; 
     
        } 
     
        @Override 
        public void accept(T t) 
        { 
     
        } 
    } 
     
    static class ReduceIntMax 
    { 
        boolean reduceEmpty = true; 
        int reduceState = 0; 
     
        public int getReduceState() 
        { 
            return reduceState; 
        } 
     
        public void accept(int t) 
        { 
            if (reduceEmpty) 
            { 
                reduceEmpty = false; 
                reduceState = t; 
            } 
            else 
            { 
                reduceState = Math.max(reduceState, t); 
            } 
        } 
    } 
     
     
    static int similarToSpliterator(String[] data) 
    { 
        ArraySp<String> src = new ArraySp<>(data); 
     
        int[] skip = new int[1]; 
        skip[0] = 1; 
        WrappingSp<String> firstFilter = new WrappingSp<String>(src, (s) -> 
        { 
            if (s.charAt(0) == '#') 
                return false; 
            if (skip[0] != 0) 
            { 
                skip[0]--; 
                return false; 
            } 
            return true; 
        }); 
        OddLineSp<String> oddLines = new OddLineSp<>(firstFilter); 
        final ReduceIntMax reduceIntMax = new ReduceIntMax(); 
        while (oddLines.tryAdvance(s -> 
                                   { 
                                       int intValue = parseInt(s.substring(14, 16)); 
                                       reduceIntMax.accept(intValue); 
                                   })) ; // do nothing in the loop body 
        return reduceIntMax.getReduceState(); 
    } 
    

    此代码较大,因为如果在循环内没有一些非平凡的有状态回调,则无法(或至少很难)表示逻辑。这里的 Sp接口(interface)是 j.u.s.Streamj.u.Spliterator接口(interface)的混合体。
  • ArraySp表示Arrays.stream的结果。
  • WrappingSpj.u.s.StreamSpliterators.WrappingSpliterator相似,后者在实际代码中表示针对任何非空管道的Spliterator接口(interface)的实现,即,至少应用了一个中间操作的Stream(请参阅j.u.s.AbstractPipeline.spliterator method)。在我的代码中,我将其与StatelessOp子类合并,并放置负责filter方法实现的逻辑。同样为了简单起见,我使用skip实现了filter
  • OddLineSp对应于您的OddLines及其产生的Stream
  • ReduceIntMax表示ReduceOpsMath.max终端操作int

  • 那么在此示例中重要的是什么?这里重要的是,因为您首先过滤了原始流,所以 OddLineSp是根据非空管道(即 WrappingSp)创建的。而且,如果您仔细观察 WrappingSp,您会注意到每次 tryAdvance都被调用时,它将调用委托(delegate)给 sourceSp并将结果累积到 buffer中。而且,由于管道中没有 flatMap,因此 buffer的元素将被一一复制。 IE。每次调用 WrappingSp.tryAdvance时,它将调用 ArraySp.tryAdvance,准确地返回一个元素(通过回调),并将其进一步传递给调用方提供的 consumer(除非该元素与过滤器不匹配,在这种情况下将再次调用 ArraySp.tryAdvance)再一次,但 buffer永远不会一次填充多个元素)。

    旁注3 :如果要查看实际代码,则最有趣的地方是 j.u.s.StreamSpliterators. WrappingSpliterator.tryAdvance ,它调用
    j.u.s.StreamSpliterators. AbstractWrappingSpliterator.doAdvance 依次调用 j.u.s.StreamSpliterators. AbstractWrappingSpliterator.fillBuffer j.u.s.StreamSpliterators. pusher 依次调用在ojit_a初始化的 WrappingSpliterator.initPartialTraversalState
    因此,影响性能的主要因素是复制到缓冲区中。
    对于我们这些普通的Java开发人员来说,不幸的是,Stream API的当前实现几乎是封闭的,您不能仅使用继承或组合来修改内部行为的某些方面。
    您可以使用基于反射的黑客手段,使针对特定情况的复制到缓冲区的效率更高,并获得一定的性能(但是牺牲了 Stream的惰性),但是您不能完全避免这种复制,因此基于 Spliterator的代码将是反正比较慢

    回到第2条 旁注的示例,基于 Spliterator的包含具体化 filteredData的测试工作得更快,因为在 WrappingSp之前的管道中没有 OddLineSp,因此不会复制到中间缓冲区中。


    评论关闭
    IT干货网

    微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!