1# Copyright 2014 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import json 6import logging 7import socket 8import threading 9import time 10 11from telemetry.core.backends.chrome import websocket 12 13 14class InspectorWebsocket(object): 15 16 def __init__(self, notification_handler=None, error_handler=None): 17 """Create a websocket handler for communicating with Inspectors. 18 19 Args: 20 notification_handler: A callback for notifications received as a result of 21 calling DispatchNotifications() or StartAsyncDispatchNotifications(). 22 Must accept a single JSON object containing the Inspector's 23 notification. May return True to indicate the stop async dispatching. 24 error_handler: A callback for errors in communicating with the Inspector. 25 Must accept a single numeric parameter indicated the time elapsed before 26 the error. 27 """ 28 self._socket = None 29 self._thread = None 30 self._cur_socket_timeout = 0 31 self._next_request_id = 0 32 self._notification_handler = notification_handler 33 self._error_handler = error_handler 34 35 @property 36 def is_dispatching_async_notifications(self): 37 return self._thread != None 38 39 def Connect(self, url, timeout=10): 40 assert not self._socket 41 self._socket = websocket.create_connection(url, timeout=timeout) 42 self._cur_socket_timeout = 0 43 self._next_request_id = 0 44 45 def Disconnect(self): 46 if self._socket: 47 self._socket.close() 48 self._socket = None 49 50 def SendAndIgnoreResponse(self, req): 51 req['id'] = self._next_request_id 52 self._next_request_id += 1 53 data = json.dumps(req) 54 self._socket.send(data) 55 logging.debug('sent [%s]', data) 56 57 def SyncRequest(self, req, timeout=10): 58 assert not self._thread, 'Cannot be used during async dispatching.' 59 self.SendAndIgnoreResponse(req) 60 61 while self._socket: 62 res = self._Receive(timeout) 63 if 'id' in res and res['id'] == req['id']: 64 return res 65 66 def DispatchNotifications(self, timeout=10): 67 assert not self._thread, 'Cannot be used during async dispatching.' 68 self._Receive(timeout) 69 70 def StartAsyncDispatchNotifications(self): 71 assert not self._thread, 'Cannot be started twice.' 72 self._thread = threading.Thread(target=self._AsyncDispatcher) 73 self._thread.daemon = True 74 self._thread.start() 75 76 def StopAsyncDispatchNotifications(self): 77 self._thread.join(timeout=30) 78 if self._thread.is_alive(): 79 raise RuntimeError('Timed out waiting for async dispatch notifications.') 80 self._thread = None 81 82 def _AsyncDispatcher(self): 83 while self._socket: 84 try: 85 if not self._Receive(): 86 break 87 except websocket.WebSocketTimeoutException: 88 pass 89 90 def _SetTimeout(self, timeout): 91 if self._cur_socket_timeout != timeout: 92 self._socket.settimeout(timeout) 93 self._cur_socket_timeout = timeout 94 95 def _Receive(self, timeout=10): 96 self._SetTimeout(timeout) 97 start_time = time.time() 98 try: 99 while self._socket: 100 data = self._socket.recv() 101 res = json.loads(data) 102 logging.debug('got [%s]', data) 103 if 'method' in res and self._notification_handler(res): 104 return None 105 return res 106 except (socket.error, websocket.WebSocketException): 107 elapsed_time = time.time() - start_time 108 self._error_handler(elapsed_time) 109