1d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang# Use of this source code is governed by a BSD-style license that can be
3d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang# found in the LICENSE file.
4d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
5d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang# This file defines helper functions for putting entries into elasticsearch.
6d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
7d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang"""Utils for sending metadata to elasticsearch
8d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
9d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangElasticsearch is a key-value store NOSQL database.
10d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangSource is here: https://github.com/elasticsearch/elasticsearch
11d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangWe will be using es to store our metadata.
12d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
13d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangFor example, if we wanted to store the following metadata:
14d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
15d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangmetadata = {
16d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang    'host_id': 1
17d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang    'job_id': 20
18d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang    'time_start': 100000
19d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang    'time_recorded': 100006
20d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang}
21d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
22d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangThe following call will send metadata to the default es server.
23d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang    es_utils.ESMetadata().post(index, metadata)
24d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangWe can also specify which port and host to use.
25d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
26d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangUsing for testing: Sometimes, when we choose a single index
27d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangto put entries into, we want to clear that index of all
28d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangentries before running our tests. Use clear_index function.
29d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang(see es_utils_functionaltest.py for an example)
30d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
31d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangThis file also contains methods for sending queries to es. Currently,
32d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangthe query (json dict) we send to es is quite complicated (but flexible).
33d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangWe've included several methods that composes queries that would be useful.
34d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangThese methods are all named create_*_query()
35d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
36d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangFor example, the below query returns job_id, host_id, and job_start
37d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangfor all job_ids in [0, 99999] and host_id matching 10.
38d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
39d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangrange_eq_query = {
40d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang    'fields': ['job_id', 'host_id', 'job_start'],
41d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang    'query': {
42d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang        'filtered': {
43d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang            'query': {
44d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang                'match': {
45d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang                    'host_id': 10,
46d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang                }
47d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang            }
48d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang            'filter': {
49d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang                'range': {
50d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang                    'job_id': {
51d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang                        'gte': 0,
52d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang                        'lte': 99999,
53d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang                    }
54d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang                }
55d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang            }
56d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang        }
57d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang    }
58d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang}
59d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
60d2d294c83fe8355d432f84771a651b1c99029bcfMichael LiangTo send a query once it is created, call execute_query() to send it to the
61d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangintended elasticsearch server.
62d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
63d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang"""
64d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
65b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Blackimport collections
66d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangimport json
67d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangimport logging
68d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangimport socket
69d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liangimport time
70d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
71d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangtry:
72d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang    import elasticsearch
73cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi    from elasticsearch import helpers as elasticsearch_helpers
74d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangexcept ImportError:
757bdd89d6927be7c0aa4096aaf6f0354a5de38a19Dan Shi    logging.debug('Failed to import elasticsearch. Mock classes will be used '
767bdd89d6927be7c0aa4096aaf6f0354a5de38a19Dan Shi                  'and calls to Elasticsearch server will be no-op. Test run '
777bdd89d6927be7c0aa4096aaf6f0354a5de38a19Dan Shi                  'is not affected by the missing elasticsearch module.')
78d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang    import elasticsearch_mock as elasticsearch
79cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi    elasticsearch_helpers = elasticsearch.Elasticsearch()
80d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
81b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
827cde53d430e208139638ee65116c0cd1ae7a7a36Dan ShiDEFAULT_TIMEOUT = 30
833eab70e78b26a4e0de263c7d5ecf4c4a6fef1519Gabe Black
84d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
85d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liangclass EsUtilException(Exception):
86d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang    """Exception raised when functions here fail. """
87d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang    pass
88d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang
89d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
90b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe BlackQueryResult = collections.namedtuple('QueryResult', ['total', 'hits'])
91d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
92d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
93d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liangclass ESMetadata(object):
94b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black    """Class handling es connection for metadata."""
95b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
96b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black    @property
97b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black    def es(self):
98b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        """Read only property, lazily initialized"""
99b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        if not self._es:
100b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            self._es = elasticsearch.Elasticsearch(host=self.host,
101b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                                                   port=self.port,
102b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                                                   timeout=self.timeout)
103b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        return self._es
104b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
105d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
106b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black    def __init__(self, use_http, host, port, index, udp_port,
107b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                 timeout=DEFAULT_TIMEOUT):
108d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang        """Initialize ESMetadata object.
109d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
110b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param use_http: Whether to send data to ES using HTTP.
111b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param host: Elasticsearch host.
112b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param port: Elasticsearch port.
113b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param index: What index the metadata is stored in.
114b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param udp_port: What port to use for UDP data.
115b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param timeout: How long to wait while connecting to es.
116d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang        """
117b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        self.use_http = use_http
118d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang        self.host = host
119d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang        self.port = port
1203eab70e78b26a4e0de263c7d5ecf4c4a6fef1519Gabe Black        self.index = index
121b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        self.udp_port = udp_port
122b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        self.timeout = timeout
123b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        self._es = None
124d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
125d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
126b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black    def _send_data_http(self, type_str, metadata):
127b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        """Sends data to insert into elasticsearch using HTTP.
128d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
1290d7474640b6a6e4334d16921fe0df79418007af9Michael Liang        @param type_str: sets the _type field in elasticsearch db.
130d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang        @param metadata: dictionary object containing metadata
131d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang        """
1321057bae99b76a521f491c09a2fc9c3409c0e6010Dan Shi        try:
1331057bae99b76a521f491c09a2fc9c3409c0e6010Dan Shi            self.es.index(index=self.index, doc_type=type_str, body=metadata)
1341057bae99b76a521f491c09a2fc9c3409c0e6010Dan Shi        except elasticsearch.ElasticsearchException as e:
1351057bae99b76a521f491c09a2fc9c3409c0e6010Dan Shi            # Mute exceptions from metadata reporting to prevent meta data
1361057bae99b76a521f491c09a2fc9c3409c0e6010Dan Shi            # reporting errors from killing test.
1371057bae99b76a521f491c09a2fc9c3409c0e6010Dan Shi            logging.error(e)
138b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
139b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
140b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black    def _send_data_udp(self, type_str, metadata):
141b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        """Sends data to insert into elasticsearch using UDP.
142d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang
1430d7474640b6a6e4334d16921fe0df79418007af9Michael Liang        @param type_str: sets the _type field in elasticsearch db.
144d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang        @param metadata: dictionary object containing metadata
145b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        """
146b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        try:
147b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            # Header.
148b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            message = json.dumps(
149b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    {'index': {'_index': self.index, '_type': type_str}},
150b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    separators=(', ', ' : '))
151b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            message += '\n'
152b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            # Metadata.
153b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            message += json.dumps(metadata, separators=(', ', ' : '))
154b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            message += '\n'
155b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
156b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
157b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            sock.sendto(message, (self.host, self.udp_port))
158b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        except socket.error as e:
159b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            logging.warn(e)
160b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
161b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
162b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black    def post(self, type_str, metadata, log_time_recorded=True, **kwargs):
163b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        """Wraps call of send_data, inserts entry into elasticsearch.
164b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
165b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param type_str: Sets the _type field in elasticsearch db.
166b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param metadata: Dictionary object containing metadata
167b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param log_time_recorded: Whether to automatically record the time
168b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                                  this metadata is recorded. Default is True.
169b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param kwargs: Additional metadata fields
1700e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi
1710e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi        @return: True if post action succeeded. Otherwise return False.
1720e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi
173d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang        """
174d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang        if not metadata:
1750e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi            return True
176b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
177b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        metadata = metadata.copy()
178b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        metadata.update(kwargs)
17928a3f36c0c6c8b9d69c9d32a637cac3ca6251560Michael Liang        # metadata should not contain anything with key '_type'
180b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        if '_type' in metadata:
181b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            type_str = metadata['_type']
182b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            del metadata['_type']
18354682e7d8b47ec91788bad71520437e2734082dbDan Shi        if log_time_recorded:
184b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            metadata['time_recorded'] = time.time()
185d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang        try:
186b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            if self.use_http:
187b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                self._send_data_http(type_str, metadata)
188b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            else:
189b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                self._send_data_udp(type_str, metadata)
1900e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi            return True
191d2d294c83fe8355d432f84771a651b1c99029bcfMichael Liang        except elasticsearch.ElasticsearchException as e:
192d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang            logging.error(e)
1930e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi            return False
194d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang
1951057bae99b76a521f491c09a2fc9c3409c0e6010Dan Shi
196cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi    def bulk_post(self, data_list, log_time_recorded=True, **kwargs):
197cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi        """Wraps call of send_data, inserts entry into elasticsearch.
198cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi
199cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi        @param data_list: A list of dictionary objects containing metadata.
200cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi        @param log_time_recorded: Whether to automatically record the time
201cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi                                  this metadata is recorded. Default is True.
202cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi        @param kwargs: Additional metadata fields
2030e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi
2040e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi        @return: True if post action succeeded. Otherwise return False.
2050e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi
206cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi        """
207cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi        if not data_list:
2080e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi            return True
209cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi
210cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi        actions = []
211cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi        for metadata in data_list:
212cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi            metadata = metadata.copy()
213cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi            metadata.update(kwargs)
2145de10171860a2ef3a30aa2ba6a67431673120649Dan Shi            if log_time_recorded and not 'time_recorded' in metadata:
215cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi                metadata['time_recorded'] = time.time()
216cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi            metadata['_index'] = self.index
217cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi            actions.append(metadata)
218cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi
219cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi        try:
220cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi            elasticsearch_helpers.bulk(self.es, actions)
2210e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi            return True
222cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi        except elasticsearch.ElasticsearchException as e:
223cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi            logging.error(e)
2240e7d0f5c8298782aecf4db3b0a2bb05b2f201486Dan Shi            return False
225cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi
226cf2e8dd3f81d5eb4c9720db396ebbf64fd7b9ae4Dan Shi
227b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black    def _compose_query(self, equality_constraints=[], fields_returned=None,
228b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                       range_constraints=[], size=1000000, sort_specs=None,
229b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                       regex_constraints=[], batch_constraints=[]):
230b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        """Creates a dict. representing multple range and/or equality queries.
231b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
232b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        Example input:
233b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        _compose_query(
234b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            fields_returned = ['time_recorded', 'hostname',
235b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                               'status', 'dbg_str'],
236b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            equality_constraints = [
237b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                ('_type', 'host_history'),
238b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                ('hostname', '172.22.169.106'),
239b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            ],
240b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            range_constraints = [
241b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                ('time_recorded', 1405628341.904379, 1405700341.904379)
242b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            ],
243b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            size=20,
244b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            sort_specs=[
245b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                'hostname',
246b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                {'time_recorded': 'asc'},
247b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            ]
248d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang        )
249d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang
250b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        Output:
251b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        {
252b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            'fields': ['time_recorded', 'hostname', 'status', 'dbg_str'],
253b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            'query': {
254b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                'bool': {
255b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    'minimum_should_match': 3,
256b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    'should': [
257b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                        {
258b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                            'term':  {
259b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                                '_type': 'host_history'
260b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                            }
261b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                        },
262b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                        {
263b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                            'term': {
264b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                                'hostname': '172.22.169.106'
265b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                            }
266b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                        },
267b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                        {
268b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                            'range': {
269b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                                'time_recorded': {
270b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                                    'gte': 1405628341.904379,
271b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                                    'lte': 1405700341.904379
272b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                                }
273d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang                            }
274d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang                        }
275b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    ]
276b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                },
277d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang            },
278b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            'size': 20
279b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            'sort': [
280b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                'hostname',
281b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                { 'time_recorded': 'asc'},
282b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            ]
283b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        }
284d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang
285b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param equality_constraints: list of tuples of (field, value) pairs
286b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            representing what each field should equal to in the query.
287b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            e.g. [ ('field1', 1), ('field2', 'value') ]
288b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param fields_returned: list of fields that we should return when
289b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            the query is executed. Set it to None to return all fields. Note
290b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            that the key/vals will be stored in _source key of the hit object,
291b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            if fields_returned is set to None.
292b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param range_constraints: list of tuples of (field, low, high) pairs
293b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            representing what each field should be between (inclusive).
294b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            e.g. [ ('field1', 2, 10), ('field2', -1, 20) ]
295b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            If you want one side to be unbounded, you can use None.
296b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            e.g. [ ('field1', 2, None) ] means value of field1 >= 2.
297b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param size: max number of entries to return. Default is 1000000.
298b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param sort_specs: A list of fields to sort on, tiebreakers will be
299b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            broken by the next field(s).
300b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param regex_constraints: A list of regex constraints of tuples of
301b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            (field, value) pairs, e.g., [('filed1', '.*value.*')].
302b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param batch_constraints: list of tuples of (field, list) pairs
303b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            representing each field should be equal to one of the values
304b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            in the list.
305b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            e.g., [ ('job_id', [10, 11, 12, 13]) ]
306b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @returns: dictionary object that represents query to es.
307b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                  This will return None if there are no equality constraints
308b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                  and no range constraints.
309b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        """
310b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        if not equality_constraints and not range_constraints:
311b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            raise EsUtilException('No range or equality constraints specified.')
312b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
313b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        # Creates list of range dictionaries to put in the 'should' list.
314b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        range_list = []
315b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        if range_constraints:
316b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            for key, low, high in range_constraints:
317b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                if low is None and high is None:
318b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    continue
319b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                temp_dict = {}
320b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                if low is not None:
321b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    temp_dict['gte'] = low
322b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                if high is not None:
323b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    temp_dict['lte'] = high
324b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                range_list.append( {'range': {key: temp_dict}})
325b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
326b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        # Creates the list of term dictionaries to put in the 'should' list.
327b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        eq_list = [{'term': {k: v}} for k, v in equality_constraints if k]
328b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        batch_list = [{'terms': {k: v}} for k, v in batch_constraints if k]
329b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        regex_list = [{'regexp': {k: v}} for k, v in regex_constraints if k]
330b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        constraints = eq_list + batch_list + range_list + regex_list
331b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        query = {
332b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            'query': {
333b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                'bool': {
3347cde53d430e208139638ee65116c0cd1ae7a7a36Dan Shi                    'must': constraints,
335b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                }
336b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            },
337b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        }
338b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        if fields_returned:
339b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            query['fields'] = fields_returned
340b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        query['size'] = size
341b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        if sort_specs:
342b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            query['sort'] = sort_specs
343b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        return query
344b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
345b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
346b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black    def execute_query(self, query):
347b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        """Makes a query on the given index.
348b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
349b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @param query: query dictionary (see _compose_query)
350b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        @returns: A QueryResult instance describing the result.
351b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
352b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        Example output:
353b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        {
354b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            "took" : 5,
355b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            "timed_out" : false,
356b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            "_shards" : {
357b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                "total" : 16,
358b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                "successful" : 16,
359b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                "failed" : 0
360b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            },
361b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            "hits" : {
362b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                "total" : 4,
363b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                "max_score" : 1.0,
364b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                "hits" : [ {
365b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    "_index" : "graphite_metrics2",
366b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    "_type" : "metric",
367b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    "_id" : "rtntrjgdsafdsfdsfdsfdsfdssssssss",
368b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    "_score" : 1.0,
369b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    "_source":{"target_type": "timer",
370b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                               "host_id": 1,
371b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                               "job_id": 22,
372b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                               "time_start": 400}
373b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                }, {
374b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    "_index" : "graphite_metrics2",
375b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    "_type" : "metric",
376b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    "_id" : "dfgfddddddddddddddddddddddhhh",
377b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    "_score" : 1.0,
378b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    "_source":{"target_type": "timer",
379b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                        "host_id": 2,
380b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                        "job_id": 23,
381b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                        "time_start": 405}
382b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                }, {
383b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                "_index" : "graphite_metrics2",
384b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                "_type" : "metric",
385b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                "_id" : "erwerwerwewtrewgfednvfngfngfrhfd",
386b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                "_score" : 1.0,
387b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                "_source":{"target_type": "timer",
388b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                           "host_id": 3,
389b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                           "job_id": 24,
390b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                           "time_start": 4098}
391b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                }, {
392b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    "_index" : "graphite_metrics2",
393b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    "_type" : "metric",
394b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    "_id" : "dfherjgwetfrsupbretowegoegheorgsa",
395b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    "_score" : 1.0,
396b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                    "_source":{"target_type": "timer",
397b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                               "host_id": 22,
398b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                               "job_id": 25,
399b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                               "time_start": 4200}
400b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                } ]
401d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang            }
402b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        }
403d9a0924ab5b65852942b1dec5a021d0371dc95e2Michael Liang
404b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        """
405b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        if not self.es.indices.exists(index=self.index):
406b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            logging.error('Index (%s) does not exist on %s:%s',
407b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                          self.index, self.host, self.port)
408b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            return None
4097cde53d430e208139638ee65116c0cd1ae7a7a36Dan Shi        result = self.es.search(index=self.index, body=query,
410cae83c7f545ba16316c819ce238cf6a00db4a4c5Dan Shi                                timeout=DEFAULT_TIMEOUT,
411cae83c7f545ba16316c819ce238cf6a00db4a4c5Dan Shi                                request_timeout=DEFAULT_TIMEOUT)
412b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        # Check if all matched records are returned. It could be the size is
413b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        # set too small. Special case for size set to 1, as that means that
414b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        # the query cares about the first matched entry.
415b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        # TODO: Use pagination in Elasticsearch. This needs major change on how
416b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        #       query results are iterated.
417b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        size = query.get('size', 1)
418b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        return_count = len(result['hits']['hits'])
419b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        total_match = result['hits']['total']
420b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        if total_match > return_count and size != 1:
421b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            logging.error('There are %d matched records, only %d entries are '
422b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                          'returned. Query size is set to %d.', total_match,
423b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                          return_count, size)
424b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
425b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        # Extract the actual results from the query.
426b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        output = QueryResult(total_match, [])
427b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        for hit in result['hits']['hits']:
428b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            converted = {}
429b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            if 'fields' in hit:
430b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                for key, value in hit['fields'].items():
43199be217ec6a37e487d3ccaf9f3a9e3dd199acb78Dan Shi                    converted[key] = value[0] if len(value)==1 else value
432b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            else:
433b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black                converted = hit['_source'].copy()
434b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black            output.hits.append(converted)
435b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        return output
436b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
437b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black
438b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black    def query(self, *args, **kwargs):
439b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        """The arguments to this function are the same as _compose_query."""
440b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        query = self._compose_query(*args, **kwargs)
441b72f4fbcf1583da27f09f4abb9d8162530bf4559Gabe Black        return self.execute_query(query)
442