1# Copyright 2014 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import json
6import logging
7import socket
8import threading
9import time
10
11from telemetry.core.backends.chrome import websocket
12
13
14class InspectorWebsocket(object):
15
16  def __init__(self, notification_handler=None, error_handler=None):
17    """Create a websocket handler for communicating with Inspectors.
18
19    Args:
20      notification_handler: A callback for notifications received as a result of
21        calling DispatchNotifications() or StartAsyncDispatchNotifications().
22        Must accept a single JSON object containing the Inspector's
23        notification. May return True to indicate the stop async dispatching.
24      error_handler: A callback for errors in communicating with the Inspector.
25        Must accept a single numeric parameter indicated the time elapsed before
26        the error.
27    """
28    self._socket = None
29    self._thread = None
30    self._cur_socket_timeout = 0
31    self._next_request_id = 0
32    self._notification_handler = notification_handler
33    self._error_handler = error_handler
34
35  @property
36  def is_dispatching_async_notifications(self):
37    return self._thread != None
38
39  def Connect(self, url, timeout=10):
40    assert not self._socket
41    self._socket = websocket.create_connection(url, timeout=timeout)
42    self._cur_socket_timeout = 0
43    self._next_request_id = 0
44
45  def Disconnect(self):
46    if self._socket:
47      self._socket.close()
48      self._socket = None
49
50  def SendAndIgnoreResponse(self, req):
51    req['id'] = self._next_request_id
52    self._next_request_id += 1
53    data = json.dumps(req)
54    self._socket.send(data)
55    logging.debug('sent [%s]', data)
56
57  def SyncRequest(self, req, timeout=10):
58    assert not self._thread, 'Cannot be used during async dispatching.'
59    self.SendAndIgnoreResponse(req)
60
61    while self._socket:
62      res = self._Receive(timeout)
63      if 'id' in res and res['id'] == req['id']:
64        return res
65
66  def DispatchNotifications(self, timeout=10):
67    assert not self._thread, 'Cannot be used during async dispatching.'
68    self._Receive(timeout)
69
70  def StartAsyncDispatchNotifications(self):
71    assert not self._thread, 'Cannot be started twice.'
72    self._thread = threading.Thread(target=self._AsyncDispatcher)
73    self._thread.daemon = True
74    self._thread.start()
75
76  def StopAsyncDispatchNotifications(self):
77    self._thread.join(timeout=30)
78    if self._thread.is_alive():
79      raise RuntimeError('Timed out waiting for async dispatch notifications.')
80    self._thread = None
81
82  def _AsyncDispatcher(self):
83    while self._socket:
84      try:
85        if not self._Receive():
86          break
87      except websocket.WebSocketTimeoutException:
88        pass
89
90  def _SetTimeout(self, timeout):
91    if self._cur_socket_timeout != timeout:
92      self._socket.settimeout(timeout)
93      self._cur_socket_timeout = timeout
94
95  def _Receive(self, timeout=10):
96    self._SetTimeout(timeout)
97    start_time = time.time()
98    try:
99      while self._socket:
100        data = self._socket.recv()
101        res = json.loads(data)
102        logging.debug('got [%s]', data)
103        if 'method' in res and self._notification_handler(res):
104          return None
105        return res
106    except (socket.error, websocket.WebSocketException):
107      elapsed_time = time.time() - start_time
108      self._error_handler(elapsed_time)
109