1#!/usr/bin/python
2
3# Copyright 2016 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Queries a MySQL database and emits status metrics to Monarch.
8
9Note: confusingly, 'Innodb_buffer_pool_reads' is actually the cache-misses, not
10the number of reads to the buffer pool.  'Innodb_buffer_pool_read_requests'
11corresponds to the number of reads the the buffer pool.
12"""
13import logging
14import sys
15
16import MySQLdb
17import time
18
19import common
20
21from autotest_lib.client.common_lib import global_config
22from autotest_lib.client.common_lib.cros import retry
23
24from chromite.lib import metrics
25from chromite.lib import ts_mon_config
26
27AT_DIR='/usr/local/autotest'
28DEFAULT_USER = global_config.global_config.get_config_value(
29        'CROS', 'db_backup_user', type=str, default='')
30DEFAULT_PASSWD = global_config.global_config.get_config_value(
31        'CROS', 'db_backup_password', type=str, default='')
32
33LOOP_INTERVAL = 60
34
35EMITTED_STATUSES_COUNTERS = [
36        'bytes_received',
37        'bytes_sent',
38        'connections',
39        'Innodb_buffer_pool_read_requests',
40        'Innodb_buffer_pool_reads',
41        'Innodb_row_lock_waits',
42        'questions',
43        'slow_queries',
44        'threads_created',
45]
46
47EMITTED_STATUS_GAUGES = [
48        'Innodb_row_lock_time_avg',
49        'Innodb_row_lock_current_waits',
50        'threads_running',
51        'threads_connected',
52]
53
54
55class RetryingConnection(object):
56    """Maintains a db connection and a cursor."""
57    INITIAL_SLEEP_SECONDS = 20
58    MAX_TIMEOUT_SECONDS = 60 * 60
59
60    def __init__(self, *args, **kwargs):
61        self.args = args
62        self.kwargs = kwargs
63        self.db = None
64        self.cursor = None
65
66    def Connect(self):
67        """Establishes a MySQL connection and creates a cursor."""
68        self.db = MySQLdb.connect(*self.args, **self.kwargs)
69        self.cursor = self.db.cursor()
70
71    def Reconnect(self):
72        """Attempts to close the connection, then reconnects."""
73        try:
74            self.cursor.close()
75            self.db.close()
76        except MySQLdb.Error:
77            pass
78        self.Connect()
79
80    def RetryWith(self, func):
81        """Run a function, retrying on OperationalError."""
82        return retry.retry(
83            MySQLdb.OperationalError,
84            delay_sec=self.INITIAL_SLEEP_SECONDS,
85            timeout_min=self.MAX_TIMEOUT_SECONDS,
86            callback=self.Reconnect
87        )(func)()
88
89    def Execute(self, *args, **kwargs):
90        """Runs .execute on the cursor, reconnecting on failure."""
91        def _Execute():
92            return self.cursor.execute(*args, **kwargs)
93        return self.RetryWith(_Execute)
94
95    def Fetchall(self):
96        """Runs .fetchall on the cursor."""
97        return self.cursor.fetchall()
98
99
100def GetStatus(connection, status):
101    """Get the status variable from the database, retrying on failure.
102
103    @param connection: MySQLdb cursor to query with.
104    @param status: Name of the status variable.
105    @returns The mysql query result.
106    """
107    connection.Execute('SHOW GLOBAL STATUS LIKE "%s";' % status)
108    output = connection.Fetchall()[0][1]
109
110    if not output:
111        logging.error('Cannot find any global status like %s', status)
112
113    return int(output)
114
115
116def QueryAndEmit(baselines, conn):
117    """Queries MySQL for important stats and emits Monarch metrics
118
119    @param baselines: A dict containing the initial values for the cumulative
120                      metrics.
121    @param conn: The mysql connection object.
122    """
123    for status in EMITTED_STATUSES_COUNTERS:
124        metric_name = 'chromeos/autotest/afe_db/%s' % status.lower()
125        delta = GetStatus(conn, status) - baselines[status]
126        metrics.Counter(metric_name).set(delta)
127
128    for status in EMITTED_STATUS_GAUGES:
129        metric_name = 'chromeos/autotest/afe_db/%s' % status.lower()
130        metrics.Gauge(metric_name).set(GetStatus(conn, status))
131
132    pages_free = GetStatus(conn, 'Innodb_buffer_pool_pages_free')
133    pages_total = GetStatus(conn, 'Innodb_buffer_pool_pages_total')
134
135    metrics.Gauge('chromeos/autotest/afe_db/buffer_pool_pages').set(
136        pages_free, fields={'used': False})
137
138    metrics.Gauge('chromeos/autotest/afe_db/buffer_pool_pages').set(
139        pages_total - pages_free, fields={'used': True})
140
141
142def main():
143    """Sets up ts_mon and repeatedly queries MySQL stats"""
144    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
145    conn = RetryingConnection('localhost', DEFAULT_USER, DEFAULT_PASSWD)
146    conn.Connect()
147
148    # TODO(crbug.com/803566) Use indirect=False to mitigate orphan mysql_stats
149    # processes overwhelming shards.
150    with ts_mon_config.SetupTsMonGlobalState('mysql_stats', indirect=False):
151      QueryLoop(conn)
152
153
154def QueryLoop(conn):
155    """Queries and emits metrics every LOOP_INTERVAL seconds.
156
157    @param conn: The mysql connection object.
158    """
159    # Get the baselines for cumulative metrics. Otherwise the windowed rate at
160    # the very beginning will be extremely high as it shoots up from 0 to its
161    # current value.
162    baselines = dict((s, GetStatus(conn, s))
163                     for s in EMITTED_STATUSES_COUNTERS)
164
165    while True:
166        now = time.time()
167        QueryAndEmit(baselines, conn)
168        time_spent = time.time() - now
169        sleep_duration = LOOP_INTERVAL - time_spent
170        time.sleep(max(0, sleep_duration))
171
172
173if __name__ == '__main__':
174  main()
175