1# Copyright 2015 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import io
6import json
7import logging
8import time
9import uuid
10
11from google.appengine.api import app_identity
12
13from apiclient import http
14from apiclient.discovery import build
15from oauth2client import client
16
17from base import exceptions
18
19
20# urlfetch max size is 10 MB. Assume 1000 bytes per row and split the
21# insert into chunks of 10,000 rows.
22INSERTION_MAX_ROWS = 10000
23
24
25class BigQuery(object):
26  """Methods for interfacing with BigQuery."""
27
28  def __init__(self, project_id=None):
29    self._service = _Service()
30    if project_id:
31      self._project_id = project_id
32    else:
33      self._project_id = app_identity.get_application_id()
34
35  def InsertRowsAsync(self, dataset_id, table_id, rows,
36                      truncate=False, num_retries=5):
37    responses = []
38    for i in xrange(0, len(rows), INSERTION_MAX_ROWS):
39      rows_chunk = rows[i:i+INSERTION_MAX_ROWS]
40      logging.info('Inserting %d rows into %s.%s.',
41                   len(rows_chunk), dataset_id, table_id)
42      body = {
43          'configuration': {
44              'jobReference': {
45                  'projectId': self._project_id,
46                  'jobId': str(uuid.uuid4()),
47              },
48              'load': {
49                  'destinationTable': {
50                      'projectId': self._project_id,
51                      'datasetId': dataset_id,
52                      'tableId': table_id,
53                  },
54                  'sourceFormat': 'NEWLINE_DELIMITED_JSON',
55                  'writeDisposition':
56                      'WRITE_TRUNCATE' if truncate else 'WRITE_APPEND',
57              }
58          }
59      }
60
61      # Format rows as newline-delimited JSON.
62      media_buffer = io.BytesIO()
63      for row in rows_chunk:
64        json.dump(row, media_buffer, separators=(',', ':'))
65        print >> media_buffer
66      media_body = http.MediaIoBaseUpload(
67          media_buffer, mimetype='application/octet-stream')
68
69      responses.append(self._service.jobs().insert(
70          projectId=self._project_id,
71          body=body, media_body=media_body).execute(num_retries=num_retries))
72
73      # Only truncate on the first insert!
74      truncate = False
75
76    # TODO(dtu): Return a Job object.
77    return responses
78
79  def InsertRowsSync(self, dataset_id, table_id, rows, num_retries=5):
80    for i in xrange(0, len(rows), INSERTION_MAX_ROWS):
81      rows_chunk = rows[i:i+INSERTION_MAX_ROWS]
82      logging.info('Inserting %d rows into %s.%s.',
83                   len(rows_chunk), dataset_id, table_id)
84      rows_chunk = [{'insertId': str(uuid.uuid4()), 'json': row}
85                    for row in rows_chunk]
86      insert_data = {'rows': rows_chunk}
87      response = self._service.tabledata().insertAll(
88          projectId=self._project_id,
89          datasetId=dataset_id,
90          tableId=table_id,
91          body=insert_data).execute(num_retries=num_retries)
92
93      if 'insertErrors' in response:
94        raise exceptions.QueryError(response['insertErrors'])
95
96  def QueryAsync(self, query, num_retries=5):
97    logging.debug(query)
98    body = {
99        'jobReference': {
100            'projectId': self._project_id,
101            'jobId': str(uuid.uuid4()),
102        },
103        'configuration': {
104            'query': {
105                'query': query,
106                'priority': 'INTERACTIVE',
107            }
108        }
109    }
110    return self._service.jobs().insert(
111        projectId=self._project_id,
112        body=body).execute(num_retries=num_retries)
113
114  def QuerySync(self, query, timeout=60, num_retries=5):
115    """Query Bigtable and return the results as a dict.
116
117    Args:
118      query: Query string.
119      timeout: Timeout in seconds.
120      num_retries: Number of attempts.
121
122    Returns:
123      Query results. The format is specified in the "rows" field here:
124      https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/python/latest/bigquery_v2.jobs.html#getQueryResults
125    """
126    logging.debug(query)
127    query_data = {
128        'query': query,
129        'timeoutMs': timeout * 1000,
130    }
131    start_time = time.time()
132    response = self._service.jobs().query(
133        projectId=self._project_id,
134        body=query_data).execute(num_retries=num_retries)
135
136    if 'errors' in response:
137      raise exceptions.QueryError(response['errors'])
138
139    # TODO(dtu): Fetch subsequent pages of rows for big queries.
140    # TODO(dtu): Reformat results as dicts.
141    result = response.get('rows', [])
142    logging.debug('Query fetched %d rows in %fs.',
143                  len(result), time.time() - start_time)
144    return result
145
146  def IsJobDone(self, job):
147    response = self._service.jobs().get(**job['jobReference']).execute()
148    if response['status']['state'] == 'DONE':
149      return response
150    else:
151      return None
152
153  def PollJob(self, job, timeout):
154    # TODO(dtu): Take multiple jobs as parameters.
155    start_time = time.time()
156    iteration = 0
157
158    while True:
159      elapsed_time = time.time() - start_time
160
161      response = self.IsJobDone(job)
162      if response:
163        if 'errors' in response['status']:
164          raise exceptions.QueryError(response['status']['errors'])
165        logging.debug('Polled job for %d seconds.', int(elapsed_time))
166        return response
167
168      if elapsed_time >= timeout:
169        break
170      time.sleep(min(1.5 ** iteration, timeout - elapsed_time))
171      iteration += 1
172
173    raise exceptions.TimeoutError()
174
175
176def _Service():
177  """Returns an initialized and authorized BigQuery client."""
178  # pylint: disable=no-member
179  credentials = client.GoogleCredentials.get_application_default()
180  if credentials.create_scoped_required():
181    credentials = credentials.create_scoped(
182        'https://www.googleapis.com/auth/bigquery')
183  return build('bigquery', 'v2', credentials=credentials)
184