1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5# This file defines helper functions for putting entries into elasticsearch. 6 7"""Utils for sending metadata to elasticsearch 8 9Elasticsearch is a key-value store NOSQL database. 10Source is here: https://github.com/elasticsearch/elasticsearch 11We will be using es to store our metadata. 12 13For example, if we wanted to store the following metadata: 14 15metadata = { 16 'host_id': 1 17 'job_id': 20 18 'time_start': 100000 19 'time_recorded': 100006 20} 21 22The following call will send metadata to the default es server. 23 es_utils.ESMetadata().post(index, metadata) 24We can also specify which port and host to use. 25 26Using for testing: Sometimes, when we choose a single index 27to put entries into, we want to clear that index of all 28entries before running our tests. Use clear_index function. 29(see es_utils_functionaltest.py for an example) 30 31This file also contains methods for sending queries to es. Currently, 32the query (json dict) we send to es is quite complicated (but flexible). 33We've included several methods that composes queries that would be useful. 34These methods are all named create_*_query() 35 36For example, the below query returns job_id, host_id, and job_start 37for all job_ids in [0, 99999] and host_id matching 10. 38 39range_eq_query = { 40 'fields': ['job_id', 'host_id', 'job_start'], 41 'query': { 42 'filtered': { 43 'query': { 44 'match': { 45 'host_id': 10, 46 } 47 } 48 'filter': { 49 'range': { 50 'job_id': { 51 'gte': 0, 52 'lte': 99999, 53 } 54 } 55 } 56 } 57 } 58} 59 60To send a query once it is created, call execute_query() to send it to the 61intended elasticsearch server. 62 63""" 64 65import collections 66import json 67import logging 68import socket 69import time 70 71try: 72 import elasticsearch 73 from elasticsearch import helpers as elasticsearch_helpers 74except ImportError: 75 logging.debug('Failed to import elasticsearch. Mock classes will be used ' 76 'and calls to Elasticsearch server will be no-op. Test run ' 77 'is not affected by the missing elasticsearch module.') 78 import elasticsearch_mock as elasticsearch 79 elasticsearch_helpers = elasticsearch.Elasticsearch() 80 81 82DEFAULT_TIMEOUT = 30 83 84 85class EsUtilException(Exception): 86 """Exception raised when functions here fail. """ 87 pass 88 89 90QueryResult = collections.namedtuple('QueryResult', ['total', 'hits']) 91 92 93class ESMetadata(object): 94 """Class handling es connection for metadata.""" 95 96 @property 97 def es(self): 98 """Read only property, lazily initialized""" 99 if not self._es: 100 self._es = elasticsearch.Elasticsearch(host=self.host, 101 port=self.port, 102 timeout=self.timeout) 103 return self._es 104 105 106 def __init__(self, use_http, host, port, index, udp_port, 107 timeout=DEFAULT_TIMEOUT): 108 """Initialize ESMetadata object. 109 110 @param use_http: Whether to send data to ES using HTTP. 111 @param host: Elasticsearch host. 112 @param port: Elasticsearch port. 113 @param index: What index the metadata is stored in. 114 @param udp_port: What port to use for UDP data. 115 @param timeout: How long to wait while connecting to es. 116 """ 117 self.use_http = use_http 118 self.host = host 119 self.port = port 120 self.index = index 121 self.udp_port = udp_port 122 self.timeout = timeout 123 self._es = None 124 125 126 def _send_data_http(self, type_str, metadata): 127 """Sends data to insert into elasticsearch using HTTP. 128 129 @param type_str: sets the _type field in elasticsearch db. 130 @param metadata: dictionary object containing metadata 131 """ 132 try: 133 self.es.index(index=self.index, doc_type=type_str, body=metadata) 134 except elasticsearch.ElasticsearchException as e: 135 # Mute exceptions from metadata reporting to prevent meta data 136 # reporting errors from killing test. 137 logging.error(e) 138 139 140 def _send_data_udp(self, type_str, metadata): 141 """Sends data to insert into elasticsearch using UDP. 142 143 @param type_str: sets the _type field in elasticsearch db. 144 @param metadata: dictionary object containing metadata 145 """ 146 try: 147 # Header. 148 message = json.dumps( 149 {'index': {'_index': self.index, '_type': type_str}}, 150 separators=(', ', ' : ')) 151 message += '\n' 152 # Metadata. 153 message += json.dumps(metadata, separators=(', ', ' : ')) 154 message += '\n' 155 156 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 157 sock.sendto(message, (self.host, self.udp_port)) 158 except socket.error as e: 159 logging.warn(e) 160 161 162 def post(self, type_str, metadata, log_time_recorded=True, **kwargs): 163 """Wraps call of send_data, inserts entry into elasticsearch. 164 165 @param type_str: Sets the _type field in elasticsearch db. 166 @param metadata: Dictionary object containing metadata 167 @param log_time_recorded: Whether to automatically record the time 168 this metadata is recorded. Default is True. 169 @param kwargs: Additional metadata fields 170 171 @return: True if post action succeeded. Otherwise return False. 172 173 """ 174 if not metadata: 175 return True 176 177 metadata = metadata.copy() 178 metadata.update(kwargs) 179 # metadata should not contain anything with key '_type' 180 if '_type' in metadata: 181 type_str = metadata['_type'] 182 del metadata['_type'] 183 if log_time_recorded: 184 metadata['time_recorded'] = time.time() 185 try: 186 if self.use_http: 187 self._send_data_http(type_str, metadata) 188 else: 189 self._send_data_udp(type_str, metadata) 190 return True 191 except elasticsearch.ElasticsearchException as e: 192 logging.error(e) 193 return False 194 195 196 def bulk_post(self, data_list, log_time_recorded=True, **kwargs): 197 """Wraps call of send_data, inserts entry into elasticsearch. 198 199 @param data_list: A list of dictionary objects containing metadata. 200 @param log_time_recorded: Whether to automatically record the time 201 this metadata is recorded. Default is True. 202 @param kwargs: Additional metadata fields 203 204 @return: True if post action succeeded. Otherwise return False. 205 206 """ 207 if not data_list: 208 return True 209 210 actions = [] 211 for metadata in data_list: 212 metadata = metadata.copy() 213 metadata.update(kwargs) 214 if log_time_recorded and not 'time_recorded' in metadata: 215 metadata['time_recorded'] = time.time() 216 metadata['_index'] = self.index 217 actions.append(metadata) 218 219 try: 220 elasticsearch_helpers.bulk(self.es, actions) 221 return True 222 except elasticsearch.ElasticsearchException as e: 223 logging.error(e) 224 return False 225 226 227 def _compose_query(self, equality_constraints=[], fields_returned=None, 228 range_constraints=[], size=1000000, sort_specs=None, 229 regex_constraints=[], batch_constraints=[]): 230 """Creates a dict. representing multple range and/or equality queries. 231 232 Example input: 233 _compose_query( 234 fields_returned = ['time_recorded', 'hostname', 235 'status', 'dbg_str'], 236 equality_constraints = [ 237 ('_type', 'host_history'), 238 ('hostname', '172.22.169.106'), 239 ], 240 range_constraints = [ 241 ('time_recorded', 1405628341.904379, 1405700341.904379) 242 ], 243 size=20, 244 sort_specs=[ 245 'hostname', 246 {'time_recorded': 'asc'}, 247 ] 248 ) 249 250 Output: 251 { 252 'fields': ['time_recorded', 'hostname', 'status', 'dbg_str'], 253 'query': { 254 'bool': { 255 'minimum_should_match': 3, 256 'should': [ 257 { 258 'term': { 259 '_type': 'host_history' 260 } 261 }, 262 { 263 'term': { 264 'hostname': '172.22.169.106' 265 } 266 }, 267 { 268 'range': { 269 'time_recorded': { 270 'gte': 1405628341.904379, 271 'lte': 1405700341.904379 272 } 273 } 274 } 275 ] 276 }, 277 }, 278 'size': 20 279 'sort': [ 280 'hostname', 281 { 'time_recorded': 'asc'}, 282 ] 283 } 284 285 @param equality_constraints: list of tuples of (field, value) pairs 286 representing what each field should equal to in the query. 287 e.g. [ ('field1', 1), ('field2', 'value') ] 288 @param fields_returned: list of fields that we should return when 289 the query is executed. Set it to None to return all fields. Note 290 that the key/vals will be stored in _source key of the hit object, 291 if fields_returned is set to None. 292 @param range_constraints: list of tuples of (field, low, high) pairs 293 representing what each field should be between (inclusive). 294 e.g. [ ('field1', 2, 10), ('field2', -1, 20) ] 295 If you want one side to be unbounded, you can use None. 296 e.g. [ ('field1', 2, None) ] means value of field1 >= 2. 297 @param size: max number of entries to return. Default is 1000000. 298 @param sort_specs: A list of fields to sort on, tiebreakers will be 299 broken by the next field(s). 300 @param regex_constraints: A list of regex constraints of tuples of 301 (field, value) pairs, e.g., [('filed1', '.*value.*')]. 302 @param batch_constraints: list of tuples of (field, list) pairs 303 representing each field should be equal to one of the values 304 in the list. 305 e.g., [ ('job_id', [10, 11, 12, 13]) ] 306 @returns: dictionary object that represents query to es. 307 This will return None if there are no equality constraints 308 and no range constraints. 309 """ 310 if not equality_constraints and not range_constraints: 311 raise EsUtilException('No range or equality constraints specified.') 312 313 # Creates list of range dictionaries to put in the 'should' list. 314 range_list = [] 315 if range_constraints: 316 for key, low, high in range_constraints: 317 if low is None and high is None: 318 continue 319 temp_dict = {} 320 if low is not None: 321 temp_dict['gte'] = low 322 if high is not None: 323 temp_dict['lte'] = high 324 range_list.append( {'range': {key: temp_dict}}) 325 326 # Creates the list of term dictionaries to put in the 'should' list. 327 eq_list = [{'term': {k: v}} for k, v in equality_constraints if k] 328 batch_list = [{'terms': {k: v}} for k, v in batch_constraints if k] 329 regex_list = [{'regexp': {k: v}} for k, v in regex_constraints if k] 330 constraints = eq_list + batch_list + range_list + regex_list 331 query = { 332 'query': { 333 'bool': { 334 'must': constraints, 335 } 336 }, 337 } 338 if fields_returned: 339 query['fields'] = fields_returned 340 query['size'] = size 341 if sort_specs: 342 query['sort'] = sort_specs 343 return query 344 345 346 def execute_query(self, query): 347 """Makes a query on the given index. 348 349 @param query: query dictionary (see _compose_query) 350 @returns: A QueryResult instance describing the result. 351 352 Example output: 353 { 354 "took" : 5, 355 "timed_out" : false, 356 "_shards" : { 357 "total" : 16, 358 "successful" : 16, 359 "failed" : 0 360 }, 361 "hits" : { 362 "total" : 4, 363 "max_score" : 1.0, 364 "hits" : [ { 365 "_index" : "graphite_metrics2", 366 "_type" : "metric", 367 "_id" : "rtntrjgdsafdsfdsfdsfdsfdssssssss", 368 "_score" : 1.0, 369 "_source":{"target_type": "timer", 370 "host_id": 1, 371 "job_id": 22, 372 "time_start": 400} 373 }, { 374 "_index" : "graphite_metrics2", 375 "_type" : "metric", 376 "_id" : "dfgfddddddddddddddddddddddhhh", 377 "_score" : 1.0, 378 "_source":{"target_type": "timer", 379 "host_id": 2, 380 "job_id": 23, 381 "time_start": 405} 382 }, { 383 "_index" : "graphite_metrics2", 384 "_type" : "metric", 385 "_id" : "erwerwerwewtrewgfednvfngfngfrhfd", 386 "_score" : 1.0, 387 "_source":{"target_type": "timer", 388 "host_id": 3, 389 "job_id": 24, 390 "time_start": 4098} 391 }, { 392 "_index" : "graphite_metrics2", 393 "_type" : "metric", 394 "_id" : "dfherjgwetfrsupbretowegoegheorgsa", 395 "_score" : 1.0, 396 "_source":{"target_type": "timer", 397 "host_id": 22, 398 "job_id": 25, 399 "time_start": 4200} 400 } ] 401 } 402 } 403 404 """ 405 if not self.es.indices.exists(index=self.index): 406 logging.error('Index (%s) does not exist on %s:%s', 407 self.index, self.host, self.port) 408 return None 409 result = self.es.search(index=self.index, body=query, 410 timeout=DEFAULT_TIMEOUT, 411 request_timeout=DEFAULT_TIMEOUT) 412 # Check if all matched records are returned. It could be the size is 413 # set too small. Special case for size set to 1, as that means that 414 # the query cares about the first matched entry. 415 # TODO: Use pagination in Elasticsearch. This needs major change on how 416 # query results are iterated. 417 size = query.get('size', 1) 418 return_count = len(result['hits']['hits']) 419 total_match = result['hits']['total'] 420 if total_match > return_count and size != 1: 421 logging.error('There are %d matched records, only %d entries are ' 422 'returned. Query size is set to %d.', total_match, 423 return_count, size) 424 425 # Extract the actual results from the query. 426 output = QueryResult(total_match, []) 427 for hit in result['hits']['hits']: 428 converted = {} 429 if 'fields' in hit: 430 for key, value in hit['fields'].items(): 431 converted[key] = value[0] if len(value)==1 else value 432 else: 433 converted = hit['_source'].copy() 434 output.hits.append(converted) 435 return output 436 437 438 def query(self, *args, **kwargs): 439 """The arguments to this function are the same as _compose_query.""" 440 query = self._compose_query(*args, **kwargs) 441 return self.execute_query(query) 442