1// Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14// ============================================================================= 15#include "tensorflow/contrib/boosted_trees/lib/quantiles/weighted_quantiles_stream.h" 16 17#include "tensorflow/core/lib/random/philox_random.h" 18#include "tensorflow/core/lib/random/simple_philox.h" 19#include "tensorflow/core/platform/test.h" 20#include "tensorflow/core/platform/test_benchmark.h" 21 22namespace tensorflow { 23namespace { 24using Tuple = std::tuple<int64, int64>; 25 26using Summary = 27 boosted_trees::quantiles::WeightedQuantilesSummary<double, double>; 28using SummaryEntry = 29 boosted_trees::quantiles::WeightedQuantilesSummary<double, 30 double>::SummaryEntry; 31using Stream = 32 boosted_trees::quantiles::WeightedQuantilesStream<double, double>; 33 34TEST(GetQuantileSpecs, InvalidEps) { 35 EXPECT_DEATH({ Stream::GetQuantileSpecs(-0.01, 0L); }, "eps >= 0"); 36 EXPECT_DEATH({ Stream::GetQuantileSpecs(1.01, 0L); }, "eps < 1"); 37} 38 39TEST(GetQuantileSpecs, ZeroEps) { 40 EXPECT_DEATH({ Stream::GetQuantileSpecs(0.0, 0L); }, "max_elements > 0"); 41 EXPECT_EQ(Stream::GetQuantileSpecs(0.0, 1LL), Tuple(1LL, 2LL)); 42 EXPECT_EQ(Stream::GetQuantileSpecs(0.0, 20LL), Tuple(1LL, 20LL)); 43} 44 45TEST(GetQuantileSpecs, NonZeroEps) { 46 EXPECT_DEATH({ Stream::GetQuantileSpecs(0.01, 0L); }, "max_elements > 0"); 47 EXPECT_EQ(Stream::GetQuantileSpecs(0.1, 320LL), Tuple(4LL, 31LL)); 48 EXPECT_EQ(Stream::GetQuantileSpecs(0.01, 25600LL), Tuple(6LL, 501LL)); 49 EXPECT_EQ(Stream::GetQuantileSpecs(0.01, 104857600LL), Tuple(17LL, 1601LL)); 50 EXPECT_EQ(Stream::GetQuantileSpecs(0.1, 104857600LL), Tuple(20LL, 191LL)); 51 EXPECT_EQ(Stream::GetQuantileSpecs(0.01, 1LL << 40), Tuple(29LL, 2801LL)); 52 EXPECT_EQ(Stream::GetQuantileSpecs(0.001, 1LL << 40), Tuple(26LL, 25001LL)); 53} 54 55class WeightedQuantilesStreamTest : public ::testing::Test {}; 56 57// Stream generators. 58void GenerateFixedUniformSummary(int32 worker_id, int64 max_elements, 59 double *total_weight, Stream *stream) { 60 for (int64 i = 0; i < max_elements; ++i) { 61 const double x = static_cast<double>(i) / max_elements; 62 stream->PushEntry(x, 1.0); 63 ++(*total_weight); 64 } 65 stream->Finalize(); 66} 67 68void GenerateFixedNonUniformSummary(int32 worker_id, int64 max_elements, 69 double *total_weight, Stream *stream) { 70 for (int64 i = 0; i < max_elements; ++i) { 71 const double x = static_cast<double>(i) / max_elements; 72 stream->PushEntry(x, x); 73 (*total_weight) += x; 74 } 75 stream->Finalize(); 76} 77 78void GenerateRandUniformFixedWeightsSummary(int32 worker_id, int64 max_elements, 79 double *total_weight, 80 Stream *stream) { 81 // Simulate uniform distribution stream. 82 random::PhiloxRandom philox(13 + worker_id); 83 random::SimplePhilox rand(&philox); 84 for (int64 i = 0; i < max_elements; ++i) { 85 const double x = rand.RandDouble(); 86 stream->PushEntry(x, 1); 87 ++(*total_weight); 88 } 89 stream->Finalize(); 90} 91 92void GenerateRandUniformRandWeightsSummary(int32 worker_id, int64 max_elements, 93 double *total_weight, 94 Stream *stream) { 95 // Simulate uniform distribution stream. 96 random::PhiloxRandom philox(13 + worker_id); 97 random::SimplePhilox rand(&philox); 98 for (int64 i = 0; i < max_elements; ++i) { 99 const double x = rand.RandDouble(); 100 const double w = rand.RandDouble(); 101 stream->PushEntry(x, w); 102 (*total_weight) += w; 103 } 104 stream->Finalize(); 105} 106 107// Single worker tests. 108void TestSingleWorkerStreams( 109 double eps, int64 max_elements, 110 const std::function<void(int32, int64, double *, Stream *)> 111 &worker_summary_generator, 112 std::initializer_list<double> expected_quantiles, 113 double quantiles_matcher_epsilon) { 114 // Generate single stream. 115 double total_weight = 0; 116 Stream stream(eps, max_elements); 117 worker_summary_generator(0, max_elements, &total_weight, &stream); 118 119 // Ensure we didn't lose track of any elements and are 120 // within approximation error bound. 121 EXPECT_LE(stream.ApproximationError(), eps); 122 EXPECT_NEAR(stream.GetFinalSummary().TotalWeight(), total_weight, 1e-6); 123 124 // Verify expected quantiles. 125 int i = 0; 126 auto actuals = stream.GenerateQuantiles(expected_quantiles.size() - 1); 127 for (auto expected_quantile : expected_quantiles) { 128 EXPECT_NEAR(actuals[i], expected_quantile, quantiles_matcher_epsilon); 129 ++i; 130 } 131} 132 133// Stream generators. 134void GenerateOneValue(int32 worker_id, int64 max_elements, double *total_weight, 135 Stream *stream) { 136 stream->PushEntry(10, 1); 137 ++(*total_weight); 138 stream->Finalize(); 139} 140 141TEST(WeightedQuantilesStreamTest, OneValue) { 142 const double eps = 0.01; 143 const int64 max_elements = 1 << 16; 144 TestSingleWorkerStreams(eps, max_elements, GenerateOneValue, 145 {10.0, 10.0, 10.0, 10.0, 10.0}, 1e-2); 146} 147 148TEST(WeightedQuantilesStreamTest, FixedUniform) { 149 const double eps = 0.01; 150 const int64 max_elements = 1 << 16; 151 TestSingleWorkerStreams(eps, max_elements, GenerateFixedUniformSummary, 152 {0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}, 153 1e-2); 154} 155 156TEST(WeightedQuantilesStreamTest, FixedNonUniform) { 157 const double eps = 0.01; 158 const int64 max_elements = 1 << 16; 159 TestSingleWorkerStreams(eps, max_elements, GenerateFixedNonUniformSummary, 160 {0, std::sqrt(0.1), std::sqrt(0.2), std::sqrt(0.3), 161 std::sqrt(0.4), std::sqrt(0.5), std::sqrt(0.6), 162 std::sqrt(0.7), std::sqrt(0.8), std::sqrt(0.9), 1.0}, 163 1e-2); 164} 165 166TEST(WeightedQuantilesStreamTest, RandUniformFixedWeights) { 167 const double eps = 0.01; 168 const int64 max_elements = 1 << 16; 169 TestSingleWorkerStreams( 170 eps, max_elements, GenerateRandUniformFixedWeightsSummary, 171 {0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}, 1e-2); 172} 173 174TEST(WeightedQuantilesStreamTest, RandUniformRandWeights) { 175 const double eps = 0.01; 176 const int64 max_elements = 1 << 16; 177 TestSingleWorkerStreams( 178 eps, max_elements, GenerateRandUniformRandWeightsSummary, 179 {0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}, 1e-2); 180} 181 182// Distributed tests. 183void TestDistributedStreams( 184 int32 num_workers, double eps, int64 max_elements, 185 const std::function<void(int32, int64, double *, Stream *)> 186 &worker_summary_generator, 187 std::initializer_list<double> expected_quantiles, 188 double quantiles_matcher_epsilon) { 189 // Simulate streams on each worker running independently 190 double total_weight = 0; 191 std::vector<std::vector<SummaryEntry>> worker_summaries; 192 for (int32 i = 0; i < num_workers; ++i) { 193 Stream stream(eps / 2, max_elements); 194 worker_summary_generator(i, max_elements / num_workers, &total_weight, 195 &stream); 196 worker_summaries.push_back(stream.GetFinalSummary().GetEntryList()); 197 } 198 199 // In the accumulation phase, we aggregate the summaries from each worker 200 // and build an overall summary while maintaining error bounds by ensuring we 201 // don't increase the error by more than eps / 2. 202 Stream reducer_stream(eps, max_elements); 203 for (const auto &summary : worker_summaries) { 204 reducer_stream.PushSummary(summary); 205 } 206 reducer_stream.Finalize(); 207 208 // Ensure we didn't lose track of any elements and are 209 // within approximation error bound. 210 EXPECT_LE(reducer_stream.ApproximationError(), eps); 211 EXPECT_NEAR(reducer_stream.GetFinalSummary().TotalWeight(), total_weight, 212 total_weight); 213 214 // Verify expected quantiles. 215 int i = 0; 216 auto actuals = 217 reducer_stream.GenerateQuantiles(expected_quantiles.size() - 1); 218 for (auto expected_quantile : expected_quantiles) { 219 EXPECT_NEAR(actuals[i], expected_quantile, quantiles_matcher_epsilon); 220 ++i; 221 } 222} 223 224TEST(WeightedQuantilesStreamTest, FixedUniformDistributed) { 225 const int32 num_workers = 10; 226 const double eps = 0.01; 227 const int64 max_elements = num_workers * (1 << 16); 228 TestDistributedStreams( 229 num_workers, eps, max_elements, GenerateFixedUniformSummary, 230 {0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}, 1e-2); 231} 232 233TEST(WeightedQuantilesStreamTest, FixedNonUniformDistributed) { 234 const int32 num_workers = 10; 235 const double eps = 0.01; 236 const int64 max_elements = num_workers * (1 << 16); 237 TestDistributedStreams(num_workers, eps, max_elements, 238 GenerateFixedNonUniformSummary, 239 {0, std::sqrt(0.1), std::sqrt(0.2), std::sqrt(0.3), 240 std::sqrt(0.4), std::sqrt(0.5), std::sqrt(0.6), 241 std::sqrt(0.7), std::sqrt(0.8), std::sqrt(0.9), 1.0}, 242 1e-2); 243} 244 245TEST(WeightedQuantilesStreamTest, RandUniformFixedWeightsDistributed) { 246 const int32 num_workers = 10; 247 const double eps = 0.01; 248 const int64 max_elements = num_workers * (1 << 16); 249 TestDistributedStreams( 250 num_workers, eps, max_elements, GenerateRandUniformFixedWeightsSummary, 251 {0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}, 1e-2); 252} 253 254TEST(WeightedQuantilesStreamTest, RandUniformRandWeightsDistributed) { 255 const int32 num_workers = 10; 256 const double eps = 0.01; 257 const int64 max_elements = num_workers * (1 << 16); 258 TestDistributedStreams( 259 num_workers, eps, max_elements, GenerateRandUniformRandWeightsSummary, 260 {0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}, 1e-2); 261} 262 263} // namespace 264} // namespace tensorflow 265