1# Copyright (C) 2010, 2012 Google Inc. All rights reserved.
2#
3# Redistribution and use in source and binary forms, with or without
4# modification, are permitted provided that the following conditions are
5# met:
6#
7#     * Redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer.
9#     * Redistributions in binary form must reproduce the above
10# copyright notice, this list of conditions and the following disclaimer
11# in the documentation and/or other materials provided with the
12# distribution.
13#     * Neither the name of Google Inc. nor the names of its
14# contributors may be used to endorse or promote products derived from
15# this software without specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29import logging
30import re
31import StringIO
32import webkitpy.thirdparty.unittest2 as unittest
33
34from webkitpy.layout_tests.views.metered_stream import MeteredStream
35
36
37class RegularTest(unittest.TestCase):
38    verbose = False
39    isatty = False
40
41    def setUp(self):
42        self.stream = StringIO.StringIO()
43        self.buflist = self.stream.buflist
44        self.stream.isatty = lambda: self.isatty
45
46        # configure a logger to test that log calls do normally get included.
47        self.logger = logging.getLogger(__name__)
48        self.logger.setLevel(logging.DEBUG)
49        self.logger.propagate = False
50
51        # add a dummy time counter for a default behavior.
52        self.times = range(10)
53
54        self.meter = MeteredStream(self.stream, self.verbose, self.logger, self.time_fn, 8675)
55
56    def tearDown(self):
57        if self.meter:
58            self.meter.cleanup()
59            self.meter = None
60
61    def time_fn(self):
62        return self.times.pop(0)
63
64    def test_logging_not_included(self):
65        # This tests that if we don't hand a logger to the MeteredStream,
66        # nothing is logged.
67        logging_stream = StringIO.StringIO()
68        handler = logging.StreamHandler(logging_stream)
69        root_logger = logging.getLogger()
70        orig_level = root_logger.level
71        root_logger.addHandler(handler)
72        root_logger.setLevel(logging.DEBUG)
73        try:
74            self.meter = MeteredStream(self.stream, self.verbose, None, self.time_fn, 8675)
75            self.meter.write_throttled_update('foo')
76            self.meter.write_update('bar')
77            self.meter.write('baz')
78            self.assertEqual(logging_stream.buflist, [])
79        finally:
80            root_logger.removeHandler(handler)
81            root_logger.setLevel(orig_level)
82
83    def _basic(self, times):
84        self.times = times
85        self.meter.write_update('foo')
86        self.meter.write_update('bar')
87        self.meter.write_throttled_update('baz')
88        self.meter.write_throttled_update('baz 2')
89        self.meter.writeln('done')
90        self.assertEqual(self.times, [])
91        return self.buflist
92
93    def test_basic(self):
94        buflist = self._basic([0, 1, 2, 13, 14])
95        self.assertEqual(buflist, ['foo\n', 'bar\n', 'baz 2\n', 'done\n'])
96
97    def _log_after_update(self):
98        self.meter.write_update('foo')
99        self.logger.info('bar')
100        return self.buflist
101
102    def test_log_after_update(self):
103        buflist = self._log_after_update()
104        self.assertEqual(buflist, ['foo\n', 'bar\n'])
105
106    def test_log_args(self):
107        self.logger.info('foo %s %d', 'bar', 2)
108        self.assertEqual(self.buflist, ['foo bar 2\n'])
109
110class TtyTest(RegularTest):
111    verbose = False
112    isatty = True
113
114    def test_basic(self):
115        buflist = self._basic([0, 1, 1.05, 1.1, 2])
116        self.assertEqual(buflist, ['foo',
117                                     MeteredStream._erasure('foo'), 'bar',
118                                     MeteredStream._erasure('bar'), 'baz 2',
119                                     MeteredStream._erasure('baz 2'), 'done\n'])
120
121    def test_log_after_update(self):
122        buflist = self._log_after_update()
123        self.assertEqual(buflist, ['foo',
124                                     MeteredStream._erasure('foo'), 'bar\n'])
125
126
127class VerboseTest(RegularTest):
128    isatty = False
129    verbose = True
130
131    def test_basic(self):
132        buflist = self._basic([0, 1, 2.1, 13, 14.1234])
133        # We don't bother to match the hours and minutes of the timestamp since
134        # the local timezone can vary and we can't set that portably and easily.
135        self.assertTrue(re.match('\d\d:\d\d:00.000 8675 foo\n', buflist[0]))
136        self.assertTrue(re.match('\d\d:\d\d:01.000 8675 bar\n', buflist[1]))
137        self.assertTrue(re.match('\d\d:\d\d:13.000 8675 baz 2\n', buflist[2]))
138        self.assertTrue(re.match('\d\d:\d\d:14.123 8675 done\n', buflist[3]))
139        self.assertEqual(len(buflist), 4)
140
141    def test_log_after_update(self):
142        buflist = self._log_after_update()
143        self.assertTrue(re.match('\d\d:\d\d:00.000 8675 foo\n', buflist[0]))
144
145        # The second argument should have a real timestamp and pid, so we just check the format.
146        self.assertTrue(re.match('\d\d:\d\d:\d\d.\d\d\d \d+ bar\n', buflist[1]))
147
148        self.assertEqual(len(buflist), 2)
149
150    def test_log_args(self):
151        self.logger.info('foo %s %d', 'bar', 2)
152        self.assertEqual(len(self.buflist), 1)
153        self.assertTrue(self.buflist[0].endswith('foo bar 2\n'))
154