1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5# This file defines helper functions for putting entries into elasticsearch.
6
7"""Utils for sending metadata to elasticsearch
8
9Elasticsearch is a key-value store NOSQL database.
10Source is here: https://github.com/elasticsearch/elasticsearch
11We will be using es to store our metadata.
12
13For example, if we wanted to store the following metadata:
14
15metadata = {
16    'host_id': 1
17    'job_id': 20
18    'time_start': 100000
19    'time_recorded': 100006
20}
21
22The following call will send metadata to the default es server.
23    es_utils.ESMetadata().post(index, metadata)
24We can also specify which port and host to use.
25
26Using for testing: Sometimes, when we choose a single index
27to put entries into, we want to clear that index of all
28entries before running our tests. Use clear_index function.
29(see es_utils_functionaltest.py for an example)
30
31This file also contains methods for sending queries to es. Currently,
32the query (json dict) we send to es is quite complicated (but flexible).
33We've included several methods that composes queries that would be useful.
34These methods are all named create_*_query()
35
36For example, the below query returns job_id, host_id, and job_start
37for all job_ids in [0, 99999] and host_id matching 10.
38
39range_eq_query = {
40    'fields': ['job_id', 'host_id', 'job_start'],
41    'query': {
42        'filtered': {
43            'query': {
44                'match': {
45                    'host_id': 10,
46                }
47            }
48            'filter': {
49                'range': {
50                    'job_id': {
51                        'gte': 0,
52                        'lte': 99999,
53                    }
54                }
55            }
56        }
57    }
58}
59
60To send a query once it is created, call execute_query() to send it to the
61intended elasticsearch server.
62
63"""
64
65import collections
66import json
67import logging
68import socket
69import time
70
71try:
72    import elasticsearch
73    from elasticsearch import helpers as elasticsearch_helpers
74except ImportError:
75    logging.debug('Failed to import elasticsearch. Mock classes will be used '
76                  'and calls to Elasticsearch server will be no-op. Test run '
77                  'is not affected by the missing elasticsearch module.')
78    import elasticsearch_mock as elasticsearch
79    elasticsearch_helpers = elasticsearch.Elasticsearch()
80
81
82DEFAULT_TIMEOUT = 30
83
84
85class EsUtilException(Exception):
86    """Exception raised when functions here fail. """
87    pass
88
89
90QueryResult = collections.namedtuple('QueryResult', ['total', 'hits'])
91
92
93class ESMetadata(object):
94    """Class handling es connection for metadata."""
95
96    @property
97    def es(self):
98        """Read only property, lazily initialized"""
99        if not self._es:
100            self._es = elasticsearch.Elasticsearch(host=self.host,
101                                                   port=self.port,
102                                                   timeout=self.timeout)
103        return self._es
104
105
106    def __init__(self, use_http, host, port, index, udp_port,
107                 timeout=DEFAULT_TIMEOUT):
108        """Initialize ESMetadata object.
109
110        @param use_http: Whether to send data to ES using HTTP.
111        @param host: Elasticsearch host.
112        @param port: Elasticsearch port.
113        @param index: What index the metadata is stored in.
114        @param udp_port: What port to use for UDP data.
115        @param timeout: How long to wait while connecting to es.
116        """
117        self.use_http = use_http
118        self.host = host
119        self.port = port
120        self.index = index
121        self.udp_port = udp_port
122        self.timeout = timeout
123        self._es = None
124
125
126    def _send_data_http(self, type_str, metadata):
127        """Sends data to insert into elasticsearch using HTTP.
128
129        @param type_str: sets the _type field in elasticsearch db.
130        @param metadata: dictionary object containing metadata
131        """
132        try:
133            self.es.index(index=self.index, doc_type=type_str, body=metadata)
134        except elasticsearch.ElasticsearchException as e:
135            # Mute exceptions from metadata reporting to prevent meta data
136            # reporting errors from killing test.
137            logging.error(e)
138
139
140    def _send_data_udp(self, type_str, metadata):
141        """Sends data to insert into elasticsearch using UDP.
142
143        @param type_str: sets the _type field in elasticsearch db.
144        @param metadata: dictionary object containing metadata
145        """
146        try:
147            # Header.
148            message = json.dumps(
149                    {'index': {'_index': self.index, '_type': type_str}},
150                    separators=(', ', ' : '))
151            message += '\n'
152            # Metadata.
153            message += json.dumps(metadata, separators=(', ', ' : '))
154            message += '\n'
155
156            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
157            sock.sendto(message, (self.host, self.udp_port))
158        except socket.error as e:
159            logging.warn(e)
160
161
162    def post(self, type_str, metadata, log_time_recorded=True, **kwargs):
163        """Wraps call of send_data, inserts entry into elasticsearch.
164
165        @param type_str: Sets the _type field in elasticsearch db.
166        @param metadata: Dictionary object containing metadata
167        @param log_time_recorded: Whether to automatically record the time
168                                  this metadata is recorded. Default is True.
169        @param kwargs: Additional metadata fields
170
171        @return: True if post action succeeded. Otherwise return False.
172
173        """
174        if not metadata:
175            return True
176
177        metadata = metadata.copy()
178        metadata.update(kwargs)
179        # metadata should not contain anything with key '_type'
180        if '_type' in metadata:
181            type_str = metadata['_type']
182            del metadata['_type']
183        if log_time_recorded:
184            metadata['time_recorded'] = time.time()
185        try:
186            if self.use_http:
187                self._send_data_http(type_str, metadata)
188            else:
189                self._send_data_udp(type_str, metadata)
190            return True
191        except elasticsearch.ElasticsearchException as e:
192            logging.error(e)
193            return False
194
195
196    def bulk_post(self, data_list, log_time_recorded=True, **kwargs):
197        """Wraps call of send_data, inserts entry into elasticsearch.
198
199        @param data_list: A list of dictionary objects containing metadata.
200        @param log_time_recorded: Whether to automatically record the time
201                                  this metadata is recorded. Default is True.
202        @param kwargs: Additional metadata fields
203
204        @return: True if post action succeeded. Otherwise return False.
205
206        """
207        if not data_list:
208            return True
209
210        actions = []
211        for metadata in data_list:
212            metadata = metadata.copy()
213            metadata.update(kwargs)
214            if log_time_recorded and not 'time_recorded' in metadata:
215                metadata['time_recorded'] = time.time()
216            metadata['_index'] = self.index
217            actions.append(metadata)
218
219        try:
220            elasticsearch_helpers.bulk(self.es, actions)
221            return True
222        except elasticsearch.ElasticsearchException as e:
223            logging.error(e)
224            return False
225
226
227    def _compose_query(self, equality_constraints=[], fields_returned=None,
228                       range_constraints=[], size=1000000, sort_specs=None,
229                       regex_constraints=[], batch_constraints=[]):
230        """Creates a dict. representing multple range and/or equality queries.
231
232        Example input:
233        _compose_query(
234            fields_returned = ['time_recorded', 'hostname',
235                               'status', 'dbg_str'],
236            equality_constraints = [
237                ('_type', 'host_history'),
238                ('hostname', '172.22.169.106'),
239            ],
240            range_constraints = [
241                ('time_recorded', 1405628341.904379, 1405700341.904379)
242            ],
243            size=20,
244            sort_specs=[
245                'hostname',
246                {'time_recorded': 'asc'},
247            ]
248        )
249
250        Output:
251        {
252            'fields': ['time_recorded', 'hostname', 'status', 'dbg_str'],
253            'query': {
254                'bool': {
255                    'minimum_should_match': 3,
256                    'should': [
257                        {
258                            'term':  {
259                                '_type': 'host_history'
260                            }
261                        },
262                        {
263                            'term': {
264                                'hostname': '172.22.169.106'
265                            }
266                        },
267                        {
268                            'range': {
269                                'time_recorded': {
270                                    'gte': 1405628341.904379,
271                                    'lte': 1405700341.904379
272                                }
273                            }
274                        }
275                    ]
276                },
277            },
278            'size': 20
279            'sort': [
280                'hostname',
281                { 'time_recorded': 'asc'},
282            ]
283        }
284
285        @param equality_constraints: list of tuples of (field, value) pairs
286            representing what each field should equal to in the query.
287            e.g. [ ('field1', 1), ('field2', 'value') ]
288        @param fields_returned: list of fields that we should return when
289            the query is executed. Set it to None to return all fields. Note
290            that the key/vals will be stored in _source key of the hit object,
291            if fields_returned is set to None.
292        @param range_constraints: list of tuples of (field, low, high) pairs
293            representing what each field should be between (inclusive).
294            e.g. [ ('field1', 2, 10), ('field2', -1, 20) ]
295            If you want one side to be unbounded, you can use None.
296            e.g. [ ('field1', 2, None) ] means value of field1 >= 2.
297        @param size: max number of entries to return. Default is 1000000.
298        @param sort_specs: A list of fields to sort on, tiebreakers will be
299            broken by the next field(s).
300        @param regex_constraints: A list of regex constraints of tuples of
301            (field, value) pairs, e.g., [('filed1', '.*value.*')].
302        @param batch_constraints: list of tuples of (field, list) pairs
303            representing each field should be equal to one of the values
304            in the list.
305            e.g., [ ('job_id', [10, 11, 12, 13]) ]
306        @returns: dictionary object that represents query to es.
307                  This will return None if there are no equality constraints
308                  and no range constraints.
309        """
310        if not equality_constraints and not range_constraints:
311            raise EsUtilException('No range or equality constraints specified.')
312
313        # Creates list of range dictionaries to put in the 'should' list.
314        range_list = []
315        if range_constraints:
316            for key, low, high in range_constraints:
317                if low is None and high is None:
318                    continue
319                temp_dict = {}
320                if low is not None:
321                    temp_dict['gte'] = low
322                if high is not None:
323                    temp_dict['lte'] = high
324                range_list.append( {'range': {key: temp_dict}})
325
326        # Creates the list of term dictionaries to put in the 'should' list.
327        eq_list = [{'term': {k: v}} for k, v in equality_constraints if k]
328        batch_list = [{'terms': {k: v}} for k, v in batch_constraints if k]
329        regex_list = [{'regexp': {k: v}} for k, v in regex_constraints if k]
330        constraints = eq_list + batch_list + range_list + regex_list
331        query = {
332            'query': {
333                'bool': {
334                    'must': constraints,
335                }
336            },
337        }
338        if fields_returned:
339            query['fields'] = fields_returned
340        query['size'] = size
341        if sort_specs:
342            query['sort'] = sort_specs
343        return query
344
345
346    def execute_query(self, query):
347        """Makes a query on the given index.
348
349        @param query: query dictionary (see _compose_query)
350        @returns: A QueryResult instance describing the result.
351
352        Example output:
353        {
354            "took" : 5,
355            "timed_out" : false,
356            "_shards" : {
357                "total" : 16,
358                "successful" : 16,
359                "failed" : 0
360            },
361            "hits" : {
362                "total" : 4,
363                "max_score" : 1.0,
364                "hits" : [ {
365                    "_index" : "graphite_metrics2",
366                    "_type" : "metric",
367                    "_id" : "rtntrjgdsafdsfdsfdsfdsfdssssssss",
368                    "_score" : 1.0,
369                    "_source":{"target_type": "timer",
370                               "host_id": 1,
371                               "job_id": 22,
372                               "time_start": 400}
373                }, {
374                    "_index" : "graphite_metrics2",
375                    "_type" : "metric",
376                    "_id" : "dfgfddddddddddddddddddddddhhh",
377                    "_score" : 1.0,
378                    "_source":{"target_type": "timer",
379                        "host_id": 2,
380                        "job_id": 23,
381                        "time_start": 405}
382                }, {
383                "_index" : "graphite_metrics2",
384                "_type" : "metric",
385                "_id" : "erwerwerwewtrewgfednvfngfngfrhfd",
386                "_score" : 1.0,
387                "_source":{"target_type": "timer",
388                           "host_id": 3,
389                           "job_id": 24,
390                           "time_start": 4098}
391                }, {
392                    "_index" : "graphite_metrics2",
393                    "_type" : "metric",
394                    "_id" : "dfherjgwetfrsupbretowegoegheorgsa",
395                    "_score" : 1.0,
396                    "_source":{"target_type": "timer",
397                               "host_id": 22,
398                               "job_id": 25,
399                               "time_start": 4200}
400                } ]
401            }
402        }
403
404        """
405        if not self.es.indices.exists(index=self.index):
406            logging.error('Index (%s) does not exist on %s:%s',
407                          self.index, self.host, self.port)
408            return None
409        result = self.es.search(index=self.index, body=query,
410                                timeout=DEFAULT_TIMEOUT,
411                                request_timeout=DEFAULT_TIMEOUT)
412        # Check if all matched records are returned. It could be the size is
413        # set too small. Special case for size set to 1, as that means that
414        # the query cares about the first matched entry.
415        # TODO: Use pagination in Elasticsearch. This needs major change on how
416        #       query results are iterated.
417        size = query.get('size', 1)
418        return_count = len(result['hits']['hits'])
419        total_match = result['hits']['total']
420        if total_match > return_count and size != 1:
421            logging.error('There are %d matched records, only %d entries are '
422                          'returned. Query size is set to %d.', total_match,
423                          return_count, size)
424
425        # Extract the actual results from the query.
426        output = QueryResult(total_match, [])
427        for hit in result['hits']['hits']:
428            converted = {}
429            if 'fields' in hit:
430                for key, value in hit['fields'].items():
431                    converted[key] = value[0] if len(value)==1 else value
432            else:
433                converted = hit['_source'].copy()
434            output.hits.append(converted)
435        return output
436
437
438    def query(self, *args, **kwargs):
439        """The arguments to this function are the same as _compose_query."""
440        query = self._compose_query(*args, **kwargs)
441        return self.execute_query(query)
442