1# Copyright 2015 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import io 6import json 7import logging 8import time 9import uuid 10 11from google.appengine.api import app_identity 12 13from apiclient import http 14from apiclient.discovery import build 15from oauth2client import client 16 17from base import exceptions 18 19 20# urlfetch max size is 10 MB. Assume 1000 bytes per row and split the 21# insert into chunks of 10,000 rows. 22INSERTION_MAX_ROWS = 10000 23 24 25class BigQuery(object): 26 """Methods for interfacing with BigQuery.""" 27 28 def __init__(self, project_id=None): 29 self._service = _Service() 30 if project_id: 31 self._project_id = project_id 32 else: 33 self._project_id = app_identity.get_application_id() 34 35 def InsertRowsAsync(self, dataset_id, table_id, rows, 36 truncate=False, num_retries=5): 37 responses = [] 38 for i in xrange(0, len(rows), INSERTION_MAX_ROWS): 39 rows_chunk = rows[i:i+INSERTION_MAX_ROWS] 40 logging.info('Inserting %d rows into %s.%s.', 41 len(rows_chunk), dataset_id, table_id) 42 body = { 43 'configuration': { 44 'jobReference': { 45 'projectId': self._project_id, 46 'jobId': str(uuid.uuid4()), 47 }, 48 'load': { 49 'destinationTable': { 50 'projectId': self._project_id, 51 'datasetId': dataset_id, 52 'tableId': table_id, 53 }, 54 'sourceFormat': 'NEWLINE_DELIMITED_JSON', 55 'writeDisposition': 56 'WRITE_TRUNCATE' if truncate else 'WRITE_APPEND', 57 } 58 } 59 } 60 61 # Format rows as newline-delimited JSON. 62 media_buffer = io.BytesIO() 63 for row in rows_chunk: 64 json.dump(row, media_buffer, separators=(',', ':')) 65 print >> media_buffer 66 media_body = http.MediaIoBaseUpload( 67 media_buffer, mimetype='application/octet-stream') 68 69 responses.append(self._service.jobs().insert( 70 projectId=self._project_id, 71 body=body, media_body=media_body).execute(num_retries=num_retries)) 72 73 # Only truncate on the first insert! 74 truncate = False 75 76 # TODO(dtu): Return a Job object. 77 return responses 78 79 def InsertRowsSync(self, dataset_id, table_id, rows, num_retries=5): 80 for i in xrange(0, len(rows), INSERTION_MAX_ROWS): 81 rows_chunk = rows[i:i+INSERTION_MAX_ROWS] 82 logging.info('Inserting %d rows into %s.%s.', 83 len(rows_chunk), dataset_id, table_id) 84 rows_chunk = [{'insertId': str(uuid.uuid4()), 'json': row} 85 for row in rows_chunk] 86 insert_data = {'rows': rows_chunk} 87 response = self._service.tabledata().insertAll( 88 projectId=self._project_id, 89 datasetId=dataset_id, 90 tableId=table_id, 91 body=insert_data).execute(num_retries=num_retries) 92 93 if 'insertErrors' in response: 94 raise exceptions.QueryError(response['insertErrors']) 95 96 def QueryAsync(self, query, num_retries=5): 97 logging.debug(query) 98 body = { 99 'jobReference': { 100 'projectId': self._project_id, 101 'jobId': str(uuid.uuid4()), 102 }, 103 'configuration': { 104 'query': { 105 'query': query, 106 'priority': 'INTERACTIVE', 107 } 108 } 109 } 110 return self._service.jobs().insert( 111 projectId=self._project_id, 112 body=body).execute(num_retries=num_retries) 113 114 def QuerySync(self, query, timeout=60, num_retries=5): 115 """Query Bigtable and return the results as a dict. 116 117 Args: 118 query: Query string. 119 timeout: Timeout in seconds. 120 num_retries: Number of attempts. 121 122 Returns: 123 Query results. The format is specified in the "rows" field here: 124 https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/python/latest/bigquery_v2.jobs.html#getQueryResults 125 """ 126 logging.debug(query) 127 query_data = { 128 'query': query, 129 'timeoutMs': timeout * 1000, 130 } 131 start_time = time.time() 132 response = self._service.jobs().query( 133 projectId=self._project_id, 134 body=query_data).execute(num_retries=num_retries) 135 136 if 'errors' in response: 137 raise exceptions.QueryError(response['errors']) 138 139 # TODO(dtu): Fetch subsequent pages of rows for big queries. 140 # TODO(dtu): Reformat results as dicts. 141 result = response.get('rows', []) 142 logging.debug('Query fetched %d rows in %fs.', 143 len(result), time.time() - start_time) 144 return result 145 146 def IsJobDone(self, job): 147 response = self._service.jobs().get(**job['jobReference']).execute() 148 if response['status']['state'] == 'DONE': 149 return response 150 else: 151 return None 152 153 def PollJob(self, job, timeout): 154 # TODO(dtu): Take multiple jobs as parameters. 155 start_time = time.time() 156 iteration = 0 157 158 while True: 159 elapsed_time = time.time() - start_time 160 161 response = self.IsJobDone(job) 162 if response: 163 if 'errors' in response['status']: 164 raise exceptions.QueryError(response['status']['errors']) 165 logging.debug('Polled job for %d seconds.', int(elapsed_time)) 166 return response 167 168 if elapsed_time >= timeout: 169 break 170 time.sleep(min(1.5 ** iteration, timeout - elapsed_time)) 171 iteration += 1 172 173 raise exceptions.TimeoutError() 174 175 176def _Service(): 177 """Returns an initialized and authorized BigQuery client.""" 178 # pylint: disable=no-member 179 credentials = client.GoogleCredentials.get_application_default() 180 if credentials.create_scoped_required(): 181 credentials = credentials.create_scoped( 182 'https://www.googleapis.com/auth/bigquery') 183 return build('bigquery', 'v2', credentials=credentials) 184