weighted_quantiles_stream_test.cc revision 150a0aec64acc75d710fda67f802acebfe496cb8
1// Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// =============================================================================
15#include "tensorflow/contrib/boosted_trees/lib/quantiles/weighted_quantiles_stream.h"
16
17#include "tensorflow/core/lib/random/philox_random.h"
18#include "tensorflow/core/lib/random/simple_philox.h"
19#include "tensorflow/core/platform/test.h"
20#include "tensorflow/core/platform/test_benchmark.h"
21
22namespace tensorflow {
23namespace {
24using Tuple = std::tuple<int64, int64>;
25
26using Summary =
27    boosted_trees::quantiles::WeightedQuantilesSummary<double, double>;
28using SummaryEntry =
29    boosted_trees::quantiles::WeightedQuantilesSummary<double,
30                                                       double>::SummaryEntry;
31using Stream =
32    boosted_trees::quantiles::WeightedQuantilesStream<double, double>;
33
34TEST(GetQuantileSpecs, InvalidEps) {
35  EXPECT_DEATH({ Stream::GetQuantileSpecs(-0.01, 0L); }, "eps >= 0");
36  EXPECT_DEATH({ Stream::GetQuantileSpecs(1.01, 0L); }, "eps < 1");
37}
38
39TEST(GetQuantileSpecs, ZeroEps) {
40  EXPECT_DEATH({ Stream::GetQuantileSpecs(0.0, 0L); }, "max_elements > 0");
41  EXPECT_EQ(Stream::GetQuantileSpecs(0.0, 1LL), Tuple(1LL, 2LL));
42  EXPECT_EQ(Stream::GetQuantileSpecs(0.0, 20LL), Tuple(1LL, 20LL));
43}
44
45TEST(GetQuantileSpecs, NonZeroEps) {
46  EXPECT_DEATH({ Stream::GetQuantileSpecs(0.01, 0L); }, "max_elements > 0");
47  EXPECT_EQ(Stream::GetQuantileSpecs(0.1, 320LL), Tuple(4LL, 31LL));
48  EXPECT_EQ(Stream::GetQuantileSpecs(0.01, 25600LL), Tuple(6LL, 501LL));
49  EXPECT_EQ(Stream::GetQuantileSpecs(0.01, 104857600LL), Tuple(17LL, 1601LL));
50  EXPECT_EQ(Stream::GetQuantileSpecs(0.1, 104857600LL), Tuple(20LL, 191LL));
51  EXPECT_EQ(Stream::GetQuantileSpecs(0.01, 1LL << 40), Tuple(29LL, 2801LL));
52  EXPECT_EQ(Stream::GetQuantileSpecs(0.001, 1LL << 40), Tuple(26LL, 25001LL));
53}
54
55class WeightedQuantilesStreamTest : public ::testing::Test {};
56
57// Stream generators.
58void GenerateFixedUniformSummary(int32 worker_id, int64 max_elements,
59                                 double *total_weight, Stream *stream) {
60  for (int64 i = 0; i < max_elements; ++i) {
61    const double x = static_cast<double>(i) / max_elements;
62    stream->PushEntry(x, 1.0);
63    ++(*total_weight);
64  }
65  stream->Finalize();
66}
67
68void GenerateFixedNonUniformSummary(int32 worker_id, int64 max_elements,
69                                    double *total_weight, Stream *stream) {
70  for (int64 i = 0; i < max_elements; ++i) {
71    const double x = static_cast<double>(i) / max_elements;
72    stream->PushEntry(x, x);
73    (*total_weight) += x;
74  }
75  stream->Finalize();
76}
77
78void GenerateRandUniformFixedWeightsSummary(int32 worker_id, int64 max_elements,
79                                            double *total_weight,
80                                            Stream *stream) {
81  // Simulate uniform distribution stream.
82  random::PhiloxRandom philox(13 + worker_id);
83  random::SimplePhilox rand(&philox);
84  for (int64 i = 0; i < max_elements; ++i) {
85    const double x = rand.RandDouble();
86    stream->PushEntry(x, 1);
87    ++(*total_weight);
88  }
89  stream->Finalize();
90}
91
92void GenerateRandUniformRandWeightsSummary(int32 worker_id, int64 max_elements,
93                                           double *total_weight,
94                                           Stream *stream) {
95  // Simulate uniform distribution stream.
96  random::PhiloxRandom philox(13 + worker_id);
97  random::SimplePhilox rand(&philox);
98  for (int64 i = 0; i < max_elements; ++i) {
99    const double x = rand.RandDouble();
100    const double w = rand.RandDouble();
101    stream->PushEntry(x, w);
102    (*total_weight) += w;
103  }
104  stream->Finalize();
105}
106
107// Single worker tests.
108void TestSingleWorkerStreams(
109    double eps, int64 max_elements,
110    const std::function<void(int32, int64, double *, Stream *)>
111        &worker_summary_generator,
112    std::initializer_list<double> expected_quantiles,
113    double quantiles_matcher_epsilon) {
114  // Generate single stream.
115  double total_weight = 0;
116  Stream stream(eps, max_elements);
117  worker_summary_generator(0, max_elements, &total_weight, &stream);
118
119  // Ensure we didn't lose track of any elements and are
120  // within approximation error bound.
121  EXPECT_LE(stream.ApproximationError(), eps);
122  EXPECT_NEAR(stream.GetFinalSummary().TotalWeight(), total_weight, 1e-6);
123
124  // Verify expected quantiles.
125  int i = 0;
126  auto actuals = stream.GenerateQuantiles(expected_quantiles.size() - 1);
127  for (auto expected_quantile : expected_quantiles) {
128    EXPECT_NEAR(actuals[i], expected_quantile, quantiles_matcher_epsilon);
129    ++i;
130  }
131}
132
133// Stream generators.
134void GenerateOneValue(int32 worker_id, int64 max_elements, double *total_weight,
135                      Stream *stream) {
136  stream->PushEntry(10, 1);
137  ++(*total_weight);
138  stream->Finalize();
139}
140
141TEST(WeightedQuantilesStreamTest, OneValue) {
142  const double eps = 0.01;
143  const int64 max_elements = 1 << 16;
144  TestSingleWorkerStreams(eps, max_elements, GenerateOneValue,
145                          {10.0, 10.0, 10.0, 10.0, 10.0}, 1e-2);
146}
147
148TEST(WeightedQuantilesStreamTest, FixedUniform) {
149  const double eps = 0.01;
150  const int64 max_elements = 1 << 16;
151  TestSingleWorkerStreams(eps, max_elements, GenerateFixedUniformSummary,
152                          {0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0},
153                          1e-2);
154}
155
156TEST(WeightedQuantilesStreamTest, FixedNonUniform) {
157  const double eps = 0.01;
158  const int64 max_elements = 1 << 16;
159  TestSingleWorkerStreams(eps, max_elements, GenerateFixedNonUniformSummary,
160                          {0, std::sqrt(0.1), std::sqrt(0.2), std::sqrt(0.3),
161                           std::sqrt(0.4), std::sqrt(0.5), std::sqrt(0.6),
162                           std::sqrt(0.7), std::sqrt(0.8), std::sqrt(0.9), 1.0},
163                          1e-2);
164}
165
166TEST(WeightedQuantilesStreamTest, RandUniformFixedWeights) {
167  const double eps = 0.01;
168  const int64 max_elements = 1 << 16;
169  TestSingleWorkerStreams(
170      eps, max_elements, GenerateRandUniformFixedWeightsSummary,
171      {0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}, 1e-2);
172}
173
174TEST(WeightedQuantilesStreamTest, RandUniformRandWeights) {
175  const double eps = 0.01;
176  const int64 max_elements = 1 << 16;
177  TestSingleWorkerStreams(
178      eps, max_elements, GenerateRandUniformRandWeightsSummary,
179      {0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}, 1e-2);
180}
181
182// Distributed tests.
183void TestDistributedStreams(
184    int32 num_workers, double eps, int64 max_elements,
185    const std::function<void(int32, int64, double *, Stream *)>
186        &worker_summary_generator,
187    std::initializer_list<double> expected_quantiles,
188    double quantiles_matcher_epsilon) {
189  // Simulate streams on each worker running independently
190  double total_weight = 0;
191  std::vector<std::vector<SummaryEntry>> worker_summaries;
192  for (int32 i = 0; i < num_workers; ++i) {
193    Stream stream(eps / 2, max_elements);
194    worker_summary_generator(i, max_elements / num_workers, &total_weight,
195                             &stream);
196    worker_summaries.push_back(stream.GetFinalSummary().GetEntryList());
197  }
198
199  // In the accumulation phase, we aggregate the summaries from each worker
200  // and build an overall summary while maintaining error bounds by ensuring we
201  // don't increase the error by more than eps / 2.
202  Stream reducer_stream(eps, max_elements);
203  for (const auto &summary : worker_summaries) {
204    reducer_stream.PushSummary(summary);
205  }
206  reducer_stream.Finalize();
207
208  // Ensure we didn't lose track of any elements and are
209  // within approximation error bound.
210  EXPECT_LE(reducer_stream.ApproximationError(), eps);
211  EXPECT_NEAR(reducer_stream.GetFinalSummary().TotalWeight(), total_weight,
212              total_weight);
213
214  // Verify expected quantiles.
215  int i = 0;
216  auto actuals =
217      reducer_stream.GenerateQuantiles(expected_quantiles.size() - 1);
218  for (auto expected_quantile : expected_quantiles) {
219    EXPECT_NEAR(actuals[i], expected_quantile, quantiles_matcher_epsilon);
220    ++i;
221  }
222}
223
224TEST(WeightedQuantilesStreamTest, FixedUniformDistributed) {
225  const int32 num_workers = 10;
226  const double eps = 0.01;
227  const int64 max_elements = num_workers * (1 << 16);
228  TestDistributedStreams(
229      num_workers, eps, max_elements, GenerateFixedUniformSummary,
230      {0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}, 1e-2);
231}
232
233TEST(WeightedQuantilesStreamTest, FixedNonUniformDistributed) {
234  const int32 num_workers = 10;
235  const double eps = 0.01;
236  const int64 max_elements = num_workers * (1 << 16);
237  TestDistributedStreams(num_workers, eps, max_elements,
238                         GenerateFixedNonUniformSummary,
239                         {0, std::sqrt(0.1), std::sqrt(0.2), std::sqrt(0.3),
240                          std::sqrt(0.4), std::sqrt(0.5), std::sqrt(0.6),
241                          std::sqrt(0.7), std::sqrt(0.8), std::sqrt(0.9), 1.0},
242                         1e-2);
243}
244
245TEST(WeightedQuantilesStreamTest, RandUniformFixedWeightsDistributed) {
246  const int32 num_workers = 10;
247  const double eps = 0.01;
248  const int64 max_elements = num_workers * (1 << 16);
249  TestDistributedStreams(
250      num_workers, eps, max_elements, GenerateRandUniformFixedWeightsSummary,
251      {0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}, 1e-2);
252}
253
254TEST(WeightedQuantilesStreamTest, RandUniformRandWeightsDistributed) {
255  const int32 num_workers = 10;
256  const double eps = 0.01;
257  const int64 max_elements = num_workers * (1 << 16);
258  TestDistributedStreams(
259      num_workers, eps, max_elements, GenerateRandUniformRandWeightsSummary,
260      {0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}, 1e-2);
261}
262
263}  // namespace
264}  // namespace tensorflow
265