1ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin/*
2ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin *
5ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * This code is free software; you can redistribute it and/or modify it
6ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * under the terms of the GNU General Public License version 2 only, as
7ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * published by the Free Software Foundation.  Oracle designates this
8ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * particular file as subject to the "Classpath" exception as provided
9ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * by Oracle in the LICENSE file that accompanied this code.
10ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin *
11ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * This code is distributed in the hope that it will be useful, but WITHOUT
12ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * version 2 for more details (a copy is included in the LICENSE file that
15ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * accompanied this code).
16ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin *
17ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * You should have received a copy of the GNU General Public License version
18ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * 2 along with this work; if not, write to the Free Software Foundation,
19ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin *
21ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * or visit www.oracle.com if you need additional information or have any
23ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * questions.
24ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin */
25ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkinpackage java.util.stream;
26ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
27ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkinimport java.util.HashSet;
28ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkinimport java.util.LinkedHashSet;
29ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkinimport java.util.Objects;
30ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkinimport java.util.Set;
31ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkinimport java.util.Spliterator;
32ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkinimport java.util.concurrent.ConcurrentHashMap;
33ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkinimport java.util.concurrent.atomic.AtomicBoolean;
34ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkinimport java.util.function.IntFunction;
35ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
36ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin/**
37ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * Factory methods for transforming streams into duplicate-free streams, using
38ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * {@link Object#equals(Object)} to determine equality.
39ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin *
40ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin * @since 1.8
41ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin */
42ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkinfinal class DistinctOps {
43ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
44ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    private DistinctOps() { }
45ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
46ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    /**
47ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * Appends a "distinct" operation to the provided stream, and returns the
48ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * new stream.
49ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     *
50ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param <T> the type of both input and output elements
51ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @param upstream a reference stream with element type T
52ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     * @return the new stream
53ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin     */
54ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
55ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
56ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                                                      StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
57ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
58ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            <P_IN> Node<T> reduce(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
59ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                // If the stream is SORTED then it should also be ORDERED so the following will also
60ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                // preserve the sort order
61ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                TerminalOp<T, LinkedHashSet<T>> reduceOp
62ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
63ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                                                                 LinkedHashSet::addAll);
64ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                return Nodes.node(reduceOp.evaluateParallel(helper, spliterator));
65ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            }
66ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
67ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            @Override
68289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin            public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
69ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                                              Spliterator<P_IN> spliterator,
70ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                                              IntFunction<T[]> generator) {
71ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {
72ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    // No-op
73ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    return helper.evaluate(spliterator, false, generator);
74ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                }
75ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
76ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    return reduce(helper, spliterator);
77ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                }
78ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                else {
79ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    // Holder of null state since ConcurrentHashMap does not support null values
80ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    AtomicBoolean seenNull = new AtomicBoolean(false);
81ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>();
82ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    TerminalOp<T, Void> forEachOp = ForEachOps.makeRef(t -> {
83ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        if (t == null)
84ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            seenNull.set(true);
85ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        else
86ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            map.putIfAbsent(t, Boolean.TRUE);
87ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    }, false);
88ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    forEachOp.evaluateParallel(helper, spliterator);
89ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
90ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    // If null has been seen then copy the key set into a HashSet that supports null values
91ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    // and add null
92ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    Set<T> keys = map.keySet();
93ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    if (seenNull.get()) {
94ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        // TODO Implement a more efficient set-union view, rather than copying
95ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        keys = new HashSet<>(keys);
96ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        keys.add(null);
97ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    }
98ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    return Nodes.node(keys);
99ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                }
100ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            }
101ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
102ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            @Override
103289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin            public <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
104ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {
105ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    // No-op
106ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    return helper.wrapSpliterator(spliterator);
107ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                }
108ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
109ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    // Not lazy, barrier required to preserve order
110ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    return reduce(helper, spliterator).spliterator();
111ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                }
112ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                else {
113ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    // Lazy
114ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    return new StreamSpliterators.DistinctSpliterator<>(helper.wrapSpliterator(spliterator));
115ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                }
116ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            }
117ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
118ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            @Override
119289e51c2258b001f2aa6d6e67b21be7bb65d5102Igor Murashkin            public Sink<T> opWrapSink(int flags, Sink<T> sink) {
120ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                Objects.requireNonNull(sink);
121ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
122ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                if (StreamOpFlag.DISTINCT.isKnown(flags)) {
123ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    return sink;
124ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                } else if (StreamOpFlag.SORTED.isKnown(flags)) {
125ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    return new Sink.ChainedReference<T, T>(sink) {
126ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        boolean seenNull;
127ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        T lastSeen;
128ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
129ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        @Override
130ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        public void begin(long size) {
131ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            seenNull = false;
132ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            lastSeen = null;
133ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            downstream.begin(-1);
134ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        }
135ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
136ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        @Override
137ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        public void end() {
138ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            seenNull = false;
139ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            lastSeen = null;
140ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            downstream.end();
141ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        }
142ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
143ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        @Override
144ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        public void accept(T t) {
145ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            if (t == null) {
146ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                                if (!seenNull) {
147ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                                    seenNull = true;
148ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                                    downstream.accept(lastSeen = null);
149ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                                }
150ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            } else if (lastSeen == null || !t.equals(lastSeen)) {
151ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                                downstream.accept(lastSeen = t);
152ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            }
153ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        }
154ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    };
155ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                } else {
156ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    return new Sink.ChainedReference<T, T>(sink) {
157ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        Set<T> seen;
158ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
159ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        @Override
160ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        public void begin(long size) {
161ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            seen = new HashSet<>();
162ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            downstream.begin(-1);
163ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        }
164ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
165ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        @Override
166ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        public void end() {
167ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            seen = null;
168ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            downstream.end();
169ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        }
170ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin
171ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        @Override
172ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        public void accept(T t) {
173ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            if (!seen.contains(t)) {
174ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                                seen.add(t);
175ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                                downstream.accept(t);
176ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                            }
177ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                        }
178ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                    };
179ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin                }
180ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin            }
181ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin        };
182ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin    }
183ff18b5f136f92154f2e05217e3953d10f459e561Igor Murashkin}
184