1/*
2 * Copyright (c) 2014, Oracle and/or its affiliates. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 *
8 *   - Redistributions of source code must retain the above copyright
9 *     notice, this list of conditions and the following disclaimer.
10 *
11 *   - Redistributions in binary form must reproduce the above copyright
12 *     notice, this list of conditions and the following disclaimer in the
13 *     documentation and/or other materials provided with the distribution.
14 *
15 *   - Neither the name of Oracle nor the names of its
16 *     contributors may be used to endorse or promote products derived
17 *     from this software without specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
20 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
24 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
25 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
26 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
27 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
28 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
29 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31
32/*
33 * This source code is provided to illustrate the usage of a given feature
34 * or technique and has been deliberately simplified. Additional steps
35 * required for a production-quality application, such as security checks,
36 * input validation, and proper error handling, might not be present in
37 * this sample code.
38 */
39
40import java.io.BufferedReader;
41import java.io.IOException;
42import java.nio.file.Files;
43import java.nio.file.Paths;
44import java.util.*;
45import java.util.function.*;
46import java.util.regex.Pattern;
47import java.util.stream.Collector;
48import java.util.stream.Collectors;
49
50import static java.lang.Double.parseDouble;
51import static java.util.stream.Collectors.*;
52
53/**
54 * CSVProcessor is a tool for processing CSV files. There are several
55 * command-line options. Consult the {@link #printUsageAndExit} method for
56 * instructions and command line parameters. This sample shows examples of the
57 * following features:
58 * <ul>
59 * <li>Lambda and bulk operations. Working with streams: map(...), filter(...),
60 * sorted(...) methods. The collect(...) method with different collectors:
61 * Collectors.maxBy(...), Collectors.minBy(...), Collectors.toList(),
62 * Collectors.toCollection(...), Collectors.groupingBy(...),
63 * Collectors.toDoubleSummaryStatistics(...), and a custom Collector.</li>
64 * <li>Static method reference for printing values.</li>
65 * <li>Try-with-resources feature for closing files.</li>
66 * <li>Switch by String feature.</li>
67 * <li>Other new APIs: Pattern.asPredicate(), BinaryOperator
68 * BufferedReader.lines(), Collection.forEach(...), Comparator.comparing(...),
69 * Comparator.reversed(), Arrays.stream(...).</li>
70 * </ul>
71 *
72 */
73public class CSVProcessor {
74
75    //Number of characters that may be read
76    private static final int READ_AHEAD_LIMIT = 100_000_000;
77
78    /**
79     * The main method for the CSVProcessor program. Run the program with an
80     * empty argument list to see possible arguments.
81     *
82     * @param args the argument list for CSVProcessor.
83     */
84    public static void main(String[] args) {
85        if (args.length < 2) {
86            printUsageAndExit();
87        }
88        try (BufferedReader br = new BufferedReader(
89                Files.newBufferedReader(Paths.get(args[args.length - 1])))) {
90            //Assume that the first line contains column names.
91            List<String> header = Arrays.stream(br.readLine().split(","))
92                    .map(String::trim).collect(toList());
93            //Calculate an index of the column in question.
94            int column = getColumnNumber(header, args[1]);
95            switch (args[0]) {
96                case "sort":
97                    verifyArgumentNumber(args, 4);
98                    //Define the sort order.
99                    boolean isAsc;
100                    switch (args[2].toUpperCase()) {
101                        case "ASC":
102                            isAsc = true;
103                            break;
104                        case "DESC":
105                            isAsc = false;
106                            break;
107                        default:
108                            printUsageAndExit("Illegal argument" + args[2]);
109                            return;//Should not be reached.
110                    }
111                    /*
112                     * Create a comparator that compares lines by comparing
113                     * values in the specified column.
114                     */
115                    Comparator<String> cmp
116                            = Comparator.comparing(str -> getCell(str, column),
117                                    String.CASE_INSENSITIVE_ORDER);
118                    /*
119                     * sorted(...) is used to sort records.
120                     * forEach(...) is used to output sorted records.
121                     */
122                    br.lines().sorted(isAsc ? cmp : cmp.reversed())
123                            .forEach(System.out::println);
124                    break;
125                case "search":
126                    verifyArgumentNumber(args, 4);
127                    /*
128                     * Records are filtered by a regex.
129                     * forEach(...) is used to output filtered records.
130                     */
131                    Predicate<String> pattern
132                            = Pattern.compile(args[2]).asPredicate();
133                    br.lines().filter(str -> pattern.test(getCell(str, column)))
134                            .forEach(System.out::println);
135                    break;
136                case "groupby":
137                    verifyArgumentNumber(args, 3);
138                    /*
139                     * Group lines by values in the column with collect(...), and
140                     * print with forEach(...) for every distinct value within
141                     * the column.
142                     */
143                    br.lines().collect(
144                            Collectors.groupingBy(str -> getCell(str, column),
145                                    toCollection(TreeSet::new)))
146                            .forEach((str, set) -> {
147                                System.out.println(str + ":");
148                                set.forEach(System.out::println);
149                            });
150                    break;
151                case "stat":
152                    verifyArgumentNumber(args, 3);
153
154                    /*
155                     * BufferedReader will be read several times.
156                     * Mark this point to return here after each pass.
157                     * BufferedReader will be read right after the headers line
158                     * because it is already read.
159                     */
160                    br.mark(READ_AHEAD_LIMIT);
161
162                    /*
163                     * Statistics can be collected by a custom collector in one
164                     * pass. One pass is preferable.
165                     */
166                    System.out.println(
167                            br.lines().collect(new Statistics(column)));
168
169                    /*
170                     * Alternatively, statistics can be collected
171                     * by a built-in API in several passes.
172                     * This method demonstrates how separate operations can be
173                     * implemented using a built-in API.
174                     */
175                    br.reset();
176                    statInSeveralPasses(br, column);
177                    break;
178                default:
179                    printUsageAndExit("Illegal argument" + args[0]);
180            }
181        } catch (IOException e) {
182            printUsageAndExit(e.toString());
183        }
184    }
185
186    private static void statInSeveralPasses(BufferedReader br, int column)
187            throws IOException {
188        System.out.println("#-----Statistics in several passes-------#");
189        //Create a comparator to compare records by the column.
190        Comparator<String> comparator
191                = Comparator.comparing(
192                        (String str) -> parseDouble(getCell(str, column)));
193        //Find max record by using Collectors.maxBy(...)
194        System.out.println(
195                "Max: " + br.lines().collect(maxBy(comparator)).get());
196        br.reset();
197        //Find min record by using Collectors.minBy(...)
198        System.out.println(
199                "Min: " + br.lines().collect(minBy(comparator)).get());
200        br.reset();
201        //Compute the average value and sum with
202        //Collectors.toDoubleSummaryStatistics(...)
203        DoubleSummaryStatistics doubleSummaryStatistics
204                = br.lines().collect(summarizingDouble(
205                    str -> parseDouble(getCell(str, column))));
206        System.out.println("Average: " + doubleSummaryStatistics.getAverage());
207        System.out.println("Sum: " + doubleSummaryStatistics.getSum());
208    }
209
210    private static void verifyArgumentNumber(String[] args, int n) {
211        if (args.length != n) {
212            printUsageAndExit("Expected " + n + " arguments but was "
213                    + args.length);
214        }
215    }
216
217    private static int getColumnNumber(List<String> header, String name) {
218        int column = header.indexOf(name);
219        if (column == -1) {
220            printUsageAndExit("There is no column with name " + name);
221        }
222        return column;
223    }
224
225    private static String getCell(String record, int column) {
226        return record.split(",")[column].trim();
227    }
228
229    private static void printUsageAndExit(String... str) {
230        System.out.println("Usages:");
231
232        System.out.println("CSVProcessor sort COLUMN_NAME ASC|DESC FILE");
233        System.out.println("Sort lines by column COLUMN_NAME in CSV FILE\n");
234
235        System.out.println("CSVProcessor search COLUMN_NAME REGEX FILE");
236        System.out.println("Search for REGEX in column COLUMN_NAME in CSV FILE\n");
237
238        System.out.println("CSVProcessor groupby COLUMN_NAME FILE");
239        System.out.println("Split lines into different groups according to column "
240                + "COLUMN_NAME value\n");
241
242        System.out.println("CSVProcessor stat COLUMN_NAME FILE");
243        System.out.println("Compute max/min/average/sum  statistics by column "
244                + "COLUMN_NAME\n");
245
246        Arrays.asList(str).forEach(System.err::println);
247        System.exit(1);
248    }
249
250    /*
251     * This is a custom implementation of the Collector interface.
252     * Statistics are objects gather max,min,sum,average statistics.
253     */
254    private static class Statistics
255            implements Collector<String, Statistics, Statistics> {
256
257
258        /*
259         * This implementation does not need to be thread safe because
260         * the parallel implementation of
261         * {@link java.util.stream.Stream#collect Stream.collect()}
262         * provides the necessary partitioning and isolation for safe parallel
263         * execution.
264         */
265        private String maxRecord;
266        private String minRecord;
267
268        private double sum;
269        private int lineCount;
270        private final BinaryOperator<String> maxOperator;
271        private final BinaryOperator<String> minOperator;
272        private final int column;
273
274        public Statistics(int column) {
275            this.column = column;
276            Comparator<String> cmp = Comparator.comparing(
277                    (String str) -> parseDouble(getCell(str, column)));
278            maxOperator = BinaryOperator.maxBy(cmp);
279            minOperator = BinaryOperator.minBy(cmp);
280        }
281
282        /*
283         * Process line.
284         */
285        public Statistics accept(String line) {
286            maxRecord = maxRecord == null
287                    ? line : maxOperator.apply(maxRecord, line);
288            minRecord = minRecord == null
289                    ? line : minOperator.apply(minRecord, line);
290
291            sum += parseDouble(getCell(line, column));
292            lineCount++;
293            return this;
294        }
295
296
297        /*
298         * Merge two Statistics.
299         */
300        public Statistics combine(Statistics stat) {
301            maxRecord = maxOperator.apply(maxRecord, stat.getMaxRecord());
302            minRecord = minOperator.apply(minRecord, stat.getMinRecord());
303            sum += stat.getSum();
304            lineCount += stat.getLineCount();
305            return this;
306        }
307
308        @Override
309        public String toString() {
310            StringBuilder sb = new StringBuilder();
311            sb.append("#------Statistics------#\n");
312            sb.append("Max: ").append(getMaxRecord()).append("\n");
313            sb.append("Min: ").append(getMinRecord()).append("\n");
314            sb.append("Sum = ").append(getSum()).append("\n");
315            sb.append("Average = ").append(average()).append("\n");
316            sb.append("#------Statistics------#\n");
317            return sb.toString();
318        }
319
320        @Override
321        public Supplier<Statistics> supplier() {
322            return () -> new Statistics(column);
323        }
324
325        @Override
326        public BiConsumer<Statistics, String> accumulator() {
327            return Statistics::accept;
328        }
329
330        @Override
331        public BinaryOperator<Statistics> combiner() {
332            return Statistics::combine;
333
334        }
335
336        @Override
337        public Function<Statistics, Statistics> finisher() {
338            return stat -> stat;
339        }
340
341        @Override
342        public Set<Characteristics> characteristics() {
343            return EnumSet.of(Characteristics.IDENTITY_FINISH);
344        }
345
346        private String getMaxRecord() {
347            return maxRecord;
348        }
349
350        private String getMinRecord() {
351            return minRecord;
352        }
353
354        private double getSum() {
355            return sum;
356        }
357
358        private double average() {
359            return sum / lineCount;
360        }
361
362        private int getLineCount() {
363            return lineCount;
364        }
365
366    }
367
368}
369