1#!/usr/bin/python
2#
3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Module to upload a MySQL dump file to Cloud SQL.
8
9Usage:
10  dump_to_cloudsql.py [-h] [--resume NUM] [--user USER] [--passwd PASSWD] FILE
11                      [REMOTE]
12
13  Uploads MySQL dump file to a MySQL database or Cloud SQL. With no optional
14  arguments will connect to localhost as root with an empty password.
15
16  positional arguments:
17    FILE             text dump file containing MySQL commands
18    REMOTE           Cloud SQL instance name or MySQL hostname
19
20  optional arguments:
21    -h, --help       show this help message and exit
22    --resume NUM     resume dump at command NUM
23    --user USER      user (ignored for CloudSQL)
24    --passwd PASSWD  passwd (ignored for CloudSQL)
25"""
26
27from __future__ import division
28import argparse
29import collections
30import datetime
31import os
32import re
33import sys
34import time
35
36
37BYTES_PER_GB = 2**30
38
39
40class MySQLConnectionManager(object):
41    """Manages connections to a MySQL database.
42
43    Vars:
44      factory: A *ConnectionFactory.
45      connected: Whether we currently hold a live DB connection.
46      cmd_num: The number of commands executed.
47    """
48    def __init__(self, connection_factory):
49        self.factory = connection_factory
50        self.connected = False
51        self.cmd_num = 0
52
53    def write(self, data, execute_cmd=True, increment_cmd=False):
54        """Buffers writes to command boundaries.
55
56        Args:
57          data: A line of data from the MySQL dump.
58          execute_cmd: Whether to execute the command, defaults to True.
59          increment_cmd: Whether to increment cmd_num, defaults to False.
60          """
61        if not data or not data.strip() or data == '\n' or data[:2] == '--':
62            return
63        self._cmd += data[:-1] if data[-1] == '\n' else data
64        if self._cmd[-1] != ';':
65            return
66        # Execute command.
67        if execute_cmd:
68            self._cursor.execute(self._cmd.decode('utf-8'))
69        self._cmd = ''
70        if increment_cmd:
71            self.cmd_num += 1
72
73    def disconnect(self):
74      """Closes the current database connection."""
75      if self.connected:
76          self.connected = False
77          self._cursor.close()
78          self._db.close()
79
80    def connect(self):
81      """Creates a new database connection."""
82      self.disconnect()
83      self._db = self.factory.connect()
84      self.connected = True
85      self._cursor = self._db.cursor()
86      self._cmd = ''
87
88
89class CloudSQLConnectionFactory(object):
90    """Creates Cloud SQL database connections."""
91    def __init__(self, cloudsql_instance):
92        self._instance = cloudsql_instance
93
94    def connect(self):
95        """Connects to the Cloud SQL database and returns the connection.
96
97        Returns:
98          A MySQLdb compatible database connection to the Cloud SQL instance.
99        """
100        print 'Connecting to Cloud SQL instance %s.' % self._instance
101        try:
102            from google.storage.speckle.python.api import rdbms_googleapi
103        except ImportError:
104            sys.exit('Unable to import rdbms_googleapi. Add the AppEngine SDK '
105                     'directory to your PYTHONPATH. Download the SDK from: '
106                     'https://developers.google.com/appengine/downloads')
107        return rdbms_googleapi.connect(None, instance=self._instance)
108
109
110class LocalSQLConnectionFactory(object):
111    """Creates local MySQL database connections."""
112    def __init__(self, host=None, user='root', passwd=''):
113        if not host:
114          host = 'localhost'
115        self._host = host
116        self._user = user
117        self._passwd = passwd
118
119    def connect(self):
120        """Connects to the local MySQL database and returns the connection.
121
122        Returns:
123          A MySQLdb database connection to the local MySQL database.
124        """
125        print 'Connecting to mysql at localhost as %s.' % self._user
126        try:
127            import MySQLdb
128        except ImportError:
129            sys.exit('Unable to import MySQLdb. To install on Ubuntu: '
130                     'apt-get install python-mysqldb')
131        return MySQLdb.connect(host=self._host, user=self._user,
132                               passwd=self._passwd)
133
134
135class MySQLState(object):
136    """Maintains the MySQL global state.
137
138    This is a hack that keeps record of all MySQL lines that set global state.
139    These are needed to reconstruct the MySQL state on resume.
140    """
141    _set_regex = re.compile('\S*\s*SET(.*)[\s=]')
142
143    def __init__(self):
144        self._db_line = ''
145        self._table_lock = []
146        self._sets = collections.OrderedDict()
147
148    def process(self, line):
149        """Check and save lines that affect the global state.
150
151        Args:
152          line: A line from the MySQL dump file.
153        """
154        # Most recent USE line.
155        if line[:3] == 'USE':
156            self._db_line = line
157        # SET variables.
158        m = self._set_regex.match(line)
159        if m:
160            self._sets[m.group(1).strip()] = line
161        # Maintain LOCK TABLES
162        if (line[:11] == 'LOCK TABLES' or
163            ('ALTER TABLE' in line and 'DISABLE KEYS' in line)):
164            self._table_lock.append(line)
165        if (line[:14] == 'UNLOCK TABLES;'):
166            self._table_lock = []
167
168    def write(self, out):
169        """Print lines to recreate the saved state.
170
171        Args:
172          out: A File-like object to write out saved state.
173        """
174        out.write(self._db_line)
175        for v in self._sets.itervalues():
176            out.write(v)
177        for l in self._table_lock:
178            out.write(l)
179
180    def breakpoint(self, line):
181      """Returns true if we can handle breaking after this line.
182
183      Args:
184        line: A line from the MySQL dump file.
185
186      Returns:
187        Boolean indicating whether we can break after |line|.
188      """
189      return (line[:28] == '-- Table structure for table' or
190              line[:11] == 'INSERT INTO')
191
192
193def dump_to_cloudsql(dumpfile, manager, cmd_offset=0):
194    """Dumps a MySQL dump file to a database through a MySQLConnectionManager.
195
196    Args:
197      dumpfile: Path to a file from which to read the MySQL dump.
198      manager: An instance of MySQLConnectionManager.
199      cmd_offset: No commands will be executed on the database before this count
200        is reached. Used to continue an uncompleted dump. Defaults to 0.
201    """
202    state = MySQLState()
203    total = os.path.getsize(dumpfile)
204    start_time = time.time()
205    line_num = 0
206    with open(dumpfile, 'r') as dump:
207        for line in dump:
208            line_num += 1
209            if not manager.connected:
210                manager.connect()
211            try:
212                # Construct commands from lines and execute them.
213                state.process(line)
214                if manager.cmd_num == cmd_offset and cmd_offset != 0:
215                    print '\nRecreating state at line: %d' % line_num
216                    state.write(manager)
217                manager.write(line, manager.cmd_num >= cmd_offset, True)
218                # Print status.
219                sys.stdout.write(
220                    '\rstatus:  %.3f%%     %0.2f GB     %d commands ' %
221                    (100 * dump.tell() / total, dump.tell() / BYTES_PER_GB,
222                     manager.cmd_num))
223                sys.stdout.flush()
224            # Handle interrupts and connection failures.
225            except KeyboardInterrupt:
226                print ('\nInterrupted while executing command: %d' %
227                       manager.cmd_num)
228                raise
229            except:
230                print '\nFailed while executing command: %d' % manager.cmd_num
231                delta = int(time.time() - start_time)
232                print 'Total time: %s' % str(datetime.timedelta(seconds=delta))
233                if state.breakpoint(line):
234                    # Attempt to resume.
235                    print ('Execution can resume from here (line = %d)' %
236                           line_num)
237                    manager.cmd_num += 1
238                    cmd_offset = manager.cmd_num
239                    print ('Will now attempt to auto-resume at command: %d' %
240                           cmd_offset)
241                    manager.disconnect()
242                else:
243                    print 'Execution may fail to resume correctly from here.'
244                    print ('Use --resume=%d to attempt to resume the dump.' %
245                           manager.cmd_num)
246                    raise
247    print '\nDone.'
248
249
250if __name__ == '__main__':
251    """Imports a MySQL database from a dump file.
252
253    Interprets command line arguments and calls dump_to_cloudsql appropriately.
254    """
255    description = """Uploads MySQL dump file to a MySQL database or Cloud SQL.
256                  With no optional arguments will connect to localhost as root
257                  with an empty password."""
258    parser = argparse.ArgumentParser(description=description)
259    parser.add_argument('mysqldump', metavar='FILE',
260                        help='text dump file containing MySQL commands')
261    parser.add_argument('remote', default=None, nargs='?', metavar='REMOTE',
262        help='either a Cloud SQL account:instance or a hostname')
263    parser.add_argument('--resume', default=0, type=int, metavar='NUM',
264                        help='resume dump at command NUM')
265    parser.add_argument('--user', default='root', metavar='USER',
266                        help='user (ignored for Cloud SQL)')
267    parser.add_argument('--passwd', default='', metavar='PASSWD',
268                        help='passwd (ignored for Cloud SQL)')
269    args = parser.parse_args()
270    if args.remote and ':' in args.remote:
271        connection = CloudSQLConnectionFactory(args.remote)
272    else:
273        connection = LocalSQLConnectionFactory(args.remote, args.user,
274                                               args.passwd)
275    if args.resume:
276        print 'Resuming execution at command: %d' % options.resume
277    dump_to_cloudsql(args.mysqldump, MySQLConnectionManager(connection),
278                     args.resume)
279