1# Copyright 2001-2017 by Vinay Sajip. All Rights Reserved.
2#
3# Permission to use, copy, modify, and distribute this software and its
4# documentation for any purpose and without fee is hereby granted,
5# provided that the above copyright notice appear in all copies and that
6# both that copyright notice and this permission notice appear in
7# supporting documentation, and that the name of Vinay Sajip
8# not be used in advertising or publicity pertaining to distribution
9# of the software without specific, written prior permission.
10# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
11# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
12# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
13# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
14# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
15# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16
17"""Test harness for the logging module. Run all tests.
18
19Copyright (C) 2001-2017 Vinay Sajip. All Rights Reserved.
20"""
21
22import logging
23import logging.handlers
24import logging.config
25
26import codecs
27import configparser
28import datetime
29import pathlib
30import pickle
31import io
32import gc
33import json
34import os
35import queue
36import random
37import re
38import socket
39import struct
40import sys
41import tempfile
42from test.support.script_helper import assert_python_ok
43from test import support
44import textwrap
45import time
46import unittest
47import warnings
48import weakref
49try:
50    import threading
51    # The following imports are needed only for tests which
52    # require threading
53    import asyncore
54    from http.server import HTTPServer, BaseHTTPRequestHandler
55    import smtpd
56    from urllib.parse import urlparse, parse_qs
57    from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
58                              ThreadingTCPServer, StreamRequestHandler)
59except ImportError:
60    threading = None
61try:
62    import win32evtlog, win32evtlogutil, pywintypes
63except ImportError:
64    win32evtlog = win32evtlogutil = pywintypes = None
65
66try:
67    import zlib
68except ImportError:
69    pass
70
71class BaseTest(unittest.TestCase):
72
73    """Base class for logging tests."""
74
75    log_format = "%(name)s -> %(levelname)s: %(message)s"
76    expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$"
77    message_num = 0
78
79    def setUp(self):
80        """Setup the default logging stream to an internal StringIO instance,
81        so that we can examine log output as we want."""
82        logger_dict = logging.getLogger().manager.loggerDict
83        logging._acquireLock()
84        try:
85            self.saved_handlers = logging._handlers.copy()
86            self.saved_handler_list = logging._handlerList[:]
87            self.saved_loggers = saved_loggers = logger_dict.copy()
88            self.saved_name_to_level = logging._nameToLevel.copy()
89            self.saved_level_to_name = logging._levelToName.copy()
90            self.logger_states = logger_states = {}
91            for name in saved_loggers:
92                logger_states[name] = getattr(saved_loggers[name],
93                                              'disabled', None)
94        finally:
95            logging._releaseLock()
96
97        # Set two unused loggers
98        self.logger1 = logging.getLogger("\xab\xd7\xbb")
99        self.logger2 = logging.getLogger("\u013f\u00d6\u0047")
100
101        self.root_logger = logging.getLogger("")
102        self.original_logging_level = self.root_logger.getEffectiveLevel()
103
104        self.stream = io.StringIO()
105        self.root_logger.setLevel(logging.DEBUG)
106        self.root_hdlr = logging.StreamHandler(self.stream)
107        self.root_formatter = logging.Formatter(self.log_format)
108        self.root_hdlr.setFormatter(self.root_formatter)
109        if self.logger1.hasHandlers():
110            hlist = self.logger1.handlers + self.root_logger.handlers
111            raise AssertionError('Unexpected handlers: %s' % hlist)
112        if self.logger2.hasHandlers():
113            hlist = self.logger2.handlers + self.root_logger.handlers
114            raise AssertionError('Unexpected handlers: %s' % hlist)
115        self.root_logger.addHandler(self.root_hdlr)
116        self.assertTrue(self.logger1.hasHandlers())
117        self.assertTrue(self.logger2.hasHandlers())
118
119    def tearDown(self):
120        """Remove our logging stream, and restore the original logging
121        level."""
122        self.stream.close()
123        self.root_logger.removeHandler(self.root_hdlr)
124        while self.root_logger.handlers:
125            h = self.root_logger.handlers[0]
126            self.root_logger.removeHandler(h)
127            h.close()
128        self.root_logger.setLevel(self.original_logging_level)
129        logging._acquireLock()
130        try:
131            logging._levelToName.clear()
132            logging._levelToName.update(self.saved_level_to_name)
133            logging._nameToLevel.clear()
134            logging._nameToLevel.update(self.saved_name_to_level)
135            logging._handlers.clear()
136            logging._handlers.update(self.saved_handlers)
137            logging._handlerList[:] = self.saved_handler_list
138            loggerDict = logging.getLogger().manager.loggerDict
139            loggerDict.clear()
140            loggerDict.update(self.saved_loggers)
141            logger_states = self.logger_states
142            for name in self.logger_states:
143                if logger_states[name] is not None:
144                    self.saved_loggers[name].disabled = logger_states[name]
145        finally:
146            logging._releaseLock()
147
148    def assert_log_lines(self, expected_values, stream=None, pat=None):
149        """Match the collected log lines against the regular expression
150        self.expected_log_pat, and compare the extracted group values to
151        the expected_values list of tuples."""
152        stream = stream or self.stream
153        pat = re.compile(pat or self.expected_log_pat)
154        actual_lines = stream.getvalue().splitlines()
155        self.assertEqual(len(actual_lines), len(expected_values))
156        for actual, expected in zip(actual_lines, expected_values):
157            match = pat.search(actual)
158            if not match:
159                self.fail("Log line does not match expected pattern:\n" +
160                            actual)
161            self.assertEqual(tuple(match.groups()), expected)
162        s = stream.read()
163        if s:
164            self.fail("Remaining output at end of log stream:\n" + s)
165
166    def next_message(self):
167        """Generate a message consisting solely of an auto-incrementing
168        integer."""
169        self.message_num += 1
170        return "%d" % self.message_num
171
172
173class BuiltinLevelsTest(BaseTest):
174    """Test builtin levels and their inheritance."""
175
176    def test_flat(self):
177        #Logging levels in a flat logger namespace.
178        m = self.next_message
179
180        ERR = logging.getLogger("ERR")
181        ERR.setLevel(logging.ERROR)
182        INF = logging.LoggerAdapter(logging.getLogger("INF"), {})
183        INF.setLevel(logging.INFO)
184        DEB = logging.getLogger("DEB")
185        DEB.setLevel(logging.DEBUG)
186
187        # These should log.
188        ERR.log(logging.CRITICAL, m())
189        ERR.error(m())
190
191        INF.log(logging.CRITICAL, m())
192        INF.error(m())
193        INF.warning(m())
194        INF.info(m())
195
196        DEB.log(logging.CRITICAL, m())
197        DEB.error(m())
198        DEB.warning(m())
199        DEB.info(m())
200        DEB.debug(m())
201
202        # These should not log.
203        ERR.warning(m())
204        ERR.info(m())
205        ERR.debug(m())
206
207        INF.debug(m())
208
209        self.assert_log_lines([
210            ('ERR', 'CRITICAL', '1'),
211            ('ERR', 'ERROR', '2'),
212            ('INF', 'CRITICAL', '3'),
213            ('INF', 'ERROR', '4'),
214            ('INF', 'WARNING', '5'),
215            ('INF', 'INFO', '6'),
216            ('DEB', 'CRITICAL', '7'),
217            ('DEB', 'ERROR', '8'),
218            ('DEB', 'WARNING', '9'),
219            ('DEB', 'INFO', '10'),
220            ('DEB', 'DEBUG', '11'),
221        ])
222
223    def test_nested_explicit(self):
224        # Logging levels in a nested namespace, all explicitly set.
225        m = self.next_message
226
227        INF = logging.getLogger("INF")
228        INF.setLevel(logging.INFO)
229        INF_ERR  = logging.getLogger("INF.ERR")
230        INF_ERR.setLevel(logging.ERROR)
231
232        # These should log.
233        INF_ERR.log(logging.CRITICAL, m())
234        INF_ERR.error(m())
235
236        # These should not log.
237        INF_ERR.warning(m())
238        INF_ERR.info(m())
239        INF_ERR.debug(m())
240
241        self.assert_log_lines([
242            ('INF.ERR', 'CRITICAL', '1'),
243            ('INF.ERR', 'ERROR', '2'),
244        ])
245
246    def test_nested_inherited(self):
247        #Logging levels in a nested namespace, inherited from parent loggers.
248        m = self.next_message
249
250        INF = logging.getLogger("INF")
251        INF.setLevel(logging.INFO)
252        INF_ERR  = logging.getLogger("INF.ERR")
253        INF_ERR.setLevel(logging.ERROR)
254        INF_UNDEF = logging.getLogger("INF.UNDEF")
255        INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF")
256        UNDEF = logging.getLogger("UNDEF")
257
258        # These should log.
259        INF_UNDEF.log(logging.CRITICAL, m())
260        INF_UNDEF.error(m())
261        INF_UNDEF.warning(m())
262        INF_UNDEF.info(m())
263        INF_ERR_UNDEF.log(logging.CRITICAL, m())
264        INF_ERR_UNDEF.error(m())
265
266        # These should not log.
267        INF_UNDEF.debug(m())
268        INF_ERR_UNDEF.warning(m())
269        INF_ERR_UNDEF.info(m())
270        INF_ERR_UNDEF.debug(m())
271
272        self.assert_log_lines([
273            ('INF.UNDEF', 'CRITICAL', '1'),
274            ('INF.UNDEF', 'ERROR', '2'),
275            ('INF.UNDEF', 'WARNING', '3'),
276            ('INF.UNDEF', 'INFO', '4'),
277            ('INF.ERR.UNDEF', 'CRITICAL', '5'),
278            ('INF.ERR.UNDEF', 'ERROR', '6'),
279        ])
280
281    def test_nested_with_virtual_parent(self):
282        # Logging levels when some parent does not exist yet.
283        m = self.next_message
284
285        INF = logging.getLogger("INF")
286        GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF")
287        CHILD = logging.getLogger("INF.BADPARENT")
288        INF.setLevel(logging.INFO)
289
290        # These should log.
291        GRANDCHILD.log(logging.FATAL, m())
292        GRANDCHILD.info(m())
293        CHILD.log(logging.FATAL, m())
294        CHILD.info(m())
295
296        # These should not log.
297        GRANDCHILD.debug(m())
298        CHILD.debug(m())
299
300        self.assert_log_lines([
301            ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'),
302            ('INF.BADPARENT.UNDEF', 'INFO', '2'),
303            ('INF.BADPARENT', 'CRITICAL', '3'),
304            ('INF.BADPARENT', 'INFO', '4'),
305        ])
306
307    def test_regression_22386(self):
308        """See issue #22386 for more information."""
309        self.assertEqual(logging.getLevelName('INFO'), logging.INFO)
310        self.assertEqual(logging.getLevelName(logging.INFO), 'INFO')
311
312    def test_issue27935(self):
313        fatal = logging.getLevelName('FATAL')
314        self.assertEqual(fatal, logging.FATAL)
315
316    def test_regression_29220(self):
317        """See issue #29220 for more information."""
318        logging.addLevelName(logging.INFO, '')
319        self.addCleanup(logging.addLevelName, logging.INFO, 'INFO')
320        self.assertEqual(logging.getLevelName(logging.INFO), '')
321        self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET')
322        self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET)
323
324class BasicFilterTest(BaseTest):
325
326    """Test the bundled Filter class."""
327
328    def test_filter(self):
329        # Only messages satisfying the specified criteria pass through the
330        #  filter.
331        filter_ = logging.Filter("spam.eggs")
332        handler = self.root_logger.handlers[0]
333        try:
334            handler.addFilter(filter_)
335            spam = logging.getLogger("spam")
336            spam_eggs = logging.getLogger("spam.eggs")
337            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
338            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
339
340            spam.info(self.next_message())
341            spam_eggs.info(self.next_message())  # Good.
342            spam_eggs_fish.info(self.next_message())  # Good.
343            spam_bakedbeans.info(self.next_message())
344
345            self.assert_log_lines([
346                ('spam.eggs', 'INFO', '2'),
347                ('spam.eggs.fish', 'INFO', '3'),
348            ])
349        finally:
350            handler.removeFilter(filter_)
351
352    def test_callable_filter(self):
353        # Only messages satisfying the specified criteria pass through the
354        #  filter.
355
356        def filterfunc(record):
357            parts = record.name.split('.')
358            prefix = '.'.join(parts[:2])
359            return prefix == 'spam.eggs'
360
361        handler = self.root_logger.handlers[0]
362        try:
363            handler.addFilter(filterfunc)
364            spam = logging.getLogger("spam")
365            spam_eggs = logging.getLogger("spam.eggs")
366            spam_eggs_fish = logging.getLogger("spam.eggs.fish")
367            spam_bakedbeans = logging.getLogger("spam.bakedbeans")
368
369            spam.info(self.next_message())
370            spam_eggs.info(self.next_message())  # Good.
371            spam_eggs_fish.info(self.next_message())  # Good.
372            spam_bakedbeans.info(self.next_message())
373
374            self.assert_log_lines([
375                ('spam.eggs', 'INFO', '2'),
376                ('spam.eggs.fish', 'INFO', '3'),
377            ])
378        finally:
379            handler.removeFilter(filterfunc)
380
381    def test_empty_filter(self):
382        f = logging.Filter()
383        r = logging.makeLogRecord({'name': 'spam.eggs'})
384        self.assertTrue(f.filter(r))
385
386#
387#   First, we define our levels. There can be as many as you want - the only
388#     limitations are that they should be integers, the lowest should be > 0 and
389#   larger values mean less information being logged. If you need specific
390#   level values which do not fit into these limitations, you can use a
391#   mapping dictionary to convert between your application levels and the
392#   logging system.
393#
394SILENT      = 120
395TACITURN    = 119
396TERSE       = 118
397EFFUSIVE    = 117
398SOCIABLE    = 116
399VERBOSE     = 115
400TALKATIVE   = 114
401GARRULOUS   = 113
402CHATTERBOX  = 112
403BORING      = 111
404
405LEVEL_RANGE = range(BORING, SILENT + 1)
406
407#
408#   Next, we define names for our levels. You don't need to do this - in which
409#   case the system will use "Level n" to denote the text for the level.
410#
411my_logging_levels = {
412    SILENT      : 'Silent',
413    TACITURN    : 'Taciturn',
414    TERSE       : 'Terse',
415    EFFUSIVE    : 'Effusive',
416    SOCIABLE    : 'Sociable',
417    VERBOSE     : 'Verbose',
418    TALKATIVE   : 'Talkative',
419    GARRULOUS   : 'Garrulous',
420    CHATTERBOX  : 'Chatterbox',
421    BORING      : 'Boring',
422}
423
424class GarrulousFilter(logging.Filter):
425
426    """A filter which blocks garrulous messages."""
427
428    def filter(self, record):
429        return record.levelno != GARRULOUS
430
431class VerySpecificFilter(logging.Filter):
432
433    """A filter which blocks sociable and taciturn messages."""
434
435    def filter(self, record):
436        return record.levelno not in [SOCIABLE, TACITURN]
437
438
439class CustomLevelsAndFiltersTest(BaseTest):
440
441    """Test various filtering possibilities with custom logging levels."""
442
443    # Skip the logger name group.
444    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
445
446    def setUp(self):
447        BaseTest.setUp(self)
448        for k, v in my_logging_levels.items():
449            logging.addLevelName(k, v)
450
451    def log_at_all_levels(self, logger):
452        for lvl in LEVEL_RANGE:
453            logger.log(lvl, self.next_message())
454
455    def test_logger_filter(self):
456        # Filter at logger level.
457        self.root_logger.setLevel(VERBOSE)
458        # Levels >= 'Verbose' are good.
459        self.log_at_all_levels(self.root_logger)
460        self.assert_log_lines([
461            ('Verbose', '5'),
462            ('Sociable', '6'),
463            ('Effusive', '7'),
464            ('Terse', '8'),
465            ('Taciturn', '9'),
466            ('Silent', '10'),
467        ])
468
469    def test_handler_filter(self):
470        # Filter at handler level.
471        self.root_logger.handlers[0].setLevel(SOCIABLE)
472        try:
473            # Levels >= 'Sociable' are good.
474            self.log_at_all_levels(self.root_logger)
475            self.assert_log_lines([
476                ('Sociable', '6'),
477                ('Effusive', '7'),
478                ('Terse', '8'),
479                ('Taciturn', '9'),
480                ('Silent', '10'),
481            ])
482        finally:
483            self.root_logger.handlers[0].setLevel(logging.NOTSET)
484
485    def test_specific_filters(self):
486        # Set a specific filter object on the handler, and then add another
487        #  filter object on the logger itself.
488        handler = self.root_logger.handlers[0]
489        specific_filter = None
490        garr = GarrulousFilter()
491        handler.addFilter(garr)
492        try:
493            self.log_at_all_levels(self.root_logger)
494            first_lines = [
495                # Notice how 'Garrulous' is missing
496                ('Boring', '1'),
497                ('Chatterbox', '2'),
498                ('Talkative', '4'),
499                ('Verbose', '5'),
500                ('Sociable', '6'),
501                ('Effusive', '7'),
502                ('Terse', '8'),
503                ('Taciturn', '9'),
504                ('Silent', '10'),
505            ]
506            self.assert_log_lines(first_lines)
507
508            specific_filter = VerySpecificFilter()
509            self.root_logger.addFilter(specific_filter)
510            self.log_at_all_levels(self.root_logger)
511            self.assert_log_lines(first_lines + [
512                # Not only 'Garrulous' is still missing, but also 'Sociable'
513                # and 'Taciturn'
514                ('Boring', '11'),
515                ('Chatterbox', '12'),
516                ('Talkative', '14'),
517                ('Verbose', '15'),
518                ('Effusive', '17'),
519                ('Terse', '18'),
520                ('Silent', '20'),
521        ])
522        finally:
523            if specific_filter:
524                self.root_logger.removeFilter(specific_filter)
525            handler.removeFilter(garr)
526
527
528class HandlerTest(BaseTest):
529    def test_name(self):
530        h = logging.Handler()
531        h.name = 'generic'
532        self.assertEqual(h.name, 'generic')
533        h.name = 'anothergeneric'
534        self.assertEqual(h.name, 'anothergeneric')
535        self.assertRaises(NotImplementedError, h.emit, None)
536
537    def test_builtin_handlers(self):
538        # We can't actually *use* too many handlers in the tests,
539        # but we can try instantiating them with various options
540        if sys.platform in ('linux', 'darwin'):
541            for existing in (True, False):
542                fd, fn = tempfile.mkstemp()
543                os.close(fd)
544                if not existing:
545                    os.unlink(fn)
546                h = logging.handlers.WatchedFileHandler(fn, delay=True)
547                if existing:
548                    dev, ino = h.dev, h.ino
549                    self.assertEqual(dev, -1)
550                    self.assertEqual(ino, -1)
551                    r = logging.makeLogRecord({'msg': 'Test'})
552                    h.handle(r)
553                    # Now remove the file.
554                    os.unlink(fn)
555                    self.assertFalse(os.path.exists(fn))
556                    # The next call should recreate the file.
557                    h.handle(r)
558                    self.assertTrue(os.path.exists(fn))
559                else:
560                    self.assertEqual(h.dev, -1)
561                    self.assertEqual(h.ino, -1)
562                h.close()
563                if existing:
564                    os.unlink(fn)
565            if sys.platform == 'darwin':
566                sockname = '/var/run/syslog'
567            else:
568                sockname = '/dev/log'
569            try:
570                h = logging.handlers.SysLogHandler(sockname)
571                self.assertEqual(h.facility, h.LOG_USER)
572                self.assertTrue(h.unixsocket)
573                h.close()
574            except OSError: # syslogd might not be available
575                pass
576        for method in ('GET', 'POST', 'PUT'):
577            if method == 'PUT':
578                self.assertRaises(ValueError, logging.handlers.HTTPHandler,
579                                  'localhost', '/log', method)
580            else:
581                h = logging.handlers.HTTPHandler('localhost', '/log', method)
582                h.close()
583        h = logging.handlers.BufferingHandler(0)
584        r = logging.makeLogRecord({})
585        self.assertTrue(h.shouldFlush(r))
586        h.close()
587        h = logging.handlers.BufferingHandler(1)
588        self.assertFalse(h.shouldFlush(r))
589        h.close()
590
591    def test_path_objects(self):
592        """
593        Test that Path objects are accepted as filename arguments to handlers.
594
595        See Issue #27493.
596        """
597        fd, fn = tempfile.mkstemp()
598        os.close(fd)
599        os.unlink(fn)
600        pfn = pathlib.Path(fn)
601        cases = (
602                    (logging.FileHandler, (pfn, 'w')),
603                    (logging.handlers.RotatingFileHandler, (pfn, 'a')),
604                    (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')),
605                )
606        if sys.platform in ('linux', 'darwin'):
607            cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),)
608        for cls, args in cases:
609            h = cls(*args)
610            self.assertTrue(os.path.exists(fn))
611            h.close()
612            os.unlink(fn)
613
614    @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.')
615    @unittest.skipUnless(threading, 'Threading required for this test.')
616    def test_race(self):
617        # Issue #14632 refers.
618        def remove_loop(fname, tries):
619            for _ in range(tries):
620                try:
621                    os.unlink(fname)
622                    self.deletion_time = time.time()
623                except OSError:
624                    pass
625                time.sleep(0.004 * random.randint(0, 4))
626
627        del_count = 500
628        log_count = 500
629
630        self.handle_time = None
631        self.deletion_time = None
632
633        for delay in (False, True):
634            fd, fn = tempfile.mkstemp('.log', 'test_logging-3-')
635            os.close(fd)
636            remover = threading.Thread(target=remove_loop, args=(fn, del_count))
637            remover.daemon = True
638            remover.start()
639            h = logging.handlers.WatchedFileHandler(fn, delay=delay)
640            f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s')
641            h.setFormatter(f)
642            try:
643                for _ in range(log_count):
644                    time.sleep(0.005)
645                    r = logging.makeLogRecord({'msg': 'testing' })
646                    try:
647                        self.handle_time = time.time()
648                        h.handle(r)
649                    except Exception:
650                        print('Deleted at %s, '
651                              'opened at %s' % (self.deletion_time,
652                                                self.handle_time))
653                        raise
654            finally:
655                remover.join()
656                h.close()
657                if os.path.exists(fn):
658                    os.unlink(fn)
659
660
661class BadStream(object):
662    def write(self, data):
663        raise RuntimeError('deliberate mistake')
664
665class TestStreamHandler(logging.StreamHandler):
666    def handleError(self, record):
667        self.error_record = record
668
669class StreamHandlerTest(BaseTest):
670    def test_error_handling(self):
671        h = TestStreamHandler(BadStream())
672        r = logging.makeLogRecord({})
673        old_raise = logging.raiseExceptions
674
675        try:
676            h.handle(r)
677            self.assertIs(h.error_record, r)
678
679            h = logging.StreamHandler(BadStream())
680            with support.captured_stderr() as stderr:
681                h.handle(r)
682                msg = '\nRuntimeError: deliberate mistake\n'
683                self.assertIn(msg, stderr.getvalue())
684
685            logging.raiseExceptions = False
686            with support.captured_stderr() as stderr:
687                h.handle(r)
688                self.assertEqual('', stderr.getvalue())
689        finally:
690            logging.raiseExceptions = old_raise
691
692# -- The following section could be moved into a server_helper.py module
693# -- if it proves to be of wider utility than just test_logging
694
695if threading:
696    class TestSMTPServer(smtpd.SMTPServer):
697        """
698        This class implements a test SMTP server.
699
700        :param addr: A (host, port) tuple which the server listens on.
701                     You can specify a port value of zero: the server's
702                     *port* attribute will hold the actual port number
703                     used, which can be used in client connections.
704        :param handler: A callable which will be called to process
705                        incoming messages. The handler will be passed
706                        the client address tuple, who the message is from,
707                        a list of recipients and the message data.
708        :param poll_interval: The interval, in seconds, used in the underlying
709                              :func:`select` or :func:`poll` call by
710                              :func:`asyncore.loop`.
711        :param sockmap: A dictionary which will be used to hold
712                        :class:`asyncore.dispatcher` instances used by
713                        :func:`asyncore.loop`. This avoids changing the
714                        :mod:`asyncore` module's global state.
715        """
716
717        def __init__(self, addr, handler, poll_interval, sockmap):
718            smtpd.SMTPServer.__init__(self, addr, None, map=sockmap,
719                                      decode_data=True)
720            self.port = self.socket.getsockname()[1]
721            self._handler = handler
722            self._thread = None
723            self.poll_interval = poll_interval
724
725        def process_message(self, peer, mailfrom, rcpttos, data):
726            """
727            Delegates to the handler passed in to the server's constructor.
728
729            Typically, this will be a test case method.
730            :param peer: The client (host, port) tuple.
731            :param mailfrom: The address of the sender.
732            :param rcpttos: The addresses of the recipients.
733            :param data: The message.
734            """
735            self._handler(peer, mailfrom, rcpttos, data)
736
737        def start(self):
738            """
739            Start the server running on a separate daemon thread.
740            """
741            self._thread = t = threading.Thread(target=self.serve_forever,
742                                                args=(self.poll_interval,))
743            t.setDaemon(True)
744            t.start()
745
746        def serve_forever(self, poll_interval):
747            """
748            Run the :mod:`asyncore` loop until normal termination
749            conditions arise.
750            :param poll_interval: The interval, in seconds, used in the underlying
751                                  :func:`select` or :func:`poll` call by
752                                  :func:`asyncore.loop`.
753            """
754            try:
755                asyncore.loop(poll_interval, map=self._map)
756            except OSError:
757                # On FreeBSD 8, closing the server repeatably
758                # raises this error. We swallow it if the
759                # server has been closed.
760                if self.connected or self.accepting:
761                    raise
762
763        def stop(self, timeout=None):
764            """
765            Stop the thread by closing the server instance.
766            Wait for the server thread to terminate.
767
768            :param timeout: How long to wait for the server thread
769                            to terminate.
770            """
771            self.close()
772            self._thread.join(timeout)
773            self._thread = None
774
775    class ControlMixin(object):
776        """
777        This mixin is used to start a server on a separate thread, and
778        shut it down programmatically. Request handling is simplified - instead
779        of needing to derive a suitable RequestHandler subclass, you just
780        provide a callable which will be passed each received request to be
781        processed.
782
783        :param handler: A handler callable which will be called with a
784                        single parameter - the request - in order to
785                        process the request. This handler is called on the
786                        server thread, effectively meaning that requests are
787                        processed serially. While not quite Web scale ;-),
788                        this should be fine for testing applications.
789        :param poll_interval: The polling interval in seconds.
790        """
791        def __init__(self, handler, poll_interval):
792            self._thread = None
793            self.poll_interval = poll_interval
794            self._handler = handler
795            self.ready = threading.Event()
796
797        def start(self):
798            """
799            Create a daemon thread to run the server, and start it.
800            """
801            self._thread = t = threading.Thread(target=self.serve_forever,
802                                                args=(self.poll_interval,))
803            t.setDaemon(True)
804            t.start()
805
806        def serve_forever(self, poll_interval):
807            """
808            Run the server. Set the ready flag before entering the
809            service loop.
810            """
811            self.ready.set()
812            super(ControlMixin, self).serve_forever(poll_interval)
813
814        def stop(self, timeout=None):
815            """
816            Tell the server thread to stop, and wait for it to do so.
817
818            :param timeout: How long to wait for the server thread
819                            to terminate.
820            """
821            self.shutdown()
822            if self._thread is not None:
823                self._thread.join(timeout)
824                self._thread = None
825            self.server_close()
826            self.ready.clear()
827
828    class TestHTTPServer(ControlMixin, HTTPServer):
829        """
830        An HTTP server which is controllable using :class:`ControlMixin`.
831
832        :param addr: A tuple with the IP address and port to listen on.
833        :param handler: A handler callable which will be called with a
834                        single parameter - the request - in order to
835                        process the request.
836        :param poll_interval: The polling interval in seconds.
837        :param log: Pass ``True`` to enable log messages.
838        """
839        def __init__(self, addr, handler, poll_interval=0.5,
840                     log=False, sslctx=None):
841            class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler):
842                def __getattr__(self, name, default=None):
843                    if name.startswith('do_'):
844                        return self.process_request
845                    raise AttributeError(name)
846
847                def process_request(self):
848                    self.server._handler(self)
849
850                def log_message(self, format, *args):
851                    if log:
852                        super(DelegatingHTTPRequestHandler,
853                              self).log_message(format, *args)
854            HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler)
855            ControlMixin.__init__(self, handler, poll_interval)
856            self.sslctx = sslctx
857
858        def get_request(self):
859            try:
860                sock, addr = self.socket.accept()
861                if self.sslctx:
862                    sock = self.sslctx.wrap_socket(sock, server_side=True)
863            except OSError as e:
864                # socket errors are silenced by the caller, print them here
865                sys.stderr.write("Got an error:\n%s\n" % e)
866                raise
867            return sock, addr
868
869    class TestTCPServer(ControlMixin, ThreadingTCPServer):
870        """
871        A TCP server which is controllable using :class:`ControlMixin`.
872
873        :param addr: A tuple with the IP address and port to listen on.
874        :param handler: A handler callable which will be called with a single
875                        parameter - the request - in order to process the request.
876        :param poll_interval: The polling interval in seconds.
877        :bind_and_activate: If True (the default), binds the server and starts it
878                            listening. If False, you need to call
879                            :meth:`server_bind` and :meth:`server_activate` at
880                            some later time before calling :meth:`start`, so that
881                            the server will set up the socket and listen on it.
882        """
883
884        allow_reuse_address = True
885
886        def __init__(self, addr, handler, poll_interval=0.5,
887                     bind_and_activate=True):
888            class DelegatingTCPRequestHandler(StreamRequestHandler):
889
890                def handle(self):
891                    self.server._handler(self)
892            ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler,
893                                        bind_and_activate)
894            ControlMixin.__init__(self, handler, poll_interval)
895
896        def server_bind(self):
897            super(TestTCPServer, self).server_bind()
898            self.port = self.socket.getsockname()[1]
899
900    class TestUDPServer(ControlMixin, ThreadingUDPServer):
901        """
902        A UDP server which is controllable using :class:`ControlMixin`.
903
904        :param addr: A tuple with the IP address and port to listen on.
905        :param handler: A handler callable which will be called with a
906                        single parameter - the request - in order to
907                        process the request.
908        :param poll_interval: The polling interval for shutdown requests,
909                              in seconds.
910        :bind_and_activate: If True (the default), binds the server and
911                            starts it listening. If False, you need to
912                            call :meth:`server_bind` and
913                            :meth:`server_activate` at some later time
914                            before calling :meth:`start`, so that the server will
915                            set up the socket and listen on it.
916        """
917        def __init__(self, addr, handler, poll_interval=0.5,
918                     bind_and_activate=True):
919            class DelegatingUDPRequestHandler(DatagramRequestHandler):
920
921                def handle(self):
922                    self.server._handler(self)
923
924                def finish(self):
925                    data = self.wfile.getvalue()
926                    if data:
927                        try:
928                            super(DelegatingUDPRequestHandler, self).finish()
929                        except OSError:
930                            if not self.server._closed:
931                                raise
932
933            ThreadingUDPServer.__init__(self, addr,
934                                        DelegatingUDPRequestHandler,
935                                        bind_and_activate)
936            ControlMixin.__init__(self, handler, poll_interval)
937            self._closed = False
938
939        def server_bind(self):
940            super(TestUDPServer, self).server_bind()
941            self.port = self.socket.getsockname()[1]
942
943        def server_close(self):
944            super(TestUDPServer, self).server_close()
945            self._closed = True
946
947    if hasattr(socket, "AF_UNIX"):
948        class TestUnixStreamServer(TestTCPServer):
949            address_family = socket.AF_UNIX
950
951        class TestUnixDatagramServer(TestUDPServer):
952            address_family = socket.AF_UNIX
953
954# - end of server_helper section
955
956@unittest.skipUnless(threading, 'Threading required for this test.')
957class SMTPHandlerTest(BaseTest):
958    TIMEOUT = 8.0
959    def test_basic(self):
960        sockmap = {}
961        server = TestSMTPServer((support.HOST, 0), self.process_message, 0.001,
962                                sockmap)
963        server.start()
964        addr = (support.HOST, server.port)
965        h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log',
966                                         timeout=self.TIMEOUT)
967        self.assertEqual(h.toaddrs, ['you'])
968        self.messages = []
969        r = logging.makeLogRecord({'msg': 'Hello \u2713'})
970        self.handled = threading.Event()
971        h.handle(r)
972        self.handled.wait(self.TIMEOUT)  # 14314: don't wait forever
973        server.stop()
974        self.assertTrue(self.handled.is_set())
975        self.assertEqual(len(self.messages), 1)
976        peer, mailfrom, rcpttos, data = self.messages[0]
977        self.assertEqual(mailfrom, 'me')
978        self.assertEqual(rcpttos, ['you'])
979        self.assertIn('\nSubject: Log\n', data)
980        self.assertTrue(data.endswith('\n\nHello \u2713'))
981        h.close()
982
983    def process_message(self, *args):
984        self.messages.append(args)
985        self.handled.set()
986
987class MemoryHandlerTest(BaseTest):
988
989    """Tests for the MemoryHandler."""
990
991    # Do not bother with a logger name group.
992    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
993
994    def setUp(self):
995        BaseTest.setUp(self)
996        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
997                                                       self.root_hdlr)
998        self.mem_logger = logging.getLogger('mem')
999        self.mem_logger.propagate = 0
1000        self.mem_logger.addHandler(self.mem_hdlr)
1001
1002    def tearDown(self):
1003        self.mem_hdlr.close()
1004        BaseTest.tearDown(self)
1005
1006    def test_flush(self):
1007        # The memory handler flushes to its target handler based on specific
1008        #  criteria (message count and message level).
1009        self.mem_logger.debug(self.next_message())
1010        self.assert_log_lines([])
1011        self.mem_logger.info(self.next_message())
1012        self.assert_log_lines([])
1013        # This will flush because the level is >= logging.WARNING
1014        self.mem_logger.warning(self.next_message())
1015        lines = [
1016            ('DEBUG', '1'),
1017            ('INFO', '2'),
1018            ('WARNING', '3'),
1019        ]
1020        self.assert_log_lines(lines)
1021        for n in (4, 14):
1022            for i in range(9):
1023                self.mem_logger.debug(self.next_message())
1024            self.assert_log_lines(lines)
1025            # This will flush because it's the 10th message since the last
1026            #  flush.
1027            self.mem_logger.debug(self.next_message())
1028            lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)]
1029            self.assert_log_lines(lines)
1030
1031        self.mem_logger.debug(self.next_message())
1032        self.assert_log_lines(lines)
1033
1034    def test_flush_on_close(self):
1035        """
1036        Test that the flush-on-close configuration works as expected.
1037        """
1038        self.mem_logger.debug(self.next_message())
1039        self.assert_log_lines([])
1040        self.mem_logger.info(self.next_message())
1041        self.assert_log_lines([])
1042        self.mem_logger.removeHandler(self.mem_hdlr)
1043        # Default behaviour is to flush on close. Check that it happens.
1044        self.mem_hdlr.close()
1045        lines = [
1046            ('DEBUG', '1'),
1047            ('INFO', '2'),
1048        ]
1049        self.assert_log_lines(lines)
1050        # Now configure for flushing not to be done on close.
1051        self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING,
1052                                                       self.root_hdlr,
1053                                                       False)
1054        self.mem_logger.addHandler(self.mem_hdlr)
1055        self.mem_logger.debug(self.next_message())
1056        self.assert_log_lines(lines)  # no change
1057        self.mem_logger.info(self.next_message())
1058        self.assert_log_lines(lines)  # no change
1059        self.mem_logger.removeHandler(self.mem_hdlr)
1060        self.mem_hdlr.close()
1061        # assert that no new lines have been added
1062        self.assert_log_lines(lines)  # no change
1063
1064
1065class ExceptionFormatter(logging.Formatter):
1066    """A special exception formatter."""
1067    def formatException(self, ei):
1068        return "Got a [%s]" % ei[0].__name__
1069
1070
1071class ConfigFileTest(BaseTest):
1072
1073    """Reading logging config from a .ini-style config file."""
1074
1075    expected_log_pat = r"^(\w+) \+\+ (\w+)$"
1076
1077    # config0 is a standard configuration.
1078    config0 = """
1079    [loggers]
1080    keys=root
1081
1082    [handlers]
1083    keys=hand1
1084
1085    [formatters]
1086    keys=form1
1087
1088    [logger_root]
1089    level=WARNING
1090    handlers=hand1
1091
1092    [handler_hand1]
1093    class=StreamHandler
1094    level=NOTSET
1095    formatter=form1
1096    args=(sys.stdout,)
1097
1098    [formatter_form1]
1099    format=%(levelname)s ++ %(message)s
1100    datefmt=
1101    """
1102
1103    # config1 adds a little to the standard configuration.
1104    config1 = """
1105    [loggers]
1106    keys=root,parser
1107
1108    [handlers]
1109    keys=hand1
1110
1111    [formatters]
1112    keys=form1
1113
1114    [logger_root]
1115    level=WARNING
1116    handlers=
1117
1118    [logger_parser]
1119    level=DEBUG
1120    handlers=hand1
1121    propagate=1
1122    qualname=compiler.parser
1123
1124    [handler_hand1]
1125    class=StreamHandler
1126    level=NOTSET
1127    formatter=form1
1128    args=(sys.stdout,)
1129
1130    [formatter_form1]
1131    format=%(levelname)s ++ %(message)s
1132    datefmt=
1133    """
1134
1135    # config1a moves the handler to the root.
1136    config1a = """
1137    [loggers]
1138    keys=root,parser
1139
1140    [handlers]
1141    keys=hand1
1142
1143    [formatters]
1144    keys=form1
1145
1146    [logger_root]
1147    level=WARNING
1148    handlers=hand1
1149
1150    [logger_parser]
1151    level=DEBUG
1152    handlers=
1153    propagate=1
1154    qualname=compiler.parser
1155
1156    [handler_hand1]
1157    class=StreamHandler
1158    level=NOTSET
1159    formatter=form1
1160    args=(sys.stdout,)
1161
1162    [formatter_form1]
1163    format=%(levelname)s ++ %(message)s
1164    datefmt=
1165    """
1166
1167    # config2 has a subtle configuration error that should be reported
1168    config2 = config1.replace("sys.stdout", "sys.stbout")
1169
1170    # config3 has a less subtle configuration error
1171    config3 = config1.replace("formatter=form1", "formatter=misspelled_name")
1172
1173    # config4 specifies a custom formatter class to be loaded
1174    config4 = """
1175    [loggers]
1176    keys=root
1177
1178    [handlers]
1179    keys=hand1
1180
1181    [formatters]
1182    keys=form1
1183
1184    [logger_root]
1185    level=NOTSET
1186    handlers=hand1
1187
1188    [handler_hand1]
1189    class=StreamHandler
1190    level=NOTSET
1191    formatter=form1
1192    args=(sys.stdout,)
1193
1194    [formatter_form1]
1195    class=""" + __name__ + """.ExceptionFormatter
1196    format=%(levelname)s:%(name)s:%(message)s
1197    datefmt=
1198    """
1199
1200    # config5 specifies a custom handler class to be loaded
1201    config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler')
1202
1203    # config6 uses ', ' delimiters in the handlers and formatters sections
1204    config6 = """
1205    [loggers]
1206    keys=root,parser
1207
1208    [handlers]
1209    keys=hand1, hand2
1210
1211    [formatters]
1212    keys=form1, form2
1213
1214    [logger_root]
1215    level=WARNING
1216    handlers=
1217
1218    [logger_parser]
1219    level=DEBUG
1220    handlers=hand1
1221    propagate=1
1222    qualname=compiler.parser
1223
1224    [handler_hand1]
1225    class=StreamHandler
1226    level=NOTSET
1227    formatter=form1
1228    args=(sys.stdout,)
1229
1230    [handler_hand2]
1231    class=StreamHandler
1232    level=NOTSET
1233    formatter=form1
1234    args=(sys.stderr,)
1235
1236    [formatter_form1]
1237    format=%(levelname)s ++ %(message)s
1238    datefmt=
1239
1240    [formatter_form2]
1241    format=%(message)s
1242    datefmt=
1243    """
1244
1245    # config7 adds a compiler logger.
1246    config7 = """
1247    [loggers]
1248    keys=root,parser,compiler
1249
1250    [handlers]
1251    keys=hand1
1252
1253    [formatters]
1254    keys=form1
1255
1256    [logger_root]
1257    level=WARNING
1258    handlers=hand1
1259
1260    [logger_compiler]
1261    level=DEBUG
1262    handlers=
1263    propagate=1
1264    qualname=compiler
1265
1266    [logger_parser]
1267    level=DEBUG
1268    handlers=
1269    propagate=1
1270    qualname=compiler.parser
1271
1272    [handler_hand1]
1273    class=StreamHandler
1274    level=NOTSET
1275    formatter=form1
1276    args=(sys.stdout,)
1277
1278    [formatter_form1]
1279    format=%(levelname)s ++ %(message)s
1280    datefmt=
1281    """
1282
1283    disable_test = """
1284    [loggers]
1285    keys=root
1286
1287    [handlers]
1288    keys=screen
1289
1290    [formatters]
1291    keys=
1292
1293    [logger_root]
1294    level=DEBUG
1295    handlers=screen
1296
1297    [handler_screen]
1298    level=DEBUG
1299    class=StreamHandler
1300    args=(sys.stdout,)
1301    formatter=
1302    """
1303
1304    def apply_config(self, conf, **kwargs):
1305        file = io.StringIO(textwrap.dedent(conf))
1306        logging.config.fileConfig(file, **kwargs)
1307
1308    def test_config0_ok(self):
1309        # A simple config file which overrides the default settings.
1310        with support.captured_stdout() as output:
1311            self.apply_config(self.config0)
1312            logger = logging.getLogger()
1313            # Won't output anything
1314            logger.info(self.next_message())
1315            # Outputs a message
1316            logger.error(self.next_message())
1317            self.assert_log_lines([
1318                ('ERROR', '2'),
1319            ], stream=output)
1320            # Original logger output is empty.
1321            self.assert_log_lines([])
1322
1323    def test_config0_using_cp_ok(self):
1324        # A simple config file which overrides the default settings.
1325        with support.captured_stdout() as output:
1326            file = io.StringIO(textwrap.dedent(self.config0))
1327            cp = configparser.ConfigParser()
1328            cp.read_file(file)
1329            logging.config.fileConfig(cp)
1330            logger = logging.getLogger()
1331            # Won't output anything
1332            logger.info(self.next_message())
1333            # Outputs a message
1334            logger.error(self.next_message())
1335            self.assert_log_lines([
1336                ('ERROR', '2'),
1337            ], stream=output)
1338            # Original logger output is empty.
1339            self.assert_log_lines([])
1340
1341    def test_config1_ok(self, config=config1):
1342        # A config file defining a sub-parser as well.
1343        with support.captured_stdout() as output:
1344            self.apply_config(config)
1345            logger = logging.getLogger("compiler.parser")
1346            # Both will output a message
1347            logger.info(self.next_message())
1348            logger.error(self.next_message())
1349            self.assert_log_lines([
1350                ('INFO', '1'),
1351                ('ERROR', '2'),
1352            ], stream=output)
1353            # Original logger output is empty.
1354            self.assert_log_lines([])
1355
1356    def test_config2_failure(self):
1357        # A simple config file which overrides the default settings.
1358        self.assertRaises(Exception, self.apply_config, self.config2)
1359
1360    def test_config3_failure(self):
1361        # A simple config file which overrides the default settings.
1362        self.assertRaises(Exception, self.apply_config, self.config3)
1363
1364    def test_config4_ok(self):
1365        # A config file specifying a custom formatter class.
1366        with support.captured_stdout() as output:
1367            self.apply_config(self.config4)
1368            logger = logging.getLogger()
1369            try:
1370                raise RuntimeError()
1371            except RuntimeError:
1372                logging.exception("just testing")
1373            sys.stdout.seek(0)
1374            self.assertEqual(output.getvalue(),
1375                "ERROR:root:just testing\nGot a [RuntimeError]\n")
1376            # Original logger output is empty
1377            self.assert_log_lines([])
1378
1379    def test_config5_ok(self):
1380        self.test_config1_ok(config=self.config5)
1381
1382    def test_config6_ok(self):
1383        self.test_config1_ok(config=self.config6)
1384
1385    def test_config7_ok(self):
1386        with support.captured_stdout() as output:
1387            self.apply_config(self.config1a)
1388            logger = logging.getLogger("compiler.parser")
1389            # See issue #11424. compiler-hyphenated sorts
1390            # between compiler and compiler.xyz and this
1391            # was preventing compiler.xyz from being included
1392            # in the child loggers of compiler because of an
1393            # overzealous loop termination condition.
1394            hyphenated = logging.getLogger('compiler-hyphenated')
1395            # All will output a message
1396            logger.info(self.next_message())
1397            logger.error(self.next_message())
1398            hyphenated.critical(self.next_message())
1399            self.assert_log_lines([
1400                ('INFO', '1'),
1401                ('ERROR', '2'),
1402                ('CRITICAL', '3'),
1403            ], stream=output)
1404            # Original logger output is empty.
1405            self.assert_log_lines([])
1406        with support.captured_stdout() as output:
1407            self.apply_config(self.config7)
1408            logger = logging.getLogger("compiler.parser")
1409            self.assertFalse(logger.disabled)
1410            # Both will output a message
1411            logger.info(self.next_message())
1412            logger.error(self.next_message())
1413            logger = logging.getLogger("compiler.lexer")
1414            # Both will output a message
1415            logger.info(self.next_message())
1416            logger.error(self.next_message())
1417            # Will not appear
1418            hyphenated.critical(self.next_message())
1419            self.assert_log_lines([
1420                ('INFO', '4'),
1421                ('ERROR', '5'),
1422                ('INFO', '6'),
1423                ('ERROR', '7'),
1424            ], stream=output)
1425            # Original logger output is empty.
1426            self.assert_log_lines([])
1427
1428    def test_logger_disabling(self):
1429        self.apply_config(self.disable_test)
1430        logger = logging.getLogger('some_pristine_logger')
1431        self.assertFalse(logger.disabled)
1432        self.apply_config(self.disable_test)
1433        self.assertTrue(logger.disabled)
1434        self.apply_config(self.disable_test, disable_existing_loggers=False)
1435        self.assertFalse(logger.disabled)
1436
1437
1438@unittest.skipUnless(threading, 'Threading required for this test.')
1439class SocketHandlerTest(BaseTest):
1440
1441    """Test for SocketHandler objects."""
1442
1443    if threading:
1444        server_class = TestTCPServer
1445        address = ('localhost', 0)
1446
1447    def setUp(self):
1448        """Set up a TCP server to receive log messages, and a SocketHandler
1449        pointing to that server's address and port."""
1450        BaseTest.setUp(self)
1451        # Issue #29177: deal with errors that happen during setup
1452        self.server = self.sock_hdlr = self.server_exception = None
1453        try:
1454            self.server = server = self.server_class(self.address,
1455                                                     self.handle_socket, 0.01)
1456            server.start()
1457            # Uncomment next line to test error recovery in setUp()
1458            # raise OSError('dummy error raised')
1459        except OSError as e:
1460            self.server_exception = e
1461            return
1462        server.ready.wait()
1463        hcls = logging.handlers.SocketHandler
1464        if isinstance(server.server_address, tuple):
1465            self.sock_hdlr = hcls('localhost', server.port)
1466        else:
1467            self.sock_hdlr = hcls(server.server_address, None)
1468        self.log_output = ''
1469        self.root_logger.removeHandler(self.root_logger.handlers[0])
1470        self.root_logger.addHandler(self.sock_hdlr)
1471        self.handled = threading.Semaphore(0)
1472
1473    def tearDown(self):
1474        """Shutdown the TCP server."""
1475        try:
1476            if self.server:
1477                self.server.stop(2.0)
1478            if self.sock_hdlr:
1479                self.root_logger.removeHandler(self.sock_hdlr)
1480                self.sock_hdlr.close()
1481        finally:
1482            BaseTest.tearDown(self)
1483
1484    def handle_socket(self, request):
1485        conn = request.connection
1486        while True:
1487            chunk = conn.recv(4)
1488            if len(chunk) < 4:
1489                break
1490            slen = struct.unpack(">L", chunk)[0]
1491            chunk = conn.recv(slen)
1492            while len(chunk) < slen:
1493                chunk = chunk + conn.recv(slen - len(chunk))
1494            obj = pickle.loads(chunk)
1495            record = logging.makeLogRecord(obj)
1496            self.log_output += record.msg + '\n'
1497            self.handled.release()
1498
1499    def test_output(self):
1500        # The log message sent to the SocketHandler is properly received.
1501        if self.server_exception:
1502            self.skipTest(self.server_exception)
1503        logger = logging.getLogger("tcp")
1504        logger.error("spam")
1505        self.handled.acquire()
1506        logger.debug("eggs")
1507        self.handled.acquire()
1508        self.assertEqual(self.log_output, "spam\neggs\n")
1509
1510    def test_noserver(self):
1511        if self.server_exception:
1512            self.skipTest(self.server_exception)
1513        # Avoid timing-related failures due to SocketHandler's own hard-wired
1514        # one-second timeout on socket.create_connection() (issue #16264).
1515        self.sock_hdlr.retryStart = 2.5
1516        # Kill the server
1517        self.server.stop(2.0)
1518        # The logging call should try to connect, which should fail
1519        try:
1520            raise RuntimeError('Deliberate mistake')
1521        except RuntimeError:
1522            self.root_logger.exception('Never sent')
1523        self.root_logger.error('Never sent, either')
1524        now = time.time()
1525        self.assertGreater(self.sock_hdlr.retryTime, now)
1526        time.sleep(self.sock_hdlr.retryTime - now + 0.001)
1527        self.root_logger.error('Nor this')
1528
1529def _get_temp_domain_socket():
1530    fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock')
1531    os.close(fd)
1532    # just need a name - file can't be present, or we'll get an
1533    # 'address already in use' error.
1534    os.remove(fn)
1535    return fn
1536
1537@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1538@unittest.skipUnless(threading, 'Threading required for this test.')
1539class UnixSocketHandlerTest(SocketHandlerTest):
1540
1541    """Test for SocketHandler with unix sockets."""
1542
1543    if threading and hasattr(socket, "AF_UNIX"):
1544        server_class = TestUnixStreamServer
1545
1546    def setUp(self):
1547        # override the definition in the base class
1548        self.address = _get_temp_domain_socket()
1549        SocketHandlerTest.setUp(self)
1550
1551    def tearDown(self):
1552        SocketHandlerTest.tearDown(self)
1553        support.unlink(self.address)
1554
1555@unittest.skipUnless(threading, 'Threading required for this test.')
1556class DatagramHandlerTest(BaseTest):
1557
1558    """Test for DatagramHandler."""
1559
1560    if threading:
1561        server_class = TestUDPServer
1562        address = ('localhost', 0)
1563
1564    def setUp(self):
1565        """Set up a UDP server to receive log messages, and a DatagramHandler
1566        pointing to that server's address and port."""
1567        BaseTest.setUp(self)
1568        # Issue #29177: deal with errors that happen during setup
1569        self.server = self.sock_hdlr = self.server_exception = None
1570        try:
1571            self.server = server = self.server_class(self.address,
1572                                                     self.handle_datagram, 0.01)
1573            server.start()
1574            # Uncomment next line to test error recovery in setUp()
1575            # raise OSError('dummy error raised')
1576        except OSError as e:
1577            self.server_exception = e
1578            return
1579        server.ready.wait()
1580        hcls = logging.handlers.DatagramHandler
1581        if isinstance(server.server_address, tuple):
1582            self.sock_hdlr = hcls('localhost', server.port)
1583        else:
1584            self.sock_hdlr = hcls(server.server_address, None)
1585        self.log_output = ''
1586        self.root_logger.removeHandler(self.root_logger.handlers[0])
1587        self.root_logger.addHandler(self.sock_hdlr)
1588        self.handled = threading.Event()
1589
1590    def tearDown(self):
1591        """Shutdown the UDP server."""
1592        try:
1593            if self.server:
1594                self.server.stop(2.0)
1595            if self.sock_hdlr:
1596                self.root_logger.removeHandler(self.sock_hdlr)
1597                self.sock_hdlr.close()
1598        finally:
1599            BaseTest.tearDown(self)
1600
1601    def handle_datagram(self, request):
1602        slen = struct.pack('>L', 0) # length of prefix
1603        packet = request.packet[len(slen):]
1604        obj = pickle.loads(packet)
1605        record = logging.makeLogRecord(obj)
1606        self.log_output += record.msg + '\n'
1607        self.handled.set()
1608
1609    def test_output(self):
1610        # The log message sent to the DatagramHandler is properly received.
1611        if self.server_exception:
1612            self.skipTest(self.server_exception)
1613        logger = logging.getLogger("udp")
1614        logger.error("spam")
1615        self.handled.wait()
1616        self.handled.clear()
1617        logger.error("eggs")
1618        self.handled.wait()
1619        self.assertEqual(self.log_output, "spam\neggs\n")
1620
1621@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1622@unittest.skipUnless(threading, 'Threading required for this test.')
1623class UnixDatagramHandlerTest(DatagramHandlerTest):
1624
1625    """Test for DatagramHandler using Unix sockets."""
1626
1627    if threading and hasattr(socket, "AF_UNIX"):
1628        server_class = TestUnixDatagramServer
1629
1630    def setUp(self):
1631        # override the definition in the base class
1632        self.address = _get_temp_domain_socket()
1633        DatagramHandlerTest.setUp(self)
1634
1635    def tearDown(self):
1636        DatagramHandlerTest.tearDown(self)
1637        support.unlink(self.address)
1638
1639@unittest.skipUnless(threading, 'Threading required for this test.')
1640class SysLogHandlerTest(BaseTest):
1641
1642    """Test for SysLogHandler using UDP."""
1643
1644    if threading:
1645        server_class = TestUDPServer
1646        address = ('localhost', 0)
1647
1648    def setUp(self):
1649        """Set up a UDP server to receive log messages, and a SysLogHandler
1650        pointing to that server's address and port."""
1651        BaseTest.setUp(self)
1652        # Issue #29177: deal with errors that happen during setup
1653        self.server = self.sl_hdlr = self.server_exception = None
1654        try:
1655            self.server = server = self.server_class(self.address,
1656                                                     self.handle_datagram, 0.01)
1657            server.start()
1658            # Uncomment next line to test error recovery in setUp()
1659            # raise OSError('dummy error raised')
1660        except OSError as e:
1661            self.server_exception = e
1662            return
1663        server.ready.wait()
1664        hcls = logging.handlers.SysLogHandler
1665        if isinstance(server.server_address, tuple):
1666            self.sl_hdlr = hcls(('localhost', server.port))
1667        else:
1668            self.sl_hdlr = hcls(server.server_address)
1669        self.log_output = ''
1670        self.root_logger.removeHandler(self.root_logger.handlers[0])
1671        self.root_logger.addHandler(self.sl_hdlr)
1672        self.handled = threading.Event()
1673
1674    def tearDown(self):
1675        """Shutdown the server."""
1676        try:
1677            if self.server:
1678                self.server.stop(2.0)
1679            if self.sl_hdlr:
1680                self.root_logger.removeHandler(self.sl_hdlr)
1681                self.sl_hdlr.close()
1682        finally:
1683            BaseTest.tearDown(self)
1684
1685    def handle_datagram(self, request):
1686        self.log_output = request.packet
1687        self.handled.set()
1688
1689    def test_output(self):
1690        if self.server_exception:
1691            self.skipTest(self.server_exception)
1692        # The log message sent to the SysLogHandler is properly received.
1693        logger = logging.getLogger("slh")
1694        logger.error("sp\xe4m")
1695        self.handled.wait()
1696        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00')
1697        self.handled.clear()
1698        self.sl_hdlr.append_nul = False
1699        logger.error("sp\xe4m")
1700        self.handled.wait()
1701        self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m')
1702        self.handled.clear()
1703        self.sl_hdlr.ident = "h\xe4m-"
1704        logger.error("sp\xe4m")
1705        self.handled.wait()
1706        self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m')
1707
1708@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required")
1709@unittest.skipUnless(threading, 'Threading required for this test.')
1710class UnixSysLogHandlerTest(SysLogHandlerTest):
1711
1712    """Test for SysLogHandler with Unix sockets."""
1713
1714    if threading and hasattr(socket, "AF_UNIX"):
1715        server_class = TestUnixDatagramServer
1716
1717    def setUp(self):
1718        # override the definition in the base class
1719        self.address = _get_temp_domain_socket()
1720        SysLogHandlerTest.setUp(self)
1721
1722    def tearDown(self):
1723        SysLogHandlerTest.tearDown(self)
1724        support.unlink(self.address)
1725
1726@unittest.skipUnless(threading, 'Threading required for this test.')
1727class HTTPHandlerTest(BaseTest):
1728    """Test for HTTPHandler."""
1729
1730    def setUp(self):
1731        """Set up an HTTP server to receive log messages, and a HTTPHandler
1732        pointing to that server's address and port."""
1733        BaseTest.setUp(self)
1734        self.handled = threading.Event()
1735
1736    def handle_request(self, request):
1737        self.command = request.command
1738        self.log_data = urlparse(request.path)
1739        if self.command == 'POST':
1740            try:
1741                rlen = int(request.headers['Content-Length'])
1742                self.post_data = request.rfile.read(rlen)
1743            except:
1744                self.post_data = None
1745        request.send_response(200)
1746        request.end_headers()
1747        self.handled.set()
1748
1749    def test_output(self):
1750        # The log message sent to the HTTPHandler is properly received.
1751        logger = logging.getLogger("http")
1752        root_logger = self.root_logger
1753        root_logger.removeHandler(self.root_logger.handlers[0])
1754        for secure in (False, True):
1755            addr = ('localhost', 0)
1756            if secure:
1757                try:
1758                    import ssl
1759                except ImportError:
1760                    sslctx = None
1761                else:
1762                    here = os.path.dirname(__file__)
1763                    localhost_cert = os.path.join(here, "keycert.pem")
1764                    sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1765                    sslctx.load_cert_chain(localhost_cert)
1766
1767                    context = ssl.create_default_context(cafile=localhost_cert)
1768            else:
1769                sslctx = None
1770                context = None
1771            self.server = server = TestHTTPServer(addr, self.handle_request,
1772                                                    0.01, sslctx=sslctx)
1773            server.start()
1774            server.ready.wait()
1775            host = 'localhost:%d' % server.server_port
1776            secure_client = secure and sslctx
1777            self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob',
1778                                                       secure=secure_client,
1779                                                       context=context,
1780                                                       credentials=('foo', 'bar'))
1781            self.log_data = None
1782            root_logger.addHandler(self.h_hdlr)
1783
1784            for method in ('GET', 'POST'):
1785                self.h_hdlr.method = method
1786                self.handled.clear()
1787                msg = "sp\xe4m"
1788                logger.error(msg)
1789                self.handled.wait()
1790                self.assertEqual(self.log_data.path, '/frob')
1791                self.assertEqual(self.command, method)
1792                if method == 'GET':
1793                    d = parse_qs(self.log_data.query)
1794                else:
1795                    d = parse_qs(self.post_data.decode('utf-8'))
1796                self.assertEqual(d['name'], ['http'])
1797                self.assertEqual(d['funcName'], ['test_output'])
1798                self.assertEqual(d['msg'], [msg])
1799
1800            self.server.stop(2.0)
1801            self.root_logger.removeHandler(self.h_hdlr)
1802            self.h_hdlr.close()
1803
1804class MemoryTest(BaseTest):
1805
1806    """Test memory persistence of logger objects."""
1807
1808    def setUp(self):
1809        """Create a dict to remember potentially destroyed objects."""
1810        BaseTest.setUp(self)
1811        self._survivors = {}
1812
1813    def _watch_for_survival(self, *args):
1814        """Watch the given objects for survival, by creating weakrefs to
1815        them."""
1816        for obj in args:
1817            key = id(obj), repr(obj)
1818            self._survivors[key] = weakref.ref(obj)
1819
1820    def _assertTruesurvival(self):
1821        """Assert that all objects watched for survival have survived."""
1822        # Trigger cycle breaking.
1823        gc.collect()
1824        dead = []
1825        for (id_, repr_), ref in self._survivors.items():
1826            if ref() is None:
1827                dead.append(repr_)
1828        if dead:
1829            self.fail("%d objects should have survived "
1830                "but have been destroyed: %s" % (len(dead), ", ".join(dead)))
1831
1832    def test_persistent_loggers(self):
1833        # Logger objects are persistent and retain their configuration, even
1834        #  if visible references are destroyed.
1835        self.root_logger.setLevel(logging.INFO)
1836        foo = logging.getLogger("foo")
1837        self._watch_for_survival(foo)
1838        foo.setLevel(logging.DEBUG)
1839        self.root_logger.debug(self.next_message())
1840        foo.debug(self.next_message())
1841        self.assert_log_lines([
1842            ('foo', 'DEBUG', '2'),
1843        ])
1844        del foo
1845        # foo has survived.
1846        self._assertTruesurvival()
1847        # foo has retained its settings.
1848        bar = logging.getLogger("foo")
1849        bar.debug(self.next_message())
1850        self.assert_log_lines([
1851            ('foo', 'DEBUG', '2'),
1852            ('foo', 'DEBUG', '3'),
1853        ])
1854
1855
1856class EncodingTest(BaseTest):
1857    def test_encoding_plain_file(self):
1858        # In Python 2.x, a plain file object is treated as having no encoding.
1859        log = logging.getLogger("test")
1860        fd, fn = tempfile.mkstemp(".log", "test_logging-1-")
1861        os.close(fd)
1862        # the non-ascii data we write to the log.
1863        data = "foo\x80"
1864        try:
1865            handler = logging.FileHandler(fn, encoding="utf-8")
1866            log.addHandler(handler)
1867            try:
1868                # write non-ascii data to the log.
1869                log.warning(data)
1870            finally:
1871                log.removeHandler(handler)
1872                handler.close()
1873            # check we wrote exactly those bytes, ignoring trailing \n etc
1874            f = open(fn, encoding="utf-8")
1875            try:
1876                self.assertEqual(f.read().rstrip(), data)
1877            finally:
1878                f.close()
1879        finally:
1880            if os.path.isfile(fn):
1881                os.remove(fn)
1882
1883    def test_encoding_cyrillic_unicode(self):
1884        log = logging.getLogger("test")
1885        #Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye)
1886        message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f'
1887        #Ensure it's written in a Cyrillic encoding
1888        writer_class = codecs.getwriter('cp1251')
1889        writer_class.encoding = 'cp1251'
1890        stream = io.BytesIO()
1891        writer = writer_class(stream, 'strict')
1892        handler = logging.StreamHandler(writer)
1893        log.addHandler(handler)
1894        try:
1895            log.warning(message)
1896        finally:
1897            log.removeHandler(handler)
1898            handler.close()
1899        # check we wrote exactly those bytes, ignoring trailing \n etc
1900        s = stream.getvalue()
1901        #Compare against what the data should be when encoded in CP-1251
1902        self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n')
1903
1904
1905class WarningsTest(BaseTest):
1906
1907    def test_warnings(self):
1908        with warnings.catch_warnings():
1909            logging.captureWarnings(True)
1910            self.addCleanup(logging.captureWarnings, False)
1911            warnings.filterwarnings("always", category=UserWarning)
1912            stream = io.StringIO()
1913            h = logging.StreamHandler(stream)
1914            logger = logging.getLogger("py.warnings")
1915            logger.addHandler(h)
1916            warnings.warn("I'm warning you...")
1917            logger.removeHandler(h)
1918            s = stream.getvalue()
1919            h.close()
1920            self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0)
1921
1922            #See if an explicit file uses the original implementation
1923            a_file = io.StringIO()
1924            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42,
1925                                 a_file, "Dummy line")
1926            s = a_file.getvalue()
1927            a_file.close()
1928            self.assertEqual(s,
1929                "dummy.py:42: UserWarning: Explicit\n  Dummy line\n")
1930
1931    def test_warnings_no_handlers(self):
1932        with warnings.catch_warnings():
1933            logging.captureWarnings(True)
1934            self.addCleanup(logging.captureWarnings, False)
1935
1936            # confirm our assumption: no loggers are set
1937            logger = logging.getLogger("py.warnings")
1938            self.assertEqual(logger.handlers, [])
1939
1940            warnings.showwarning("Explicit", UserWarning, "dummy.py", 42)
1941            self.assertEqual(len(logger.handlers), 1)
1942            self.assertIsInstance(logger.handlers[0], logging.NullHandler)
1943
1944
1945def formatFunc(format, datefmt=None):
1946    return logging.Formatter(format, datefmt)
1947
1948def handlerFunc():
1949    return logging.StreamHandler()
1950
1951class CustomHandler(logging.StreamHandler):
1952    pass
1953
1954class ConfigDictTest(BaseTest):
1955
1956    """Reading logging config from a dictionary."""
1957
1958    expected_log_pat = r"^(\w+) \+\+ (\w+)$"
1959
1960    # config0 is a standard configuration.
1961    config0 = {
1962        'version': 1,
1963        'formatters': {
1964            'form1' : {
1965                'format' : '%(levelname)s ++ %(message)s',
1966            },
1967        },
1968        'handlers' : {
1969            'hand1' : {
1970                'class' : 'logging.StreamHandler',
1971                'formatter' : 'form1',
1972                'level' : 'NOTSET',
1973                'stream'  : 'ext://sys.stdout',
1974            },
1975        },
1976        'root' : {
1977            'level' : 'WARNING',
1978            'handlers' : ['hand1'],
1979        },
1980    }
1981
1982    # config1 adds a little to the standard configuration.
1983    config1 = {
1984        'version': 1,
1985        'formatters': {
1986            'form1' : {
1987                'format' : '%(levelname)s ++ %(message)s',
1988            },
1989        },
1990        'handlers' : {
1991            'hand1' : {
1992                'class' : 'logging.StreamHandler',
1993                'formatter' : 'form1',
1994                'level' : 'NOTSET',
1995                'stream'  : 'ext://sys.stdout',
1996            },
1997        },
1998        'loggers' : {
1999            'compiler.parser' : {
2000                'level' : 'DEBUG',
2001                'handlers' : ['hand1'],
2002            },
2003        },
2004        'root' : {
2005            'level' : 'WARNING',
2006        },
2007    }
2008
2009    # config1a moves the handler to the root. Used with config8a
2010    config1a = {
2011        'version': 1,
2012        'formatters': {
2013            'form1' : {
2014                'format' : '%(levelname)s ++ %(message)s',
2015            },
2016        },
2017        'handlers' : {
2018            'hand1' : {
2019                'class' : 'logging.StreamHandler',
2020                'formatter' : 'form1',
2021                'level' : 'NOTSET',
2022                'stream'  : 'ext://sys.stdout',
2023            },
2024        },
2025        'loggers' : {
2026            'compiler.parser' : {
2027                'level' : 'DEBUG',
2028            },
2029        },
2030        'root' : {
2031            'level' : 'WARNING',
2032            'handlers' : ['hand1'],
2033        },
2034    }
2035
2036    # config2 has a subtle configuration error that should be reported
2037    config2 = {
2038        'version': 1,
2039        'formatters': {
2040            'form1' : {
2041                'format' : '%(levelname)s ++ %(message)s',
2042            },
2043        },
2044        'handlers' : {
2045            'hand1' : {
2046                'class' : 'logging.StreamHandler',
2047                'formatter' : 'form1',
2048                'level' : 'NOTSET',
2049                'stream'  : 'ext://sys.stdbout',
2050            },
2051        },
2052        'loggers' : {
2053            'compiler.parser' : {
2054                'level' : 'DEBUG',
2055                'handlers' : ['hand1'],
2056            },
2057        },
2058        'root' : {
2059            'level' : 'WARNING',
2060        },
2061    }
2062
2063    #As config1 but with a misspelt level on a handler
2064    config2a = {
2065        'version': 1,
2066        'formatters': {
2067            'form1' : {
2068                'format' : '%(levelname)s ++ %(message)s',
2069            },
2070        },
2071        'handlers' : {
2072            'hand1' : {
2073                'class' : 'logging.StreamHandler',
2074                'formatter' : 'form1',
2075                'level' : 'NTOSET',
2076                'stream'  : 'ext://sys.stdout',
2077            },
2078        },
2079        'loggers' : {
2080            'compiler.parser' : {
2081                'level' : 'DEBUG',
2082                'handlers' : ['hand1'],
2083            },
2084        },
2085        'root' : {
2086            'level' : 'WARNING',
2087        },
2088    }
2089
2090
2091    #As config1 but with a misspelt level on a logger
2092    config2b = {
2093        'version': 1,
2094        'formatters': {
2095            'form1' : {
2096                'format' : '%(levelname)s ++ %(message)s',
2097            },
2098        },
2099        'handlers' : {
2100            'hand1' : {
2101                'class' : 'logging.StreamHandler',
2102                'formatter' : 'form1',
2103                'level' : 'NOTSET',
2104                'stream'  : 'ext://sys.stdout',
2105            },
2106        },
2107        'loggers' : {
2108            'compiler.parser' : {
2109                'level' : 'DEBUG',
2110                'handlers' : ['hand1'],
2111            },
2112        },
2113        'root' : {
2114            'level' : 'WRANING',
2115        },
2116    }
2117
2118    # config3 has a less subtle configuration error
2119    config3 = {
2120        'version': 1,
2121        'formatters': {
2122            'form1' : {
2123                'format' : '%(levelname)s ++ %(message)s',
2124            },
2125        },
2126        'handlers' : {
2127            'hand1' : {
2128                'class' : 'logging.StreamHandler',
2129                'formatter' : 'misspelled_name',
2130                'level' : 'NOTSET',
2131                'stream'  : 'ext://sys.stdout',
2132            },
2133        },
2134        'loggers' : {
2135            'compiler.parser' : {
2136                'level' : 'DEBUG',
2137                'handlers' : ['hand1'],
2138            },
2139        },
2140        'root' : {
2141            'level' : 'WARNING',
2142        },
2143    }
2144
2145    # config4 specifies a custom formatter class to be loaded
2146    config4 = {
2147        'version': 1,
2148        'formatters': {
2149            'form1' : {
2150                '()' : __name__ + '.ExceptionFormatter',
2151                'format' : '%(levelname)s:%(name)s:%(message)s',
2152            },
2153        },
2154        'handlers' : {
2155            'hand1' : {
2156                'class' : 'logging.StreamHandler',
2157                'formatter' : 'form1',
2158                'level' : 'NOTSET',
2159                'stream'  : 'ext://sys.stdout',
2160            },
2161        },
2162        'root' : {
2163            'level' : 'NOTSET',
2164                'handlers' : ['hand1'],
2165        },
2166    }
2167
2168    # As config4 but using an actual callable rather than a string
2169    config4a = {
2170        'version': 1,
2171        'formatters': {
2172            'form1' : {
2173                '()' : ExceptionFormatter,
2174                'format' : '%(levelname)s:%(name)s:%(message)s',
2175            },
2176            'form2' : {
2177                '()' : __name__ + '.formatFunc',
2178                'format' : '%(levelname)s:%(name)s:%(message)s',
2179            },
2180            'form3' : {
2181                '()' : formatFunc,
2182                'format' : '%(levelname)s:%(name)s:%(message)s',
2183            },
2184        },
2185        'handlers' : {
2186            'hand1' : {
2187                'class' : 'logging.StreamHandler',
2188                'formatter' : 'form1',
2189                'level' : 'NOTSET',
2190                'stream'  : 'ext://sys.stdout',
2191            },
2192            'hand2' : {
2193                '()' : handlerFunc,
2194            },
2195        },
2196        'root' : {
2197            'level' : 'NOTSET',
2198                'handlers' : ['hand1'],
2199        },
2200    }
2201
2202    # config5 specifies a custom handler class to be loaded
2203    config5 = {
2204        'version': 1,
2205        'formatters': {
2206            'form1' : {
2207                'format' : '%(levelname)s ++ %(message)s',
2208            },
2209        },
2210        'handlers' : {
2211            'hand1' : {
2212                'class' : __name__ + '.CustomHandler',
2213                'formatter' : 'form1',
2214                'level' : 'NOTSET',
2215                'stream'  : 'ext://sys.stdout',
2216            },
2217        },
2218        'loggers' : {
2219            'compiler.parser' : {
2220                'level' : 'DEBUG',
2221                'handlers' : ['hand1'],
2222            },
2223        },
2224        'root' : {
2225            'level' : 'WARNING',
2226        },
2227    }
2228
2229    # config6 specifies a custom handler class to be loaded
2230    # but has bad arguments
2231    config6 = {
2232        'version': 1,
2233        'formatters': {
2234            'form1' : {
2235                'format' : '%(levelname)s ++ %(message)s',
2236            },
2237        },
2238        'handlers' : {
2239            'hand1' : {
2240                'class' : __name__ + '.CustomHandler',
2241                'formatter' : 'form1',
2242                'level' : 'NOTSET',
2243                'stream'  : 'ext://sys.stdout',
2244                '9' : 'invalid parameter name',
2245            },
2246        },
2247        'loggers' : {
2248            'compiler.parser' : {
2249                'level' : 'DEBUG',
2250                'handlers' : ['hand1'],
2251            },
2252        },
2253        'root' : {
2254            'level' : 'WARNING',
2255        },
2256    }
2257
2258    #config 7 does not define compiler.parser but defines compiler.lexer
2259    #so compiler.parser should be disabled after applying it
2260    config7 = {
2261        'version': 1,
2262        'formatters': {
2263            'form1' : {
2264                'format' : '%(levelname)s ++ %(message)s',
2265            },
2266        },
2267        'handlers' : {
2268            'hand1' : {
2269                'class' : 'logging.StreamHandler',
2270                'formatter' : 'form1',
2271                'level' : 'NOTSET',
2272                'stream'  : 'ext://sys.stdout',
2273            },
2274        },
2275        'loggers' : {
2276            'compiler.lexer' : {
2277                'level' : 'DEBUG',
2278                'handlers' : ['hand1'],
2279            },
2280        },
2281        'root' : {
2282            'level' : 'WARNING',
2283        },
2284    }
2285
2286    # config8 defines both compiler and compiler.lexer
2287    # so compiler.parser should not be disabled (since
2288    # compiler is defined)
2289    config8 = {
2290        'version': 1,
2291        'disable_existing_loggers' : False,
2292        'formatters': {
2293            'form1' : {
2294                'format' : '%(levelname)s ++ %(message)s',
2295            },
2296        },
2297        'handlers' : {
2298            'hand1' : {
2299                'class' : 'logging.StreamHandler',
2300                'formatter' : 'form1',
2301                'level' : 'NOTSET',
2302                'stream'  : 'ext://sys.stdout',
2303            },
2304        },
2305        'loggers' : {
2306            'compiler' : {
2307                'level' : 'DEBUG',
2308                'handlers' : ['hand1'],
2309            },
2310            'compiler.lexer' : {
2311            },
2312        },
2313        'root' : {
2314            'level' : 'WARNING',
2315        },
2316    }
2317
2318    # config8a disables existing loggers
2319    config8a = {
2320        'version': 1,
2321        'disable_existing_loggers' : True,
2322        'formatters': {
2323            'form1' : {
2324                'format' : '%(levelname)s ++ %(message)s',
2325            },
2326        },
2327        'handlers' : {
2328            'hand1' : {
2329                'class' : 'logging.StreamHandler',
2330                'formatter' : 'form1',
2331                'level' : 'NOTSET',
2332                'stream'  : 'ext://sys.stdout',
2333            },
2334        },
2335        'loggers' : {
2336            'compiler' : {
2337                'level' : 'DEBUG',
2338                'handlers' : ['hand1'],
2339            },
2340            'compiler.lexer' : {
2341            },
2342        },
2343        'root' : {
2344            'level' : 'WARNING',
2345        },
2346    }
2347
2348    config9 = {
2349        'version': 1,
2350        'formatters': {
2351            'form1' : {
2352                'format' : '%(levelname)s ++ %(message)s',
2353            },
2354        },
2355        'handlers' : {
2356            'hand1' : {
2357                'class' : 'logging.StreamHandler',
2358                'formatter' : 'form1',
2359                'level' : 'WARNING',
2360                'stream'  : 'ext://sys.stdout',
2361            },
2362        },
2363        'loggers' : {
2364            'compiler.parser' : {
2365                'level' : 'WARNING',
2366                'handlers' : ['hand1'],
2367            },
2368        },
2369        'root' : {
2370            'level' : 'NOTSET',
2371        },
2372    }
2373
2374    config9a = {
2375        'version': 1,
2376        'incremental' : True,
2377        'handlers' : {
2378            'hand1' : {
2379                'level' : 'WARNING',
2380            },
2381        },
2382        'loggers' : {
2383            'compiler.parser' : {
2384                'level' : 'INFO',
2385            },
2386        },
2387    }
2388
2389    config9b = {
2390        'version': 1,
2391        'incremental' : True,
2392        'handlers' : {
2393            'hand1' : {
2394                'level' : 'INFO',
2395            },
2396        },
2397        'loggers' : {
2398            'compiler.parser' : {
2399                'level' : 'INFO',
2400            },
2401        },
2402    }
2403
2404    #As config1 but with a filter added
2405    config10 = {
2406        'version': 1,
2407        'formatters': {
2408            'form1' : {
2409                'format' : '%(levelname)s ++ %(message)s',
2410            },
2411        },
2412        'filters' : {
2413            'filt1' : {
2414                'name' : 'compiler.parser',
2415            },
2416        },
2417        'handlers' : {
2418            'hand1' : {
2419                'class' : 'logging.StreamHandler',
2420                'formatter' : 'form1',
2421                'level' : 'NOTSET',
2422                'stream'  : 'ext://sys.stdout',
2423                'filters' : ['filt1'],
2424            },
2425        },
2426        'loggers' : {
2427            'compiler.parser' : {
2428                'level' : 'DEBUG',
2429                'filters' : ['filt1'],
2430            },
2431        },
2432        'root' : {
2433            'level' : 'WARNING',
2434            'handlers' : ['hand1'],
2435        },
2436    }
2437
2438    #As config1 but using cfg:// references
2439    config11 = {
2440        'version': 1,
2441        'true_formatters': {
2442            'form1' : {
2443                'format' : '%(levelname)s ++ %(message)s',
2444            },
2445        },
2446        'handler_configs': {
2447            'hand1' : {
2448                'class' : 'logging.StreamHandler',
2449                'formatter' : 'form1',
2450                'level' : 'NOTSET',
2451                'stream'  : 'ext://sys.stdout',
2452            },
2453        },
2454        'formatters' : 'cfg://true_formatters',
2455        'handlers' : {
2456            'hand1' : 'cfg://handler_configs[hand1]',
2457        },
2458        'loggers' : {
2459            'compiler.parser' : {
2460                'level' : 'DEBUG',
2461                'handlers' : ['hand1'],
2462            },
2463        },
2464        'root' : {
2465            'level' : 'WARNING',
2466        },
2467    }
2468
2469    #As config11 but missing the version key
2470    config12 = {
2471        'true_formatters': {
2472            'form1' : {
2473                'format' : '%(levelname)s ++ %(message)s',
2474            },
2475        },
2476        'handler_configs': {
2477            'hand1' : {
2478                'class' : 'logging.StreamHandler',
2479                'formatter' : 'form1',
2480                'level' : 'NOTSET',
2481                'stream'  : 'ext://sys.stdout',
2482            },
2483        },
2484        'formatters' : 'cfg://true_formatters',
2485        'handlers' : {
2486            'hand1' : 'cfg://handler_configs[hand1]',
2487        },
2488        'loggers' : {
2489            'compiler.parser' : {
2490                'level' : 'DEBUG',
2491                'handlers' : ['hand1'],
2492            },
2493        },
2494        'root' : {
2495            'level' : 'WARNING',
2496        },
2497    }
2498
2499    #As config11 but using an unsupported version
2500    config13 = {
2501        'version': 2,
2502        'true_formatters': {
2503            'form1' : {
2504                'format' : '%(levelname)s ++ %(message)s',
2505            },
2506        },
2507        'handler_configs': {
2508            'hand1' : {
2509                'class' : 'logging.StreamHandler',
2510                'formatter' : 'form1',
2511                'level' : 'NOTSET',
2512                'stream'  : 'ext://sys.stdout',
2513            },
2514        },
2515        'formatters' : 'cfg://true_formatters',
2516        'handlers' : {
2517            'hand1' : 'cfg://handler_configs[hand1]',
2518        },
2519        'loggers' : {
2520            'compiler.parser' : {
2521                'level' : 'DEBUG',
2522                'handlers' : ['hand1'],
2523            },
2524        },
2525        'root' : {
2526            'level' : 'WARNING',
2527        },
2528    }
2529
2530    # As config0, but with properties
2531    config14 = {
2532        'version': 1,
2533        'formatters': {
2534            'form1' : {
2535                'format' : '%(levelname)s ++ %(message)s',
2536            },
2537        },
2538        'handlers' : {
2539            'hand1' : {
2540                'class' : 'logging.StreamHandler',
2541                'formatter' : 'form1',
2542                'level' : 'NOTSET',
2543                'stream'  : 'ext://sys.stdout',
2544                '.': {
2545                    'foo': 'bar',
2546                    'terminator': '!\n',
2547                }
2548            },
2549        },
2550        'root' : {
2551            'level' : 'WARNING',
2552            'handlers' : ['hand1'],
2553        },
2554    }
2555
2556    out_of_order = {
2557        "version": 1,
2558        "formatters": {
2559            "mySimpleFormatter": {
2560                "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s",
2561                "style": "$"
2562            }
2563        },
2564        "handlers": {
2565            "fileGlobal": {
2566                "class": "logging.StreamHandler",
2567                "level": "DEBUG",
2568                "formatter": "mySimpleFormatter"
2569            },
2570            "bufferGlobal": {
2571                "class": "logging.handlers.MemoryHandler",
2572                "capacity": 5,
2573                "formatter": "mySimpleFormatter",
2574                "target": "fileGlobal",
2575                "level": "DEBUG"
2576                }
2577        },
2578        "loggers": {
2579            "mymodule": {
2580                "level": "DEBUG",
2581                "handlers": ["bufferGlobal"],
2582                "propagate": "true"
2583            }
2584        }
2585    }
2586
2587    def apply_config(self, conf):
2588        logging.config.dictConfig(conf)
2589
2590    def test_config0_ok(self):
2591        # A simple config which overrides the default settings.
2592        with support.captured_stdout() as output:
2593            self.apply_config(self.config0)
2594            logger = logging.getLogger()
2595            # Won't output anything
2596            logger.info(self.next_message())
2597            # Outputs a message
2598            logger.error(self.next_message())
2599            self.assert_log_lines([
2600                ('ERROR', '2'),
2601            ], stream=output)
2602            # Original logger output is empty.
2603            self.assert_log_lines([])
2604
2605    def test_config1_ok(self, config=config1):
2606        # A config defining a sub-parser as well.
2607        with support.captured_stdout() as output:
2608            self.apply_config(config)
2609            logger = logging.getLogger("compiler.parser")
2610            # Both will output a message
2611            logger.info(self.next_message())
2612            logger.error(self.next_message())
2613            self.assert_log_lines([
2614                ('INFO', '1'),
2615                ('ERROR', '2'),
2616            ], stream=output)
2617            # Original logger output is empty.
2618            self.assert_log_lines([])
2619
2620    def test_config2_failure(self):
2621        # A simple config which overrides the default settings.
2622        self.assertRaises(Exception, self.apply_config, self.config2)
2623
2624    def test_config2a_failure(self):
2625        # A simple config which overrides the default settings.
2626        self.assertRaises(Exception, self.apply_config, self.config2a)
2627
2628    def test_config2b_failure(self):
2629        # A simple config which overrides the default settings.
2630        self.assertRaises(Exception, self.apply_config, self.config2b)
2631
2632    def test_config3_failure(self):
2633        # A simple config which overrides the default settings.
2634        self.assertRaises(Exception, self.apply_config, self.config3)
2635
2636    def test_config4_ok(self):
2637        # A config specifying a custom formatter class.
2638        with support.captured_stdout() as output:
2639            self.apply_config(self.config4)
2640            #logger = logging.getLogger()
2641            try:
2642                raise RuntimeError()
2643            except RuntimeError:
2644                logging.exception("just testing")
2645            sys.stdout.seek(0)
2646            self.assertEqual(output.getvalue(),
2647                "ERROR:root:just testing\nGot a [RuntimeError]\n")
2648            # Original logger output is empty
2649            self.assert_log_lines([])
2650
2651    def test_config4a_ok(self):
2652        # A config specifying a custom formatter class.
2653        with support.captured_stdout() as output:
2654            self.apply_config(self.config4a)
2655            #logger = logging.getLogger()
2656            try:
2657                raise RuntimeError()
2658            except RuntimeError:
2659                logging.exception("just testing")
2660            sys.stdout.seek(0)
2661            self.assertEqual(output.getvalue(),
2662                "ERROR:root:just testing\nGot a [RuntimeError]\n")
2663            # Original logger output is empty
2664            self.assert_log_lines([])
2665
2666    def test_config5_ok(self):
2667        self.test_config1_ok(config=self.config5)
2668
2669    def test_config6_failure(self):
2670        self.assertRaises(Exception, self.apply_config, self.config6)
2671
2672    def test_config7_ok(self):
2673        with support.captured_stdout() as output:
2674            self.apply_config(self.config1)
2675            logger = logging.getLogger("compiler.parser")
2676            # Both will output a message
2677            logger.info(self.next_message())
2678            logger.error(self.next_message())
2679            self.assert_log_lines([
2680                ('INFO', '1'),
2681                ('ERROR', '2'),
2682            ], stream=output)
2683            # Original logger output is empty.
2684            self.assert_log_lines([])
2685        with support.captured_stdout() as output:
2686            self.apply_config(self.config7)
2687            logger = logging.getLogger("compiler.parser")
2688            self.assertTrue(logger.disabled)
2689            logger = logging.getLogger("compiler.lexer")
2690            # Both will output a message
2691            logger.info(self.next_message())
2692            logger.error(self.next_message())
2693            self.assert_log_lines([
2694                ('INFO', '3'),
2695                ('ERROR', '4'),
2696            ], stream=output)
2697            # Original logger output is empty.
2698            self.assert_log_lines([])
2699
2700    #Same as test_config_7_ok but don't disable old loggers.
2701    def test_config_8_ok(self):
2702        with support.captured_stdout() as output:
2703            self.apply_config(self.config1)
2704            logger = logging.getLogger("compiler.parser")
2705            # All will output a message
2706            logger.info(self.next_message())
2707            logger.error(self.next_message())
2708            self.assert_log_lines([
2709                ('INFO', '1'),
2710                ('ERROR', '2'),
2711            ], stream=output)
2712            # Original logger output is empty.
2713            self.assert_log_lines([])
2714        with support.captured_stdout() as output:
2715            self.apply_config(self.config8)
2716            logger = logging.getLogger("compiler.parser")
2717            self.assertFalse(logger.disabled)
2718            # Both will output a message
2719            logger.info(self.next_message())
2720            logger.error(self.next_message())
2721            logger = logging.getLogger("compiler.lexer")
2722            # Both will output a message
2723            logger.info(self.next_message())
2724            logger.error(self.next_message())
2725            self.assert_log_lines([
2726                ('INFO', '3'),
2727                ('ERROR', '4'),
2728                ('INFO', '5'),
2729                ('ERROR', '6'),
2730            ], stream=output)
2731            # Original logger output is empty.
2732            self.assert_log_lines([])
2733
2734    def test_config_8a_ok(self):
2735        with support.captured_stdout() as output:
2736            self.apply_config(self.config1a)
2737            logger = logging.getLogger("compiler.parser")
2738            # See issue #11424. compiler-hyphenated sorts
2739            # between compiler and compiler.xyz and this
2740            # was preventing compiler.xyz from being included
2741            # in the child loggers of compiler because of an
2742            # overzealous loop termination condition.
2743            hyphenated = logging.getLogger('compiler-hyphenated')
2744            # All will output a message
2745            logger.info(self.next_message())
2746            logger.error(self.next_message())
2747            hyphenated.critical(self.next_message())
2748            self.assert_log_lines([
2749                ('INFO', '1'),
2750                ('ERROR', '2'),
2751                ('CRITICAL', '3'),
2752            ], stream=output)
2753            # Original logger output is empty.
2754            self.assert_log_lines([])
2755        with support.captured_stdout() as output:
2756            self.apply_config(self.config8a)
2757            logger = logging.getLogger("compiler.parser")
2758            self.assertFalse(logger.disabled)
2759            # Both will output a message
2760            logger.info(self.next_message())
2761            logger.error(self.next_message())
2762            logger = logging.getLogger("compiler.lexer")
2763            # Both will output a message
2764            logger.info(self.next_message())
2765            logger.error(self.next_message())
2766            # Will not appear
2767            hyphenated.critical(self.next_message())
2768            self.assert_log_lines([
2769                ('INFO', '4'),
2770                ('ERROR', '5'),
2771                ('INFO', '6'),
2772                ('ERROR', '7'),
2773            ], stream=output)
2774            # Original logger output is empty.
2775            self.assert_log_lines([])
2776
2777    def test_config_9_ok(self):
2778        with support.captured_stdout() as output:
2779            self.apply_config(self.config9)
2780            logger = logging.getLogger("compiler.parser")
2781            #Nothing will be output since both handler and logger are set to WARNING
2782            logger.info(self.next_message())
2783            self.assert_log_lines([], stream=output)
2784            self.apply_config(self.config9a)
2785            #Nothing will be output since both handler is still set to WARNING
2786            logger.info(self.next_message())
2787            self.assert_log_lines([], stream=output)
2788            self.apply_config(self.config9b)
2789            #Message should now be output
2790            logger.info(self.next_message())
2791            self.assert_log_lines([
2792                ('INFO', '3'),
2793            ], stream=output)
2794
2795    def test_config_10_ok(self):
2796        with support.captured_stdout() as output:
2797            self.apply_config(self.config10)
2798            logger = logging.getLogger("compiler.parser")
2799            logger.warning(self.next_message())
2800            logger = logging.getLogger('compiler')
2801            #Not output, because filtered
2802            logger.warning(self.next_message())
2803            logger = logging.getLogger('compiler.lexer')
2804            #Not output, because filtered
2805            logger.warning(self.next_message())
2806            logger = logging.getLogger("compiler.parser.codegen")
2807            #Output, as not filtered
2808            logger.error(self.next_message())
2809            self.assert_log_lines([
2810                ('WARNING', '1'),
2811                ('ERROR', '4'),
2812            ], stream=output)
2813
2814    def test_config11_ok(self):
2815        self.test_config1_ok(self.config11)
2816
2817    def test_config12_failure(self):
2818        self.assertRaises(Exception, self.apply_config, self.config12)
2819
2820    def test_config13_failure(self):
2821        self.assertRaises(Exception, self.apply_config, self.config13)
2822
2823    def test_config14_ok(self):
2824        with support.captured_stdout() as output:
2825            self.apply_config(self.config14)
2826            h = logging._handlers['hand1']
2827            self.assertEqual(h.foo, 'bar')
2828            self.assertEqual(h.terminator, '!\n')
2829            logging.warning('Exclamation')
2830            self.assertTrue(output.getvalue().endswith('Exclamation!\n'))
2831
2832    @unittest.skipUnless(threading, 'listen() needs threading to work')
2833    def setup_via_listener(self, text, verify=None):
2834        text = text.encode("utf-8")
2835        # Ask for a randomly assigned port (by using port 0)
2836        t = logging.config.listen(0, verify)
2837        t.start()
2838        t.ready.wait()
2839        # Now get the port allocated
2840        port = t.port
2841        t.ready.clear()
2842        try:
2843            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2844            sock.settimeout(2.0)
2845            sock.connect(('localhost', port))
2846
2847            slen = struct.pack('>L', len(text))
2848            s = slen + text
2849            sentsofar = 0
2850            left = len(s)
2851            while left > 0:
2852                sent = sock.send(s[sentsofar:])
2853                sentsofar += sent
2854                left -= sent
2855            sock.close()
2856        finally:
2857            t.ready.wait(2.0)
2858            logging.config.stopListening()
2859            t.join(2.0)
2860
2861    @unittest.skipUnless(threading, 'Threading required for this test.')
2862    def test_listen_config_10_ok(self):
2863        with support.captured_stdout() as output:
2864            self.setup_via_listener(json.dumps(self.config10))
2865            logger = logging.getLogger("compiler.parser")
2866            logger.warning(self.next_message())
2867            logger = logging.getLogger('compiler')
2868            #Not output, because filtered
2869            logger.warning(self.next_message())
2870            logger = logging.getLogger('compiler.lexer')
2871            #Not output, because filtered
2872            logger.warning(self.next_message())
2873            logger = logging.getLogger("compiler.parser.codegen")
2874            #Output, as not filtered
2875            logger.error(self.next_message())
2876            self.assert_log_lines([
2877                ('WARNING', '1'),
2878                ('ERROR', '4'),
2879            ], stream=output)
2880
2881    @unittest.skipUnless(threading, 'Threading required for this test.')
2882    def test_listen_config_1_ok(self):
2883        with support.captured_stdout() as output:
2884            self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
2885            logger = logging.getLogger("compiler.parser")
2886            # Both will output a message
2887            logger.info(self.next_message())
2888            logger.error(self.next_message())
2889            self.assert_log_lines([
2890                ('INFO', '1'),
2891                ('ERROR', '2'),
2892            ], stream=output)
2893            # Original logger output is empty.
2894            self.assert_log_lines([])
2895
2896    @unittest.skipUnless(threading, 'Threading required for this test.')
2897    def test_listen_verify(self):
2898
2899        def verify_fail(stuff):
2900            return None
2901
2902        def verify_reverse(stuff):
2903            return stuff[::-1]
2904
2905        logger = logging.getLogger("compiler.parser")
2906        to_send = textwrap.dedent(ConfigFileTest.config1)
2907        # First, specify a verification function that will fail.
2908        # We expect to see no output, since our configuration
2909        # never took effect.
2910        with support.captured_stdout() as output:
2911            self.setup_via_listener(to_send, verify_fail)
2912            # Both will output a message
2913            logger.info(self.next_message())
2914            logger.error(self.next_message())
2915        self.assert_log_lines([], stream=output)
2916        # Original logger output has the stuff we logged.
2917        self.assert_log_lines([
2918            ('INFO', '1'),
2919            ('ERROR', '2'),
2920        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
2921
2922        # Now, perform no verification. Our configuration
2923        # should take effect.
2924
2925        with support.captured_stdout() as output:
2926            self.setup_via_listener(to_send)    # no verify callable specified
2927            logger = logging.getLogger("compiler.parser")
2928            # Both will output a message
2929            logger.info(self.next_message())
2930            logger.error(self.next_message())
2931        self.assert_log_lines([
2932            ('INFO', '3'),
2933            ('ERROR', '4'),
2934        ], stream=output)
2935        # Original logger output still has the stuff we logged before.
2936        self.assert_log_lines([
2937            ('INFO', '1'),
2938            ('ERROR', '2'),
2939        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
2940
2941        # Now, perform verification which transforms the bytes.
2942
2943        with support.captured_stdout() as output:
2944            self.setup_via_listener(to_send[::-1], verify_reverse)
2945            logger = logging.getLogger("compiler.parser")
2946            # Both will output a message
2947            logger.info(self.next_message())
2948            logger.error(self.next_message())
2949        self.assert_log_lines([
2950            ('INFO', '5'),
2951            ('ERROR', '6'),
2952        ], stream=output)
2953        # Original logger output still has the stuff we logged before.
2954        self.assert_log_lines([
2955            ('INFO', '1'),
2956            ('ERROR', '2'),
2957        ], pat=r"^[\w.]+ -> (\w+): (\d+)$")
2958
2959    def test_out_of_order(self):
2960        self.apply_config(self.out_of_order)
2961        handler = logging.getLogger('mymodule').handlers[0]
2962        self.assertIsInstance(handler.target, logging.Handler)
2963        self.assertIsInstance(handler.formatter._style,
2964                              logging.StringTemplateStyle)
2965
2966    def test_baseconfig(self):
2967        d = {
2968            'atuple': (1, 2, 3),
2969            'alist': ['a', 'b', 'c'],
2970            'adict': {'d': 'e', 'f': 3 },
2971            'nest1': ('g', ('h', 'i'), 'j'),
2972            'nest2': ['k', ['l', 'm'], 'n'],
2973            'nest3': ['o', 'cfg://alist', 'p'],
2974        }
2975        bc = logging.config.BaseConfigurator(d)
2976        self.assertEqual(bc.convert('cfg://atuple[1]'), 2)
2977        self.assertEqual(bc.convert('cfg://alist[1]'), 'b')
2978        self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h')
2979        self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm')
2980        self.assertEqual(bc.convert('cfg://adict.d'), 'e')
2981        self.assertEqual(bc.convert('cfg://adict[f]'), 3)
2982        v = bc.convert('cfg://nest3')
2983        self.assertEqual(v.pop(1), ['a', 'b', 'c'])
2984        self.assertRaises(KeyError, bc.convert, 'cfg://nosuch')
2985        self.assertRaises(ValueError, bc.convert, 'cfg://!')
2986        self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]')
2987
2988class ManagerTest(BaseTest):
2989    def test_manager_loggerclass(self):
2990        logged = []
2991
2992        class MyLogger(logging.Logger):
2993            def _log(self, level, msg, args, exc_info=None, extra=None):
2994                logged.append(msg)
2995
2996        man = logging.Manager(None)
2997        self.assertRaises(TypeError, man.setLoggerClass, int)
2998        man.setLoggerClass(MyLogger)
2999        logger = man.getLogger('test')
3000        logger.warning('should appear in logged')
3001        logging.warning('should not appear in logged')
3002
3003        self.assertEqual(logged, ['should appear in logged'])
3004
3005    def test_set_log_record_factory(self):
3006        man = logging.Manager(None)
3007        expected = object()
3008        man.setLogRecordFactory(expected)
3009        self.assertEqual(man.logRecordFactory, expected)
3010
3011class ChildLoggerTest(BaseTest):
3012    def test_child_loggers(self):
3013        r = logging.getLogger()
3014        l1 = logging.getLogger('abc')
3015        l2 = logging.getLogger('def.ghi')
3016        c1 = r.getChild('xyz')
3017        c2 = r.getChild('uvw.xyz')
3018        self.assertIs(c1, logging.getLogger('xyz'))
3019        self.assertIs(c2, logging.getLogger('uvw.xyz'))
3020        c1 = l1.getChild('def')
3021        c2 = c1.getChild('ghi')
3022        c3 = l1.getChild('def.ghi')
3023        self.assertIs(c1, logging.getLogger('abc.def'))
3024        self.assertIs(c2, logging.getLogger('abc.def.ghi'))
3025        self.assertIs(c2, c3)
3026
3027
3028class DerivedLogRecord(logging.LogRecord):
3029    pass
3030
3031class LogRecordFactoryTest(BaseTest):
3032
3033    def setUp(self):
3034        class CheckingFilter(logging.Filter):
3035            def __init__(self, cls):
3036                self.cls = cls
3037
3038            def filter(self, record):
3039                t = type(record)
3040                if t is not self.cls:
3041                    msg = 'Unexpected LogRecord type %s, expected %s' % (t,
3042                            self.cls)
3043                    raise TypeError(msg)
3044                return True
3045
3046        BaseTest.setUp(self)
3047        self.filter = CheckingFilter(DerivedLogRecord)
3048        self.root_logger.addFilter(self.filter)
3049        self.orig_factory = logging.getLogRecordFactory()
3050
3051    def tearDown(self):
3052        self.root_logger.removeFilter(self.filter)
3053        BaseTest.tearDown(self)
3054        logging.setLogRecordFactory(self.orig_factory)
3055
3056    def test_logrecord_class(self):
3057        self.assertRaises(TypeError, self.root_logger.warning,
3058                          self.next_message())
3059        logging.setLogRecordFactory(DerivedLogRecord)
3060        self.root_logger.error(self.next_message())
3061        self.assert_log_lines([
3062           ('root', 'ERROR', '2'),
3063        ])
3064
3065
3066class QueueHandlerTest(BaseTest):
3067    # Do not bother with a logger name group.
3068    expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$"
3069
3070    def setUp(self):
3071        BaseTest.setUp(self)
3072        self.queue = queue.Queue(-1)
3073        self.que_hdlr = logging.handlers.QueueHandler(self.queue)
3074        self.que_logger = logging.getLogger('que')
3075        self.que_logger.propagate = False
3076        self.que_logger.setLevel(logging.WARNING)
3077        self.que_logger.addHandler(self.que_hdlr)
3078
3079    def tearDown(self):
3080        self.que_hdlr.close()
3081        BaseTest.tearDown(self)
3082
3083    def test_queue_handler(self):
3084        self.que_logger.debug(self.next_message())
3085        self.assertRaises(queue.Empty, self.queue.get_nowait)
3086        self.que_logger.info(self.next_message())
3087        self.assertRaises(queue.Empty, self.queue.get_nowait)
3088        msg = self.next_message()
3089        self.que_logger.warning(msg)
3090        data = self.queue.get_nowait()
3091        self.assertTrue(isinstance(data, logging.LogRecord))
3092        self.assertEqual(data.name, self.que_logger.name)
3093        self.assertEqual((data.msg, data.args), (msg, None))
3094
3095    @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'),
3096                         'logging.handlers.QueueListener required for this test')
3097    def test_queue_listener(self):
3098        handler = support.TestHandler(support.Matcher())
3099        listener = logging.handlers.QueueListener(self.queue, handler)
3100        listener.start()
3101        try:
3102            self.que_logger.warning(self.next_message())
3103            self.que_logger.error(self.next_message())
3104            self.que_logger.critical(self.next_message())
3105        finally:
3106            listener.stop()
3107        self.assertTrue(handler.matches(levelno=logging.WARNING, message='1'))
3108        self.assertTrue(handler.matches(levelno=logging.ERROR, message='2'))
3109        self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3'))
3110        handler.close()
3111
3112        # Now test with respect_handler_level set
3113
3114        handler = support.TestHandler(support.Matcher())
3115        handler.setLevel(logging.CRITICAL)
3116        listener = logging.handlers.QueueListener(self.queue, handler,
3117                                                  respect_handler_level=True)
3118        listener.start()
3119        try:
3120            self.que_logger.warning(self.next_message())
3121            self.que_logger.error(self.next_message())
3122            self.que_logger.critical(self.next_message())
3123        finally:
3124            listener.stop()
3125        self.assertFalse(handler.matches(levelno=logging.WARNING, message='4'))
3126        self.assertFalse(handler.matches(levelno=logging.ERROR, message='5'))
3127        self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6'))
3128
3129if hasattr(logging.handlers, 'QueueListener'):
3130    import multiprocessing
3131    from unittest.mock import patch
3132
3133    class QueueListenerTest(BaseTest):
3134        """
3135        Tests based on patch submitted for issue #27930. Ensure that
3136        QueueListener handles all log messages.
3137        """
3138
3139        repeat = 20
3140
3141        @staticmethod
3142        def setup_and_log(log_queue, ident):
3143            """
3144            Creates a logger with a QueueHandler that logs to a queue read by a
3145            QueueListener. Starts the listener, logs five messages, and stops
3146            the listener.
3147            """
3148            logger = logging.getLogger('test_logger_with_id_%s' % ident)
3149            logger.setLevel(logging.DEBUG)
3150            handler = logging.handlers.QueueHandler(log_queue)
3151            logger.addHandler(handler)
3152            listener = logging.handlers.QueueListener(log_queue)
3153            listener.start()
3154
3155            logger.info('one')
3156            logger.info('two')
3157            logger.info('three')
3158            logger.info('four')
3159            logger.info('five')
3160
3161            listener.stop()
3162            logger.removeHandler(handler)
3163            handler.close()
3164
3165        @patch.object(logging.handlers.QueueListener, 'handle')
3166        def test_handle_called_with_queue_queue(self, mock_handle):
3167            for i in range(self.repeat):
3168                log_queue = queue.Queue()
3169                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
3170            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
3171                             'correct number of handled log messages')
3172
3173        @support.requires_multiprocessing_queue
3174        @patch.object(logging.handlers.QueueListener, 'handle')
3175        def test_handle_called_with_mp_queue(self, mock_handle):
3176            for i in range(self.repeat):
3177                log_queue = multiprocessing.Queue()
3178                self.setup_and_log(log_queue, '%s_%s' % (self.id(), i))
3179            self.assertEqual(mock_handle.call_count, 5 * self.repeat,
3180                             'correct number of handled log messages')
3181
3182        @staticmethod
3183        def get_all_from_queue(log_queue):
3184            try:
3185                while True:
3186                    yield log_queue.get_nowait()
3187            except queue.Empty:
3188                return []
3189
3190        @support.requires_multiprocessing_queue
3191        def test_no_messages_in_queue_after_stop(self):
3192            """
3193            Five messages are logged then the QueueListener is stopped. This
3194            test then gets everything off the queue. Failure of this test
3195            indicates that messages were not registered on the queue until
3196            _after_ the QueueListener stopped.
3197            """
3198            for i in range(self.repeat):
3199                queue = multiprocessing.Queue()
3200                self.setup_and_log(queue, '%s_%s' %(self.id(), i))
3201                # time.sleep(1)
3202                items = list(self.get_all_from_queue(queue))
3203                expected = [[], [logging.handlers.QueueListener._sentinel]]
3204                self.assertIn(items, expected,
3205                              'Found unexpected messages in queue: %s' % (
3206                                    [m.msg if isinstance(m, logging.LogRecord)
3207                                     else m for m in items]))
3208
3209
3210ZERO = datetime.timedelta(0)
3211
3212class UTC(datetime.tzinfo):
3213    def utcoffset(self, dt):
3214        return ZERO
3215
3216    dst = utcoffset
3217
3218    def tzname(self, dt):
3219        return 'UTC'
3220
3221utc = UTC()
3222
3223class FormatterTest(unittest.TestCase):
3224    def setUp(self):
3225        self.common = {
3226            'name': 'formatter.test',
3227            'level': logging.DEBUG,
3228            'pathname': os.path.join('path', 'to', 'dummy.ext'),
3229            'lineno': 42,
3230            'exc_info': None,
3231            'func': None,
3232            'msg': 'Message with %d %s',
3233            'args': (2, 'placeholders'),
3234        }
3235        self.variants = {
3236        }
3237
3238    def get_record(self, name=None):
3239        result = dict(self.common)
3240        if name is not None:
3241            result.update(self.variants[name])
3242        return logging.makeLogRecord(result)
3243
3244    def test_percent(self):
3245        # Test %-formatting
3246        r = self.get_record()
3247        f = logging.Formatter('${%(message)s}')
3248        self.assertEqual(f.format(r), '${Message with 2 placeholders}')
3249        f = logging.Formatter('%(random)s')
3250        self.assertRaises(KeyError, f.format, r)
3251        self.assertFalse(f.usesTime())
3252        f = logging.Formatter('%(asctime)s')
3253        self.assertTrue(f.usesTime())
3254        f = logging.Formatter('%(asctime)-15s')
3255        self.assertTrue(f.usesTime())
3256        f = logging.Formatter('asctime')
3257        self.assertFalse(f.usesTime())
3258
3259    def test_braces(self):
3260        # Test {}-formatting
3261        r = self.get_record()
3262        f = logging.Formatter('$%{message}%$', style='{')
3263        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
3264        f = logging.Formatter('{random}', style='{')
3265        self.assertRaises(KeyError, f.format, r)
3266        self.assertFalse(f.usesTime())
3267        f = logging.Formatter('{asctime}', style='{')
3268        self.assertTrue(f.usesTime())
3269        f = logging.Formatter('{asctime!s:15}', style='{')
3270        self.assertTrue(f.usesTime())
3271        f = logging.Formatter('{asctime:15}', style='{')
3272        self.assertTrue(f.usesTime())
3273        f = logging.Formatter('asctime', style='{')
3274        self.assertFalse(f.usesTime())
3275
3276    def test_dollars(self):
3277        # Test $-formatting
3278        r = self.get_record()
3279        f = logging.Formatter('$message', style='$')
3280        self.assertEqual(f.format(r), 'Message with 2 placeholders')
3281        f = logging.Formatter('$$%${message}%$$', style='$')
3282        self.assertEqual(f.format(r), '$%Message with 2 placeholders%$')
3283        f = logging.Formatter('${random}', style='$')
3284        self.assertRaises(KeyError, f.format, r)
3285        self.assertFalse(f.usesTime())
3286        f = logging.Formatter('${asctime}', style='$')
3287        self.assertTrue(f.usesTime())
3288        f = logging.Formatter('${asctime', style='$')
3289        self.assertFalse(f.usesTime())
3290        f = logging.Formatter('$asctime', style='$')
3291        self.assertTrue(f.usesTime())
3292        f = logging.Formatter('asctime', style='$')
3293        self.assertFalse(f.usesTime())
3294
3295    def test_invalid_style(self):
3296        self.assertRaises(ValueError, logging.Formatter, None, None, 'x')
3297
3298    def test_time(self):
3299        r = self.get_record()
3300        dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc)
3301        # We use None to indicate we want the local timezone
3302        # We're essentially converting a UTC time to local time
3303        r.created = time.mktime(dt.astimezone(None).timetuple())
3304        r.msecs = 123
3305        f = logging.Formatter('%(asctime)s %(message)s')
3306        f.converter = time.gmtime
3307        self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123')
3308        self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21')
3309        f.format(r)
3310        self.assertEqual(r.asctime, '1993-04-21 08:03:00,123')
3311
3312class TestBufferingFormatter(logging.BufferingFormatter):
3313    def formatHeader(self, records):
3314        return '[(%d)' % len(records)
3315
3316    def formatFooter(self, records):
3317        return '(%d)]' % len(records)
3318
3319class BufferingFormatterTest(unittest.TestCase):
3320    def setUp(self):
3321        self.records = [
3322            logging.makeLogRecord({'msg': 'one'}),
3323            logging.makeLogRecord({'msg': 'two'}),
3324        ]
3325
3326    def test_default(self):
3327        f = logging.BufferingFormatter()
3328        self.assertEqual('', f.format([]))
3329        self.assertEqual('onetwo', f.format(self.records))
3330
3331    def test_custom(self):
3332        f = TestBufferingFormatter()
3333        self.assertEqual('[(2)onetwo(2)]', f.format(self.records))
3334        lf = logging.Formatter('<%(message)s>')
3335        f = TestBufferingFormatter(lf)
3336        self.assertEqual('[(2)<one><two>(2)]', f.format(self.records))
3337
3338class ExceptionTest(BaseTest):
3339    def test_formatting(self):
3340        r = self.root_logger
3341        h = RecordingHandler()
3342        r.addHandler(h)
3343        try:
3344            raise RuntimeError('deliberate mistake')
3345        except:
3346            logging.exception('failed', stack_info=True)
3347        r.removeHandler(h)
3348        h.close()
3349        r = h.records[0]
3350        self.assertTrue(r.exc_text.startswith('Traceback (most recent '
3351                                              'call last):\n'))
3352        self.assertTrue(r.exc_text.endswith('\nRuntimeError: '
3353                                            'deliberate mistake'))
3354        self.assertTrue(r.stack_info.startswith('Stack (most recent '
3355                                              'call last):\n'))
3356        self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', '
3357                                            'stack_info=True)'))
3358
3359
3360class LastResortTest(BaseTest):
3361    def test_last_resort(self):
3362        # Test the last resort handler
3363        root = self.root_logger
3364        root.removeHandler(self.root_hdlr)
3365        old_lastresort = logging.lastResort
3366        old_raise_exceptions = logging.raiseExceptions
3367
3368        try:
3369            with support.captured_stderr() as stderr:
3370                root.debug('This should not appear')
3371                self.assertEqual(stderr.getvalue(), '')
3372                root.warning('Final chance!')
3373                self.assertEqual(stderr.getvalue(), 'Final chance!\n')
3374
3375            # No handlers and no last resort, so 'No handlers' message
3376            logging.lastResort = None
3377            with support.captured_stderr() as stderr:
3378                root.warning('Final chance!')
3379                msg = 'No handlers could be found for logger "root"\n'
3380                self.assertEqual(stderr.getvalue(), msg)
3381
3382            # 'No handlers' message only printed once
3383            with support.captured_stderr() as stderr:
3384                root.warning('Final chance!')
3385                self.assertEqual(stderr.getvalue(), '')
3386
3387            # If raiseExceptions is False, no message is printed
3388            root.manager.emittedNoHandlerWarning = False
3389            logging.raiseExceptions = False
3390            with support.captured_stderr() as stderr:
3391                root.warning('Final chance!')
3392                self.assertEqual(stderr.getvalue(), '')
3393        finally:
3394            root.addHandler(self.root_hdlr)
3395            logging.lastResort = old_lastresort
3396            logging.raiseExceptions = old_raise_exceptions
3397
3398
3399class FakeHandler:
3400
3401    def __init__(self, identifier, called):
3402        for method in ('acquire', 'flush', 'close', 'release'):
3403            setattr(self, method, self.record_call(identifier, method, called))
3404
3405    def record_call(self, identifier, method_name, called):
3406        def inner():
3407            called.append('{} - {}'.format(identifier, method_name))
3408        return inner
3409
3410
3411class RecordingHandler(logging.NullHandler):
3412
3413    def __init__(self, *args, **kwargs):
3414        super(RecordingHandler, self).__init__(*args, **kwargs)
3415        self.records = []
3416
3417    def handle(self, record):
3418        """Keep track of all the emitted records."""
3419        self.records.append(record)
3420
3421
3422class ShutdownTest(BaseTest):
3423
3424    """Test suite for the shutdown method."""
3425
3426    def setUp(self):
3427        super(ShutdownTest, self).setUp()
3428        self.called = []
3429
3430        raise_exceptions = logging.raiseExceptions
3431        self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions)
3432
3433    def raise_error(self, error):
3434        def inner():
3435            raise error()
3436        return inner
3437
3438    def test_no_failure(self):
3439        # create some fake handlers
3440        handler0 = FakeHandler(0, self.called)
3441        handler1 = FakeHandler(1, self.called)
3442        handler2 = FakeHandler(2, self.called)
3443
3444        # create live weakref to those handlers
3445        handlers = map(logging.weakref.ref, [handler0, handler1, handler2])
3446
3447        logging.shutdown(handlerList=list(handlers))
3448
3449        expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release',
3450                    '1 - acquire', '1 - flush', '1 - close', '1 - release',
3451                    '0 - acquire', '0 - flush', '0 - close', '0 - release']
3452        self.assertEqual(expected, self.called)
3453
3454    def _test_with_failure_in_method(self, method, error):
3455        handler = FakeHandler(0, self.called)
3456        setattr(handler, method, self.raise_error(error))
3457        handlers = [logging.weakref.ref(handler)]
3458
3459        logging.shutdown(handlerList=list(handlers))
3460
3461        self.assertEqual('0 - release', self.called[-1])
3462
3463    def test_with_ioerror_in_acquire(self):
3464        self._test_with_failure_in_method('acquire', OSError)
3465
3466    def test_with_ioerror_in_flush(self):
3467        self._test_with_failure_in_method('flush', OSError)
3468
3469    def test_with_ioerror_in_close(self):
3470        self._test_with_failure_in_method('close', OSError)
3471
3472    def test_with_valueerror_in_acquire(self):
3473        self._test_with_failure_in_method('acquire', ValueError)
3474
3475    def test_with_valueerror_in_flush(self):
3476        self._test_with_failure_in_method('flush', ValueError)
3477
3478    def test_with_valueerror_in_close(self):
3479        self._test_with_failure_in_method('close', ValueError)
3480
3481    def test_with_other_error_in_acquire_without_raise(self):
3482        logging.raiseExceptions = False
3483        self._test_with_failure_in_method('acquire', IndexError)
3484
3485    def test_with_other_error_in_flush_without_raise(self):
3486        logging.raiseExceptions = False
3487        self._test_with_failure_in_method('flush', IndexError)
3488
3489    def test_with_other_error_in_close_without_raise(self):
3490        logging.raiseExceptions = False
3491        self._test_with_failure_in_method('close', IndexError)
3492
3493    def test_with_other_error_in_acquire_with_raise(self):
3494        logging.raiseExceptions = True
3495        self.assertRaises(IndexError, self._test_with_failure_in_method,
3496                          'acquire', IndexError)
3497
3498    def test_with_other_error_in_flush_with_raise(self):
3499        logging.raiseExceptions = True
3500        self.assertRaises(IndexError, self._test_with_failure_in_method,
3501                          'flush', IndexError)
3502
3503    def test_with_other_error_in_close_with_raise(self):
3504        logging.raiseExceptions = True
3505        self.assertRaises(IndexError, self._test_with_failure_in_method,
3506                          'close', IndexError)
3507
3508
3509class ModuleLevelMiscTest(BaseTest):
3510
3511    """Test suite for some module level methods."""
3512
3513    def test_disable(self):
3514        old_disable = logging.root.manager.disable
3515        # confirm our assumptions are correct
3516        self.assertEqual(old_disable, 0)
3517        self.addCleanup(logging.disable, old_disable)
3518
3519        logging.disable(83)
3520        self.assertEqual(logging.root.manager.disable, 83)
3521
3522    def _test_log(self, method, level=None):
3523        called = []
3524        support.patch(self, logging, 'basicConfig',
3525                      lambda *a, **kw: called.append((a, kw)))
3526
3527        recording = RecordingHandler()
3528        logging.root.addHandler(recording)
3529
3530        log_method = getattr(logging, method)
3531        if level is not None:
3532            log_method(level, "test me: %r", recording)
3533        else:
3534            log_method("test me: %r", recording)
3535
3536        self.assertEqual(len(recording.records), 1)
3537        record = recording.records[0]
3538        self.assertEqual(record.getMessage(), "test me: %r" % recording)
3539
3540        expected_level = level if level is not None else getattr(logging, method.upper())
3541        self.assertEqual(record.levelno, expected_level)
3542
3543        # basicConfig was not called!
3544        self.assertEqual(called, [])
3545
3546    def test_log(self):
3547        self._test_log('log', logging.ERROR)
3548
3549    def test_debug(self):
3550        self._test_log('debug')
3551
3552    def test_info(self):
3553        self._test_log('info')
3554
3555    def test_warning(self):
3556        self._test_log('warning')
3557
3558    def test_error(self):
3559        self._test_log('error')
3560
3561    def test_critical(self):
3562        self._test_log('critical')
3563
3564    def test_set_logger_class(self):
3565        self.assertRaises(TypeError, logging.setLoggerClass, object)
3566
3567        class MyLogger(logging.Logger):
3568            pass
3569
3570        logging.setLoggerClass(MyLogger)
3571        self.assertEqual(logging.getLoggerClass(), MyLogger)
3572
3573        logging.setLoggerClass(logging.Logger)
3574        self.assertEqual(logging.getLoggerClass(), logging.Logger)
3575
3576    @support.requires_type_collecting
3577    def test_logging_at_shutdown(self):
3578        # Issue #20037
3579        code = """if 1:
3580            import logging
3581
3582            class A:
3583                def __del__(self):
3584                    try:
3585                        raise ValueError("some error")
3586                    except Exception:
3587                        logging.exception("exception in __del__")
3588
3589            a = A()"""
3590        rc, out, err = assert_python_ok("-c", code)
3591        err = err.decode()
3592        self.assertIn("exception in __del__", err)
3593        self.assertIn("ValueError: some error", err)
3594
3595
3596class LogRecordTest(BaseTest):
3597    def test_str_rep(self):
3598        r = logging.makeLogRecord({})
3599        s = str(r)
3600        self.assertTrue(s.startswith('<LogRecord: '))
3601        self.assertTrue(s.endswith('>'))
3602
3603    def test_dict_arg(self):
3604        h = RecordingHandler()
3605        r = logging.getLogger()
3606        r.addHandler(h)
3607        d = {'less' : 'more' }
3608        logging.warning('less is %(less)s', d)
3609        self.assertIs(h.records[0].args, d)
3610        self.assertEqual(h.records[0].message, 'less is more')
3611        r.removeHandler(h)
3612        h.close()
3613
3614    def test_multiprocessing(self):
3615        r = logging.makeLogRecord({})
3616        self.assertEqual(r.processName, 'MainProcess')
3617        try:
3618            import multiprocessing as mp
3619            r = logging.makeLogRecord({})
3620            self.assertEqual(r.processName, mp.current_process().name)
3621        except ImportError:
3622            pass
3623
3624    def test_optional(self):
3625        r = logging.makeLogRecord({})
3626        NOT_NONE = self.assertIsNotNone
3627        if threading:
3628            NOT_NONE(r.thread)
3629            NOT_NONE(r.threadName)
3630        NOT_NONE(r.process)
3631        NOT_NONE(r.processName)
3632        log_threads = logging.logThreads
3633        log_processes = logging.logProcesses
3634        log_multiprocessing = logging.logMultiprocessing
3635        try:
3636            logging.logThreads = False
3637            logging.logProcesses = False
3638            logging.logMultiprocessing = False
3639            r = logging.makeLogRecord({})
3640            NONE = self.assertIsNone
3641            NONE(r.thread)
3642            NONE(r.threadName)
3643            NONE(r.process)
3644            NONE(r.processName)
3645        finally:
3646            logging.logThreads = log_threads
3647            logging.logProcesses = log_processes
3648            logging.logMultiprocessing = log_multiprocessing
3649
3650class BasicConfigTest(unittest.TestCase):
3651
3652    """Test suite for logging.basicConfig."""
3653
3654    def setUp(self):
3655        super(BasicConfigTest, self).setUp()
3656        self.handlers = logging.root.handlers
3657        self.saved_handlers = logging._handlers.copy()
3658        self.saved_handler_list = logging._handlerList[:]
3659        self.original_logging_level = logging.root.level
3660        self.addCleanup(self.cleanup)
3661        logging.root.handlers = []
3662
3663    def tearDown(self):
3664        for h in logging.root.handlers[:]:
3665            logging.root.removeHandler(h)
3666            h.close()
3667        super(BasicConfigTest, self).tearDown()
3668
3669    def cleanup(self):
3670        setattr(logging.root, 'handlers', self.handlers)
3671        logging._handlers.clear()
3672        logging._handlers.update(self.saved_handlers)
3673        logging._handlerList[:] = self.saved_handler_list
3674        logging.root.level = self.original_logging_level
3675
3676    def test_no_kwargs(self):
3677        logging.basicConfig()
3678
3679        # handler defaults to a StreamHandler to sys.stderr
3680        self.assertEqual(len(logging.root.handlers), 1)
3681        handler = logging.root.handlers[0]
3682        self.assertIsInstance(handler, logging.StreamHandler)
3683        self.assertEqual(handler.stream, sys.stderr)
3684
3685        formatter = handler.formatter
3686        # format defaults to logging.BASIC_FORMAT
3687        self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT)
3688        # datefmt defaults to None
3689        self.assertIsNone(formatter.datefmt)
3690        # style defaults to %
3691        self.assertIsInstance(formatter._style, logging.PercentStyle)
3692
3693        # level is not explicitly set
3694        self.assertEqual(logging.root.level, self.original_logging_level)
3695
3696    def test_strformatstyle(self):
3697        with support.captured_stdout() as output:
3698            logging.basicConfig(stream=sys.stdout, style="{")
3699            logging.error("Log an error")
3700            sys.stdout.seek(0)
3701            self.assertEqual(output.getvalue().strip(),
3702                "ERROR:root:Log an error")
3703
3704    def test_stringtemplatestyle(self):
3705        with support.captured_stdout() as output:
3706            logging.basicConfig(stream=sys.stdout, style="$")
3707            logging.error("Log an error")
3708            sys.stdout.seek(0)
3709            self.assertEqual(output.getvalue().strip(),
3710                "ERROR:root:Log an error")
3711
3712    def test_filename(self):
3713
3714        def cleanup(h1, h2, fn):
3715            h1.close()
3716            h2.close()
3717            os.remove(fn)
3718
3719        logging.basicConfig(filename='test.log')
3720
3721        self.assertEqual(len(logging.root.handlers), 1)
3722        handler = logging.root.handlers[0]
3723        self.assertIsInstance(handler, logging.FileHandler)
3724
3725        expected = logging.FileHandler('test.log', 'a')
3726        self.assertEqual(handler.stream.mode, expected.stream.mode)
3727        self.assertEqual(handler.stream.name, expected.stream.name)
3728        self.addCleanup(cleanup, handler, expected, 'test.log')
3729
3730    def test_filemode(self):
3731
3732        def cleanup(h1, h2, fn):
3733            h1.close()
3734            h2.close()
3735            os.remove(fn)
3736
3737        logging.basicConfig(filename='test.log', filemode='wb')
3738
3739        handler = logging.root.handlers[0]
3740        expected = logging.FileHandler('test.log', 'wb')
3741        self.assertEqual(handler.stream.mode, expected.stream.mode)
3742        self.addCleanup(cleanup, handler, expected, 'test.log')
3743
3744    def test_stream(self):
3745        stream = io.StringIO()
3746        self.addCleanup(stream.close)
3747        logging.basicConfig(stream=stream)
3748
3749        self.assertEqual(len(logging.root.handlers), 1)
3750        handler = logging.root.handlers[0]
3751        self.assertIsInstance(handler, logging.StreamHandler)
3752        self.assertEqual(handler.stream, stream)
3753
3754    def test_format(self):
3755        logging.basicConfig(format='foo')
3756
3757        formatter = logging.root.handlers[0].formatter
3758        self.assertEqual(formatter._style._fmt, 'foo')
3759
3760    def test_datefmt(self):
3761        logging.basicConfig(datefmt='bar')
3762
3763        formatter = logging.root.handlers[0].formatter
3764        self.assertEqual(formatter.datefmt, 'bar')
3765
3766    def test_style(self):
3767        logging.basicConfig(style='$')
3768
3769        formatter = logging.root.handlers[0].formatter
3770        self.assertIsInstance(formatter._style, logging.StringTemplateStyle)
3771
3772    def test_level(self):
3773        old_level = logging.root.level
3774        self.addCleanup(logging.root.setLevel, old_level)
3775
3776        logging.basicConfig(level=57)
3777        self.assertEqual(logging.root.level, 57)
3778        # Test that second call has no effect
3779        logging.basicConfig(level=58)
3780        self.assertEqual(logging.root.level, 57)
3781
3782    def test_incompatible(self):
3783        assertRaises = self.assertRaises
3784        handlers = [logging.StreamHandler()]
3785        stream = sys.stderr
3786        assertRaises(ValueError, logging.basicConfig, filename='test.log',
3787                                                     stream=stream)
3788        assertRaises(ValueError, logging.basicConfig, filename='test.log',
3789                                                     handlers=handlers)
3790        assertRaises(ValueError, logging.basicConfig, stream=stream,
3791                                                     handlers=handlers)
3792        # Issue 23207: test for invalid kwargs
3793        assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO)
3794        # Should pop both filename and filemode even if filename is None
3795        logging.basicConfig(filename=None, filemode='a')
3796
3797    def test_handlers(self):
3798        handlers = [
3799            logging.StreamHandler(),
3800            logging.StreamHandler(sys.stdout),
3801            logging.StreamHandler(),
3802        ]
3803        f = logging.Formatter()
3804        handlers[2].setFormatter(f)
3805        logging.basicConfig(handlers=handlers)
3806        self.assertIs(handlers[0], logging.root.handlers[0])
3807        self.assertIs(handlers[1], logging.root.handlers[1])
3808        self.assertIs(handlers[2], logging.root.handlers[2])
3809        self.assertIsNotNone(handlers[0].formatter)
3810        self.assertIsNotNone(handlers[1].formatter)
3811        self.assertIs(handlers[2].formatter, f)
3812        self.assertIs(handlers[0].formatter, handlers[1].formatter)
3813
3814    def _test_log(self, method, level=None):
3815        # logging.root has no handlers so basicConfig should be called
3816        called = []
3817
3818        old_basic_config = logging.basicConfig
3819        def my_basic_config(*a, **kw):
3820            old_basic_config()
3821            old_level = logging.root.level
3822            logging.root.setLevel(100)  # avoid having messages in stderr
3823            self.addCleanup(logging.root.setLevel, old_level)
3824            called.append((a, kw))
3825
3826        support.patch(self, logging, 'basicConfig', my_basic_config)
3827
3828        log_method = getattr(logging, method)
3829        if level is not None:
3830            log_method(level, "test me")
3831        else:
3832            log_method("test me")
3833
3834        # basicConfig was called with no arguments
3835        self.assertEqual(called, [((), {})])
3836
3837    def test_log(self):
3838        self._test_log('log', logging.WARNING)
3839
3840    def test_debug(self):
3841        self._test_log('debug')
3842
3843    def test_info(self):
3844        self._test_log('info')
3845
3846    def test_warning(self):
3847        self._test_log('warning')
3848
3849    def test_error(self):
3850        self._test_log('error')
3851
3852    def test_critical(self):
3853        self._test_log('critical')
3854
3855
3856class LoggerAdapterTest(unittest.TestCase):
3857
3858    def setUp(self):
3859        super(LoggerAdapterTest, self).setUp()
3860        old_handler_list = logging._handlerList[:]
3861
3862        self.recording = RecordingHandler()
3863        self.logger = logging.root
3864        self.logger.addHandler(self.recording)
3865        self.addCleanup(self.logger.removeHandler, self.recording)
3866        self.addCleanup(self.recording.close)
3867
3868        def cleanup():
3869            logging._handlerList[:] = old_handler_list
3870
3871        self.addCleanup(cleanup)
3872        self.addCleanup(logging.shutdown)
3873        self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None)
3874
3875    def test_exception(self):
3876        msg = 'testing exception: %r'
3877        exc = None
3878        try:
3879            1 / 0
3880        except ZeroDivisionError as e:
3881            exc = e
3882            self.adapter.exception(msg, self.recording)
3883
3884        self.assertEqual(len(self.recording.records), 1)
3885        record = self.recording.records[0]
3886        self.assertEqual(record.levelno, logging.ERROR)
3887        self.assertEqual(record.msg, msg)
3888        self.assertEqual(record.args, (self.recording,))
3889        self.assertEqual(record.exc_info,
3890                         (exc.__class__, exc, exc.__traceback__))
3891
3892    def test_exception_excinfo(self):
3893        try:
3894            1 / 0
3895        except ZeroDivisionError as e:
3896            exc = e
3897
3898        self.adapter.exception('exc_info test', exc_info=exc)
3899
3900        self.assertEqual(len(self.recording.records), 1)
3901        record = self.recording.records[0]
3902        self.assertEqual(record.exc_info,
3903                         (exc.__class__, exc, exc.__traceback__))
3904
3905    def test_critical(self):
3906        msg = 'critical test! %r'
3907        self.adapter.critical(msg, self.recording)
3908
3909        self.assertEqual(len(self.recording.records), 1)
3910        record = self.recording.records[0]
3911        self.assertEqual(record.levelno, logging.CRITICAL)
3912        self.assertEqual(record.msg, msg)
3913        self.assertEqual(record.args, (self.recording,))
3914
3915    def test_is_enabled_for(self):
3916        old_disable = self.adapter.logger.manager.disable
3917        self.adapter.logger.manager.disable = 33
3918        self.addCleanup(setattr, self.adapter.logger.manager, 'disable',
3919                        old_disable)
3920        self.assertFalse(self.adapter.isEnabledFor(32))
3921
3922    def test_has_handlers(self):
3923        self.assertTrue(self.adapter.hasHandlers())
3924
3925        for handler in self.logger.handlers:
3926            self.logger.removeHandler(handler)
3927
3928        self.assertFalse(self.logger.hasHandlers())
3929        self.assertFalse(self.adapter.hasHandlers())
3930
3931
3932class LoggerTest(BaseTest):
3933
3934    def setUp(self):
3935        super(LoggerTest, self).setUp()
3936        self.recording = RecordingHandler()
3937        self.logger = logging.Logger(name='blah')
3938        self.logger.addHandler(self.recording)
3939        self.addCleanup(self.logger.removeHandler, self.recording)
3940        self.addCleanup(self.recording.close)
3941        self.addCleanup(logging.shutdown)
3942
3943    def test_set_invalid_level(self):
3944        self.assertRaises(TypeError, self.logger.setLevel, object())
3945
3946    def test_exception(self):
3947        msg = 'testing exception: %r'
3948        exc = None
3949        try:
3950            1 / 0
3951        except ZeroDivisionError as e:
3952            exc = e
3953            self.logger.exception(msg, self.recording)
3954
3955        self.assertEqual(len(self.recording.records), 1)
3956        record = self.recording.records[0]
3957        self.assertEqual(record.levelno, logging.ERROR)
3958        self.assertEqual(record.msg, msg)
3959        self.assertEqual(record.args, (self.recording,))
3960        self.assertEqual(record.exc_info,
3961                         (exc.__class__, exc, exc.__traceback__))
3962
3963    def test_log_invalid_level_with_raise(self):
3964        with support.swap_attr(logging, 'raiseExceptions', True):
3965            self.assertRaises(TypeError, self.logger.log, '10', 'test message')
3966
3967    def test_log_invalid_level_no_raise(self):
3968        with support.swap_attr(logging, 'raiseExceptions', False):
3969            self.logger.log('10', 'test message')  # no exception happens
3970
3971    def test_find_caller_with_stack_info(self):
3972        called = []
3973        support.patch(self, logging.traceback, 'print_stack',
3974                      lambda f, file: called.append(file.getvalue()))
3975
3976        self.logger.findCaller(stack_info=True)
3977
3978        self.assertEqual(len(called), 1)
3979        self.assertEqual('Stack (most recent call last):\n', called[0])
3980
3981    def test_make_record_with_extra_overwrite(self):
3982        name = 'my record'
3983        level = 13
3984        fn = lno = msg = args = exc_info = func = sinfo = None
3985        rv = logging._logRecordFactory(name, level, fn, lno, msg, args,
3986                                       exc_info, func, sinfo)
3987
3988        for key in ('message', 'asctime') + tuple(rv.__dict__.keys()):
3989            extra = {key: 'some value'}
3990            self.assertRaises(KeyError, self.logger.makeRecord, name, level,
3991                              fn, lno, msg, args, exc_info,
3992                              extra=extra, sinfo=sinfo)
3993
3994    def test_make_record_with_extra_no_overwrite(self):
3995        name = 'my record'
3996        level = 13
3997        fn = lno = msg = args = exc_info = func = sinfo = None
3998        extra = {'valid_key': 'some value'}
3999        result = self.logger.makeRecord(name, level, fn, lno, msg, args,
4000                                        exc_info, extra=extra, sinfo=sinfo)
4001        self.assertIn('valid_key', result.__dict__)
4002
4003    def test_has_handlers(self):
4004        self.assertTrue(self.logger.hasHandlers())
4005
4006        for handler in self.logger.handlers:
4007            self.logger.removeHandler(handler)
4008        self.assertFalse(self.logger.hasHandlers())
4009
4010    def test_has_handlers_no_propagate(self):
4011        child_logger = logging.getLogger('blah.child')
4012        child_logger.propagate = False
4013        self.assertFalse(child_logger.hasHandlers())
4014
4015    def test_is_enabled_for(self):
4016        old_disable = self.logger.manager.disable
4017        self.logger.manager.disable = 23
4018        self.addCleanup(setattr, self.logger.manager, 'disable', old_disable)
4019        self.assertFalse(self.logger.isEnabledFor(22))
4020
4021    def test_root_logger_aliases(self):
4022        root = logging.getLogger()
4023        self.assertIs(root, logging.root)
4024        self.assertIs(root, logging.getLogger(None))
4025        self.assertIs(root, logging.getLogger(''))
4026        self.assertIs(root, logging.getLogger('foo').root)
4027        self.assertIs(root, logging.getLogger('foo.bar').root)
4028        self.assertIs(root, logging.getLogger('foo').parent)
4029
4030        self.assertIsNot(root, logging.getLogger('\0'))
4031        self.assertIsNot(root, logging.getLogger('foo.bar').parent)
4032
4033    def test_invalid_names(self):
4034        self.assertRaises(TypeError, logging.getLogger, any)
4035        self.assertRaises(TypeError, logging.getLogger, b'foo')
4036
4037
4038class BaseFileTest(BaseTest):
4039    "Base class for handler tests that write log files"
4040
4041    def setUp(self):
4042        BaseTest.setUp(self)
4043        fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-")
4044        os.close(fd)
4045        self.rmfiles = []
4046
4047    def tearDown(self):
4048        for fn in self.rmfiles:
4049            os.unlink(fn)
4050        if os.path.exists(self.fn):
4051            os.unlink(self.fn)
4052        BaseTest.tearDown(self)
4053
4054    def assertLogFile(self, filename):
4055        "Assert a log file is there and register it for deletion"
4056        self.assertTrue(os.path.exists(filename),
4057                        msg="Log file %r does not exist" % filename)
4058        self.rmfiles.append(filename)
4059
4060
4061class FileHandlerTest(BaseFileTest):
4062    def test_delay(self):
4063        os.unlink(self.fn)
4064        fh = logging.FileHandler(self.fn, delay=True)
4065        self.assertIsNone(fh.stream)
4066        self.assertFalse(os.path.exists(self.fn))
4067        fh.handle(logging.makeLogRecord({}))
4068        self.assertIsNotNone(fh.stream)
4069        self.assertTrue(os.path.exists(self.fn))
4070        fh.close()
4071
4072class RotatingFileHandlerTest(BaseFileTest):
4073    def next_rec(self):
4074        return logging.LogRecord('n', logging.DEBUG, 'p', 1,
4075                                 self.next_message(), None, None, None)
4076
4077    def test_should_not_rollover(self):
4078        # If maxbytes is zero rollover never occurs
4079        rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=0)
4080        self.assertFalse(rh.shouldRollover(None))
4081        rh.close()
4082
4083    def test_should_rollover(self):
4084        rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=1)
4085        self.assertTrue(rh.shouldRollover(self.next_rec()))
4086        rh.close()
4087
4088    def test_file_created(self):
4089        # checks that the file is created and assumes it was created
4090        # by us
4091        rh = logging.handlers.RotatingFileHandler(self.fn)
4092        rh.emit(self.next_rec())
4093        self.assertLogFile(self.fn)
4094        rh.close()
4095
4096    def test_rollover_filenames(self):
4097        def namer(name):
4098            return name + ".test"
4099        rh = logging.handlers.RotatingFileHandler(
4100            self.fn, backupCount=2, maxBytes=1)
4101        rh.namer = namer
4102        rh.emit(self.next_rec())
4103        self.assertLogFile(self.fn)
4104        rh.emit(self.next_rec())
4105        self.assertLogFile(namer(self.fn + ".1"))
4106        rh.emit(self.next_rec())
4107        self.assertLogFile(namer(self.fn + ".2"))
4108        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
4109        rh.close()
4110
4111    @support.requires_zlib
4112    def test_rotator(self):
4113        def namer(name):
4114            return name + ".gz"
4115
4116        def rotator(source, dest):
4117            with open(source, "rb") as sf:
4118                data = sf.read()
4119                compressed = zlib.compress(data, 9)
4120                with open(dest, "wb") as df:
4121                    df.write(compressed)
4122            os.remove(source)
4123
4124        rh = logging.handlers.RotatingFileHandler(
4125            self.fn, backupCount=2, maxBytes=1)
4126        rh.rotator = rotator
4127        rh.namer = namer
4128        m1 = self.next_rec()
4129        rh.emit(m1)
4130        self.assertLogFile(self.fn)
4131        m2 = self.next_rec()
4132        rh.emit(m2)
4133        fn = namer(self.fn + ".1")
4134        self.assertLogFile(fn)
4135        newline = os.linesep
4136        with open(fn, "rb") as f:
4137            compressed = f.read()
4138            data = zlib.decompress(compressed)
4139            self.assertEqual(data.decode("ascii"), m1.msg + newline)
4140        rh.emit(self.next_rec())
4141        fn = namer(self.fn + ".2")
4142        self.assertLogFile(fn)
4143        with open(fn, "rb") as f:
4144            compressed = f.read()
4145            data = zlib.decompress(compressed)
4146            self.assertEqual(data.decode("ascii"), m1.msg + newline)
4147        rh.emit(self.next_rec())
4148        fn = namer(self.fn + ".2")
4149        with open(fn, "rb") as f:
4150            compressed = f.read()
4151            data = zlib.decompress(compressed)
4152            self.assertEqual(data.decode("ascii"), m2.msg + newline)
4153        self.assertFalse(os.path.exists(namer(self.fn + ".3")))
4154        rh.close()
4155
4156class TimedRotatingFileHandlerTest(BaseFileTest):
4157    # other test methods added below
4158    def test_rollover(self):
4159        fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S',
4160                                                       backupCount=1)
4161        fmt = logging.Formatter('%(asctime)s %(message)s')
4162        fh.setFormatter(fmt)
4163        r1 = logging.makeLogRecord({'msg': 'testing - initial'})
4164        fh.emit(r1)
4165        self.assertLogFile(self.fn)
4166        time.sleep(1.1)    # a little over a second ...
4167        r2 = logging.makeLogRecord({'msg': 'testing - after delay'})
4168        fh.emit(r2)
4169        fh.close()
4170        # At this point, we should have a recent rotated file which we
4171        # can test for the existence of. However, in practice, on some
4172        # machines which run really slowly, we don't know how far back
4173        # in time to go to look for the log file. So, we go back a fair
4174        # bit, and stop as soon as we see a rotated file. In theory this
4175        # could of course still fail, but the chances are lower.
4176        found = False
4177        now = datetime.datetime.now()
4178        GO_BACK = 5 * 60 # seconds
4179        for secs in range(GO_BACK):
4180            prev = now - datetime.timedelta(seconds=secs)
4181            fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S")
4182            found = os.path.exists(fn)
4183            if found:
4184                self.rmfiles.append(fn)
4185                break
4186        msg = 'No rotated files found, went back %d seconds' % GO_BACK
4187        if not found:
4188            #print additional diagnostics
4189            dn, fn = os.path.split(self.fn)
4190            files = [f for f in os.listdir(dn) if f.startswith(fn)]
4191            print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr)
4192            print('The only matching files are: %s' % files, file=sys.stderr)
4193            for f in files:
4194                print('Contents of %s:' % f)
4195                path = os.path.join(dn, f)
4196                with open(path, 'r') as tf:
4197                    print(tf.read())
4198        self.assertTrue(found, msg=msg)
4199
4200    def test_invalid(self):
4201        assertRaises = self.assertRaises
4202        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
4203                     self.fn, 'X', delay=True)
4204        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
4205                     self.fn, 'W', delay=True)
4206        assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler,
4207                     self.fn, 'W7', delay=True)
4208
4209    def test_compute_rollover_daily_attime(self):
4210        currentTime = 0
4211        atTime = datetime.time(12, 0, 0)
4212        rh = logging.handlers.TimedRotatingFileHandler(
4213            self.fn, when='MIDNIGHT', interval=1, backupCount=0, utc=True,
4214            atTime=atTime)
4215        try:
4216            actual = rh.computeRollover(currentTime)
4217            self.assertEqual(actual, currentTime + 12 * 60 * 60)
4218
4219            actual = rh.computeRollover(currentTime + 13 * 60 * 60)
4220            self.assertEqual(actual, currentTime + 36 * 60 * 60)
4221        finally:
4222            rh.close()
4223
4224    #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.')
4225    def test_compute_rollover_weekly_attime(self):
4226        currentTime = int(time.time())
4227        today = currentTime - currentTime % 86400
4228
4229        atTime = datetime.time(12, 0, 0)
4230
4231        wday = time.gmtime(today).tm_wday
4232        for day in range(7):
4233            rh = logging.handlers.TimedRotatingFileHandler(
4234                self.fn, when='W%d' % day, interval=1, backupCount=0, utc=True,
4235                atTime=atTime)
4236            try:
4237                if wday > day:
4238                    # The rollover day has already passed this week, so we
4239                    # go over into next week
4240                    expected = (7 - wday + day)
4241                else:
4242                    expected = (day - wday)
4243                # At this point expected is in days from now, convert to seconds
4244                expected *= 24 * 60 * 60
4245                # Add in the rollover time
4246                expected += 12 * 60 * 60
4247                # Add in adjustment for today
4248                expected += today
4249                actual = rh.computeRollover(today)
4250                if actual != expected:
4251                    print('failed in timezone: %d' % time.timezone)
4252                    print('local vars: %s' % locals())
4253                self.assertEqual(actual, expected)
4254                if day == wday:
4255                    # goes into following week
4256                    expected += 7 * 24 * 60 * 60
4257                actual = rh.computeRollover(today + 13 * 60 * 60)
4258                if actual != expected:
4259                    print('failed in timezone: %d' % time.timezone)
4260                    print('local vars: %s' % locals())
4261                self.assertEqual(actual, expected)
4262            finally:
4263                rh.close()
4264
4265
4266def secs(**kw):
4267    return datetime.timedelta(**kw) // datetime.timedelta(seconds=1)
4268
4269for when, exp in (('S', 1),
4270                  ('M', 60),
4271                  ('H', 60 * 60),
4272                  ('D', 60 * 60 * 24),
4273                  ('MIDNIGHT', 60 * 60 * 24),
4274                  # current time (epoch start) is a Thursday, W0 means Monday
4275                  ('W0', secs(days=4, hours=24)),
4276                 ):
4277    def test_compute_rollover(self, when=when, exp=exp):
4278        rh = logging.handlers.TimedRotatingFileHandler(
4279            self.fn, when=when, interval=1, backupCount=0, utc=True)
4280        currentTime = 0.0
4281        actual = rh.computeRollover(currentTime)
4282        if exp != actual:
4283            # Failures occur on some systems for MIDNIGHT and W0.
4284            # Print detailed calculation for MIDNIGHT so we can try to see
4285            # what's going on
4286            if when == 'MIDNIGHT':
4287                try:
4288                    if rh.utc:
4289                        t = time.gmtime(currentTime)
4290                    else:
4291                        t = time.localtime(currentTime)
4292                    currentHour = t[3]
4293                    currentMinute = t[4]
4294                    currentSecond = t[5]
4295                    # r is the number of seconds left between now and midnight
4296                    r = logging.handlers._MIDNIGHT - ((currentHour * 60 +
4297                                                       currentMinute) * 60 +
4298                            currentSecond)
4299                    result = currentTime + r
4300                    print('t: %s (%s)' % (t, rh.utc), file=sys.stderr)
4301                    print('currentHour: %s' % currentHour, file=sys.stderr)
4302                    print('currentMinute: %s' % currentMinute, file=sys.stderr)
4303                    print('currentSecond: %s' % currentSecond, file=sys.stderr)
4304                    print('r: %s' % r, file=sys.stderr)
4305                    print('result: %s' % result, file=sys.stderr)
4306                except Exception:
4307                    print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr)
4308        self.assertEqual(exp, actual)
4309        rh.close()
4310    setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover)
4311
4312
4313@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.')
4314class NTEventLogHandlerTest(BaseTest):
4315    def test_basic(self):
4316        logtype = 'Application'
4317        elh = win32evtlog.OpenEventLog(None, logtype)
4318        num_recs = win32evtlog.GetNumberOfEventLogRecords(elh)
4319
4320        try:
4321            h = logging.handlers.NTEventLogHandler('test_logging')
4322        except pywintypes.error as e:
4323            if e.winerror == 5:  # access denied
4324                raise unittest.SkipTest('Insufficient privileges to run test')
4325            raise
4326
4327        r = logging.makeLogRecord({'msg': 'Test Log Message'})
4328        h.handle(r)
4329        h.close()
4330        # Now see if the event is recorded
4331        self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh))
4332        flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \
4333                win32evtlog.EVENTLOG_SEQUENTIAL_READ
4334        found = False
4335        GO_BACK = 100
4336        events = win32evtlog.ReadEventLog(elh, flags, GO_BACK)
4337        for e in events:
4338            if e.SourceName != 'test_logging':
4339                continue
4340            msg = win32evtlogutil.SafeFormatMessage(e, logtype)
4341            if msg != 'Test Log Message\r\n':
4342                continue
4343            found = True
4344            break
4345        msg = 'Record not found in event log, went back %d records' % GO_BACK
4346        self.assertTrue(found, msg=msg)
4347
4348
4349class MiscTestCase(unittest.TestCase):
4350    def test__all__(self):
4351        blacklist = {'logThreads', 'logMultiprocessing',
4352                     'logProcesses', 'currentframe',
4353                     'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle',
4354                     'Filterer', 'PlaceHolder', 'Manager', 'RootLogger',
4355                     'root', 'threading'}
4356        support.check__all__(self, logging, blacklist=blacklist)
4357
4358
4359# Set the locale to the platform-dependent default.  I have no idea
4360# why the test does this, but in any case we save the current locale
4361# first and restore it at the end.
4362@support.run_with_locale('LC_ALL', '')
4363def test_main():
4364    tests = [
4365        BuiltinLevelsTest, BasicFilterTest, CustomLevelsAndFiltersTest,
4366        HandlerTest, MemoryHandlerTest, ConfigFileTest, SocketHandlerTest,
4367        DatagramHandlerTest, MemoryTest, EncodingTest, WarningsTest,
4368        ConfigDictTest, ManagerTest, FormatterTest, BufferingFormatterTest,
4369        StreamHandlerTest, LogRecordFactoryTest, ChildLoggerTest,
4370        QueueHandlerTest, ShutdownTest, ModuleLevelMiscTest, BasicConfigTest,
4371        LoggerAdapterTest, LoggerTest, SMTPHandlerTest, FileHandlerTest,
4372        RotatingFileHandlerTest,  LastResortTest, LogRecordTest,
4373        ExceptionTest, SysLogHandlerTest, HTTPHandlerTest,
4374        NTEventLogHandlerTest, TimedRotatingFileHandlerTest,
4375        UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest,
4376        MiscTestCase
4377    ]
4378    if hasattr(logging.handlers, 'QueueListener'):
4379        tests.append(QueueListenerTest)
4380    support.run_unittest(*tests)
4381
4382if __name__ == "__main__":
4383    test_main()
4384