1/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7    http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#include "tensorflow/contrib/cloud/kernels/bigquery_table_accessor.h"
17#include "tensorflow/contrib/cloud/kernels/bigquery_table_accessor_test_data.h"
18#include "tensorflow/core/example/feature.pb.h"
19#include "tensorflow/core/lib/core/status_test_util.h"
20#include "tensorflow/core/lib/gtl/stl_util.h"
21#include "tensorflow/core/platform/cloud/http_request_fake.h"
22#include "tensorflow/core/platform/test.h"
23
24namespace tensorflow {
25namespace {
26
27constexpr char kTestProject[] = "test-project";
28constexpr char kTestDataset[] = "test-dataset";
29constexpr char kTestTable[] = "test-table";
30
31bool HasSubstr(const string& base, const string& substr) {
32  bool ok = StringPiece(base).contains(substr);
33  EXPECT_TRUE(ok) << base << ", expected substring " << substr;
34  return ok;
35}
36
37class FakeAuthProvider : public AuthProvider {
38 public:
39  Status GetToken(string* token) override {
40    *token = "fake_token";
41    return Status::OK();
42  }
43};
44
45string DeterministicSerialization(const tensorflow::Example& example) {
46  const std::size_t size = example.ByteSizeLong();
47  string result(size, '\0');
48  ::tensorflow::protobuf::io::ArrayOutputStream array_stream(
49      gtl::string_as_array(&result), size);
50  ::tensorflow::protobuf::io::CodedOutputStream output_stream(&array_stream);
51
52  output_stream.SetSerializationDeterministic(true);
53  example.SerializeWithCachedSizes(&output_stream);
54  EXPECT_FALSE(output_stream.HadError());
55  EXPECT_EQ(size, output_stream.ByteCount());
56  return result;
57}
58
59}  // namespace
60
61class BigQueryTableAccessorTest : public ::testing::Test {
62 protected:
63  BigQueryTableAccessor::SchemaNode GetSchema() {
64    return accessor_->schema_root_;
65  }
66
67  Status CreateTableAccessor(const string& project_id, const string& dataset_id,
68                             const string& table_id, int64 timestamp_millis,
69                             int64 row_buffer_size,
70                             const std::vector<string>& columns,
71                             const BigQueryTablePartition& partition) {
72    return BigQueryTableAccessor::New(
73        project_id, dataset_id, table_id, timestamp_millis, row_buffer_size, "",
74        columns, partition, std::unique_ptr<AuthProvider>(new FakeAuthProvider),
75        std::unique_ptr<HttpRequest::Factory>(
76            new FakeHttpRequestFactory(&requests_)),
77        &accessor_);
78  }
79
80  std::vector<HttpRequest*> requests_;
81  std::unique_ptr<BigQueryTableAccessor> accessor_;
82};
83
84TEST_F(BigQueryTableAccessorTest, NegativeTimestamp) {
85  const auto status =
86      CreateTableAccessor(kTestProject, kTestDataset, kTestTable, -1, 3, {},
87                          BigQueryTablePartition());
88  EXPECT_TRUE(errors::IsInvalidArgument(status));
89}
90
91TEST_F(BigQueryTableAccessorTest, ZeroTimestamp) {
92  const auto status =
93      CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 0, 3, {},
94                          BigQueryTablePartition());
95  EXPECT_TRUE(errors::IsInvalidArgument(status));
96}
97
98TEST_F(BigQueryTableAccessorTest, RepeatedFieldNoAllowedTest) {
99  requests_.emplace_back(new FakeHttpRequest(
100      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
101      "datasets/test-dataset/tables/test-table/\n"
102      "Auth Token: fake_token\n",
103      R"({
104        "kind": "bigquery#table",
105        "etag": "\"4zcX32ezvFoFzxHoG04qJqKZk6c/MTQ1Nzk3NTgwNzE4Mw\"",
106        "id": "test-project:test-dataset.test-table",
107        "schema": {
108          "fields": [
109          {
110            "name": "int_field",
111            "type": "INTEGER",
112            "mode": "REPEATED"
113          }]
114        },
115        "numRows": "10"
116      })"));
117  const auto status =
118      CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 3, {},
119                          BigQueryTablePartition());
120  EXPECT_TRUE(errors::IsUnimplemented(status));
121  EXPECT_TRUE(HasSubstr(status.error_message(),
122                        "Tables with repeated columns are not supported"));
123}
124
125TEST_F(BigQueryTableAccessorTest, ValidSchemaTest) {
126  requests_.emplace_back(new FakeHttpRequest(
127      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
128      "datasets/test-dataset/tables/test-table/\n"
129      "Auth Token: fake_token\n",
130      kSampleSchema));
131  TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 3,
132                                   {}, BigQueryTablePartition()));
133  // Validate total number of rows.
134  EXPECT_EQ(4, accessor_->total_num_rows());
135
136  // Validate the schema.
137  const auto schema_root = GetSchema();
138  EXPECT_EQ(schema_root.name, "");
139  EXPECT_EQ(schema_root.type, BigQueryTableAccessor::ColumnType::kNone);
140  EXPECT_EQ(9, schema_root.schema_nodes.size());
141
142  EXPECT_EQ(schema_root.schema_nodes[0].name, "int_field");
143  EXPECT_EQ(schema_root.schema_nodes[0].type,
144            BigQueryTableAccessor::ColumnType::kInteger);
145
146  EXPECT_EQ(schema_root.schema_nodes[1].name, "str_field");
147  EXPECT_EQ(schema_root.schema_nodes[1].type,
148            BigQueryTableAccessor::ColumnType::kString);
149
150  EXPECT_EQ(1, schema_root.schema_nodes[2].schema_nodes.size());
151  EXPECT_EQ(schema_root.schema_nodes[2].name, "rec_field");
152  EXPECT_EQ(schema_root.schema_nodes[2].type,
153            BigQueryTableAccessor::ColumnType::kRecord);
154
155  EXPECT_EQ(schema_root.schema_nodes[2].schema_nodes[0].name,
156            "rec_field.float_field");
157  EXPECT_EQ(schema_root.schema_nodes[2].schema_nodes[0].type,
158            BigQueryTableAccessor::ColumnType::kFloat);
159
160  EXPECT_EQ(schema_root.schema_nodes[3].name, "bool_field");
161  EXPECT_EQ(schema_root.schema_nodes[3].type,
162            BigQueryTableAccessor::ColumnType::kBoolean);
163
164  EXPECT_EQ(schema_root.schema_nodes[4].name, "bytes_field");
165  EXPECT_EQ(schema_root.schema_nodes[4].type,
166            BigQueryTableAccessor::ColumnType::kBytes);
167
168  EXPECT_EQ(schema_root.schema_nodes[5].name, "timestamp_field");
169  EXPECT_EQ(schema_root.schema_nodes[5].type,
170            BigQueryTableAccessor::ColumnType::kTimestamp);
171
172  EXPECT_EQ(schema_root.schema_nodes[6].name, "date_field");
173  EXPECT_EQ(schema_root.schema_nodes[6].type,
174            BigQueryTableAccessor::ColumnType::kDate);
175
176  EXPECT_EQ(schema_root.schema_nodes[7].name, "time_field");
177  EXPECT_EQ(schema_root.schema_nodes[7].type,
178            BigQueryTableAccessor::ColumnType::kTime);
179
180  EXPECT_EQ(schema_root.schema_nodes[8].name, "datetime_field");
181  EXPECT_EQ(schema_root.schema_nodes[8].type,
182            BigQueryTableAccessor::ColumnType::kDatetime);
183}
184
185TEST_F(BigQueryTableAccessorTest, ReadOneRowTest) {
186  requests_.emplace_back(new FakeHttpRequest(
187      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
188      "datasets/test-dataset/tables/test-table/\n"
189      "Auth Token: fake_token\n",
190      kSampleSchema));
191  requests_.emplace_back(new FakeHttpRequest(
192      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
193      "datasets/test-dataset/tables/test-table/data?maxResults=1&startIndex=2\n"
194      "Auth Token: fake_token\n",
195      kTestRow));
196  BigQueryTablePartition partition;
197  partition.set_start_index(2);
198  partition.set_end_index(2);
199  TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 1,
200                                   {}, partition));
201  int64 row_id;
202  Example example;
203  TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example));
204
205  // Validate returned result.
206  Example expected_example;
207  ASSERT_TRUE(protobuf::TextFormat::ParseFromString(kTestExampleProto,
208                                                    &expected_example));
209  EXPECT_EQ(DeterministicSerialization(expected_example),
210            DeterministicSerialization(example));
211  EXPECT_EQ(row_id, 2);
212  EXPECT_TRUE(accessor_->Done());
213}
214
215TEST_F(BigQueryTableAccessorTest, ReadOneRowPartialTest) {
216  requests_.emplace_back(new FakeHttpRequest(
217      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
218      "datasets/test-dataset/tables/test-table/\n"
219      "Auth Token: fake_token\n",
220      kSampleSchema));
221  requests_.emplace_back(new FakeHttpRequest(
222      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
223      "datasets/test-dataset/tables/test-table/data?maxResults=1&startIndex=2\n"
224      "Auth Token: fake_token\n",
225      kTestRow));
226  BigQueryTablePartition partition;
227  partition.set_start_index(2);
228  partition.set_end_index(2);
229  TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 1,
230                                   {"bool_field", "rec_field.float_field"},
231                                   partition));
232  int64 row_id;
233  Example example;
234  TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example));
235
236  // Validate returned result.
237  EXPECT_EQ(row_id, 2);
238  EXPECT_TRUE(accessor_->Done());
239  Example expected_example;
240  ASSERT_TRUE(protobuf::TextFormat::ParseFromString(kTestPartialExampleProto,
241                                                    &expected_example));
242  EXPECT_EQ(DeterministicSerialization(expected_example),
243            DeterministicSerialization(example));
244}
245
246TEST_F(BigQueryTableAccessorTest, ReadOneRowWithNullsTest) {
247  requests_.emplace_back(new FakeHttpRequest(
248      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
249      "datasets/test-dataset/tables/test-table/\n"
250      "Auth Token: fake_token\n",
251      kSampleSchema));
252  requests_.emplace_back(new FakeHttpRequest(
253      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
254      "datasets/test-dataset/tables/test-table/data?maxResults=1&startIndex=2\n"
255      "Auth Token: fake_token\n",
256      kTestRowWithNulls));
257  BigQueryTablePartition partition;
258  partition.set_start_index(2);
259  partition.set_end_index(2);
260  TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 1,
261                                   {}, partition));
262  int64 row_id;
263  Example example;
264  TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example));
265
266  // Validate returned result.
267  Example expected_example;
268  ASSERT_TRUE(protobuf::TextFormat::ParseFromString(kTestExampleProtoWithNulls,
269                                                    &expected_example));
270  EXPECT_EQ(DeterministicSerialization(expected_example),
271            DeterministicSerialization(example));
272  EXPECT_EQ(row_id, 2);
273  EXPECT_TRUE(accessor_->Done());
274}
275
276TEST_F(BigQueryTableAccessorTest, ReadOneRowTwoRecords) {
277  requests_.emplace_back(new FakeHttpRequest(
278      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
279      "datasets/test-dataset/tables/test-table/\n"
280      "Auth Token: fake_token\n",
281      kSampleSchemaTwoRecords));
282  requests_.emplace_back(new FakeHttpRequest(
283      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
284      "datasets/test-dataset/tables/test-table/data?maxResults=1&startIndex=2\n"
285      "Auth Token: fake_token\n",
286      kTestRowWithTwoRecords));
287  BigQueryTablePartition partition;
288  partition.set_start_index(2);
289  partition.set_end_index(2);
290  TF_EXPECT_OK(CreateTableAccessor(
291      kTestProject, kTestDataset, kTestTable, 1, 1,
292      {"rec_field2.bool_field", "rec_field1.float_field"}, partition));
293
294  int64 row_id;
295  Example example;
296  TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example));
297
298  // Validate returned result.
299  Example expected_example;
300  ASSERT_TRUE(protobuf::TextFormat::ParseFromString(
301      kTestExampleProtoWithTwoRecords, &expected_example));
302  EXPECT_EQ(DeterministicSerialization(expected_example),
303            DeterministicSerialization(example));
304  EXPECT_EQ(row_id, 2);
305  EXPECT_TRUE(accessor_->Done());
306}
307
308TEST_F(BigQueryTableAccessorTest, NonExistentColumns) {
309  requests_.emplace_back(new FakeHttpRequest(
310      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
311      "datasets/test-dataset/tables/test-table/\n"
312      "Auth Token: fake_token\n",
313      kSampleSchemaTwoRecords));
314  requests_.emplace_back(new FakeHttpRequest(
315      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
316      "datasets/test-dataset/tables/test-table/data?maxResults=1&startIndex=2\n"
317      "Auth Token: fake_token\n",
318      kTestRowWithTwoRecords));
319  BigQueryTablePartition partition;
320  partition.set_start_index(2);
321  partition.set_end_index(2);
322  TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 1,
323                                   {"bool_field", "float_field"}, partition));
324  int64 row_id;
325  Example example;
326  TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example));
327
328  // Validate returned result.
329  EXPECT_EQ(row_id, 2);
330  EXPECT_TRUE(accessor_->Done());
331}
332
333TEST_F(BigQueryTableAccessorTest, EmptyRow) {
334  requests_.emplace_back(new FakeHttpRequest(
335      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
336      "datasets/test-dataset/tables/test-table/\n"
337      "Auth Token: fake_token\n",
338      kSampleSchemaTwoRecords));
339  requests_.emplace_back(new FakeHttpRequest(
340      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
341      "datasets/test-dataset/tables/test-table/data?maxResults=1&startIndex=2\n"
342      "Auth Token: fake_token\n",
343      kTestEmptyRow));
344  BigQueryTablePartition partition;
345  partition.set_start_index(2);
346  partition.set_end_index(2);
347  TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 1,
348                                   {}, partition));
349  int64 row_id;
350  Example example;
351  TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example));
352
353  // Validate returned result.
354  EXPECT_EQ(row_id, 2);
355  EXPECT_TRUE(accessor_->Done());
356}
357
358TEST_F(BigQueryTableAccessorTest, BrokenRowTest) {
359  requests_.emplace_back(new FakeHttpRequest(
360      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
361      "datasets/test-dataset/tables/test-table/\n"
362      "Auth Token: fake_token\n",
363      kSampleSchema));
364  requests_.emplace_back(new FakeHttpRequest(
365      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
366      "datasets/test-dataset/tables/test-table/data?maxResults=1&startIndex=2\n"
367      "Auth Token: fake_token\n",
368      kBrokenTestRow));
369  BigQueryTablePartition partition;
370  partition.set_start_index(2);
371  partition.set_end_index(2);
372  TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 1,
373                                   {}, partition));
374  int64 row_id;
375  Example example;
376  const auto status = accessor_->ReadRow(&row_id, &example);
377  EXPECT_TRUE(errors::IsInternal(status));
378  EXPECT_TRUE(
379      HasSubstr(status.error_message(), "Cannot convert value to integer"));
380}
381
382TEST_F(BigQueryTableAccessorTest, MultiplePagesTest) {
383  requests_.emplace_back(new FakeHttpRequest(
384      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
385      "datasets/test-dataset/tables/test-table/\n"
386      "Auth Token: fake_token\n",
387      kSampleSchema));
388  requests_.emplace_back(new FakeHttpRequest(
389      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
390      "datasets/test-dataset/tables/test-table/data?maxResults=2&startIndex=1\n"
391      "Auth Token: fake_token\n",
392      kTestTwoRows));
393  requests_.emplace_back(new FakeHttpRequest(
394      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
395      "datasets/test-dataset/tables/test-table/"
396      "data?maxResults=2&pageToken=next_page\n"
397      "Auth Token: fake_token\n",
398      kTestRowWithNulls));
399
400  BigQueryTablePartition partition;
401  partition.set_start_index(1);
402  partition.set_end_index(-1);
403  TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 2,
404                                   {}, partition));
405
406  int64 row_id;
407  Example example;
408  TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example));
409  EXPECT_EQ(1, row_id);
410  EXPECT_FALSE(accessor_->Done());
411  EXPECT_EQ(
412      (example.features().feature()).at("int_field").int64_list().value(0),
413      1111);
414
415  TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example));
416  EXPECT_EQ(2, row_id);
417  EXPECT_FALSE(accessor_->Done());
418  EXPECT_EQ(example.features().feature().at("int_field").int64_list().value(0),
419            2222);
420
421  TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example));
422  EXPECT_EQ(3, row_id);
423  EXPECT_TRUE(accessor_->Done());
424
425  Example expected_example;
426  ASSERT_TRUE(protobuf::TextFormat::ParseFromString(kTestExampleProtoWithNulls,
427                                                    &expected_example));
428  EXPECT_EQ(DeterministicSerialization(expected_example),
429            DeterministicSerialization(example));
430  EXPECT_TRUE(errors::IsOutOfRange(accessor_->ReadRow(&row_id, &example)));
431}
432
433TEST_F(BigQueryTableAccessorTest, SwitchingPartitionsTest) {
434  requests_.emplace_back(new FakeHttpRequest(
435      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
436      "datasets/test-dataset/tables/test-table/\n"
437      "Auth Token: fake_token\n",
438      kSampleSchema));
439  requests_.emplace_back(new FakeHttpRequest(
440      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
441      "datasets/test-dataset/tables/test-table/data?maxResults=1&startIndex=0\n"
442      "Auth Token: fake_token\n",
443      kTestTwoRows));
444  requests_.emplace_back(new FakeHttpRequest(
445      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
446      "datasets/test-dataset/tables/test-table/"
447      "data?maxResults=2&startIndex=3\n"
448      "Auth Token: fake_token\n",
449      kTestRowWithNulls));
450  requests_.emplace_back(new FakeHttpRequest(
451      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
452      "datasets/test-dataset/tables/test-table/data?maxResults=2&startIndex=0\n"
453      "Auth Token: fake_token\n",
454      kTestTwoRows));
455
456  BigQueryTablePartition partition;
457  partition.set_start_index(0);
458  partition.set_end_index(0);
459  TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 2,
460                                   {}, partition));
461
462  int64 row_id;
463  Example example;
464  TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example));
465  EXPECT_EQ(0, row_id);
466  EXPECT_TRUE(accessor_->Done());
467  EXPECT_EQ(example.features().feature().at("int_field").int64_list().value(0),
468            1111);
469
470  partition.set_start_index(3);
471  partition.set_end_index(-1);
472  TF_EXPECT_OK(accessor_->SetPartition(partition));
473  TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example));
474  EXPECT_EQ(3, row_id);
475  EXPECT_TRUE(accessor_->Done());
476  EXPECT_EQ(example.features().feature().at("int_field").int64_list().value(0),
477            1234);
478
479  partition.set_start_index(0);
480  partition.set_end_index(1);
481  TF_EXPECT_OK(accessor_->SetPartition(partition));
482  TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example));
483  EXPECT_EQ(0, row_id);
484  EXPECT_FALSE(accessor_->Done());
485  EXPECT_EQ(example.features().feature().at("int_field").int64_list().value(0),
486            1111);
487  TF_EXPECT_OK(accessor_->ReadRow(&row_id, &example));
488  EXPECT_EQ(1, row_id);
489  EXPECT_TRUE(accessor_->Done());
490  EXPECT_EQ(example.features().feature().at("int_field").int64_list().value(0),
491            2222);
492}
493
494TEST_F(BigQueryTableAccessorTest, EmptyPartitionTest) {
495  requests_.emplace_back(new FakeHttpRequest(
496      "Uri: https://www.googleapis.com/bigquery/v2/projects/test-project/"
497      "datasets/test-dataset/tables/test-table/\n"
498      "Auth Token: fake_token\n",
499      kSampleSchema));
500
501  BigQueryTablePartition partition;
502  partition.set_start_index(3);
503  partition.set_end_index(2);
504  TF_EXPECT_OK(CreateTableAccessor(kTestProject, kTestDataset, kTestTable, 1, 1,
505                                   {}, partition));
506  EXPECT_TRUE(accessor_->Done());
507
508  int64 row_id;
509  Example example;
510  EXPECT_TRUE(errors::IsOutOfRange(accessor_->ReadRow(&row_id, &example)));
511}
512
513}  // namespace tensorflow
514