1d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang# Use of this source code is governed by a BSD-style license that can be 3d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang# found in the LICENSE file. 4d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 5d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang# This file defines helper functions for putting entries into elasticsearch. 6d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 7d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang"""Utils for sending metadata to elasticsearch 8d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 9d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangElasticsearch is a key-value store NOSQL database. 10d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangSource is here: https://github.com/elasticsearch/elasticsearch 11d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangWe will be using es to store our metadata. 12d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 13d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangFor example, if we wanted to store the following metadata: 14d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 15d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangmetadata = { 16d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 'host_id': 1 17d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 'job_id': 20 18d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 'time_start': 100000 19d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 'time_recorded': 100006 20d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang} 21d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 22d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangThe following call will send metadata to the default es server. 23d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang es_utils.ESMetadata().post(index, metadata) 24d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangWe can also specify which port and host to use. 25d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 26d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangUsing for testing: Sometimes, when we choose a single index 27d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangto put entries into, we want to clear that index of all 28d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangentries before running our tests. Use clear_index function. 29d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang(see es_utils_functionaltest.py for an example) 30d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 31d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangThis file also contains methods for sending queries to es. Currently, 32d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangthe query (json dict) we send to es is quite complicated (but flexible). 33d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangWe've included several methods that composes queries that would be useful. 34d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangThese methods are all named create_*_query() 35d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 36d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangFor example, the below query returns job_id, host_id, and job_start 37d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangfor all job_ids in [0, 99999] and host_id matching 10. 38d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 39d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangrange_eq_query = { 40d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 'fields': ['job_id', 'host_id', 'job_start'], 41d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 'query': { 42d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 'filtered': { 43d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 'query': { 44d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 'match': { 45d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 'host_id': 10, 46d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang } 47d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang } 48d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 'filter': { 49d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 'range': { 50d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 'job_id': { 51d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 'gte': 0, 52d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 'lte': 99999, 53d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang } 54d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang } 55d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang } 56d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang } 57d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang } 58d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang} 59d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 60d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangTo send a query once it is created, call execute_query() to send it to the 61d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangintended elasticsearch server. 62d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 63d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang""" 64d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 65b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Blackimport collections 66d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangimport json 67d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangimport logging 68d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangimport socket 69d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liangimport time 70d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 71d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangtry: 72d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang import elasticsearch 73cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi from elasticsearch import helpers as elasticsearch_helpers 74d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangexcept ImportError: 757bdd89d6927be7c0aa4096aaf6f0354a5de38a19Dan Shi logging.debug('Failed to import elasticsearch. Mock classes will be used ' 767bdd89d6927be7c0aa4096aaf6f0354a5de38a19Dan Shi 'and calls to Elasticsearch server will be no-op. Test run ' 777bdd89d6927be7c0aa4096aaf6f0354a5de38a19Dan Shi 'is not affected by the missing elasticsearch module.') 78d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang import elasticsearch_mock as elasticsearch 79cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi elasticsearch_helpers = elasticsearch.Elasticsearch() 80d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 81b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 827cde53d430e208139638ee65116c0cd1ae7a7a36Dan ShiDEFAULT_TIMEOUT = 30 833eab70e78b26a4e0de263c7d5ecf4c4a6fef1519Gabe Black 84d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 85d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liangclass EsUtilException(Exception): 86d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang """Exception raised when functions here fail. """ 87d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang pass 88d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang 89d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 90b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe BlackQueryResult = collections.namedtuple('QueryResult', ['total', 'hits']) 91d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 92d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 93d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangclass ESMetadata(object): 94b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black """Class handling es connection for metadata.""" 95b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 96b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @property 97b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black def es(self): 98b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black """Read only property, lazily initialized""" 99b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black if not self._es: 100b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black self._es = elasticsearch.Elasticsearch(host=self.host, 101b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black port=self.port, 102b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black timeout=self.timeout) 103b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black return self._es 104b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 105d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 106b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black def __init__(self, use_http, host, port, index, udp_port, 107b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black timeout=DEFAULT_TIMEOUT): 108d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang """Initialize ESMetadata object. 109d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 110b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param use_http: Whether to send data to ES using HTTP. 111b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param host: Elasticsearch host. 112b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param port: Elasticsearch port. 113b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param index: What index the metadata is stored in. 114b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param udp_port: What port to use for UDP data. 115b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param timeout: How long to wait while connecting to es. 116d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang """ 117b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black self.use_http = use_http 118d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang self.host = host 119d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang self.port = port 1203eab70e78b26a4e0de263c7d5ecf4c4a6fef1519Gabe Black self.index = index 121b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black self.udp_port = udp_port 122b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black self.timeout = timeout 123b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black self._es = None 124d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 125d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 126b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black def _send_data_http(self, type_str, metadata): 127b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black """Sends data to insert into elasticsearch using HTTP. 128d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 1290d7474640b6a6e4334d16921fe0df79418007af9Michael Liang @param type_str: sets the _type field in elasticsearch db. 130d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang @param metadata: dictionary object containing metadata 131d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang """ 1321057bae99b76a521f491c09a2fc9c3409c0e6010Dan Shi try: 1331057bae99b76a521f491c09a2fc9c3409c0e6010Dan Shi self.es.index(index=self.index, doc_type=type_str, body=metadata) 1341057bae99b76a521f491c09a2fc9c3409c0e6010Dan Shi except elasticsearch.ElasticsearchException as e: 1351057bae99b76a521f491c09a2fc9c3409c0e6010Dan Shi # Mute exceptions from metadata reporting to prevent meta data 1361057bae99b76a521f491c09a2fc9c3409c0e6010Dan Shi # reporting errors from killing test. 1371057bae99b76a521f491c09a2fc9c3409c0e6010Dan Shi logging.error(e) 138b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 139b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 140b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black def _send_data_udp(self, type_str, metadata): 141b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black """Sends data to insert into elasticsearch using UDP. 142d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang 1430d7474640b6a6e4334d16921fe0df79418007af9Michael Liang @param type_str: sets the _type field in elasticsearch db. 144d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang @param metadata: dictionary object containing metadata 145b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black """ 146b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black try: 147b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black # Header. 148b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black message = json.dumps( 149b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black {'index': {'_index': self.index, '_type': type_str}}, 150b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black separators=(', ', ' : ')) 151b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black message += '\n' 152b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black # Metadata. 153b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black message += json.dumps(metadata, separators=(', ', ' : ')) 154b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black message += '\n' 155b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 156b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 157b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black sock.sendto(message, (self.host, self.udp_port)) 158b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black except socket.error as e: 159b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black logging.warn(e) 160b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 161b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 162b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black def post(self, type_str, metadata, log_time_recorded=True, **kwargs): 163b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black """Wraps call of send_data, inserts entry into elasticsearch. 164b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 165b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param type_str: Sets the _type field in elasticsearch db. 166b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param metadata: Dictionary object containing metadata 167b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param log_time_recorded: Whether to automatically record the time 168b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black this metadata is recorded. Default is True. 169b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param kwargs: Additional metadata fields 1700e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi 1710e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi @return: True if post action succeeded. Otherwise return False. 1720e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi 173d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang """ 174d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang if not metadata: 1750e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi return True 176b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 177b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black metadata = metadata.copy() 178b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black metadata.update(kwargs) 17928a3f36c0c6c8b9d69c9d32a637cac3ca6251560Michael Liang # metadata should not contain anything with key '_type' 180b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black if '_type' in metadata: 181b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black type_str = metadata['_type'] 182b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black del metadata['_type'] 18354682e7d8b47ec91788bad71520437e2734082dbDan Shi if log_time_recorded: 184b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black metadata['time_recorded'] = time.time() 185d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang try: 186b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black if self.use_http: 187b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black self._send_data_http(type_str, metadata) 188b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black else: 189b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black self._send_data_udp(type_str, metadata) 1900e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi return True 191d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang except elasticsearch.ElasticsearchException as e: 192d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang logging.error(e) 1930e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi return False 194d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang 1951057bae99b76a521f491c09a2fc9c3409c0e6010Dan Shi 196cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi def bulk_post(self, data_list, log_time_recorded=True, **kwargs): 197cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi """Wraps call of send_data, inserts entry into elasticsearch. 198cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi 199cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi @param data_list: A list of dictionary objects containing metadata. 200cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi @param log_time_recorded: Whether to automatically record the time 201cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi this metadata is recorded. Default is True. 202cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi @param kwargs: Additional metadata fields 2030e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi 2040e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi @return: True if post action succeeded. Otherwise return False. 2050e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi 206cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi """ 207cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi if not data_list: 2080e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi return True 209cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi 210cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi actions = [] 211cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi for metadata in data_list: 212cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi metadata = metadata.copy() 213cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi metadata.update(kwargs) 2145de10171860a2ef3a30aa2ba6a67431673120649Dan Shi if log_time_recorded and not 'time_recorded' in metadata: 215cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi metadata['time_recorded'] = time.time() 216cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi metadata['_index'] = self.index 217cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi actions.append(metadata) 218cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi 219cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi try: 220cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi elasticsearch_helpers.bulk(self.es, actions) 2210e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi return True 222cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi except elasticsearch.ElasticsearchException as e: 223cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi logging.error(e) 2240e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi return False 225cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi 226cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi 227b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black def _compose_query(self, equality_constraints=[], fields_returned=None, 228b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black range_constraints=[], size=1000000, sort_specs=None, 229b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black regex_constraints=[], batch_constraints=[]): 230b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black """Creates a dict. representing multple range and/or equality queries. 231b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 232b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black Example input: 233b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black _compose_query( 234b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black fields_returned = ['time_recorded', 'hostname', 235b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'status', 'dbg_str'], 236b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black equality_constraints = [ 237b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black ('_type', 'host_history'), 238b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black ('hostname', '172.22.169.106'), 239b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black ], 240b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black range_constraints = [ 241b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black ('time_recorded', 1405628341.904379, 1405700341.904379) 242b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black ], 243b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black size=20, 244b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black sort_specs=[ 245b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'hostname', 246b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black {'time_recorded': 'asc'}, 247b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black ] 248d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang ) 249d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang 250b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black Output: 251b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black { 252b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'fields': ['time_recorded', 'hostname', 'status', 'dbg_str'], 253b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'query': { 254b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'bool': { 255b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'minimum_should_match': 3, 256b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'should': [ 257b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black { 258b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'term': { 259b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black '_type': 'host_history' 260b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black } 261b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black }, 262b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black { 263b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'term': { 264b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'hostname': '172.22.169.106' 265b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black } 266b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black }, 267b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black { 268b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'range': { 269b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'time_recorded': { 270b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'gte': 1405628341.904379, 271b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'lte': 1405700341.904379 272b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black } 273d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang } 274d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang } 275b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black ] 276b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black }, 277d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang }, 278b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'size': 20 279b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'sort': [ 280b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'hostname', 281b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black { 'time_recorded': 'asc'}, 282b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black ] 283b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black } 284d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang 285b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param equality_constraints: list of tuples of (field, value) pairs 286b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black representing what each field should equal to in the query. 287b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black e.g. [ ('field1', 1), ('field2', 'value') ] 288b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param fields_returned: list of fields that we should return when 289b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black the query is executed. Set it to None to return all fields. Note 290b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black that the key/vals will be stored in _source key of the hit object, 291b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black if fields_returned is set to None. 292b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param range_constraints: list of tuples of (field, low, high) pairs 293b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black representing what each field should be between (inclusive). 294b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black e.g. [ ('field1', 2, 10), ('field2', -1, 20) ] 295b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black If you want one side to be unbounded, you can use None. 296b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black e.g. [ ('field1', 2, None) ] means value of field1 >= 2. 297b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param size: max number of entries to return. Default is 1000000. 298b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param sort_specs: A list of fields to sort on, tiebreakers will be 299b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black broken by the next field(s). 300b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param regex_constraints: A list of regex constraints of tuples of 301b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black (field, value) pairs, e.g., [('filed1', '.*value.*')]. 302b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param batch_constraints: list of tuples of (field, list) pairs 303b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black representing each field should be equal to one of the values 304b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black in the list. 305b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black e.g., [ ('job_id', [10, 11, 12, 13]) ] 306b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @returns: dictionary object that represents query to es. 307b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black This will return None if there are no equality constraints 308b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black and no range constraints. 309b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black """ 310b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black if not equality_constraints and not range_constraints: 311b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black raise EsUtilException('No range or equality constraints specified.') 312b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 313b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black # Creates list of range dictionaries to put in the 'should' list. 314b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black range_list = [] 315b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black if range_constraints: 316b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black for key, low, high in range_constraints: 317b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black if low is None and high is None: 318b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black continue 319b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black temp_dict = {} 320b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black if low is not None: 321b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black temp_dict['gte'] = low 322b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black if high is not None: 323b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black temp_dict['lte'] = high 324b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black range_list.append( {'range': {key: temp_dict}}) 325b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 326b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black # Creates the list of term dictionaries to put in the 'should' list. 327b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black eq_list = [{'term': {k: v}} for k, v in equality_constraints if k] 328b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black batch_list = [{'terms': {k: v}} for k, v in batch_constraints if k] 329b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black regex_list = [{'regexp': {k: v}} for k, v in regex_constraints if k] 330b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black constraints = eq_list + batch_list + range_list + regex_list 331b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black query = { 332b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'query': { 333b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'bool': { 3347cde53d430e208139638ee65116c0cd1ae7a7a36Dan Shi 'must': constraints, 335b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black } 336b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black }, 337b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black } 338b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black if fields_returned: 339b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black query['fields'] = fields_returned 340b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black query['size'] = size 341b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black if sort_specs: 342b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black query['sort'] = sort_specs 343b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black return query 344b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 345b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 346b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black def execute_query(self, query): 347b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black """Makes a query on the given index. 348b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 349b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @param query: query dictionary (see _compose_query) 350b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black @returns: A QueryResult instance describing the result. 351b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 352b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black Example output: 353b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black { 354b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "took" : 5, 355b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "timed_out" : false, 356b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_shards" : { 357b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "total" : 16, 358b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "successful" : 16, 359b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "failed" : 0 360b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black }, 361b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "hits" : { 362b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "total" : 4, 363b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "max_score" : 1.0, 364b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "hits" : [ { 365b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_index" : "graphite_metrics2", 366b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_type" : "metric", 367b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_id" : "rtntrjgdsafdsfdsfdsfdsfdssssssss", 368b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_score" : 1.0, 369b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_source":{"target_type": "timer", 370b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "host_id": 1, 371b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "job_id": 22, 372b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "time_start": 400} 373b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black }, { 374b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_index" : "graphite_metrics2", 375b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_type" : "metric", 376b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_id" : "dfgfddddddddddddddddddddddhhh", 377b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_score" : 1.0, 378b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_source":{"target_type": "timer", 379b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "host_id": 2, 380b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "job_id": 23, 381b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "time_start": 405} 382b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black }, { 383b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_index" : "graphite_metrics2", 384b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_type" : "metric", 385b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_id" : "erwerwerwewtrewgfednvfngfngfrhfd", 386b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_score" : 1.0, 387b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_source":{"target_type": "timer", 388b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "host_id": 3, 389b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "job_id": 24, 390b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "time_start": 4098} 391b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black }, { 392b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_index" : "graphite_metrics2", 393b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_type" : "metric", 394b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_id" : "dfherjgwetfrsupbretowegoegheorgsa", 395b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_score" : 1.0, 396b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "_source":{"target_type": "timer", 397b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "host_id": 22, 398b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "job_id": 25, 399b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black "time_start": 4200} 400b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black } ] 401d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang } 402b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black } 403d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang 404b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black """ 405b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black if not self.es.indices.exists(index=self.index): 406b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black logging.error('Index (%s) does not exist on %s:%s', 407b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black self.index, self.host, self.port) 408b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black return None 4097cde53d430e208139638ee65116c0cd1ae7a7a36Dan Shi result = self.es.search(index=self.index, body=query, 410cae83c7f545ba16316c819ce238cf6a00db4a4c5Dan Shi timeout=DEFAULT_TIMEOUT, 411cae83c7f545ba16316c819ce238cf6a00db4a4c5Dan Shi request_timeout=DEFAULT_TIMEOUT) 412b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black # Check if all matched records are returned. It could be the size is 413b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black # set too small. Special case for size set to 1, as that means that 414b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black # the query cares about the first matched entry. 415b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black # TODO: Use pagination in Elasticsearch. This needs major change on how 416b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black # query results are iterated. 417b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black size = query.get('size', 1) 418b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black return_count = len(result['hits']['hits']) 419b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black total_match = result['hits']['total'] 420b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black if total_match > return_count and size != 1: 421b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black logging.error('There are %d matched records, only %d entries are ' 422b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 'returned. Query size is set to %d.', total_match, 423b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black return_count, size) 424b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 425b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black # Extract the actual results from the query. 426b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black output = QueryResult(total_match, []) 427b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black for hit in result['hits']['hits']: 428b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black converted = {} 429b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black if 'fields' in hit: 430b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black for key, value in hit['fields'].items(): 43199be217ec6a37e487d3ccaf9f3a9e3dd199acb78Dan Shi converted[key] = value[0] if len(value)==1 else value 432b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black else: 433b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black converted = hit['_source'].copy() 434b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black output.hits.append(converted) 435b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black return output 436b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 437b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black 438b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black def query(self, *args, **kwargs): 439b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black """The arguments to this function are the same as _compose_query.""" 440b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black query = self._compose_query(*args, **kwargs) 441b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black return self.execute_query(query) 442