1# Copyright 2001-2017 by Vinay Sajip. All Rights Reserved. 2# 3# Permission to use, copy, modify, and distribute this software and its 4# documentation for any purpose and without fee is hereby granted, 5# provided that the above copyright notice appear in all copies and that 6# both that copyright notice and this permission notice appear in 7# supporting documentation, and that the name of Vinay Sajip 8# not be used in advertising or publicity pertaining to distribution 9# of the software without specific, written prior permission. 10# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING 11# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL 12# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR 13# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER 14# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 15# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 17"""Test harness for the logging module. Run all tests. 18 19Copyright (C) 2001-2017 Vinay Sajip. All Rights Reserved. 20""" 21 22import logging 23import logging.handlers 24import logging.config 25 26import codecs 27import configparser 28import datetime 29import pathlib 30import pickle 31import io 32import gc 33import json 34import os 35import queue 36import random 37import re 38import socket 39import struct 40import sys 41import tempfile 42from test.support.script_helper import assert_python_ok 43from test import support 44import textwrap 45import time 46import unittest 47import warnings 48import weakref 49try: 50 import threading 51 # The following imports are needed only for tests which 52 # require threading 53 import asyncore 54 from http.server import HTTPServer, BaseHTTPRequestHandler 55 import smtpd 56 from urllib.parse import urlparse, parse_qs 57 from socketserver import (ThreadingUDPServer, DatagramRequestHandler, 58 ThreadingTCPServer, StreamRequestHandler) 59except ImportError: 60 threading = None 61try: 62 import win32evtlog, win32evtlogutil, pywintypes 63except ImportError: 64 win32evtlog = win32evtlogutil = pywintypes = None 65 66try: 67 import zlib 68except ImportError: 69 pass 70 71class BaseTest(unittest.TestCase): 72 73 """Base class for logging tests.""" 74 75 log_format = "%(name)s -> %(levelname)s: %(message)s" 76 expected_log_pat = r"^([\w.]+) -> (\w+): (\d+)$" 77 message_num = 0 78 79 def setUp(self): 80 """Setup the default logging stream to an internal StringIO instance, 81 so that we can examine log output as we want.""" 82 logger_dict = logging.getLogger().manager.loggerDict 83 logging._acquireLock() 84 try: 85 self.saved_handlers = logging._handlers.copy() 86 self.saved_handler_list = logging._handlerList[:] 87 self.saved_loggers = saved_loggers = logger_dict.copy() 88 self.saved_name_to_level = logging._nameToLevel.copy() 89 self.saved_level_to_name = logging._levelToName.copy() 90 self.logger_states = logger_states = {} 91 for name in saved_loggers: 92 logger_states[name] = getattr(saved_loggers[name], 93 'disabled', None) 94 finally: 95 logging._releaseLock() 96 97 # Set two unused loggers 98 self.logger1 = logging.getLogger("\xab\xd7\xbb") 99 self.logger2 = logging.getLogger("\u013f\u00d6\u0047") 100 101 self.root_logger = logging.getLogger("") 102 self.original_logging_level = self.root_logger.getEffectiveLevel() 103 104 self.stream = io.StringIO() 105 self.root_logger.setLevel(logging.DEBUG) 106 self.root_hdlr = logging.StreamHandler(self.stream) 107 self.root_formatter = logging.Formatter(self.log_format) 108 self.root_hdlr.setFormatter(self.root_formatter) 109 if self.logger1.hasHandlers(): 110 hlist = self.logger1.handlers + self.root_logger.handlers 111 raise AssertionError('Unexpected handlers: %s' % hlist) 112 if self.logger2.hasHandlers(): 113 hlist = self.logger2.handlers + self.root_logger.handlers 114 raise AssertionError('Unexpected handlers: %s' % hlist) 115 self.root_logger.addHandler(self.root_hdlr) 116 self.assertTrue(self.logger1.hasHandlers()) 117 self.assertTrue(self.logger2.hasHandlers()) 118 119 def tearDown(self): 120 """Remove our logging stream, and restore the original logging 121 level.""" 122 self.stream.close() 123 self.root_logger.removeHandler(self.root_hdlr) 124 while self.root_logger.handlers: 125 h = self.root_logger.handlers[0] 126 self.root_logger.removeHandler(h) 127 h.close() 128 self.root_logger.setLevel(self.original_logging_level) 129 logging._acquireLock() 130 try: 131 logging._levelToName.clear() 132 logging._levelToName.update(self.saved_level_to_name) 133 logging._nameToLevel.clear() 134 logging._nameToLevel.update(self.saved_name_to_level) 135 logging._handlers.clear() 136 logging._handlers.update(self.saved_handlers) 137 logging._handlerList[:] = self.saved_handler_list 138 loggerDict = logging.getLogger().manager.loggerDict 139 loggerDict.clear() 140 loggerDict.update(self.saved_loggers) 141 logger_states = self.logger_states 142 for name in self.logger_states: 143 if logger_states[name] is not None: 144 self.saved_loggers[name].disabled = logger_states[name] 145 finally: 146 logging._releaseLock() 147 148 def assert_log_lines(self, expected_values, stream=None, pat=None): 149 """Match the collected log lines against the regular expression 150 self.expected_log_pat, and compare the extracted group values to 151 the expected_values list of tuples.""" 152 stream = stream or self.stream 153 pat = re.compile(pat or self.expected_log_pat) 154 actual_lines = stream.getvalue().splitlines() 155 self.assertEqual(len(actual_lines), len(expected_values)) 156 for actual, expected in zip(actual_lines, expected_values): 157 match = pat.search(actual) 158 if not match: 159 self.fail("Log line does not match expected pattern:\n" + 160 actual) 161 self.assertEqual(tuple(match.groups()), expected) 162 s = stream.read() 163 if s: 164 self.fail("Remaining output at end of log stream:\n" + s) 165 166 def next_message(self): 167 """Generate a message consisting solely of an auto-incrementing 168 integer.""" 169 self.message_num += 1 170 return "%d" % self.message_num 171 172 173class BuiltinLevelsTest(BaseTest): 174 """Test builtin levels and their inheritance.""" 175 176 def test_flat(self): 177 #Logging levels in a flat logger namespace. 178 m = self.next_message 179 180 ERR = logging.getLogger("ERR") 181 ERR.setLevel(logging.ERROR) 182 INF = logging.LoggerAdapter(logging.getLogger("INF"), {}) 183 INF.setLevel(logging.INFO) 184 DEB = logging.getLogger("DEB") 185 DEB.setLevel(logging.DEBUG) 186 187 # These should log. 188 ERR.log(logging.CRITICAL, m()) 189 ERR.error(m()) 190 191 INF.log(logging.CRITICAL, m()) 192 INF.error(m()) 193 INF.warning(m()) 194 INF.info(m()) 195 196 DEB.log(logging.CRITICAL, m()) 197 DEB.error(m()) 198 DEB.warning(m()) 199 DEB.info(m()) 200 DEB.debug(m()) 201 202 # These should not log. 203 ERR.warning(m()) 204 ERR.info(m()) 205 ERR.debug(m()) 206 207 INF.debug(m()) 208 209 self.assert_log_lines([ 210 ('ERR', 'CRITICAL', '1'), 211 ('ERR', 'ERROR', '2'), 212 ('INF', 'CRITICAL', '3'), 213 ('INF', 'ERROR', '4'), 214 ('INF', 'WARNING', '5'), 215 ('INF', 'INFO', '6'), 216 ('DEB', 'CRITICAL', '7'), 217 ('DEB', 'ERROR', '8'), 218 ('DEB', 'WARNING', '9'), 219 ('DEB', 'INFO', '10'), 220 ('DEB', 'DEBUG', '11'), 221 ]) 222 223 def test_nested_explicit(self): 224 # Logging levels in a nested namespace, all explicitly set. 225 m = self.next_message 226 227 INF = logging.getLogger("INF") 228 INF.setLevel(logging.INFO) 229 INF_ERR = logging.getLogger("INF.ERR") 230 INF_ERR.setLevel(logging.ERROR) 231 232 # These should log. 233 INF_ERR.log(logging.CRITICAL, m()) 234 INF_ERR.error(m()) 235 236 # These should not log. 237 INF_ERR.warning(m()) 238 INF_ERR.info(m()) 239 INF_ERR.debug(m()) 240 241 self.assert_log_lines([ 242 ('INF.ERR', 'CRITICAL', '1'), 243 ('INF.ERR', 'ERROR', '2'), 244 ]) 245 246 def test_nested_inherited(self): 247 #Logging levels in a nested namespace, inherited from parent loggers. 248 m = self.next_message 249 250 INF = logging.getLogger("INF") 251 INF.setLevel(logging.INFO) 252 INF_ERR = logging.getLogger("INF.ERR") 253 INF_ERR.setLevel(logging.ERROR) 254 INF_UNDEF = logging.getLogger("INF.UNDEF") 255 INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") 256 UNDEF = logging.getLogger("UNDEF") 257 258 # These should log. 259 INF_UNDEF.log(logging.CRITICAL, m()) 260 INF_UNDEF.error(m()) 261 INF_UNDEF.warning(m()) 262 INF_UNDEF.info(m()) 263 INF_ERR_UNDEF.log(logging.CRITICAL, m()) 264 INF_ERR_UNDEF.error(m()) 265 266 # These should not log. 267 INF_UNDEF.debug(m()) 268 INF_ERR_UNDEF.warning(m()) 269 INF_ERR_UNDEF.info(m()) 270 INF_ERR_UNDEF.debug(m()) 271 272 self.assert_log_lines([ 273 ('INF.UNDEF', 'CRITICAL', '1'), 274 ('INF.UNDEF', 'ERROR', '2'), 275 ('INF.UNDEF', 'WARNING', '3'), 276 ('INF.UNDEF', 'INFO', '4'), 277 ('INF.ERR.UNDEF', 'CRITICAL', '5'), 278 ('INF.ERR.UNDEF', 'ERROR', '6'), 279 ]) 280 281 def test_nested_with_virtual_parent(self): 282 # Logging levels when some parent does not exist yet. 283 m = self.next_message 284 285 INF = logging.getLogger("INF") 286 GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") 287 CHILD = logging.getLogger("INF.BADPARENT") 288 INF.setLevel(logging.INFO) 289 290 # These should log. 291 GRANDCHILD.log(logging.FATAL, m()) 292 GRANDCHILD.info(m()) 293 CHILD.log(logging.FATAL, m()) 294 CHILD.info(m()) 295 296 # These should not log. 297 GRANDCHILD.debug(m()) 298 CHILD.debug(m()) 299 300 self.assert_log_lines([ 301 ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'), 302 ('INF.BADPARENT.UNDEF', 'INFO', '2'), 303 ('INF.BADPARENT', 'CRITICAL', '3'), 304 ('INF.BADPARENT', 'INFO', '4'), 305 ]) 306 307 def test_regression_22386(self): 308 """See issue #22386 for more information.""" 309 self.assertEqual(logging.getLevelName('INFO'), logging.INFO) 310 self.assertEqual(logging.getLevelName(logging.INFO), 'INFO') 311 312 def test_issue27935(self): 313 fatal = logging.getLevelName('FATAL') 314 self.assertEqual(fatal, logging.FATAL) 315 316 def test_regression_29220(self): 317 """See issue #29220 for more information.""" 318 logging.addLevelName(logging.INFO, '') 319 self.addCleanup(logging.addLevelName, logging.INFO, 'INFO') 320 self.assertEqual(logging.getLevelName(logging.INFO), '') 321 self.assertEqual(logging.getLevelName(logging.NOTSET), 'NOTSET') 322 self.assertEqual(logging.getLevelName('NOTSET'), logging.NOTSET) 323 324class BasicFilterTest(BaseTest): 325 326 """Test the bundled Filter class.""" 327 328 def test_filter(self): 329 # Only messages satisfying the specified criteria pass through the 330 # filter. 331 filter_ = logging.Filter("spam.eggs") 332 handler = self.root_logger.handlers[0] 333 try: 334 handler.addFilter(filter_) 335 spam = logging.getLogger("spam") 336 spam_eggs = logging.getLogger("spam.eggs") 337 spam_eggs_fish = logging.getLogger("spam.eggs.fish") 338 spam_bakedbeans = logging.getLogger("spam.bakedbeans") 339 340 spam.info(self.next_message()) 341 spam_eggs.info(self.next_message()) # Good. 342 spam_eggs_fish.info(self.next_message()) # Good. 343 spam_bakedbeans.info(self.next_message()) 344 345 self.assert_log_lines([ 346 ('spam.eggs', 'INFO', '2'), 347 ('spam.eggs.fish', 'INFO', '3'), 348 ]) 349 finally: 350 handler.removeFilter(filter_) 351 352 def test_callable_filter(self): 353 # Only messages satisfying the specified criteria pass through the 354 # filter. 355 356 def filterfunc(record): 357 parts = record.name.split('.') 358 prefix = '.'.join(parts[:2]) 359 return prefix == 'spam.eggs' 360 361 handler = self.root_logger.handlers[0] 362 try: 363 handler.addFilter(filterfunc) 364 spam = logging.getLogger("spam") 365 spam_eggs = logging.getLogger("spam.eggs") 366 spam_eggs_fish = logging.getLogger("spam.eggs.fish") 367 spam_bakedbeans = logging.getLogger("spam.bakedbeans") 368 369 spam.info(self.next_message()) 370 spam_eggs.info(self.next_message()) # Good. 371 spam_eggs_fish.info(self.next_message()) # Good. 372 spam_bakedbeans.info(self.next_message()) 373 374 self.assert_log_lines([ 375 ('spam.eggs', 'INFO', '2'), 376 ('spam.eggs.fish', 'INFO', '3'), 377 ]) 378 finally: 379 handler.removeFilter(filterfunc) 380 381 def test_empty_filter(self): 382 f = logging.Filter() 383 r = logging.makeLogRecord({'name': 'spam.eggs'}) 384 self.assertTrue(f.filter(r)) 385 386# 387# First, we define our levels. There can be as many as you want - the only 388# limitations are that they should be integers, the lowest should be > 0 and 389# larger values mean less information being logged. If you need specific 390# level values which do not fit into these limitations, you can use a 391# mapping dictionary to convert between your application levels and the 392# logging system. 393# 394SILENT = 120 395TACITURN = 119 396TERSE = 118 397EFFUSIVE = 117 398SOCIABLE = 116 399VERBOSE = 115 400TALKATIVE = 114 401GARRULOUS = 113 402CHATTERBOX = 112 403BORING = 111 404 405LEVEL_RANGE = range(BORING, SILENT + 1) 406 407# 408# Next, we define names for our levels. You don't need to do this - in which 409# case the system will use "Level n" to denote the text for the level. 410# 411my_logging_levels = { 412 SILENT : 'Silent', 413 TACITURN : 'Taciturn', 414 TERSE : 'Terse', 415 EFFUSIVE : 'Effusive', 416 SOCIABLE : 'Sociable', 417 VERBOSE : 'Verbose', 418 TALKATIVE : 'Talkative', 419 GARRULOUS : 'Garrulous', 420 CHATTERBOX : 'Chatterbox', 421 BORING : 'Boring', 422} 423 424class GarrulousFilter(logging.Filter): 425 426 """A filter which blocks garrulous messages.""" 427 428 def filter(self, record): 429 return record.levelno != GARRULOUS 430 431class VerySpecificFilter(logging.Filter): 432 433 """A filter which blocks sociable and taciturn messages.""" 434 435 def filter(self, record): 436 return record.levelno not in [SOCIABLE, TACITURN] 437 438 439class CustomLevelsAndFiltersTest(BaseTest): 440 441 """Test various filtering possibilities with custom logging levels.""" 442 443 # Skip the logger name group. 444 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 445 446 def setUp(self): 447 BaseTest.setUp(self) 448 for k, v in my_logging_levels.items(): 449 logging.addLevelName(k, v) 450 451 def log_at_all_levels(self, logger): 452 for lvl in LEVEL_RANGE: 453 logger.log(lvl, self.next_message()) 454 455 def test_logger_filter(self): 456 # Filter at logger level. 457 self.root_logger.setLevel(VERBOSE) 458 # Levels >= 'Verbose' are good. 459 self.log_at_all_levels(self.root_logger) 460 self.assert_log_lines([ 461 ('Verbose', '5'), 462 ('Sociable', '6'), 463 ('Effusive', '7'), 464 ('Terse', '8'), 465 ('Taciturn', '9'), 466 ('Silent', '10'), 467 ]) 468 469 def test_handler_filter(self): 470 # Filter at handler level. 471 self.root_logger.handlers[0].setLevel(SOCIABLE) 472 try: 473 # Levels >= 'Sociable' are good. 474 self.log_at_all_levels(self.root_logger) 475 self.assert_log_lines([ 476 ('Sociable', '6'), 477 ('Effusive', '7'), 478 ('Terse', '8'), 479 ('Taciturn', '9'), 480 ('Silent', '10'), 481 ]) 482 finally: 483 self.root_logger.handlers[0].setLevel(logging.NOTSET) 484 485 def test_specific_filters(self): 486 # Set a specific filter object on the handler, and then add another 487 # filter object on the logger itself. 488 handler = self.root_logger.handlers[0] 489 specific_filter = None 490 garr = GarrulousFilter() 491 handler.addFilter(garr) 492 try: 493 self.log_at_all_levels(self.root_logger) 494 first_lines = [ 495 # Notice how 'Garrulous' is missing 496 ('Boring', '1'), 497 ('Chatterbox', '2'), 498 ('Talkative', '4'), 499 ('Verbose', '5'), 500 ('Sociable', '6'), 501 ('Effusive', '7'), 502 ('Terse', '8'), 503 ('Taciturn', '9'), 504 ('Silent', '10'), 505 ] 506 self.assert_log_lines(first_lines) 507 508 specific_filter = VerySpecificFilter() 509 self.root_logger.addFilter(specific_filter) 510 self.log_at_all_levels(self.root_logger) 511 self.assert_log_lines(first_lines + [ 512 # Not only 'Garrulous' is still missing, but also 'Sociable' 513 # and 'Taciturn' 514 ('Boring', '11'), 515 ('Chatterbox', '12'), 516 ('Talkative', '14'), 517 ('Verbose', '15'), 518 ('Effusive', '17'), 519 ('Terse', '18'), 520 ('Silent', '20'), 521 ]) 522 finally: 523 if specific_filter: 524 self.root_logger.removeFilter(specific_filter) 525 handler.removeFilter(garr) 526 527 528class HandlerTest(BaseTest): 529 def test_name(self): 530 h = logging.Handler() 531 h.name = 'generic' 532 self.assertEqual(h.name, 'generic') 533 h.name = 'anothergeneric' 534 self.assertEqual(h.name, 'anothergeneric') 535 self.assertRaises(NotImplementedError, h.emit, None) 536 537 def test_builtin_handlers(self): 538 # We can't actually *use* too many handlers in the tests, 539 # but we can try instantiating them with various options 540 if sys.platform in ('linux', 'darwin'): 541 for existing in (True, False): 542 fd, fn = tempfile.mkstemp() 543 os.close(fd) 544 if not existing: 545 os.unlink(fn) 546 h = logging.handlers.WatchedFileHandler(fn, delay=True) 547 if existing: 548 dev, ino = h.dev, h.ino 549 self.assertEqual(dev, -1) 550 self.assertEqual(ino, -1) 551 r = logging.makeLogRecord({'msg': 'Test'}) 552 h.handle(r) 553 # Now remove the file. 554 os.unlink(fn) 555 self.assertFalse(os.path.exists(fn)) 556 # The next call should recreate the file. 557 h.handle(r) 558 self.assertTrue(os.path.exists(fn)) 559 else: 560 self.assertEqual(h.dev, -1) 561 self.assertEqual(h.ino, -1) 562 h.close() 563 if existing: 564 os.unlink(fn) 565 if sys.platform == 'darwin': 566 sockname = '/var/run/syslog' 567 else: 568 sockname = '/dev/log' 569 try: 570 h = logging.handlers.SysLogHandler(sockname) 571 self.assertEqual(h.facility, h.LOG_USER) 572 self.assertTrue(h.unixsocket) 573 h.close() 574 except OSError: # syslogd might not be available 575 pass 576 for method in ('GET', 'POST', 'PUT'): 577 if method == 'PUT': 578 self.assertRaises(ValueError, logging.handlers.HTTPHandler, 579 'localhost', '/log', method) 580 else: 581 h = logging.handlers.HTTPHandler('localhost', '/log', method) 582 h.close() 583 h = logging.handlers.BufferingHandler(0) 584 r = logging.makeLogRecord({}) 585 self.assertTrue(h.shouldFlush(r)) 586 h.close() 587 h = logging.handlers.BufferingHandler(1) 588 self.assertFalse(h.shouldFlush(r)) 589 h.close() 590 591 def test_path_objects(self): 592 """ 593 Test that Path objects are accepted as filename arguments to handlers. 594 595 See Issue #27493. 596 """ 597 fd, fn = tempfile.mkstemp() 598 os.close(fd) 599 os.unlink(fn) 600 pfn = pathlib.Path(fn) 601 cases = ( 602 (logging.FileHandler, (pfn, 'w')), 603 (logging.handlers.RotatingFileHandler, (pfn, 'a')), 604 (logging.handlers.TimedRotatingFileHandler, (pfn, 'h')), 605 ) 606 if sys.platform in ('linux', 'darwin'): 607 cases += ((logging.handlers.WatchedFileHandler, (pfn, 'w')),) 608 for cls, args in cases: 609 h = cls(*args) 610 self.assertTrue(os.path.exists(fn)) 611 h.close() 612 os.unlink(fn) 613 614 @unittest.skipIf(os.name == 'nt', 'WatchedFileHandler not appropriate for Windows.') 615 @unittest.skipUnless(threading, 'Threading required for this test.') 616 def test_race(self): 617 # Issue #14632 refers. 618 def remove_loop(fname, tries): 619 for _ in range(tries): 620 try: 621 os.unlink(fname) 622 self.deletion_time = time.time() 623 except OSError: 624 pass 625 time.sleep(0.004 * random.randint(0, 4)) 626 627 del_count = 500 628 log_count = 500 629 630 self.handle_time = None 631 self.deletion_time = None 632 633 for delay in (False, True): 634 fd, fn = tempfile.mkstemp('.log', 'test_logging-3-') 635 os.close(fd) 636 remover = threading.Thread(target=remove_loop, args=(fn, del_count)) 637 remover.daemon = True 638 remover.start() 639 h = logging.handlers.WatchedFileHandler(fn, delay=delay) 640 f = logging.Formatter('%(asctime)s: %(levelname)s: %(message)s') 641 h.setFormatter(f) 642 try: 643 for _ in range(log_count): 644 time.sleep(0.005) 645 r = logging.makeLogRecord({'msg': 'testing' }) 646 try: 647 self.handle_time = time.time() 648 h.handle(r) 649 except Exception: 650 print('Deleted at %s, ' 651 'opened at %s' % (self.deletion_time, 652 self.handle_time)) 653 raise 654 finally: 655 remover.join() 656 h.close() 657 if os.path.exists(fn): 658 os.unlink(fn) 659 660 661class BadStream(object): 662 def write(self, data): 663 raise RuntimeError('deliberate mistake') 664 665class TestStreamHandler(logging.StreamHandler): 666 def handleError(self, record): 667 self.error_record = record 668 669class StreamHandlerTest(BaseTest): 670 def test_error_handling(self): 671 h = TestStreamHandler(BadStream()) 672 r = logging.makeLogRecord({}) 673 old_raise = logging.raiseExceptions 674 675 try: 676 h.handle(r) 677 self.assertIs(h.error_record, r) 678 679 h = logging.StreamHandler(BadStream()) 680 with support.captured_stderr() as stderr: 681 h.handle(r) 682 msg = '\nRuntimeError: deliberate mistake\n' 683 self.assertIn(msg, stderr.getvalue()) 684 685 logging.raiseExceptions = False 686 with support.captured_stderr() as stderr: 687 h.handle(r) 688 self.assertEqual('', stderr.getvalue()) 689 finally: 690 logging.raiseExceptions = old_raise 691 692# -- The following section could be moved into a server_helper.py module 693# -- if it proves to be of wider utility than just test_logging 694 695if threading: 696 class TestSMTPServer(smtpd.SMTPServer): 697 """ 698 This class implements a test SMTP server. 699 700 :param addr: A (host, port) tuple which the server listens on. 701 You can specify a port value of zero: the server's 702 *port* attribute will hold the actual port number 703 used, which can be used in client connections. 704 :param handler: A callable which will be called to process 705 incoming messages. The handler will be passed 706 the client address tuple, who the message is from, 707 a list of recipients and the message data. 708 :param poll_interval: The interval, in seconds, used in the underlying 709 :func:`select` or :func:`poll` call by 710 :func:`asyncore.loop`. 711 :param sockmap: A dictionary which will be used to hold 712 :class:`asyncore.dispatcher` instances used by 713 :func:`asyncore.loop`. This avoids changing the 714 :mod:`asyncore` module's global state. 715 """ 716 717 def __init__(self, addr, handler, poll_interval, sockmap): 718 smtpd.SMTPServer.__init__(self, addr, None, map=sockmap, 719 decode_data=True) 720 self.port = self.socket.getsockname()[1] 721 self._handler = handler 722 self._thread = None 723 self.poll_interval = poll_interval 724 725 def process_message(self, peer, mailfrom, rcpttos, data): 726 """ 727 Delegates to the handler passed in to the server's constructor. 728 729 Typically, this will be a test case method. 730 :param peer: The client (host, port) tuple. 731 :param mailfrom: The address of the sender. 732 :param rcpttos: The addresses of the recipients. 733 :param data: The message. 734 """ 735 self._handler(peer, mailfrom, rcpttos, data) 736 737 def start(self): 738 """ 739 Start the server running on a separate daemon thread. 740 """ 741 self._thread = t = threading.Thread(target=self.serve_forever, 742 args=(self.poll_interval,)) 743 t.setDaemon(True) 744 t.start() 745 746 def serve_forever(self, poll_interval): 747 """ 748 Run the :mod:`asyncore` loop until normal termination 749 conditions arise. 750 :param poll_interval: The interval, in seconds, used in the underlying 751 :func:`select` or :func:`poll` call by 752 :func:`asyncore.loop`. 753 """ 754 try: 755 asyncore.loop(poll_interval, map=self._map) 756 except OSError: 757 # On FreeBSD 8, closing the server repeatably 758 # raises this error. We swallow it if the 759 # server has been closed. 760 if self.connected or self.accepting: 761 raise 762 763 def stop(self, timeout=None): 764 """ 765 Stop the thread by closing the server instance. 766 Wait for the server thread to terminate. 767 768 :param timeout: How long to wait for the server thread 769 to terminate. 770 """ 771 self.close() 772 self._thread.join(timeout) 773 self._thread = None 774 775 class ControlMixin(object): 776 """ 777 This mixin is used to start a server on a separate thread, and 778 shut it down programmatically. Request handling is simplified - instead 779 of needing to derive a suitable RequestHandler subclass, you just 780 provide a callable which will be passed each received request to be 781 processed. 782 783 :param handler: A handler callable which will be called with a 784 single parameter - the request - in order to 785 process the request. This handler is called on the 786 server thread, effectively meaning that requests are 787 processed serially. While not quite Web scale ;-), 788 this should be fine for testing applications. 789 :param poll_interval: The polling interval in seconds. 790 """ 791 def __init__(self, handler, poll_interval): 792 self._thread = None 793 self.poll_interval = poll_interval 794 self._handler = handler 795 self.ready = threading.Event() 796 797 def start(self): 798 """ 799 Create a daemon thread to run the server, and start it. 800 """ 801 self._thread = t = threading.Thread(target=self.serve_forever, 802 args=(self.poll_interval,)) 803 t.setDaemon(True) 804 t.start() 805 806 def serve_forever(self, poll_interval): 807 """ 808 Run the server. Set the ready flag before entering the 809 service loop. 810 """ 811 self.ready.set() 812 super(ControlMixin, self).serve_forever(poll_interval) 813 814 def stop(self, timeout=None): 815 """ 816 Tell the server thread to stop, and wait for it to do so. 817 818 :param timeout: How long to wait for the server thread 819 to terminate. 820 """ 821 self.shutdown() 822 if self._thread is not None: 823 self._thread.join(timeout) 824 self._thread = None 825 self.server_close() 826 self.ready.clear() 827 828 class TestHTTPServer(ControlMixin, HTTPServer): 829 """ 830 An HTTP server which is controllable using :class:`ControlMixin`. 831 832 :param addr: A tuple with the IP address and port to listen on. 833 :param handler: A handler callable which will be called with a 834 single parameter - the request - in order to 835 process the request. 836 :param poll_interval: The polling interval in seconds. 837 :param log: Pass ``True`` to enable log messages. 838 """ 839 def __init__(self, addr, handler, poll_interval=0.5, 840 log=False, sslctx=None): 841 class DelegatingHTTPRequestHandler(BaseHTTPRequestHandler): 842 def __getattr__(self, name, default=None): 843 if name.startswith('do_'): 844 return self.process_request 845 raise AttributeError(name) 846 847 def process_request(self): 848 self.server._handler(self) 849 850 def log_message(self, format, *args): 851 if log: 852 super(DelegatingHTTPRequestHandler, 853 self).log_message(format, *args) 854 HTTPServer.__init__(self, addr, DelegatingHTTPRequestHandler) 855 ControlMixin.__init__(self, handler, poll_interval) 856 self.sslctx = sslctx 857 858 def get_request(self): 859 try: 860 sock, addr = self.socket.accept() 861 if self.sslctx: 862 sock = self.sslctx.wrap_socket(sock, server_side=True) 863 except OSError as e: 864 # socket errors are silenced by the caller, print them here 865 sys.stderr.write("Got an error:\n%s\n" % e) 866 raise 867 return sock, addr 868 869 class TestTCPServer(ControlMixin, ThreadingTCPServer): 870 """ 871 A TCP server which is controllable using :class:`ControlMixin`. 872 873 :param addr: A tuple with the IP address and port to listen on. 874 :param handler: A handler callable which will be called with a single 875 parameter - the request - in order to process the request. 876 :param poll_interval: The polling interval in seconds. 877 :bind_and_activate: If True (the default), binds the server and starts it 878 listening. If False, you need to call 879 :meth:`server_bind` and :meth:`server_activate` at 880 some later time before calling :meth:`start`, so that 881 the server will set up the socket and listen on it. 882 """ 883 884 allow_reuse_address = True 885 886 def __init__(self, addr, handler, poll_interval=0.5, 887 bind_and_activate=True): 888 class DelegatingTCPRequestHandler(StreamRequestHandler): 889 890 def handle(self): 891 self.server._handler(self) 892 ThreadingTCPServer.__init__(self, addr, DelegatingTCPRequestHandler, 893 bind_and_activate) 894 ControlMixin.__init__(self, handler, poll_interval) 895 896 def server_bind(self): 897 super(TestTCPServer, self).server_bind() 898 self.port = self.socket.getsockname()[1] 899 900 class TestUDPServer(ControlMixin, ThreadingUDPServer): 901 """ 902 A UDP server which is controllable using :class:`ControlMixin`. 903 904 :param addr: A tuple with the IP address and port to listen on. 905 :param handler: A handler callable which will be called with a 906 single parameter - the request - in order to 907 process the request. 908 :param poll_interval: The polling interval for shutdown requests, 909 in seconds. 910 :bind_and_activate: If True (the default), binds the server and 911 starts it listening. If False, you need to 912 call :meth:`server_bind` and 913 :meth:`server_activate` at some later time 914 before calling :meth:`start`, so that the server will 915 set up the socket and listen on it. 916 """ 917 def __init__(self, addr, handler, poll_interval=0.5, 918 bind_and_activate=True): 919 class DelegatingUDPRequestHandler(DatagramRequestHandler): 920 921 def handle(self): 922 self.server._handler(self) 923 924 def finish(self): 925 data = self.wfile.getvalue() 926 if data: 927 try: 928 super(DelegatingUDPRequestHandler, self).finish() 929 except OSError: 930 if not self.server._closed: 931 raise 932 933 ThreadingUDPServer.__init__(self, addr, 934 DelegatingUDPRequestHandler, 935 bind_and_activate) 936 ControlMixin.__init__(self, handler, poll_interval) 937 self._closed = False 938 939 def server_bind(self): 940 super(TestUDPServer, self).server_bind() 941 self.port = self.socket.getsockname()[1] 942 943 def server_close(self): 944 super(TestUDPServer, self).server_close() 945 self._closed = True 946 947 if hasattr(socket, "AF_UNIX"): 948 class TestUnixStreamServer(TestTCPServer): 949 address_family = socket.AF_UNIX 950 951 class TestUnixDatagramServer(TestUDPServer): 952 address_family = socket.AF_UNIX 953 954# - end of server_helper section 955 956@unittest.skipUnless(threading, 'Threading required for this test.') 957class SMTPHandlerTest(BaseTest): 958 TIMEOUT = 8.0 959 def test_basic(self): 960 sockmap = {} 961 server = TestSMTPServer((support.HOST, 0), self.process_message, 0.001, 962 sockmap) 963 server.start() 964 addr = (support.HOST, server.port) 965 h = logging.handlers.SMTPHandler(addr, 'me', 'you', 'Log', 966 timeout=self.TIMEOUT) 967 self.assertEqual(h.toaddrs, ['you']) 968 self.messages = [] 969 r = logging.makeLogRecord({'msg': 'Hello \u2713'}) 970 self.handled = threading.Event() 971 h.handle(r) 972 self.handled.wait(self.TIMEOUT) # 14314: don't wait forever 973 server.stop() 974 self.assertTrue(self.handled.is_set()) 975 self.assertEqual(len(self.messages), 1) 976 peer, mailfrom, rcpttos, data = self.messages[0] 977 self.assertEqual(mailfrom, 'me') 978 self.assertEqual(rcpttos, ['you']) 979 self.assertIn('\nSubject: Log\n', data) 980 self.assertTrue(data.endswith('\n\nHello \u2713')) 981 h.close() 982 983 def process_message(self, *args): 984 self.messages.append(args) 985 self.handled.set() 986 987class MemoryHandlerTest(BaseTest): 988 989 """Tests for the MemoryHandler.""" 990 991 # Do not bother with a logger name group. 992 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 993 994 def setUp(self): 995 BaseTest.setUp(self) 996 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 997 self.root_hdlr) 998 self.mem_logger = logging.getLogger('mem') 999 self.mem_logger.propagate = 0 1000 self.mem_logger.addHandler(self.mem_hdlr) 1001 1002 def tearDown(self): 1003 self.mem_hdlr.close() 1004 BaseTest.tearDown(self) 1005 1006 def test_flush(self): 1007 # The memory handler flushes to its target handler based on specific 1008 # criteria (message count and message level). 1009 self.mem_logger.debug(self.next_message()) 1010 self.assert_log_lines([]) 1011 self.mem_logger.info(self.next_message()) 1012 self.assert_log_lines([]) 1013 # This will flush because the level is >= logging.WARNING 1014 self.mem_logger.warning(self.next_message()) 1015 lines = [ 1016 ('DEBUG', '1'), 1017 ('INFO', '2'), 1018 ('WARNING', '3'), 1019 ] 1020 self.assert_log_lines(lines) 1021 for n in (4, 14): 1022 for i in range(9): 1023 self.mem_logger.debug(self.next_message()) 1024 self.assert_log_lines(lines) 1025 # This will flush because it's the 10th message since the last 1026 # flush. 1027 self.mem_logger.debug(self.next_message()) 1028 lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)] 1029 self.assert_log_lines(lines) 1030 1031 self.mem_logger.debug(self.next_message()) 1032 self.assert_log_lines(lines) 1033 1034 def test_flush_on_close(self): 1035 """ 1036 Test that the flush-on-close configuration works as expected. 1037 """ 1038 self.mem_logger.debug(self.next_message()) 1039 self.assert_log_lines([]) 1040 self.mem_logger.info(self.next_message()) 1041 self.assert_log_lines([]) 1042 self.mem_logger.removeHandler(self.mem_hdlr) 1043 # Default behaviour is to flush on close. Check that it happens. 1044 self.mem_hdlr.close() 1045 lines = [ 1046 ('DEBUG', '1'), 1047 ('INFO', '2'), 1048 ] 1049 self.assert_log_lines(lines) 1050 # Now configure for flushing not to be done on close. 1051 self.mem_hdlr = logging.handlers.MemoryHandler(10, logging.WARNING, 1052 self.root_hdlr, 1053 False) 1054 self.mem_logger.addHandler(self.mem_hdlr) 1055 self.mem_logger.debug(self.next_message()) 1056 self.assert_log_lines(lines) # no change 1057 self.mem_logger.info(self.next_message()) 1058 self.assert_log_lines(lines) # no change 1059 self.mem_logger.removeHandler(self.mem_hdlr) 1060 self.mem_hdlr.close() 1061 # assert that no new lines have been added 1062 self.assert_log_lines(lines) # no change 1063 1064 1065class ExceptionFormatter(logging.Formatter): 1066 """A special exception formatter.""" 1067 def formatException(self, ei): 1068 return "Got a [%s]" % ei[0].__name__ 1069 1070 1071class ConfigFileTest(BaseTest): 1072 1073 """Reading logging config from a .ini-style config file.""" 1074 1075 expected_log_pat = r"^(\w+) \+\+ (\w+)$" 1076 1077 # config0 is a standard configuration. 1078 config0 = """ 1079 [loggers] 1080 keys=root 1081 1082 [handlers] 1083 keys=hand1 1084 1085 [formatters] 1086 keys=form1 1087 1088 [logger_root] 1089 level=WARNING 1090 handlers=hand1 1091 1092 [handler_hand1] 1093 class=StreamHandler 1094 level=NOTSET 1095 formatter=form1 1096 args=(sys.stdout,) 1097 1098 [formatter_form1] 1099 format=%(levelname)s ++ %(message)s 1100 datefmt= 1101 """ 1102 1103 # config1 adds a little to the standard configuration. 1104 config1 = """ 1105 [loggers] 1106 keys=root,parser 1107 1108 [handlers] 1109 keys=hand1 1110 1111 [formatters] 1112 keys=form1 1113 1114 [logger_root] 1115 level=WARNING 1116 handlers= 1117 1118 [logger_parser] 1119 level=DEBUG 1120 handlers=hand1 1121 propagate=1 1122 qualname=compiler.parser 1123 1124 [handler_hand1] 1125 class=StreamHandler 1126 level=NOTSET 1127 formatter=form1 1128 args=(sys.stdout,) 1129 1130 [formatter_form1] 1131 format=%(levelname)s ++ %(message)s 1132 datefmt= 1133 """ 1134 1135 # config1a moves the handler to the root. 1136 config1a = """ 1137 [loggers] 1138 keys=root,parser 1139 1140 [handlers] 1141 keys=hand1 1142 1143 [formatters] 1144 keys=form1 1145 1146 [logger_root] 1147 level=WARNING 1148 handlers=hand1 1149 1150 [logger_parser] 1151 level=DEBUG 1152 handlers= 1153 propagate=1 1154 qualname=compiler.parser 1155 1156 [handler_hand1] 1157 class=StreamHandler 1158 level=NOTSET 1159 formatter=form1 1160 args=(sys.stdout,) 1161 1162 [formatter_form1] 1163 format=%(levelname)s ++ %(message)s 1164 datefmt= 1165 """ 1166 1167 # config2 has a subtle configuration error that should be reported 1168 config2 = config1.replace("sys.stdout", "sys.stbout") 1169 1170 # config3 has a less subtle configuration error 1171 config3 = config1.replace("formatter=form1", "formatter=misspelled_name") 1172 1173 # config4 specifies a custom formatter class to be loaded 1174 config4 = """ 1175 [loggers] 1176 keys=root 1177 1178 [handlers] 1179 keys=hand1 1180 1181 [formatters] 1182 keys=form1 1183 1184 [logger_root] 1185 level=NOTSET 1186 handlers=hand1 1187 1188 [handler_hand1] 1189 class=StreamHandler 1190 level=NOTSET 1191 formatter=form1 1192 args=(sys.stdout,) 1193 1194 [formatter_form1] 1195 class=""" + __name__ + """.ExceptionFormatter 1196 format=%(levelname)s:%(name)s:%(message)s 1197 datefmt= 1198 """ 1199 1200 # config5 specifies a custom handler class to be loaded 1201 config5 = config1.replace('class=StreamHandler', 'class=logging.StreamHandler') 1202 1203 # config6 uses ', ' delimiters in the handlers and formatters sections 1204 config6 = """ 1205 [loggers] 1206 keys=root,parser 1207 1208 [handlers] 1209 keys=hand1, hand2 1210 1211 [formatters] 1212 keys=form1, form2 1213 1214 [logger_root] 1215 level=WARNING 1216 handlers= 1217 1218 [logger_parser] 1219 level=DEBUG 1220 handlers=hand1 1221 propagate=1 1222 qualname=compiler.parser 1223 1224 [handler_hand1] 1225 class=StreamHandler 1226 level=NOTSET 1227 formatter=form1 1228 args=(sys.stdout,) 1229 1230 [handler_hand2] 1231 class=StreamHandler 1232 level=NOTSET 1233 formatter=form1 1234 args=(sys.stderr,) 1235 1236 [formatter_form1] 1237 format=%(levelname)s ++ %(message)s 1238 datefmt= 1239 1240 [formatter_form2] 1241 format=%(message)s 1242 datefmt= 1243 """ 1244 1245 # config7 adds a compiler logger. 1246 config7 = """ 1247 [loggers] 1248 keys=root,parser,compiler 1249 1250 [handlers] 1251 keys=hand1 1252 1253 [formatters] 1254 keys=form1 1255 1256 [logger_root] 1257 level=WARNING 1258 handlers=hand1 1259 1260 [logger_compiler] 1261 level=DEBUG 1262 handlers= 1263 propagate=1 1264 qualname=compiler 1265 1266 [logger_parser] 1267 level=DEBUG 1268 handlers= 1269 propagate=1 1270 qualname=compiler.parser 1271 1272 [handler_hand1] 1273 class=StreamHandler 1274 level=NOTSET 1275 formatter=form1 1276 args=(sys.stdout,) 1277 1278 [formatter_form1] 1279 format=%(levelname)s ++ %(message)s 1280 datefmt= 1281 """ 1282 1283 disable_test = """ 1284 [loggers] 1285 keys=root 1286 1287 [handlers] 1288 keys=screen 1289 1290 [formatters] 1291 keys= 1292 1293 [logger_root] 1294 level=DEBUG 1295 handlers=screen 1296 1297 [handler_screen] 1298 level=DEBUG 1299 class=StreamHandler 1300 args=(sys.stdout,) 1301 formatter= 1302 """ 1303 1304 def apply_config(self, conf, **kwargs): 1305 file = io.StringIO(textwrap.dedent(conf)) 1306 logging.config.fileConfig(file, **kwargs) 1307 1308 def test_config0_ok(self): 1309 # A simple config file which overrides the default settings. 1310 with support.captured_stdout() as output: 1311 self.apply_config(self.config0) 1312 logger = logging.getLogger() 1313 # Won't output anything 1314 logger.info(self.next_message()) 1315 # Outputs a message 1316 logger.error(self.next_message()) 1317 self.assert_log_lines([ 1318 ('ERROR', '2'), 1319 ], stream=output) 1320 # Original logger output is empty. 1321 self.assert_log_lines([]) 1322 1323 def test_config0_using_cp_ok(self): 1324 # A simple config file which overrides the default settings. 1325 with support.captured_stdout() as output: 1326 file = io.StringIO(textwrap.dedent(self.config0)) 1327 cp = configparser.ConfigParser() 1328 cp.read_file(file) 1329 logging.config.fileConfig(cp) 1330 logger = logging.getLogger() 1331 # Won't output anything 1332 logger.info(self.next_message()) 1333 # Outputs a message 1334 logger.error(self.next_message()) 1335 self.assert_log_lines([ 1336 ('ERROR', '2'), 1337 ], stream=output) 1338 # Original logger output is empty. 1339 self.assert_log_lines([]) 1340 1341 def test_config1_ok(self, config=config1): 1342 # A config file defining a sub-parser as well. 1343 with support.captured_stdout() as output: 1344 self.apply_config(config) 1345 logger = logging.getLogger("compiler.parser") 1346 # Both will output a message 1347 logger.info(self.next_message()) 1348 logger.error(self.next_message()) 1349 self.assert_log_lines([ 1350 ('INFO', '1'), 1351 ('ERROR', '2'), 1352 ], stream=output) 1353 # Original logger output is empty. 1354 self.assert_log_lines([]) 1355 1356 def test_config2_failure(self): 1357 # A simple config file which overrides the default settings. 1358 self.assertRaises(Exception, self.apply_config, self.config2) 1359 1360 def test_config3_failure(self): 1361 # A simple config file which overrides the default settings. 1362 self.assertRaises(Exception, self.apply_config, self.config3) 1363 1364 def test_config4_ok(self): 1365 # A config file specifying a custom formatter class. 1366 with support.captured_stdout() as output: 1367 self.apply_config(self.config4) 1368 logger = logging.getLogger() 1369 try: 1370 raise RuntimeError() 1371 except RuntimeError: 1372 logging.exception("just testing") 1373 sys.stdout.seek(0) 1374 self.assertEqual(output.getvalue(), 1375 "ERROR:root:just testing\nGot a [RuntimeError]\n") 1376 # Original logger output is empty 1377 self.assert_log_lines([]) 1378 1379 def test_config5_ok(self): 1380 self.test_config1_ok(config=self.config5) 1381 1382 def test_config6_ok(self): 1383 self.test_config1_ok(config=self.config6) 1384 1385 def test_config7_ok(self): 1386 with support.captured_stdout() as output: 1387 self.apply_config(self.config1a) 1388 logger = logging.getLogger("compiler.parser") 1389 # See issue #11424. compiler-hyphenated sorts 1390 # between compiler and compiler.xyz and this 1391 # was preventing compiler.xyz from being included 1392 # in the child loggers of compiler because of an 1393 # overzealous loop termination condition. 1394 hyphenated = logging.getLogger('compiler-hyphenated') 1395 # All will output a message 1396 logger.info(self.next_message()) 1397 logger.error(self.next_message()) 1398 hyphenated.critical(self.next_message()) 1399 self.assert_log_lines([ 1400 ('INFO', '1'), 1401 ('ERROR', '2'), 1402 ('CRITICAL', '3'), 1403 ], stream=output) 1404 # Original logger output is empty. 1405 self.assert_log_lines([]) 1406 with support.captured_stdout() as output: 1407 self.apply_config(self.config7) 1408 logger = logging.getLogger("compiler.parser") 1409 self.assertFalse(logger.disabled) 1410 # Both will output a message 1411 logger.info(self.next_message()) 1412 logger.error(self.next_message()) 1413 logger = logging.getLogger("compiler.lexer") 1414 # Both will output a message 1415 logger.info(self.next_message()) 1416 logger.error(self.next_message()) 1417 # Will not appear 1418 hyphenated.critical(self.next_message()) 1419 self.assert_log_lines([ 1420 ('INFO', '4'), 1421 ('ERROR', '5'), 1422 ('INFO', '6'), 1423 ('ERROR', '7'), 1424 ], stream=output) 1425 # Original logger output is empty. 1426 self.assert_log_lines([]) 1427 1428 def test_logger_disabling(self): 1429 self.apply_config(self.disable_test) 1430 logger = logging.getLogger('some_pristine_logger') 1431 self.assertFalse(logger.disabled) 1432 self.apply_config(self.disable_test) 1433 self.assertTrue(logger.disabled) 1434 self.apply_config(self.disable_test, disable_existing_loggers=False) 1435 self.assertFalse(logger.disabled) 1436 1437 1438@unittest.skipUnless(threading, 'Threading required for this test.') 1439class SocketHandlerTest(BaseTest): 1440 1441 """Test for SocketHandler objects.""" 1442 1443 if threading: 1444 server_class = TestTCPServer 1445 address = ('localhost', 0) 1446 1447 def setUp(self): 1448 """Set up a TCP server to receive log messages, and a SocketHandler 1449 pointing to that server's address and port.""" 1450 BaseTest.setUp(self) 1451 # Issue #29177: deal with errors that happen during setup 1452 self.server = self.sock_hdlr = self.server_exception = None 1453 try: 1454 self.server = server = self.server_class(self.address, 1455 self.handle_socket, 0.01) 1456 server.start() 1457 # Uncomment next line to test error recovery in setUp() 1458 # raise OSError('dummy error raised') 1459 except OSError as e: 1460 self.server_exception = e 1461 return 1462 server.ready.wait() 1463 hcls = logging.handlers.SocketHandler 1464 if isinstance(server.server_address, tuple): 1465 self.sock_hdlr = hcls('localhost', server.port) 1466 else: 1467 self.sock_hdlr = hcls(server.server_address, None) 1468 self.log_output = '' 1469 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1470 self.root_logger.addHandler(self.sock_hdlr) 1471 self.handled = threading.Semaphore(0) 1472 1473 def tearDown(self): 1474 """Shutdown the TCP server.""" 1475 try: 1476 if self.server: 1477 self.server.stop(2.0) 1478 if self.sock_hdlr: 1479 self.root_logger.removeHandler(self.sock_hdlr) 1480 self.sock_hdlr.close() 1481 finally: 1482 BaseTest.tearDown(self) 1483 1484 def handle_socket(self, request): 1485 conn = request.connection 1486 while True: 1487 chunk = conn.recv(4) 1488 if len(chunk) < 4: 1489 break 1490 slen = struct.unpack(">L", chunk)[0] 1491 chunk = conn.recv(slen) 1492 while len(chunk) < slen: 1493 chunk = chunk + conn.recv(slen - len(chunk)) 1494 obj = pickle.loads(chunk) 1495 record = logging.makeLogRecord(obj) 1496 self.log_output += record.msg + '\n' 1497 self.handled.release() 1498 1499 def test_output(self): 1500 # The log message sent to the SocketHandler is properly received. 1501 if self.server_exception: 1502 self.skipTest(self.server_exception) 1503 logger = logging.getLogger("tcp") 1504 logger.error("spam") 1505 self.handled.acquire() 1506 logger.debug("eggs") 1507 self.handled.acquire() 1508 self.assertEqual(self.log_output, "spam\neggs\n") 1509 1510 def test_noserver(self): 1511 if self.server_exception: 1512 self.skipTest(self.server_exception) 1513 # Avoid timing-related failures due to SocketHandler's own hard-wired 1514 # one-second timeout on socket.create_connection() (issue #16264). 1515 self.sock_hdlr.retryStart = 2.5 1516 # Kill the server 1517 self.server.stop(2.0) 1518 # The logging call should try to connect, which should fail 1519 try: 1520 raise RuntimeError('Deliberate mistake') 1521 except RuntimeError: 1522 self.root_logger.exception('Never sent') 1523 self.root_logger.error('Never sent, either') 1524 now = time.time() 1525 self.assertGreater(self.sock_hdlr.retryTime, now) 1526 time.sleep(self.sock_hdlr.retryTime - now + 0.001) 1527 self.root_logger.error('Nor this') 1528 1529def _get_temp_domain_socket(): 1530 fd, fn = tempfile.mkstemp(prefix='test_logging_', suffix='.sock') 1531 os.close(fd) 1532 # just need a name - file can't be present, or we'll get an 1533 # 'address already in use' error. 1534 os.remove(fn) 1535 return fn 1536 1537@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1538@unittest.skipUnless(threading, 'Threading required for this test.') 1539class UnixSocketHandlerTest(SocketHandlerTest): 1540 1541 """Test for SocketHandler with unix sockets.""" 1542 1543 if threading and hasattr(socket, "AF_UNIX"): 1544 server_class = TestUnixStreamServer 1545 1546 def setUp(self): 1547 # override the definition in the base class 1548 self.address = _get_temp_domain_socket() 1549 SocketHandlerTest.setUp(self) 1550 1551 def tearDown(self): 1552 SocketHandlerTest.tearDown(self) 1553 support.unlink(self.address) 1554 1555@unittest.skipUnless(threading, 'Threading required for this test.') 1556class DatagramHandlerTest(BaseTest): 1557 1558 """Test for DatagramHandler.""" 1559 1560 if threading: 1561 server_class = TestUDPServer 1562 address = ('localhost', 0) 1563 1564 def setUp(self): 1565 """Set up a UDP server to receive log messages, and a DatagramHandler 1566 pointing to that server's address and port.""" 1567 BaseTest.setUp(self) 1568 # Issue #29177: deal with errors that happen during setup 1569 self.server = self.sock_hdlr = self.server_exception = None 1570 try: 1571 self.server = server = self.server_class(self.address, 1572 self.handle_datagram, 0.01) 1573 server.start() 1574 # Uncomment next line to test error recovery in setUp() 1575 # raise OSError('dummy error raised') 1576 except OSError as e: 1577 self.server_exception = e 1578 return 1579 server.ready.wait() 1580 hcls = logging.handlers.DatagramHandler 1581 if isinstance(server.server_address, tuple): 1582 self.sock_hdlr = hcls('localhost', server.port) 1583 else: 1584 self.sock_hdlr = hcls(server.server_address, None) 1585 self.log_output = '' 1586 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1587 self.root_logger.addHandler(self.sock_hdlr) 1588 self.handled = threading.Event() 1589 1590 def tearDown(self): 1591 """Shutdown the UDP server.""" 1592 try: 1593 if self.server: 1594 self.server.stop(2.0) 1595 if self.sock_hdlr: 1596 self.root_logger.removeHandler(self.sock_hdlr) 1597 self.sock_hdlr.close() 1598 finally: 1599 BaseTest.tearDown(self) 1600 1601 def handle_datagram(self, request): 1602 slen = struct.pack('>L', 0) # length of prefix 1603 packet = request.packet[len(slen):] 1604 obj = pickle.loads(packet) 1605 record = logging.makeLogRecord(obj) 1606 self.log_output += record.msg + '\n' 1607 self.handled.set() 1608 1609 def test_output(self): 1610 # The log message sent to the DatagramHandler is properly received. 1611 if self.server_exception: 1612 self.skipTest(self.server_exception) 1613 logger = logging.getLogger("udp") 1614 logger.error("spam") 1615 self.handled.wait() 1616 self.handled.clear() 1617 logger.error("eggs") 1618 self.handled.wait() 1619 self.assertEqual(self.log_output, "spam\neggs\n") 1620 1621@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1622@unittest.skipUnless(threading, 'Threading required for this test.') 1623class UnixDatagramHandlerTest(DatagramHandlerTest): 1624 1625 """Test for DatagramHandler using Unix sockets.""" 1626 1627 if threading and hasattr(socket, "AF_UNIX"): 1628 server_class = TestUnixDatagramServer 1629 1630 def setUp(self): 1631 # override the definition in the base class 1632 self.address = _get_temp_domain_socket() 1633 DatagramHandlerTest.setUp(self) 1634 1635 def tearDown(self): 1636 DatagramHandlerTest.tearDown(self) 1637 support.unlink(self.address) 1638 1639@unittest.skipUnless(threading, 'Threading required for this test.') 1640class SysLogHandlerTest(BaseTest): 1641 1642 """Test for SysLogHandler using UDP.""" 1643 1644 if threading: 1645 server_class = TestUDPServer 1646 address = ('localhost', 0) 1647 1648 def setUp(self): 1649 """Set up a UDP server to receive log messages, and a SysLogHandler 1650 pointing to that server's address and port.""" 1651 BaseTest.setUp(self) 1652 # Issue #29177: deal with errors that happen during setup 1653 self.server = self.sl_hdlr = self.server_exception = None 1654 try: 1655 self.server = server = self.server_class(self.address, 1656 self.handle_datagram, 0.01) 1657 server.start() 1658 # Uncomment next line to test error recovery in setUp() 1659 # raise OSError('dummy error raised') 1660 except OSError as e: 1661 self.server_exception = e 1662 return 1663 server.ready.wait() 1664 hcls = logging.handlers.SysLogHandler 1665 if isinstance(server.server_address, tuple): 1666 self.sl_hdlr = hcls(('localhost', server.port)) 1667 else: 1668 self.sl_hdlr = hcls(server.server_address) 1669 self.log_output = '' 1670 self.root_logger.removeHandler(self.root_logger.handlers[0]) 1671 self.root_logger.addHandler(self.sl_hdlr) 1672 self.handled = threading.Event() 1673 1674 def tearDown(self): 1675 """Shutdown the server.""" 1676 try: 1677 if self.server: 1678 self.server.stop(2.0) 1679 if self.sl_hdlr: 1680 self.root_logger.removeHandler(self.sl_hdlr) 1681 self.sl_hdlr.close() 1682 finally: 1683 BaseTest.tearDown(self) 1684 1685 def handle_datagram(self, request): 1686 self.log_output = request.packet 1687 self.handled.set() 1688 1689 def test_output(self): 1690 if self.server_exception: 1691 self.skipTest(self.server_exception) 1692 # The log message sent to the SysLogHandler is properly received. 1693 logger = logging.getLogger("slh") 1694 logger.error("sp\xe4m") 1695 self.handled.wait() 1696 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m\x00') 1697 self.handled.clear() 1698 self.sl_hdlr.append_nul = False 1699 logger.error("sp\xe4m") 1700 self.handled.wait() 1701 self.assertEqual(self.log_output, b'<11>sp\xc3\xa4m') 1702 self.handled.clear() 1703 self.sl_hdlr.ident = "h\xe4m-" 1704 logger.error("sp\xe4m") 1705 self.handled.wait() 1706 self.assertEqual(self.log_output, b'<11>h\xc3\xa4m-sp\xc3\xa4m') 1707 1708@unittest.skipUnless(hasattr(socket, "AF_UNIX"), "Unix sockets required") 1709@unittest.skipUnless(threading, 'Threading required for this test.') 1710class UnixSysLogHandlerTest(SysLogHandlerTest): 1711 1712 """Test for SysLogHandler with Unix sockets.""" 1713 1714 if threading and hasattr(socket, "AF_UNIX"): 1715 server_class = TestUnixDatagramServer 1716 1717 def setUp(self): 1718 # override the definition in the base class 1719 self.address = _get_temp_domain_socket() 1720 SysLogHandlerTest.setUp(self) 1721 1722 def tearDown(self): 1723 SysLogHandlerTest.tearDown(self) 1724 support.unlink(self.address) 1725 1726@unittest.skipUnless(threading, 'Threading required for this test.') 1727class HTTPHandlerTest(BaseTest): 1728 """Test for HTTPHandler.""" 1729 1730 def setUp(self): 1731 """Set up an HTTP server to receive log messages, and a HTTPHandler 1732 pointing to that server's address and port.""" 1733 BaseTest.setUp(self) 1734 self.handled = threading.Event() 1735 1736 def handle_request(self, request): 1737 self.command = request.command 1738 self.log_data = urlparse(request.path) 1739 if self.command == 'POST': 1740 try: 1741 rlen = int(request.headers['Content-Length']) 1742 self.post_data = request.rfile.read(rlen) 1743 except: 1744 self.post_data = None 1745 request.send_response(200) 1746 request.end_headers() 1747 self.handled.set() 1748 1749 def test_output(self): 1750 # The log message sent to the HTTPHandler is properly received. 1751 logger = logging.getLogger("http") 1752 root_logger = self.root_logger 1753 root_logger.removeHandler(self.root_logger.handlers[0]) 1754 for secure in (False, True): 1755 addr = ('localhost', 0) 1756 if secure: 1757 try: 1758 import ssl 1759 except ImportError: 1760 sslctx = None 1761 else: 1762 here = os.path.dirname(__file__) 1763 localhost_cert = os.path.join(here, "keycert.pem") 1764 sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1765 sslctx.load_cert_chain(localhost_cert) 1766 1767 context = ssl.create_default_context(cafile=localhost_cert) 1768 else: 1769 sslctx = None 1770 context = None 1771 self.server = server = TestHTTPServer(addr, self.handle_request, 1772 0.01, sslctx=sslctx) 1773 server.start() 1774 server.ready.wait() 1775 host = 'localhost:%d' % server.server_port 1776 secure_client = secure and sslctx 1777 self.h_hdlr = logging.handlers.HTTPHandler(host, '/frob', 1778 secure=secure_client, 1779 context=context, 1780 credentials=('foo', 'bar')) 1781 self.log_data = None 1782 root_logger.addHandler(self.h_hdlr) 1783 1784 for method in ('GET', 'POST'): 1785 self.h_hdlr.method = method 1786 self.handled.clear() 1787 msg = "sp\xe4m" 1788 logger.error(msg) 1789 self.handled.wait() 1790 self.assertEqual(self.log_data.path, '/frob') 1791 self.assertEqual(self.command, method) 1792 if method == 'GET': 1793 d = parse_qs(self.log_data.query) 1794 else: 1795 d = parse_qs(self.post_data.decode('utf-8')) 1796 self.assertEqual(d['name'], ['http']) 1797 self.assertEqual(d['funcName'], ['test_output']) 1798 self.assertEqual(d['msg'], [msg]) 1799 1800 self.server.stop(2.0) 1801 self.root_logger.removeHandler(self.h_hdlr) 1802 self.h_hdlr.close() 1803 1804class MemoryTest(BaseTest): 1805 1806 """Test memory persistence of logger objects.""" 1807 1808 def setUp(self): 1809 """Create a dict to remember potentially destroyed objects.""" 1810 BaseTest.setUp(self) 1811 self._survivors = {} 1812 1813 def _watch_for_survival(self, *args): 1814 """Watch the given objects for survival, by creating weakrefs to 1815 them.""" 1816 for obj in args: 1817 key = id(obj), repr(obj) 1818 self._survivors[key] = weakref.ref(obj) 1819 1820 def _assertTruesurvival(self): 1821 """Assert that all objects watched for survival have survived.""" 1822 # Trigger cycle breaking. 1823 gc.collect() 1824 dead = [] 1825 for (id_, repr_), ref in self._survivors.items(): 1826 if ref() is None: 1827 dead.append(repr_) 1828 if dead: 1829 self.fail("%d objects should have survived " 1830 "but have been destroyed: %s" % (len(dead), ", ".join(dead))) 1831 1832 def test_persistent_loggers(self): 1833 # Logger objects are persistent and retain their configuration, even 1834 # if visible references are destroyed. 1835 self.root_logger.setLevel(logging.INFO) 1836 foo = logging.getLogger("foo") 1837 self._watch_for_survival(foo) 1838 foo.setLevel(logging.DEBUG) 1839 self.root_logger.debug(self.next_message()) 1840 foo.debug(self.next_message()) 1841 self.assert_log_lines([ 1842 ('foo', 'DEBUG', '2'), 1843 ]) 1844 del foo 1845 # foo has survived. 1846 self._assertTruesurvival() 1847 # foo has retained its settings. 1848 bar = logging.getLogger("foo") 1849 bar.debug(self.next_message()) 1850 self.assert_log_lines([ 1851 ('foo', 'DEBUG', '2'), 1852 ('foo', 'DEBUG', '3'), 1853 ]) 1854 1855 1856class EncodingTest(BaseTest): 1857 def test_encoding_plain_file(self): 1858 # In Python 2.x, a plain file object is treated as having no encoding. 1859 log = logging.getLogger("test") 1860 fd, fn = tempfile.mkstemp(".log", "test_logging-1-") 1861 os.close(fd) 1862 # the non-ascii data we write to the log. 1863 data = "foo\x80" 1864 try: 1865 handler = logging.FileHandler(fn, encoding="utf-8") 1866 log.addHandler(handler) 1867 try: 1868 # write non-ascii data to the log. 1869 log.warning(data) 1870 finally: 1871 log.removeHandler(handler) 1872 handler.close() 1873 # check we wrote exactly those bytes, ignoring trailing \n etc 1874 f = open(fn, encoding="utf-8") 1875 try: 1876 self.assertEqual(f.read().rstrip(), data) 1877 finally: 1878 f.close() 1879 finally: 1880 if os.path.isfile(fn): 1881 os.remove(fn) 1882 1883 def test_encoding_cyrillic_unicode(self): 1884 log = logging.getLogger("test") 1885 #Get a message in Unicode: Do svidanya in Cyrillic (meaning goodbye) 1886 message = '\u0434\u043e \u0441\u0432\u0438\u0434\u0430\u043d\u0438\u044f' 1887 #Ensure it's written in a Cyrillic encoding 1888 writer_class = codecs.getwriter('cp1251') 1889 writer_class.encoding = 'cp1251' 1890 stream = io.BytesIO() 1891 writer = writer_class(stream, 'strict') 1892 handler = logging.StreamHandler(writer) 1893 log.addHandler(handler) 1894 try: 1895 log.warning(message) 1896 finally: 1897 log.removeHandler(handler) 1898 handler.close() 1899 # check we wrote exactly those bytes, ignoring trailing \n etc 1900 s = stream.getvalue() 1901 #Compare against what the data should be when encoded in CP-1251 1902 self.assertEqual(s, b'\xe4\xee \xf1\xe2\xe8\xe4\xe0\xed\xe8\xff\n') 1903 1904 1905class WarningsTest(BaseTest): 1906 1907 def test_warnings(self): 1908 with warnings.catch_warnings(): 1909 logging.captureWarnings(True) 1910 self.addCleanup(logging.captureWarnings, False) 1911 warnings.filterwarnings("always", category=UserWarning) 1912 stream = io.StringIO() 1913 h = logging.StreamHandler(stream) 1914 logger = logging.getLogger("py.warnings") 1915 logger.addHandler(h) 1916 warnings.warn("I'm warning you...") 1917 logger.removeHandler(h) 1918 s = stream.getvalue() 1919 h.close() 1920 self.assertGreater(s.find("UserWarning: I'm warning you...\n"), 0) 1921 1922 #See if an explicit file uses the original implementation 1923 a_file = io.StringIO() 1924 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42, 1925 a_file, "Dummy line") 1926 s = a_file.getvalue() 1927 a_file.close() 1928 self.assertEqual(s, 1929 "dummy.py:42: UserWarning: Explicit\n Dummy line\n") 1930 1931 def test_warnings_no_handlers(self): 1932 with warnings.catch_warnings(): 1933 logging.captureWarnings(True) 1934 self.addCleanup(logging.captureWarnings, False) 1935 1936 # confirm our assumption: no loggers are set 1937 logger = logging.getLogger("py.warnings") 1938 self.assertEqual(logger.handlers, []) 1939 1940 warnings.showwarning("Explicit", UserWarning, "dummy.py", 42) 1941 self.assertEqual(len(logger.handlers), 1) 1942 self.assertIsInstance(logger.handlers[0], logging.NullHandler) 1943 1944 1945def formatFunc(format, datefmt=None): 1946 return logging.Formatter(format, datefmt) 1947 1948def handlerFunc(): 1949 return logging.StreamHandler() 1950 1951class CustomHandler(logging.StreamHandler): 1952 pass 1953 1954class ConfigDictTest(BaseTest): 1955 1956 """Reading logging config from a dictionary.""" 1957 1958 expected_log_pat = r"^(\w+) \+\+ (\w+)$" 1959 1960 # config0 is a standard configuration. 1961 config0 = { 1962 'version': 1, 1963 'formatters': { 1964 'form1' : { 1965 'format' : '%(levelname)s ++ %(message)s', 1966 }, 1967 }, 1968 'handlers' : { 1969 'hand1' : { 1970 'class' : 'logging.StreamHandler', 1971 'formatter' : 'form1', 1972 'level' : 'NOTSET', 1973 'stream' : 'ext://sys.stdout', 1974 }, 1975 }, 1976 'root' : { 1977 'level' : 'WARNING', 1978 'handlers' : ['hand1'], 1979 }, 1980 } 1981 1982 # config1 adds a little to the standard configuration. 1983 config1 = { 1984 'version': 1, 1985 'formatters': { 1986 'form1' : { 1987 'format' : '%(levelname)s ++ %(message)s', 1988 }, 1989 }, 1990 'handlers' : { 1991 'hand1' : { 1992 'class' : 'logging.StreamHandler', 1993 'formatter' : 'form1', 1994 'level' : 'NOTSET', 1995 'stream' : 'ext://sys.stdout', 1996 }, 1997 }, 1998 'loggers' : { 1999 'compiler.parser' : { 2000 'level' : 'DEBUG', 2001 'handlers' : ['hand1'], 2002 }, 2003 }, 2004 'root' : { 2005 'level' : 'WARNING', 2006 }, 2007 } 2008 2009 # config1a moves the handler to the root. Used with config8a 2010 config1a = { 2011 'version': 1, 2012 'formatters': { 2013 'form1' : { 2014 'format' : '%(levelname)s ++ %(message)s', 2015 }, 2016 }, 2017 'handlers' : { 2018 'hand1' : { 2019 'class' : 'logging.StreamHandler', 2020 'formatter' : 'form1', 2021 'level' : 'NOTSET', 2022 'stream' : 'ext://sys.stdout', 2023 }, 2024 }, 2025 'loggers' : { 2026 'compiler.parser' : { 2027 'level' : 'DEBUG', 2028 }, 2029 }, 2030 'root' : { 2031 'level' : 'WARNING', 2032 'handlers' : ['hand1'], 2033 }, 2034 } 2035 2036 # config2 has a subtle configuration error that should be reported 2037 config2 = { 2038 'version': 1, 2039 'formatters': { 2040 'form1' : { 2041 'format' : '%(levelname)s ++ %(message)s', 2042 }, 2043 }, 2044 'handlers' : { 2045 'hand1' : { 2046 'class' : 'logging.StreamHandler', 2047 'formatter' : 'form1', 2048 'level' : 'NOTSET', 2049 'stream' : 'ext://sys.stdbout', 2050 }, 2051 }, 2052 'loggers' : { 2053 'compiler.parser' : { 2054 'level' : 'DEBUG', 2055 'handlers' : ['hand1'], 2056 }, 2057 }, 2058 'root' : { 2059 'level' : 'WARNING', 2060 }, 2061 } 2062 2063 #As config1 but with a misspelt level on a handler 2064 config2a = { 2065 'version': 1, 2066 'formatters': { 2067 'form1' : { 2068 'format' : '%(levelname)s ++ %(message)s', 2069 }, 2070 }, 2071 'handlers' : { 2072 'hand1' : { 2073 'class' : 'logging.StreamHandler', 2074 'formatter' : 'form1', 2075 'level' : 'NTOSET', 2076 'stream' : 'ext://sys.stdout', 2077 }, 2078 }, 2079 'loggers' : { 2080 'compiler.parser' : { 2081 'level' : 'DEBUG', 2082 'handlers' : ['hand1'], 2083 }, 2084 }, 2085 'root' : { 2086 'level' : 'WARNING', 2087 }, 2088 } 2089 2090 2091 #As config1 but with a misspelt level on a logger 2092 config2b = { 2093 'version': 1, 2094 'formatters': { 2095 'form1' : { 2096 'format' : '%(levelname)s ++ %(message)s', 2097 }, 2098 }, 2099 'handlers' : { 2100 'hand1' : { 2101 'class' : 'logging.StreamHandler', 2102 'formatter' : 'form1', 2103 'level' : 'NOTSET', 2104 'stream' : 'ext://sys.stdout', 2105 }, 2106 }, 2107 'loggers' : { 2108 'compiler.parser' : { 2109 'level' : 'DEBUG', 2110 'handlers' : ['hand1'], 2111 }, 2112 }, 2113 'root' : { 2114 'level' : 'WRANING', 2115 }, 2116 } 2117 2118 # config3 has a less subtle configuration error 2119 config3 = { 2120 'version': 1, 2121 'formatters': { 2122 'form1' : { 2123 'format' : '%(levelname)s ++ %(message)s', 2124 }, 2125 }, 2126 'handlers' : { 2127 'hand1' : { 2128 'class' : 'logging.StreamHandler', 2129 'formatter' : 'misspelled_name', 2130 'level' : 'NOTSET', 2131 'stream' : 'ext://sys.stdout', 2132 }, 2133 }, 2134 'loggers' : { 2135 'compiler.parser' : { 2136 'level' : 'DEBUG', 2137 'handlers' : ['hand1'], 2138 }, 2139 }, 2140 'root' : { 2141 'level' : 'WARNING', 2142 }, 2143 } 2144 2145 # config4 specifies a custom formatter class to be loaded 2146 config4 = { 2147 'version': 1, 2148 'formatters': { 2149 'form1' : { 2150 '()' : __name__ + '.ExceptionFormatter', 2151 'format' : '%(levelname)s:%(name)s:%(message)s', 2152 }, 2153 }, 2154 'handlers' : { 2155 'hand1' : { 2156 'class' : 'logging.StreamHandler', 2157 'formatter' : 'form1', 2158 'level' : 'NOTSET', 2159 'stream' : 'ext://sys.stdout', 2160 }, 2161 }, 2162 'root' : { 2163 'level' : 'NOTSET', 2164 'handlers' : ['hand1'], 2165 }, 2166 } 2167 2168 # As config4 but using an actual callable rather than a string 2169 config4a = { 2170 'version': 1, 2171 'formatters': { 2172 'form1' : { 2173 '()' : ExceptionFormatter, 2174 'format' : '%(levelname)s:%(name)s:%(message)s', 2175 }, 2176 'form2' : { 2177 '()' : __name__ + '.formatFunc', 2178 'format' : '%(levelname)s:%(name)s:%(message)s', 2179 }, 2180 'form3' : { 2181 '()' : formatFunc, 2182 'format' : '%(levelname)s:%(name)s:%(message)s', 2183 }, 2184 }, 2185 'handlers' : { 2186 'hand1' : { 2187 'class' : 'logging.StreamHandler', 2188 'formatter' : 'form1', 2189 'level' : 'NOTSET', 2190 'stream' : 'ext://sys.stdout', 2191 }, 2192 'hand2' : { 2193 '()' : handlerFunc, 2194 }, 2195 }, 2196 'root' : { 2197 'level' : 'NOTSET', 2198 'handlers' : ['hand1'], 2199 }, 2200 } 2201 2202 # config5 specifies a custom handler class to be loaded 2203 config5 = { 2204 'version': 1, 2205 'formatters': { 2206 'form1' : { 2207 'format' : '%(levelname)s ++ %(message)s', 2208 }, 2209 }, 2210 'handlers' : { 2211 'hand1' : { 2212 'class' : __name__ + '.CustomHandler', 2213 'formatter' : 'form1', 2214 'level' : 'NOTSET', 2215 'stream' : 'ext://sys.stdout', 2216 }, 2217 }, 2218 'loggers' : { 2219 'compiler.parser' : { 2220 'level' : 'DEBUG', 2221 'handlers' : ['hand1'], 2222 }, 2223 }, 2224 'root' : { 2225 'level' : 'WARNING', 2226 }, 2227 } 2228 2229 # config6 specifies a custom handler class to be loaded 2230 # but has bad arguments 2231 config6 = { 2232 'version': 1, 2233 'formatters': { 2234 'form1' : { 2235 'format' : '%(levelname)s ++ %(message)s', 2236 }, 2237 }, 2238 'handlers' : { 2239 'hand1' : { 2240 'class' : __name__ + '.CustomHandler', 2241 'formatter' : 'form1', 2242 'level' : 'NOTSET', 2243 'stream' : 'ext://sys.stdout', 2244 '9' : 'invalid parameter name', 2245 }, 2246 }, 2247 'loggers' : { 2248 'compiler.parser' : { 2249 'level' : 'DEBUG', 2250 'handlers' : ['hand1'], 2251 }, 2252 }, 2253 'root' : { 2254 'level' : 'WARNING', 2255 }, 2256 } 2257 2258 #config 7 does not define compiler.parser but defines compiler.lexer 2259 #so compiler.parser should be disabled after applying it 2260 config7 = { 2261 'version': 1, 2262 'formatters': { 2263 'form1' : { 2264 'format' : '%(levelname)s ++ %(message)s', 2265 }, 2266 }, 2267 'handlers' : { 2268 'hand1' : { 2269 'class' : 'logging.StreamHandler', 2270 'formatter' : 'form1', 2271 'level' : 'NOTSET', 2272 'stream' : 'ext://sys.stdout', 2273 }, 2274 }, 2275 'loggers' : { 2276 'compiler.lexer' : { 2277 'level' : 'DEBUG', 2278 'handlers' : ['hand1'], 2279 }, 2280 }, 2281 'root' : { 2282 'level' : 'WARNING', 2283 }, 2284 } 2285 2286 # config8 defines both compiler and compiler.lexer 2287 # so compiler.parser should not be disabled (since 2288 # compiler is defined) 2289 config8 = { 2290 'version': 1, 2291 'disable_existing_loggers' : False, 2292 'formatters': { 2293 'form1' : { 2294 'format' : '%(levelname)s ++ %(message)s', 2295 }, 2296 }, 2297 'handlers' : { 2298 'hand1' : { 2299 'class' : 'logging.StreamHandler', 2300 'formatter' : 'form1', 2301 'level' : 'NOTSET', 2302 'stream' : 'ext://sys.stdout', 2303 }, 2304 }, 2305 'loggers' : { 2306 'compiler' : { 2307 'level' : 'DEBUG', 2308 'handlers' : ['hand1'], 2309 }, 2310 'compiler.lexer' : { 2311 }, 2312 }, 2313 'root' : { 2314 'level' : 'WARNING', 2315 }, 2316 } 2317 2318 # config8a disables existing loggers 2319 config8a = { 2320 'version': 1, 2321 'disable_existing_loggers' : True, 2322 'formatters': { 2323 'form1' : { 2324 'format' : '%(levelname)s ++ %(message)s', 2325 }, 2326 }, 2327 'handlers' : { 2328 'hand1' : { 2329 'class' : 'logging.StreamHandler', 2330 'formatter' : 'form1', 2331 'level' : 'NOTSET', 2332 'stream' : 'ext://sys.stdout', 2333 }, 2334 }, 2335 'loggers' : { 2336 'compiler' : { 2337 'level' : 'DEBUG', 2338 'handlers' : ['hand1'], 2339 }, 2340 'compiler.lexer' : { 2341 }, 2342 }, 2343 'root' : { 2344 'level' : 'WARNING', 2345 }, 2346 } 2347 2348 config9 = { 2349 'version': 1, 2350 'formatters': { 2351 'form1' : { 2352 'format' : '%(levelname)s ++ %(message)s', 2353 }, 2354 }, 2355 'handlers' : { 2356 'hand1' : { 2357 'class' : 'logging.StreamHandler', 2358 'formatter' : 'form1', 2359 'level' : 'WARNING', 2360 'stream' : 'ext://sys.stdout', 2361 }, 2362 }, 2363 'loggers' : { 2364 'compiler.parser' : { 2365 'level' : 'WARNING', 2366 'handlers' : ['hand1'], 2367 }, 2368 }, 2369 'root' : { 2370 'level' : 'NOTSET', 2371 }, 2372 } 2373 2374 config9a = { 2375 'version': 1, 2376 'incremental' : True, 2377 'handlers' : { 2378 'hand1' : { 2379 'level' : 'WARNING', 2380 }, 2381 }, 2382 'loggers' : { 2383 'compiler.parser' : { 2384 'level' : 'INFO', 2385 }, 2386 }, 2387 } 2388 2389 config9b = { 2390 'version': 1, 2391 'incremental' : True, 2392 'handlers' : { 2393 'hand1' : { 2394 'level' : 'INFO', 2395 }, 2396 }, 2397 'loggers' : { 2398 'compiler.parser' : { 2399 'level' : 'INFO', 2400 }, 2401 }, 2402 } 2403 2404 #As config1 but with a filter added 2405 config10 = { 2406 'version': 1, 2407 'formatters': { 2408 'form1' : { 2409 'format' : '%(levelname)s ++ %(message)s', 2410 }, 2411 }, 2412 'filters' : { 2413 'filt1' : { 2414 'name' : 'compiler.parser', 2415 }, 2416 }, 2417 'handlers' : { 2418 'hand1' : { 2419 'class' : 'logging.StreamHandler', 2420 'formatter' : 'form1', 2421 'level' : 'NOTSET', 2422 'stream' : 'ext://sys.stdout', 2423 'filters' : ['filt1'], 2424 }, 2425 }, 2426 'loggers' : { 2427 'compiler.parser' : { 2428 'level' : 'DEBUG', 2429 'filters' : ['filt1'], 2430 }, 2431 }, 2432 'root' : { 2433 'level' : 'WARNING', 2434 'handlers' : ['hand1'], 2435 }, 2436 } 2437 2438 #As config1 but using cfg:// references 2439 config11 = { 2440 'version': 1, 2441 'true_formatters': { 2442 'form1' : { 2443 'format' : '%(levelname)s ++ %(message)s', 2444 }, 2445 }, 2446 'handler_configs': { 2447 'hand1' : { 2448 'class' : 'logging.StreamHandler', 2449 'formatter' : 'form1', 2450 'level' : 'NOTSET', 2451 'stream' : 'ext://sys.stdout', 2452 }, 2453 }, 2454 'formatters' : 'cfg://true_formatters', 2455 'handlers' : { 2456 'hand1' : 'cfg://handler_configs[hand1]', 2457 }, 2458 'loggers' : { 2459 'compiler.parser' : { 2460 'level' : 'DEBUG', 2461 'handlers' : ['hand1'], 2462 }, 2463 }, 2464 'root' : { 2465 'level' : 'WARNING', 2466 }, 2467 } 2468 2469 #As config11 but missing the version key 2470 config12 = { 2471 'true_formatters': { 2472 'form1' : { 2473 'format' : '%(levelname)s ++ %(message)s', 2474 }, 2475 }, 2476 'handler_configs': { 2477 'hand1' : { 2478 'class' : 'logging.StreamHandler', 2479 'formatter' : 'form1', 2480 'level' : 'NOTSET', 2481 'stream' : 'ext://sys.stdout', 2482 }, 2483 }, 2484 'formatters' : 'cfg://true_formatters', 2485 'handlers' : { 2486 'hand1' : 'cfg://handler_configs[hand1]', 2487 }, 2488 'loggers' : { 2489 'compiler.parser' : { 2490 'level' : 'DEBUG', 2491 'handlers' : ['hand1'], 2492 }, 2493 }, 2494 'root' : { 2495 'level' : 'WARNING', 2496 }, 2497 } 2498 2499 #As config11 but using an unsupported version 2500 config13 = { 2501 'version': 2, 2502 'true_formatters': { 2503 'form1' : { 2504 'format' : '%(levelname)s ++ %(message)s', 2505 }, 2506 }, 2507 'handler_configs': { 2508 'hand1' : { 2509 'class' : 'logging.StreamHandler', 2510 'formatter' : 'form1', 2511 'level' : 'NOTSET', 2512 'stream' : 'ext://sys.stdout', 2513 }, 2514 }, 2515 'formatters' : 'cfg://true_formatters', 2516 'handlers' : { 2517 'hand1' : 'cfg://handler_configs[hand1]', 2518 }, 2519 'loggers' : { 2520 'compiler.parser' : { 2521 'level' : 'DEBUG', 2522 'handlers' : ['hand1'], 2523 }, 2524 }, 2525 'root' : { 2526 'level' : 'WARNING', 2527 }, 2528 } 2529 2530 # As config0, but with properties 2531 config14 = { 2532 'version': 1, 2533 'formatters': { 2534 'form1' : { 2535 'format' : '%(levelname)s ++ %(message)s', 2536 }, 2537 }, 2538 'handlers' : { 2539 'hand1' : { 2540 'class' : 'logging.StreamHandler', 2541 'formatter' : 'form1', 2542 'level' : 'NOTSET', 2543 'stream' : 'ext://sys.stdout', 2544 '.': { 2545 'foo': 'bar', 2546 'terminator': '!\n', 2547 } 2548 }, 2549 }, 2550 'root' : { 2551 'level' : 'WARNING', 2552 'handlers' : ['hand1'], 2553 }, 2554 } 2555 2556 out_of_order = { 2557 "version": 1, 2558 "formatters": { 2559 "mySimpleFormatter": { 2560 "format": "%(asctime)s (%(name)s) %(levelname)s: %(message)s", 2561 "style": "$" 2562 } 2563 }, 2564 "handlers": { 2565 "fileGlobal": { 2566 "class": "logging.StreamHandler", 2567 "level": "DEBUG", 2568 "formatter": "mySimpleFormatter" 2569 }, 2570 "bufferGlobal": { 2571 "class": "logging.handlers.MemoryHandler", 2572 "capacity": 5, 2573 "formatter": "mySimpleFormatter", 2574 "target": "fileGlobal", 2575 "level": "DEBUG" 2576 } 2577 }, 2578 "loggers": { 2579 "mymodule": { 2580 "level": "DEBUG", 2581 "handlers": ["bufferGlobal"], 2582 "propagate": "true" 2583 } 2584 } 2585 } 2586 2587 def apply_config(self, conf): 2588 logging.config.dictConfig(conf) 2589 2590 def test_config0_ok(self): 2591 # A simple config which overrides the default settings. 2592 with support.captured_stdout() as output: 2593 self.apply_config(self.config0) 2594 logger = logging.getLogger() 2595 # Won't output anything 2596 logger.info(self.next_message()) 2597 # Outputs a message 2598 logger.error(self.next_message()) 2599 self.assert_log_lines([ 2600 ('ERROR', '2'), 2601 ], stream=output) 2602 # Original logger output is empty. 2603 self.assert_log_lines([]) 2604 2605 def test_config1_ok(self, config=config1): 2606 # A config defining a sub-parser as well. 2607 with support.captured_stdout() as output: 2608 self.apply_config(config) 2609 logger = logging.getLogger("compiler.parser") 2610 # Both will output a message 2611 logger.info(self.next_message()) 2612 logger.error(self.next_message()) 2613 self.assert_log_lines([ 2614 ('INFO', '1'), 2615 ('ERROR', '2'), 2616 ], stream=output) 2617 # Original logger output is empty. 2618 self.assert_log_lines([]) 2619 2620 def test_config2_failure(self): 2621 # A simple config which overrides the default settings. 2622 self.assertRaises(Exception, self.apply_config, self.config2) 2623 2624 def test_config2a_failure(self): 2625 # A simple config which overrides the default settings. 2626 self.assertRaises(Exception, self.apply_config, self.config2a) 2627 2628 def test_config2b_failure(self): 2629 # A simple config which overrides the default settings. 2630 self.assertRaises(Exception, self.apply_config, self.config2b) 2631 2632 def test_config3_failure(self): 2633 # A simple config which overrides the default settings. 2634 self.assertRaises(Exception, self.apply_config, self.config3) 2635 2636 def test_config4_ok(self): 2637 # A config specifying a custom formatter class. 2638 with support.captured_stdout() as output: 2639 self.apply_config(self.config4) 2640 #logger = logging.getLogger() 2641 try: 2642 raise RuntimeError() 2643 except RuntimeError: 2644 logging.exception("just testing") 2645 sys.stdout.seek(0) 2646 self.assertEqual(output.getvalue(), 2647 "ERROR:root:just testing\nGot a [RuntimeError]\n") 2648 # Original logger output is empty 2649 self.assert_log_lines([]) 2650 2651 def test_config4a_ok(self): 2652 # A config specifying a custom formatter class. 2653 with support.captured_stdout() as output: 2654 self.apply_config(self.config4a) 2655 #logger = logging.getLogger() 2656 try: 2657 raise RuntimeError() 2658 except RuntimeError: 2659 logging.exception("just testing") 2660 sys.stdout.seek(0) 2661 self.assertEqual(output.getvalue(), 2662 "ERROR:root:just testing\nGot a [RuntimeError]\n") 2663 # Original logger output is empty 2664 self.assert_log_lines([]) 2665 2666 def test_config5_ok(self): 2667 self.test_config1_ok(config=self.config5) 2668 2669 def test_config6_failure(self): 2670 self.assertRaises(Exception, self.apply_config, self.config6) 2671 2672 def test_config7_ok(self): 2673 with support.captured_stdout() as output: 2674 self.apply_config(self.config1) 2675 logger = logging.getLogger("compiler.parser") 2676 # Both will output a message 2677 logger.info(self.next_message()) 2678 logger.error(self.next_message()) 2679 self.assert_log_lines([ 2680 ('INFO', '1'), 2681 ('ERROR', '2'), 2682 ], stream=output) 2683 # Original logger output is empty. 2684 self.assert_log_lines([]) 2685 with support.captured_stdout() as output: 2686 self.apply_config(self.config7) 2687 logger = logging.getLogger("compiler.parser") 2688 self.assertTrue(logger.disabled) 2689 logger = logging.getLogger("compiler.lexer") 2690 # Both will output a message 2691 logger.info(self.next_message()) 2692 logger.error(self.next_message()) 2693 self.assert_log_lines([ 2694 ('INFO', '3'), 2695 ('ERROR', '4'), 2696 ], stream=output) 2697 # Original logger output is empty. 2698 self.assert_log_lines([]) 2699 2700 #Same as test_config_7_ok but don't disable old loggers. 2701 def test_config_8_ok(self): 2702 with support.captured_stdout() as output: 2703 self.apply_config(self.config1) 2704 logger = logging.getLogger("compiler.parser") 2705 # All will output a message 2706 logger.info(self.next_message()) 2707 logger.error(self.next_message()) 2708 self.assert_log_lines([ 2709 ('INFO', '1'), 2710 ('ERROR', '2'), 2711 ], stream=output) 2712 # Original logger output is empty. 2713 self.assert_log_lines([]) 2714 with support.captured_stdout() as output: 2715 self.apply_config(self.config8) 2716 logger = logging.getLogger("compiler.parser") 2717 self.assertFalse(logger.disabled) 2718 # Both will output a message 2719 logger.info(self.next_message()) 2720 logger.error(self.next_message()) 2721 logger = logging.getLogger("compiler.lexer") 2722 # Both will output a message 2723 logger.info(self.next_message()) 2724 logger.error(self.next_message()) 2725 self.assert_log_lines([ 2726 ('INFO', '3'), 2727 ('ERROR', '4'), 2728 ('INFO', '5'), 2729 ('ERROR', '6'), 2730 ], stream=output) 2731 # Original logger output is empty. 2732 self.assert_log_lines([]) 2733 2734 def test_config_8a_ok(self): 2735 with support.captured_stdout() as output: 2736 self.apply_config(self.config1a) 2737 logger = logging.getLogger("compiler.parser") 2738 # See issue #11424. compiler-hyphenated sorts 2739 # between compiler and compiler.xyz and this 2740 # was preventing compiler.xyz from being included 2741 # in the child loggers of compiler because of an 2742 # overzealous loop termination condition. 2743 hyphenated = logging.getLogger('compiler-hyphenated') 2744 # All will output a message 2745 logger.info(self.next_message()) 2746 logger.error(self.next_message()) 2747 hyphenated.critical(self.next_message()) 2748 self.assert_log_lines([ 2749 ('INFO', '1'), 2750 ('ERROR', '2'), 2751 ('CRITICAL', '3'), 2752 ], stream=output) 2753 # Original logger output is empty. 2754 self.assert_log_lines([]) 2755 with support.captured_stdout() as output: 2756 self.apply_config(self.config8a) 2757 logger = logging.getLogger("compiler.parser") 2758 self.assertFalse(logger.disabled) 2759 # Both will output a message 2760 logger.info(self.next_message()) 2761 logger.error(self.next_message()) 2762 logger = logging.getLogger("compiler.lexer") 2763 # Both will output a message 2764 logger.info(self.next_message()) 2765 logger.error(self.next_message()) 2766 # Will not appear 2767 hyphenated.critical(self.next_message()) 2768 self.assert_log_lines([ 2769 ('INFO', '4'), 2770 ('ERROR', '5'), 2771 ('INFO', '6'), 2772 ('ERROR', '7'), 2773 ], stream=output) 2774 # Original logger output is empty. 2775 self.assert_log_lines([]) 2776 2777 def test_config_9_ok(self): 2778 with support.captured_stdout() as output: 2779 self.apply_config(self.config9) 2780 logger = logging.getLogger("compiler.parser") 2781 #Nothing will be output since both handler and logger are set to WARNING 2782 logger.info(self.next_message()) 2783 self.assert_log_lines([], stream=output) 2784 self.apply_config(self.config9a) 2785 #Nothing will be output since both handler is still set to WARNING 2786 logger.info(self.next_message()) 2787 self.assert_log_lines([], stream=output) 2788 self.apply_config(self.config9b) 2789 #Message should now be output 2790 logger.info(self.next_message()) 2791 self.assert_log_lines([ 2792 ('INFO', '3'), 2793 ], stream=output) 2794 2795 def test_config_10_ok(self): 2796 with support.captured_stdout() as output: 2797 self.apply_config(self.config10) 2798 logger = logging.getLogger("compiler.parser") 2799 logger.warning(self.next_message()) 2800 logger = logging.getLogger('compiler') 2801 #Not output, because filtered 2802 logger.warning(self.next_message()) 2803 logger = logging.getLogger('compiler.lexer') 2804 #Not output, because filtered 2805 logger.warning(self.next_message()) 2806 logger = logging.getLogger("compiler.parser.codegen") 2807 #Output, as not filtered 2808 logger.error(self.next_message()) 2809 self.assert_log_lines([ 2810 ('WARNING', '1'), 2811 ('ERROR', '4'), 2812 ], stream=output) 2813 2814 def test_config11_ok(self): 2815 self.test_config1_ok(self.config11) 2816 2817 def test_config12_failure(self): 2818 self.assertRaises(Exception, self.apply_config, self.config12) 2819 2820 def test_config13_failure(self): 2821 self.assertRaises(Exception, self.apply_config, self.config13) 2822 2823 def test_config14_ok(self): 2824 with support.captured_stdout() as output: 2825 self.apply_config(self.config14) 2826 h = logging._handlers['hand1'] 2827 self.assertEqual(h.foo, 'bar') 2828 self.assertEqual(h.terminator, '!\n') 2829 logging.warning('Exclamation') 2830 self.assertTrue(output.getvalue().endswith('Exclamation!\n')) 2831 2832 @unittest.skipUnless(threading, 'listen() needs threading to work') 2833 def setup_via_listener(self, text, verify=None): 2834 text = text.encode("utf-8") 2835 # Ask for a randomly assigned port (by using port 0) 2836 t = logging.config.listen(0, verify) 2837 t.start() 2838 t.ready.wait() 2839 # Now get the port allocated 2840 port = t.port 2841 t.ready.clear() 2842 try: 2843 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 2844 sock.settimeout(2.0) 2845 sock.connect(('localhost', port)) 2846 2847 slen = struct.pack('>L', len(text)) 2848 s = slen + text 2849 sentsofar = 0 2850 left = len(s) 2851 while left > 0: 2852 sent = sock.send(s[sentsofar:]) 2853 sentsofar += sent 2854 left -= sent 2855 sock.close() 2856 finally: 2857 t.ready.wait(2.0) 2858 logging.config.stopListening() 2859 t.join(2.0) 2860 2861 @unittest.skipUnless(threading, 'Threading required for this test.') 2862 def test_listen_config_10_ok(self): 2863 with support.captured_stdout() as output: 2864 self.setup_via_listener(json.dumps(self.config10)) 2865 logger = logging.getLogger("compiler.parser") 2866 logger.warning(self.next_message()) 2867 logger = logging.getLogger('compiler') 2868 #Not output, because filtered 2869 logger.warning(self.next_message()) 2870 logger = logging.getLogger('compiler.lexer') 2871 #Not output, because filtered 2872 logger.warning(self.next_message()) 2873 logger = logging.getLogger("compiler.parser.codegen") 2874 #Output, as not filtered 2875 logger.error(self.next_message()) 2876 self.assert_log_lines([ 2877 ('WARNING', '1'), 2878 ('ERROR', '4'), 2879 ], stream=output) 2880 2881 @unittest.skipUnless(threading, 'Threading required for this test.') 2882 def test_listen_config_1_ok(self): 2883 with support.captured_stdout() as output: 2884 self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1)) 2885 logger = logging.getLogger("compiler.parser") 2886 # Both will output a message 2887 logger.info(self.next_message()) 2888 logger.error(self.next_message()) 2889 self.assert_log_lines([ 2890 ('INFO', '1'), 2891 ('ERROR', '2'), 2892 ], stream=output) 2893 # Original logger output is empty. 2894 self.assert_log_lines([]) 2895 2896 @unittest.skipUnless(threading, 'Threading required for this test.') 2897 def test_listen_verify(self): 2898 2899 def verify_fail(stuff): 2900 return None 2901 2902 def verify_reverse(stuff): 2903 return stuff[::-1] 2904 2905 logger = logging.getLogger("compiler.parser") 2906 to_send = textwrap.dedent(ConfigFileTest.config1) 2907 # First, specify a verification function that will fail. 2908 # We expect to see no output, since our configuration 2909 # never took effect. 2910 with support.captured_stdout() as output: 2911 self.setup_via_listener(to_send, verify_fail) 2912 # Both will output a message 2913 logger.info(self.next_message()) 2914 logger.error(self.next_message()) 2915 self.assert_log_lines([], stream=output) 2916 # Original logger output has the stuff we logged. 2917 self.assert_log_lines([ 2918 ('INFO', '1'), 2919 ('ERROR', '2'), 2920 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 2921 2922 # Now, perform no verification. Our configuration 2923 # should take effect. 2924 2925 with support.captured_stdout() as output: 2926 self.setup_via_listener(to_send) # no verify callable specified 2927 logger = logging.getLogger("compiler.parser") 2928 # Both will output a message 2929 logger.info(self.next_message()) 2930 logger.error(self.next_message()) 2931 self.assert_log_lines([ 2932 ('INFO', '3'), 2933 ('ERROR', '4'), 2934 ], stream=output) 2935 # Original logger output still has the stuff we logged before. 2936 self.assert_log_lines([ 2937 ('INFO', '1'), 2938 ('ERROR', '2'), 2939 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 2940 2941 # Now, perform verification which transforms the bytes. 2942 2943 with support.captured_stdout() as output: 2944 self.setup_via_listener(to_send[::-1], verify_reverse) 2945 logger = logging.getLogger("compiler.parser") 2946 # Both will output a message 2947 logger.info(self.next_message()) 2948 logger.error(self.next_message()) 2949 self.assert_log_lines([ 2950 ('INFO', '5'), 2951 ('ERROR', '6'), 2952 ], stream=output) 2953 # Original logger output still has the stuff we logged before. 2954 self.assert_log_lines([ 2955 ('INFO', '1'), 2956 ('ERROR', '2'), 2957 ], pat=r"^[\w.]+ -> (\w+): (\d+)$") 2958 2959 def test_out_of_order(self): 2960 self.apply_config(self.out_of_order) 2961 handler = logging.getLogger('mymodule').handlers[0] 2962 self.assertIsInstance(handler.target, logging.Handler) 2963 self.assertIsInstance(handler.formatter._style, 2964 logging.StringTemplateStyle) 2965 2966 def test_baseconfig(self): 2967 d = { 2968 'atuple': (1, 2, 3), 2969 'alist': ['a', 'b', 'c'], 2970 'adict': {'d': 'e', 'f': 3 }, 2971 'nest1': ('g', ('h', 'i'), 'j'), 2972 'nest2': ['k', ['l', 'm'], 'n'], 2973 'nest3': ['o', 'cfg://alist', 'p'], 2974 } 2975 bc = logging.config.BaseConfigurator(d) 2976 self.assertEqual(bc.convert('cfg://atuple[1]'), 2) 2977 self.assertEqual(bc.convert('cfg://alist[1]'), 'b') 2978 self.assertEqual(bc.convert('cfg://nest1[1][0]'), 'h') 2979 self.assertEqual(bc.convert('cfg://nest2[1][1]'), 'm') 2980 self.assertEqual(bc.convert('cfg://adict.d'), 'e') 2981 self.assertEqual(bc.convert('cfg://adict[f]'), 3) 2982 v = bc.convert('cfg://nest3') 2983 self.assertEqual(v.pop(1), ['a', 'b', 'c']) 2984 self.assertRaises(KeyError, bc.convert, 'cfg://nosuch') 2985 self.assertRaises(ValueError, bc.convert, 'cfg://!') 2986 self.assertRaises(KeyError, bc.convert, 'cfg://adict[2]') 2987 2988class ManagerTest(BaseTest): 2989 def test_manager_loggerclass(self): 2990 logged = [] 2991 2992 class MyLogger(logging.Logger): 2993 def _log(self, level, msg, args, exc_info=None, extra=None): 2994 logged.append(msg) 2995 2996 man = logging.Manager(None) 2997 self.assertRaises(TypeError, man.setLoggerClass, int) 2998 man.setLoggerClass(MyLogger) 2999 logger = man.getLogger('test') 3000 logger.warning('should appear in logged') 3001 logging.warning('should not appear in logged') 3002 3003 self.assertEqual(logged, ['should appear in logged']) 3004 3005 def test_set_log_record_factory(self): 3006 man = logging.Manager(None) 3007 expected = object() 3008 man.setLogRecordFactory(expected) 3009 self.assertEqual(man.logRecordFactory, expected) 3010 3011class ChildLoggerTest(BaseTest): 3012 def test_child_loggers(self): 3013 r = logging.getLogger() 3014 l1 = logging.getLogger('abc') 3015 l2 = logging.getLogger('def.ghi') 3016 c1 = r.getChild('xyz') 3017 c2 = r.getChild('uvw.xyz') 3018 self.assertIs(c1, logging.getLogger('xyz')) 3019 self.assertIs(c2, logging.getLogger('uvw.xyz')) 3020 c1 = l1.getChild('def') 3021 c2 = c1.getChild('ghi') 3022 c3 = l1.getChild('def.ghi') 3023 self.assertIs(c1, logging.getLogger('abc.def')) 3024 self.assertIs(c2, logging.getLogger('abc.def.ghi')) 3025 self.assertIs(c2, c3) 3026 3027 3028class DerivedLogRecord(logging.LogRecord): 3029 pass 3030 3031class LogRecordFactoryTest(BaseTest): 3032 3033 def setUp(self): 3034 class CheckingFilter(logging.Filter): 3035 def __init__(self, cls): 3036 self.cls = cls 3037 3038 def filter(self, record): 3039 t = type(record) 3040 if t is not self.cls: 3041 msg = 'Unexpected LogRecord type %s, expected %s' % (t, 3042 self.cls) 3043 raise TypeError(msg) 3044 return True 3045 3046 BaseTest.setUp(self) 3047 self.filter = CheckingFilter(DerivedLogRecord) 3048 self.root_logger.addFilter(self.filter) 3049 self.orig_factory = logging.getLogRecordFactory() 3050 3051 def tearDown(self): 3052 self.root_logger.removeFilter(self.filter) 3053 BaseTest.tearDown(self) 3054 logging.setLogRecordFactory(self.orig_factory) 3055 3056 def test_logrecord_class(self): 3057 self.assertRaises(TypeError, self.root_logger.warning, 3058 self.next_message()) 3059 logging.setLogRecordFactory(DerivedLogRecord) 3060 self.root_logger.error(self.next_message()) 3061 self.assert_log_lines([ 3062 ('root', 'ERROR', '2'), 3063 ]) 3064 3065 3066class QueueHandlerTest(BaseTest): 3067 # Do not bother with a logger name group. 3068 expected_log_pat = r"^[\w.]+ -> (\w+): (\d+)$" 3069 3070 def setUp(self): 3071 BaseTest.setUp(self) 3072 self.queue = queue.Queue(-1) 3073 self.que_hdlr = logging.handlers.QueueHandler(self.queue) 3074 self.que_logger = logging.getLogger('que') 3075 self.que_logger.propagate = False 3076 self.que_logger.setLevel(logging.WARNING) 3077 self.que_logger.addHandler(self.que_hdlr) 3078 3079 def tearDown(self): 3080 self.que_hdlr.close() 3081 BaseTest.tearDown(self) 3082 3083 def test_queue_handler(self): 3084 self.que_logger.debug(self.next_message()) 3085 self.assertRaises(queue.Empty, self.queue.get_nowait) 3086 self.que_logger.info(self.next_message()) 3087 self.assertRaises(queue.Empty, self.queue.get_nowait) 3088 msg = self.next_message() 3089 self.que_logger.warning(msg) 3090 data = self.queue.get_nowait() 3091 self.assertTrue(isinstance(data, logging.LogRecord)) 3092 self.assertEqual(data.name, self.que_logger.name) 3093 self.assertEqual((data.msg, data.args), (msg, None)) 3094 3095 @unittest.skipUnless(hasattr(logging.handlers, 'QueueListener'), 3096 'logging.handlers.QueueListener required for this test') 3097 def test_queue_listener(self): 3098 handler = support.TestHandler(support.Matcher()) 3099 listener = logging.handlers.QueueListener(self.queue, handler) 3100 listener.start() 3101 try: 3102 self.que_logger.warning(self.next_message()) 3103 self.que_logger.error(self.next_message()) 3104 self.que_logger.critical(self.next_message()) 3105 finally: 3106 listener.stop() 3107 self.assertTrue(handler.matches(levelno=logging.WARNING, message='1')) 3108 self.assertTrue(handler.matches(levelno=logging.ERROR, message='2')) 3109 self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='3')) 3110 handler.close() 3111 3112 # Now test with respect_handler_level set 3113 3114 handler = support.TestHandler(support.Matcher()) 3115 handler.setLevel(logging.CRITICAL) 3116 listener = logging.handlers.QueueListener(self.queue, handler, 3117 respect_handler_level=True) 3118 listener.start() 3119 try: 3120 self.que_logger.warning(self.next_message()) 3121 self.que_logger.error(self.next_message()) 3122 self.que_logger.critical(self.next_message()) 3123 finally: 3124 listener.stop() 3125 self.assertFalse(handler.matches(levelno=logging.WARNING, message='4')) 3126 self.assertFalse(handler.matches(levelno=logging.ERROR, message='5')) 3127 self.assertTrue(handler.matches(levelno=logging.CRITICAL, message='6')) 3128 3129if hasattr(logging.handlers, 'QueueListener'): 3130 import multiprocessing 3131 from unittest.mock import patch 3132 3133 class QueueListenerTest(BaseTest): 3134 """ 3135 Tests based on patch submitted for issue #27930. Ensure that 3136 QueueListener handles all log messages. 3137 """ 3138 3139 repeat = 20 3140 3141 @staticmethod 3142 def setup_and_log(log_queue, ident): 3143 """ 3144 Creates a logger with a QueueHandler that logs to a queue read by a 3145 QueueListener. Starts the listener, logs five messages, and stops 3146 the listener. 3147 """ 3148 logger = logging.getLogger('test_logger_with_id_%s' % ident) 3149 logger.setLevel(logging.DEBUG) 3150 handler = logging.handlers.QueueHandler(log_queue) 3151 logger.addHandler(handler) 3152 listener = logging.handlers.QueueListener(log_queue) 3153 listener.start() 3154 3155 logger.info('one') 3156 logger.info('two') 3157 logger.info('three') 3158 logger.info('four') 3159 logger.info('five') 3160 3161 listener.stop() 3162 logger.removeHandler(handler) 3163 handler.close() 3164 3165 @patch.object(logging.handlers.QueueListener, 'handle') 3166 def test_handle_called_with_queue_queue(self, mock_handle): 3167 for i in range(self.repeat): 3168 log_queue = queue.Queue() 3169 self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) 3170 self.assertEqual(mock_handle.call_count, 5 * self.repeat, 3171 'correct number of handled log messages') 3172 3173 @support.requires_multiprocessing_queue 3174 @patch.object(logging.handlers.QueueListener, 'handle') 3175 def test_handle_called_with_mp_queue(self, mock_handle): 3176 for i in range(self.repeat): 3177 log_queue = multiprocessing.Queue() 3178 self.setup_and_log(log_queue, '%s_%s' % (self.id(), i)) 3179 self.assertEqual(mock_handle.call_count, 5 * self.repeat, 3180 'correct number of handled log messages') 3181 3182 @staticmethod 3183 def get_all_from_queue(log_queue): 3184 try: 3185 while True: 3186 yield log_queue.get_nowait() 3187 except queue.Empty: 3188 return [] 3189 3190 @support.requires_multiprocessing_queue 3191 def test_no_messages_in_queue_after_stop(self): 3192 """ 3193 Five messages are logged then the QueueListener is stopped. This 3194 test then gets everything off the queue. Failure of this test 3195 indicates that messages were not registered on the queue until 3196 _after_ the QueueListener stopped. 3197 """ 3198 for i in range(self.repeat): 3199 queue = multiprocessing.Queue() 3200 self.setup_and_log(queue, '%s_%s' %(self.id(), i)) 3201 # time.sleep(1) 3202 items = list(self.get_all_from_queue(queue)) 3203 expected = [[], [logging.handlers.QueueListener._sentinel]] 3204 self.assertIn(items, expected, 3205 'Found unexpected messages in queue: %s' % ( 3206 [m.msg if isinstance(m, logging.LogRecord) 3207 else m for m in items])) 3208 3209 3210ZERO = datetime.timedelta(0) 3211 3212class UTC(datetime.tzinfo): 3213 def utcoffset(self, dt): 3214 return ZERO 3215 3216 dst = utcoffset 3217 3218 def tzname(self, dt): 3219 return 'UTC' 3220 3221utc = UTC() 3222 3223class FormatterTest(unittest.TestCase): 3224 def setUp(self): 3225 self.common = { 3226 'name': 'formatter.test', 3227 'level': logging.DEBUG, 3228 'pathname': os.path.join('path', 'to', 'dummy.ext'), 3229 'lineno': 42, 3230 'exc_info': None, 3231 'func': None, 3232 'msg': 'Message with %d %s', 3233 'args': (2, 'placeholders'), 3234 } 3235 self.variants = { 3236 } 3237 3238 def get_record(self, name=None): 3239 result = dict(self.common) 3240 if name is not None: 3241 result.update(self.variants[name]) 3242 return logging.makeLogRecord(result) 3243 3244 def test_percent(self): 3245 # Test %-formatting 3246 r = self.get_record() 3247 f = logging.Formatter('${%(message)s}') 3248 self.assertEqual(f.format(r), '${Message with 2 placeholders}') 3249 f = logging.Formatter('%(random)s') 3250 self.assertRaises(KeyError, f.format, r) 3251 self.assertFalse(f.usesTime()) 3252 f = logging.Formatter('%(asctime)s') 3253 self.assertTrue(f.usesTime()) 3254 f = logging.Formatter('%(asctime)-15s') 3255 self.assertTrue(f.usesTime()) 3256 f = logging.Formatter('asctime') 3257 self.assertFalse(f.usesTime()) 3258 3259 def test_braces(self): 3260 # Test {}-formatting 3261 r = self.get_record() 3262 f = logging.Formatter('$%{message}%$', style='{') 3263 self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') 3264 f = logging.Formatter('{random}', style='{') 3265 self.assertRaises(KeyError, f.format, r) 3266 self.assertFalse(f.usesTime()) 3267 f = logging.Formatter('{asctime}', style='{') 3268 self.assertTrue(f.usesTime()) 3269 f = logging.Formatter('{asctime!s:15}', style='{') 3270 self.assertTrue(f.usesTime()) 3271 f = logging.Formatter('{asctime:15}', style='{') 3272 self.assertTrue(f.usesTime()) 3273 f = logging.Formatter('asctime', style='{') 3274 self.assertFalse(f.usesTime()) 3275 3276 def test_dollars(self): 3277 # Test $-formatting 3278 r = self.get_record() 3279 f = logging.Formatter('$message', style='$') 3280 self.assertEqual(f.format(r), 'Message with 2 placeholders') 3281 f = logging.Formatter('$$%${message}%$$', style='$') 3282 self.assertEqual(f.format(r), '$%Message with 2 placeholders%$') 3283 f = logging.Formatter('${random}', style='$') 3284 self.assertRaises(KeyError, f.format, r) 3285 self.assertFalse(f.usesTime()) 3286 f = logging.Formatter('${asctime}', style='$') 3287 self.assertTrue(f.usesTime()) 3288 f = logging.Formatter('${asctime', style='$') 3289 self.assertFalse(f.usesTime()) 3290 f = logging.Formatter('$asctime', style='$') 3291 self.assertTrue(f.usesTime()) 3292 f = logging.Formatter('asctime', style='$') 3293 self.assertFalse(f.usesTime()) 3294 3295 def test_invalid_style(self): 3296 self.assertRaises(ValueError, logging.Formatter, None, None, 'x') 3297 3298 def test_time(self): 3299 r = self.get_record() 3300 dt = datetime.datetime(1993, 4, 21, 8, 3, 0, 0, utc) 3301 # We use None to indicate we want the local timezone 3302 # We're essentially converting a UTC time to local time 3303 r.created = time.mktime(dt.astimezone(None).timetuple()) 3304 r.msecs = 123 3305 f = logging.Formatter('%(asctime)s %(message)s') 3306 f.converter = time.gmtime 3307 self.assertEqual(f.formatTime(r), '1993-04-21 08:03:00,123') 3308 self.assertEqual(f.formatTime(r, '%Y:%d'), '1993:21') 3309 f.format(r) 3310 self.assertEqual(r.asctime, '1993-04-21 08:03:00,123') 3311 3312class TestBufferingFormatter(logging.BufferingFormatter): 3313 def formatHeader(self, records): 3314 return '[(%d)' % len(records) 3315 3316 def formatFooter(self, records): 3317 return '(%d)]' % len(records) 3318 3319class BufferingFormatterTest(unittest.TestCase): 3320 def setUp(self): 3321 self.records = [ 3322 logging.makeLogRecord({'msg': 'one'}), 3323 logging.makeLogRecord({'msg': 'two'}), 3324 ] 3325 3326 def test_default(self): 3327 f = logging.BufferingFormatter() 3328 self.assertEqual('', f.format([])) 3329 self.assertEqual('onetwo', f.format(self.records)) 3330 3331 def test_custom(self): 3332 f = TestBufferingFormatter() 3333 self.assertEqual('[(2)onetwo(2)]', f.format(self.records)) 3334 lf = logging.Formatter('<%(message)s>') 3335 f = TestBufferingFormatter(lf) 3336 self.assertEqual('[(2)<one><two>(2)]', f.format(self.records)) 3337 3338class ExceptionTest(BaseTest): 3339 def test_formatting(self): 3340 r = self.root_logger 3341 h = RecordingHandler() 3342 r.addHandler(h) 3343 try: 3344 raise RuntimeError('deliberate mistake') 3345 except: 3346 logging.exception('failed', stack_info=True) 3347 r.removeHandler(h) 3348 h.close() 3349 r = h.records[0] 3350 self.assertTrue(r.exc_text.startswith('Traceback (most recent ' 3351 'call last):\n')) 3352 self.assertTrue(r.exc_text.endswith('\nRuntimeError: ' 3353 'deliberate mistake')) 3354 self.assertTrue(r.stack_info.startswith('Stack (most recent ' 3355 'call last):\n')) 3356 self.assertTrue(r.stack_info.endswith('logging.exception(\'failed\', ' 3357 'stack_info=True)')) 3358 3359 3360class LastResortTest(BaseTest): 3361 def test_last_resort(self): 3362 # Test the last resort handler 3363 root = self.root_logger 3364 root.removeHandler(self.root_hdlr) 3365 old_lastresort = logging.lastResort 3366 old_raise_exceptions = logging.raiseExceptions 3367 3368 try: 3369 with support.captured_stderr() as stderr: 3370 root.debug('This should not appear') 3371 self.assertEqual(stderr.getvalue(), '') 3372 root.warning('Final chance!') 3373 self.assertEqual(stderr.getvalue(), 'Final chance!\n') 3374 3375 # No handlers and no last resort, so 'No handlers' message 3376 logging.lastResort = None 3377 with support.captured_stderr() as stderr: 3378 root.warning('Final chance!') 3379 msg = 'No handlers could be found for logger "root"\n' 3380 self.assertEqual(stderr.getvalue(), msg) 3381 3382 # 'No handlers' message only printed once 3383 with support.captured_stderr() as stderr: 3384 root.warning('Final chance!') 3385 self.assertEqual(stderr.getvalue(), '') 3386 3387 # If raiseExceptions is False, no message is printed 3388 root.manager.emittedNoHandlerWarning = False 3389 logging.raiseExceptions = False 3390 with support.captured_stderr() as stderr: 3391 root.warning('Final chance!') 3392 self.assertEqual(stderr.getvalue(), '') 3393 finally: 3394 root.addHandler(self.root_hdlr) 3395 logging.lastResort = old_lastresort 3396 logging.raiseExceptions = old_raise_exceptions 3397 3398 3399class FakeHandler: 3400 3401 def __init__(self, identifier, called): 3402 for method in ('acquire', 'flush', 'close', 'release'): 3403 setattr(self, method, self.record_call(identifier, method, called)) 3404 3405 def record_call(self, identifier, method_name, called): 3406 def inner(): 3407 called.append('{} - {}'.format(identifier, method_name)) 3408 return inner 3409 3410 3411class RecordingHandler(logging.NullHandler): 3412 3413 def __init__(self, *args, **kwargs): 3414 super(RecordingHandler, self).__init__(*args, **kwargs) 3415 self.records = [] 3416 3417 def handle(self, record): 3418 """Keep track of all the emitted records.""" 3419 self.records.append(record) 3420 3421 3422class ShutdownTest(BaseTest): 3423 3424 """Test suite for the shutdown method.""" 3425 3426 def setUp(self): 3427 super(ShutdownTest, self).setUp() 3428 self.called = [] 3429 3430 raise_exceptions = logging.raiseExceptions 3431 self.addCleanup(setattr, logging, 'raiseExceptions', raise_exceptions) 3432 3433 def raise_error(self, error): 3434 def inner(): 3435 raise error() 3436 return inner 3437 3438 def test_no_failure(self): 3439 # create some fake handlers 3440 handler0 = FakeHandler(0, self.called) 3441 handler1 = FakeHandler(1, self.called) 3442 handler2 = FakeHandler(2, self.called) 3443 3444 # create live weakref to those handlers 3445 handlers = map(logging.weakref.ref, [handler0, handler1, handler2]) 3446 3447 logging.shutdown(handlerList=list(handlers)) 3448 3449 expected = ['2 - acquire', '2 - flush', '2 - close', '2 - release', 3450 '1 - acquire', '1 - flush', '1 - close', '1 - release', 3451 '0 - acquire', '0 - flush', '0 - close', '0 - release'] 3452 self.assertEqual(expected, self.called) 3453 3454 def _test_with_failure_in_method(self, method, error): 3455 handler = FakeHandler(0, self.called) 3456 setattr(handler, method, self.raise_error(error)) 3457 handlers = [logging.weakref.ref(handler)] 3458 3459 logging.shutdown(handlerList=list(handlers)) 3460 3461 self.assertEqual('0 - release', self.called[-1]) 3462 3463 def test_with_ioerror_in_acquire(self): 3464 self._test_with_failure_in_method('acquire', OSError) 3465 3466 def test_with_ioerror_in_flush(self): 3467 self._test_with_failure_in_method('flush', OSError) 3468 3469 def test_with_ioerror_in_close(self): 3470 self._test_with_failure_in_method('close', OSError) 3471 3472 def test_with_valueerror_in_acquire(self): 3473 self._test_with_failure_in_method('acquire', ValueError) 3474 3475 def test_with_valueerror_in_flush(self): 3476 self._test_with_failure_in_method('flush', ValueError) 3477 3478 def test_with_valueerror_in_close(self): 3479 self._test_with_failure_in_method('close', ValueError) 3480 3481 def test_with_other_error_in_acquire_without_raise(self): 3482 logging.raiseExceptions = False 3483 self._test_with_failure_in_method('acquire', IndexError) 3484 3485 def test_with_other_error_in_flush_without_raise(self): 3486 logging.raiseExceptions = False 3487 self._test_with_failure_in_method('flush', IndexError) 3488 3489 def test_with_other_error_in_close_without_raise(self): 3490 logging.raiseExceptions = False 3491 self._test_with_failure_in_method('close', IndexError) 3492 3493 def test_with_other_error_in_acquire_with_raise(self): 3494 logging.raiseExceptions = True 3495 self.assertRaises(IndexError, self._test_with_failure_in_method, 3496 'acquire', IndexError) 3497 3498 def test_with_other_error_in_flush_with_raise(self): 3499 logging.raiseExceptions = True 3500 self.assertRaises(IndexError, self._test_with_failure_in_method, 3501 'flush', IndexError) 3502 3503 def test_with_other_error_in_close_with_raise(self): 3504 logging.raiseExceptions = True 3505 self.assertRaises(IndexError, self._test_with_failure_in_method, 3506 'close', IndexError) 3507 3508 3509class ModuleLevelMiscTest(BaseTest): 3510 3511 """Test suite for some module level methods.""" 3512 3513 def test_disable(self): 3514 old_disable = logging.root.manager.disable 3515 # confirm our assumptions are correct 3516 self.assertEqual(old_disable, 0) 3517 self.addCleanup(logging.disable, old_disable) 3518 3519 logging.disable(83) 3520 self.assertEqual(logging.root.manager.disable, 83) 3521 3522 def _test_log(self, method, level=None): 3523 called = [] 3524 support.patch(self, logging, 'basicConfig', 3525 lambda *a, **kw: called.append((a, kw))) 3526 3527 recording = RecordingHandler() 3528 logging.root.addHandler(recording) 3529 3530 log_method = getattr(logging, method) 3531 if level is not None: 3532 log_method(level, "test me: %r", recording) 3533 else: 3534 log_method("test me: %r", recording) 3535 3536 self.assertEqual(len(recording.records), 1) 3537 record = recording.records[0] 3538 self.assertEqual(record.getMessage(), "test me: %r" % recording) 3539 3540 expected_level = level if level is not None else getattr(logging, method.upper()) 3541 self.assertEqual(record.levelno, expected_level) 3542 3543 # basicConfig was not called! 3544 self.assertEqual(called, []) 3545 3546 def test_log(self): 3547 self._test_log('log', logging.ERROR) 3548 3549 def test_debug(self): 3550 self._test_log('debug') 3551 3552 def test_info(self): 3553 self._test_log('info') 3554 3555 def test_warning(self): 3556 self._test_log('warning') 3557 3558 def test_error(self): 3559 self._test_log('error') 3560 3561 def test_critical(self): 3562 self._test_log('critical') 3563 3564 def test_set_logger_class(self): 3565 self.assertRaises(TypeError, logging.setLoggerClass, object) 3566 3567 class MyLogger(logging.Logger): 3568 pass 3569 3570 logging.setLoggerClass(MyLogger) 3571 self.assertEqual(logging.getLoggerClass(), MyLogger) 3572 3573 logging.setLoggerClass(logging.Logger) 3574 self.assertEqual(logging.getLoggerClass(), logging.Logger) 3575 3576 @support.requires_type_collecting 3577 def test_logging_at_shutdown(self): 3578 # Issue #20037 3579 code = """if 1: 3580 import logging 3581 3582 class A: 3583 def __del__(self): 3584 try: 3585 raise ValueError("some error") 3586 except Exception: 3587 logging.exception("exception in __del__") 3588 3589 a = A()""" 3590 rc, out, err = assert_python_ok("-c", code) 3591 err = err.decode() 3592 self.assertIn("exception in __del__", err) 3593 self.assertIn("ValueError: some error", err) 3594 3595 3596class LogRecordTest(BaseTest): 3597 def test_str_rep(self): 3598 r = logging.makeLogRecord({}) 3599 s = str(r) 3600 self.assertTrue(s.startswith('<LogRecord: ')) 3601 self.assertTrue(s.endswith('>')) 3602 3603 def test_dict_arg(self): 3604 h = RecordingHandler() 3605 r = logging.getLogger() 3606 r.addHandler(h) 3607 d = {'less' : 'more' } 3608 logging.warning('less is %(less)s', d) 3609 self.assertIs(h.records[0].args, d) 3610 self.assertEqual(h.records[0].message, 'less is more') 3611 r.removeHandler(h) 3612 h.close() 3613 3614 def test_multiprocessing(self): 3615 r = logging.makeLogRecord({}) 3616 self.assertEqual(r.processName, 'MainProcess') 3617 try: 3618 import multiprocessing as mp 3619 r = logging.makeLogRecord({}) 3620 self.assertEqual(r.processName, mp.current_process().name) 3621 except ImportError: 3622 pass 3623 3624 def test_optional(self): 3625 r = logging.makeLogRecord({}) 3626 NOT_NONE = self.assertIsNotNone 3627 if threading: 3628 NOT_NONE(r.thread) 3629 NOT_NONE(r.threadName) 3630 NOT_NONE(r.process) 3631 NOT_NONE(r.processName) 3632 log_threads = logging.logThreads 3633 log_processes = logging.logProcesses 3634 log_multiprocessing = logging.logMultiprocessing 3635 try: 3636 logging.logThreads = False 3637 logging.logProcesses = False 3638 logging.logMultiprocessing = False 3639 r = logging.makeLogRecord({}) 3640 NONE = self.assertIsNone 3641 NONE(r.thread) 3642 NONE(r.threadName) 3643 NONE(r.process) 3644 NONE(r.processName) 3645 finally: 3646 logging.logThreads = log_threads 3647 logging.logProcesses = log_processes 3648 logging.logMultiprocessing = log_multiprocessing 3649 3650class BasicConfigTest(unittest.TestCase): 3651 3652 """Test suite for logging.basicConfig.""" 3653 3654 def setUp(self): 3655 super(BasicConfigTest, self).setUp() 3656 self.handlers = logging.root.handlers 3657 self.saved_handlers = logging._handlers.copy() 3658 self.saved_handler_list = logging._handlerList[:] 3659 self.original_logging_level = logging.root.level 3660 self.addCleanup(self.cleanup) 3661 logging.root.handlers = [] 3662 3663 def tearDown(self): 3664 for h in logging.root.handlers[:]: 3665 logging.root.removeHandler(h) 3666 h.close() 3667 super(BasicConfigTest, self).tearDown() 3668 3669 def cleanup(self): 3670 setattr(logging.root, 'handlers', self.handlers) 3671 logging._handlers.clear() 3672 logging._handlers.update(self.saved_handlers) 3673 logging._handlerList[:] = self.saved_handler_list 3674 logging.root.level = self.original_logging_level 3675 3676 def test_no_kwargs(self): 3677 logging.basicConfig() 3678 3679 # handler defaults to a StreamHandler to sys.stderr 3680 self.assertEqual(len(logging.root.handlers), 1) 3681 handler = logging.root.handlers[0] 3682 self.assertIsInstance(handler, logging.StreamHandler) 3683 self.assertEqual(handler.stream, sys.stderr) 3684 3685 formatter = handler.formatter 3686 # format defaults to logging.BASIC_FORMAT 3687 self.assertEqual(formatter._style._fmt, logging.BASIC_FORMAT) 3688 # datefmt defaults to None 3689 self.assertIsNone(formatter.datefmt) 3690 # style defaults to % 3691 self.assertIsInstance(formatter._style, logging.PercentStyle) 3692 3693 # level is not explicitly set 3694 self.assertEqual(logging.root.level, self.original_logging_level) 3695 3696 def test_strformatstyle(self): 3697 with support.captured_stdout() as output: 3698 logging.basicConfig(stream=sys.stdout, style="{") 3699 logging.error("Log an error") 3700 sys.stdout.seek(0) 3701 self.assertEqual(output.getvalue().strip(), 3702 "ERROR:root:Log an error") 3703 3704 def test_stringtemplatestyle(self): 3705 with support.captured_stdout() as output: 3706 logging.basicConfig(stream=sys.stdout, style="$") 3707 logging.error("Log an error") 3708 sys.stdout.seek(0) 3709 self.assertEqual(output.getvalue().strip(), 3710 "ERROR:root:Log an error") 3711 3712 def test_filename(self): 3713 3714 def cleanup(h1, h2, fn): 3715 h1.close() 3716 h2.close() 3717 os.remove(fn) 3718 3719 logging.basicConfig(filename='test.log') 3720 3721 self.assertEqual(len(logging.root.handlers), 1) 3722 handler = logging.root.handlers[0] 3723 self.assertIsInstance(handler, logging.FileHandler) 3724 3725 expected = logging.FileHandler('test.log', 'a') 3726 self.assertEqual(handler.stream.mode, expected.stream.mode) 3727 self.assertEqual(handler.stream.name, expected.stream.name) 3728 self.addCleanup(cleanup, handler, expected, 'test.log') 3729 3730 def test_filemode(self): 3731 3732 def cleanup(h1, h2, fn): 3733 h1.close() 3734 h2.close() 3735 os.remove(fn) 3736 3737 logging.basicConfig(filename='test.log', filemode='wb') 3738 3739 handler = logging.root.handlers[0] 3740 expected = logging.FileHandler('test.log', 'wb') 3741 self.assertEqual(handler.stream.mode, expected.stream.mode) 3742 self.addCleanup(cleanup, handler, expected, 'test.log') 3743 3744 def test_stream(self): 3745 stream = io.StringIO() 3746 self.addCleanup(stream.close) 3747 logging.basicConfig(stream=stream) 3748 3749 self.assertEqual(len(logging.root.handlers), 1) 3750 handler = logging.root.handlers[0] 3751 self.assertIsInstance(handler, logging.StreamHandler) 3752 self.assertEqual(handler.stream, stream) 3753 3754 def test_format(self): 3755 logging.basicConfig(format='foo') 3756 3757 formatter = logging.root.handlers[0].formatter 3758 self.assertEqual(formatter._style._fmt, 'foo') 3759 3760 def test_datefmt(self): 3761 logging.basicConfig(datefmt='bar') 3762 3763 formatter = logging.root.handlers[0].formatter 3764 self.assertEqual(formatter.datefmt, 'bar') 3765 3766 def test_style(self): 3767 logging.basicConfig(style='$') 3768 3769 formatter = logging.root.handlers[0].formatter 3770 self.assertIsInstance(formatter._style, logging.StringTemplateStyle) 3771 3772 def test_level(self): 3773 old_level = logging.root.level 3774 self.addCleanup(logging.root.setLevel, old_level) 3775 3776 logging.basicConfig(level=57) 3777 self.assertEqual(logging.root.level, 57) 3778 # Test that second call has no effect 3779 logging.basicConfig(level=58) 3780 self.assertEqual(logging.root.level, 57) 3781 3782 def test_incompatible(self): 3783 assertRaises = self.assertRaises 3784 handlers = [logging.StreamHandler()] 3785 stream = sys.stderr 3786 assertRaises(ValueError, logging.basicConfig, filename='test.log', 3787 stream=stream) 3788 assertRaises(ValueError, logging.basicConfig, filename='test.log', 3789 handlers=handlers) 3790 assertRaises(ValueError, logging.basicConfig, stream=stream, 3791 handlers=handlers) 3792 # Issue 23207: test for invalid kwargs 3793 assertRaises(ValueError, logging.basicConfig, loglevel=logging.INFO) 3794 # Should pop both filename and filemode even if filename is None 3795 logging.basicConfig(filename=None, filemode='a') 3796 3797 def test_handlers(self): 3798 handlers = [ 3799 logging.StreamHandler(), 3800 logging.StreamHandler(sys.stdout), 3801 logging.StreamHandler(), 3802 ] 3803 f = logging.Formatter() 3804 handlers[2].setFormatter(f) 3805 logging.basicConfig(handlers=handlers) 3806 self.assertIs(handlers[0], logging.root.handlers[0]) 3807 self.assertIs(handlers[1], logging.root.handlers[1]) 3808 self.assertIs(handlers[2], logging.root.handlers[2]) 3809 self.assertIsNotNone(handlers[0].formatter) 3810 self.assertIsNotNone(handlers[1].formatter) 3811 self.assertIs(handlers[2].formatter, f) 3812 self.assertIs(handlers[0].formatter, handlers[1].formatter) 3813 3814 def _test_log(self, method, level=None): 3815 # logging.root has no handlers so basicConfig should be called 3816 called = [] 3817 3818 old_basic_config = logging.basicConfig 3819 def my_basic_config(*a, **kw): 3820 old_basic_config() 3821 old_level = logging.root.level 3822 logging.root.setLevel(100) # avoid having messages in stderr 3823 self.addCleanup(logging.root.setLevel, old_level) 3824 called.append((a, kw)) 3825 3826 support.patch(self, logging, 'basicConfig', my_basic_config) 3827 3828 log_method = getattr(logging, method) 3829 if level is not None: 3830 log_method(level, "test me") 3831 else: 3832 log_method("test me") 3833 3834 # basicConfig was called with no arguments 3835 self.assertEqual(called, [((), {})]) 3836 3837 def test_log(self): 3838 self._test_log('log', logging.WARNING) 3839 3840 def test_debug(self): 3841 self._test_log('debug') 3842 3843 def test_info(self): 3844 self._test_log('info') 3845 3846 def test_warning(self): 3847 self._test_log('warning') 3848 3849 def test_error(self): 3850 self._test_log('error') 3851 3852 def test_critical(self): 3853 self._test_log('critical') 3854 3855 3856class LoggerAdapterTest(unittest.TestCase): 3857 3858 def setUp(self): 3859 super(LoggerAdapterTest, self).setUp() 3860 old_handler_list = logging._handlerList[:] 3861 3862 self.recording = RecordingHandler() 3863 self.logger = logging.root 3864 self.logger.addHandler(self.recording) 3865 self.addCleanup(self.logger.removeHandler, self.recording) 3866 self.addCleanup(self.recording.close) 3867 3868 def cleanup(): 3869 logging._handlerList[:] = old_handler_list 3870 3871 self.addCleanup(cleanup) 3872 self.addCleanup(logging.shutdown) 3873 self.adapter = logging.LoggerAdapter(logger=self.logger, extra=None) 3874 3875 def test_exception(self): 3876 msg = 'testing exception: %r' 3877 exc = None 3878 try: 3879 1 / 0 3880 except ZeroDivisionError as e: 3881 exc = e 3882 self.adapter.exception(msg, self.recording) 3883 3884 self.assertEqual(len(self.recording.records), 1) 3885 record = self.recording.records[0] 3886 self.assertEqual(record.levelno, logging.ERROR) 3887 self.assertEqual(record.msg, msg) 3888 self.assertEqual(record.args, (self.recording,)) 3889 self.assertEqual(record.exc_info, 3890 (exc.__class__, exc, exc.__traceback__)) 3891 3892 def test_exception_excinfo(self): 3893 try: 3894 1 / 0 3895 except ZeroDivisionError as e: 3896 exc = e 3897 3898 self.adapter.exception('exc_info test', exc_info=exc) 3899 3900 self.assertEqual(len(self.recording.records), 1) 3901 record = self.recording.records[0] 3902 self.assertEqual(record.exc_info, 3903 (exc.__class__, exc, exc.__traceback__)) 3904 3905 def test_critical(self): 3906 msg = 'critical test! %r' 3907 self.adapter.critical(msg, self.recording) 3908 3909 self.assertEqual(len(self.recording.records), 1) 3910 record = self.recording.records[0] 3911 self.assertEqual(record.levelno, logging.CRITICAL) 3912 self.assertEqual(record.msg, msg) 3913 self.assertEqual(record.args, (self.recording,)) 3914 3915 def test_is_enabled_for(self): 3916 old_disable = self.adapter.logger.manager.disable 3917 self.adapter.logger.manager.disable = 33 3918 self.addCleanup(setattr, self.adapter.logger.manager, 'disable', 3919 old_disable) 3920 self.assertFalse(self.adapter.isEnabledFor(32)) 3921 3922 def test_has_handlers(self): 3923 self.assertTrue(self.adapter.hasHandlers()) 3924 3925 for handler in self.logger.handlers: 3926 self.logger.removeHandler(handler) 3927 3928 self.assertFalse(self.logger.hasHandlers()) 3929 self.assertFalse(self.adapter.hasHandlers()) 3930 3931 3932class LoggerTest(BaseTest): 3933 3934 def setUp(self): 3935 super(LoggerTest, self).setUp() 3936 self.recording = RecordingHandler() 3937 self.logger = logging.Logger(name='blah') 3938 self.logger.addHandler(self.recording) 3939 self.addCleanup(self.logger.removeHandler, self.recording) 3940 self.addCleanup(self.recording.close) 3941 self.addCleanup(logging.shutdown) 3942 3943 def test_set_invalid_level(self): 3944 self.assertRaises(TypeError, self.logger.setLevel, object()) 3945 3946 def test_exception(self): 3947 msg = 'testing exception: %r' 3948 exc = None 3949 try: 3950 1 / 0 3951 except ZeroDivisionError as e: 3952 exc = e 3953 self.logger.exception(msg, self.recording) 3954 3955 self.assertEqual(len(self.recording.records), 1) 3956 record = self.recording.records[0] 3957 self.assertEqual(record.levelno, logging.ERROR) 3958 self.assertEqual(record.msg, msg) 3959 self.assertEqual(record.args, (self.recording,)) 3960 self.assertEqual(record.exc_info, 3961 (exc.__class__, exc, exc.__traceback__)) 3962 3963 def test_log_invalid_level_with_raise(self): 3964 with support.swap_attr(logging, 'raiseExceptions', True): 3965 self.assertRaises(TypeError, self.logger.log, '10', 'test message') 3966 3967 def test_log_invalid_level_no_raise(self): 3968 with support.swap_attr(logging, 'raiseExceptions', False): 3969 self.logger.log('10', 'test message') # no exception happens 3970 3971 def test_find_caller_with_stack_info(self): 3972 called = [] 3973 support.patch(self, logging.traceback, 'print_stack', 3974 lambda f, file: called.append(file.getvalue())) 3975 3976 self.logger.findCaller(stack_info=True) 3977 3978 self.assertEqual(len(called), 1) 3979 self.assertEqual('Stack (most recent call last):\n', called[0]) 3980 3981 def test_make_record_with_extra_overwrite(self): 3982 name = 'my record' 3983 level = 13 3984 fn = lno = msg = args = exc_info = func = sinfo = None 3985 rv = logging._logRecordFactory(name, level, fn, lno, msg, args, 3986 exc_info, func, sinfo) 3987 3988 for key in ('message', 'asctime') + tuple(rv.__dict__.keys()): 3989 extra = {key: 'some value'} 3990 self.assertRaises(KeyError, self.logger.makeRecord, name, level, 3991 fn, lno, msg, args, exc_info, 3992 extra=extra, sinfo=sinfo) 3993 3994 def test_make_record_with_extra_no_overwrite(self): 3995 name = 'my record' 3996 level = 13 3997 fn = lno = msg = args = exc_info = func = sinfo = None 3998 extra = {'valid_key': 'some value'} 3999 result = self.logger.makeRecord(name, level, fn, lno, msg, args, 4000 exc_info, extra=extra, sinfo=sinfo) 4001 self.assertIn('valid_key', result.__dict__) 4002 4003 def test_has_handlers(self): 4004 self.assertTrue(self.logger.hasHandlers()) 4005 4006 for handler in self.logger.handlers: 4007 self.logger.removeHandler(handler) 4008 self.assertFalse(self.logger.hasHandlers()) 4009 4010 def test_has_handlers_no_propagate(self): 4011 child_logger = logging.getLogger('blah.child') 4012 child_logger.propagate = False 4013 self.assertFalse(child_logger.hasHandlers()) 4014 4015 def test_is_enabled_for(self): 4016 old_disable = self.logger.manager.disable 4017 self.logger.manager.disable = 23 4018 self.addCleanup(setattr, self.logger.manager, 'disable', old_disable) 4019 self.assertFalse(self.logger.isEnabledFor(22)) 4020 4021 def test_root_logger_aliases(self): 4022 root = logging.getLogger() 4023 self.assertIs(root, logging.root) 4024 self.assertIs(root, logging.getLogger(None)) 4025 self.assertIs(root, logging.getLogger('')) 4026 self.assertIs(root, logging.getLogger('foo').root) 4027 self.assertIs(root, logging.getLogger('foo.bar').root) 4028 self.assertIs(root, logging.getLogger('foo').parent) 4029 4030 self.assertIsNot(root, logging.getLogger('\0')) 4031 self.assertIsNot(root, logging.getLogger('foo.bar').parent) 4032 4033 def test_invalid_names(self): 4034 self.assertRaises(TypeError, logging.getLogger, any) 4035 self.assertRaises(TypeError, logging.getLogger, b'foo') 4036 4037 4038class BaseFileTest(BaseTest): 4039 "Base class for handler tests that write log files" 4040 4041 def setUp(self): 4042 BaseTest.setUp(self) 4043 fd, self.fn = tempfile.mkstemp(".log", "test_logging-2-") 4044 os.close(fd) 4045 self.rmfiles = [] 4046 4047 def tearDown(self): 4048 for fn in self.rmfiles: 4049 os.unlink(fn) 4050 if os.path.exists(self.fn): 4051 os.unlink(self.fn) 4052 BaseTest.tearDown(self) 4053 4054 def assertLogFile(self, filename): 4055 "Assert a log file is there and register it for deletion" 4056 self.assertTrue(os.path.exists(filename), 4057 msg="Log file %r does not exist" % filename) 4058 self.rmfiles.append(filename) 4059 4060 4061class FileHandlerTest(BaseFileTest): 4062 def test_delay(self): 4063 os.unlink(self.fn) 4064 fh = logging.FileHandler(self.fn, delay=True) 4065 self.assertIsNone(fh.stream) 4066 self.assertFalse(os.path.exists(self.fn)) 4067 fh.handle(logging.makeLogRecord({})) 4068 self.assertIsNotNone(fh.stream) 4069 self.assertTrue(os.path.exists(self.fn)) 4070 fh.close() 4071 4072class RotatingFileHandlerTest(BaseFileTest): 4073 def next_rec(self): 4074 return logging.LogRecord('n', logging.DEBUG, 'p', 1, 4075 self.next_message(), None, None, None) 4076 4077 def test_should_not_rollover(self): 4078 # If maxbytes is zero rollover never occurs 4079 rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=0) 4080 self.assertFalse(rh.shouldRollover(None)) 4081 rh.close() 4082 4083 def test_should_rollover(self): 4084 rh = logging.handlers.RotatingFileHandler(self.fn, maxBytes=1) 4085 self.assertTrue(rh.shouldRollover(self.next_rec())) 4086 rh.close() 4087 4088 def test_file_created(self): 4089 # checks that the file is created and assumes it was created 4090 # by us 4091 rh = logging.handlers.RotatingFileHandler(self.fn) 4092 rh.emit(self.next_rec()) 4093 self.assertLogFile(self.fn) 4094 rh.close() 4095 4096 def test_rollover_filenames(self): 4097 def namer(name): 4098 return name + ".test" 4099 rh = logging.handlers.RotatingFileHandler( 4100 self.fn, backupCount=2, maxBytes=1) 4101 rh.namer = namer 4102 rh.emit(self.next_rec()) 4103 self.assertLogFile(self.fn) 4104 rh.emit(self.next_rec()) 4105 self.assertLogFile(namer(self.fn + ".1")) 4106 rh.emit(self.next_rec()) 4107 self.assertLogFile(namer(self.fn + ".2")) 4108 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 4109 rh.close() 4110 4111 @support.requires_zlib 4112 def test_rotator(self): 4113 def namer(name): 4114 return name + ".gz" 4115 4116 def rotator(source, dest): 4117 with open(source, "rb") as sf: 4118 data = sf.read() 4119 compressed = zlib.compress(data, 9) 4120 with open(dest, "wb") as df: 4121 df.write(compressed) 4122 os.remove(source) 4123 4124 rh = logging.handlers.RotatingFileHandler( 4125 self.fn, backupCount=2, maxBytes=1) 4126 rh.rotator = rotator 4127 rh.namer = namer 4128 m1 = self.next_rec() 4129 rh.emit(m1) 4130 self.assertLogFile(self.fn) 4131 m2 = self.next_rec() 4132 rh.emit(m2) 4133 fn = namer(self.fn + ".1") 4134 self.assertLogFile(fn) 4135 newline = os.linesep 4136 with open(fn, "rb") as f: 4137 compressed = f.read() 4138 data = zlib.decompress(compressed) 4139 self.assertEqual(data.decode("ascii"), m1.msg + newline) 4140 rh.emit(self.next_rec()) 4141 fn = namer(self.fn + ".2") 4142 self.assertLogFile(fn) 4143 with open(fn, "rb") as f: 4144 compressed = f.read() 4145 data = zlib.decompress(compressed) 4146 self.assertEqual(data.decode("ascii"), m1.msg + newline) 4147 rh.emit(self.next_rec()) 4148 fn = namer(self.fn + ".2") 4149 with open(fn, "rb") as f: 4150 compressed = f.read() 4151 data = zlib.decompress(compressed) 4152 self.assertEqual(data.decode("ascii"), m2.msg + newline) 4153 self.assertFalse(os.path.exists(namer(self.fn + ".3"))) 4154 rh.close() 4155 4156class TimedRotatingFileHandlerTest(BaseFileTest): 4157 # other test methods added below 4158 def test_rollover(self): 4159 fh = logging.handlers.TimedRotatingFileHandler(self.fn, 'S', 4160 backupCount=1) 4161 fmt = logging.Formatter('%(asctime)s %(message)s') 4162 fh.setFormatter(fmt) 4163 r1 = logging.makeLogRecord({'msg': 'testing - initial'}) 4164 fh.emit(r1) 4165 self.assertLogFile(self.fn) 4166 time.sleep(1.1) # a little over a second ... 4167 r2 = logging.makeLogRecord({'msg': 'testing - after delay'}) 4168 fh.emit(r2) 4169 fh.close() 4170 # At this point, we should have a recent rotated file which we 4171 # can test for the existence of. However, in practice, on some 4172 # machines which run really slowly, we don't know how far back 4173 # in time to go to look for the log file. So, we go back a fair 4174 # bit, and stop as soon as we see a rotated file. In theory this 4175 # could of course still fail, but the chances are lower. 4176 found = False 4177 now = datetime.datetime.now() 4178 GO_BACK = 5 * 60 # seconds 4179 for secs in range(GO_BACK): 4180 prev = now - datetime.timedelta(seconds=secs) 4181 fn = self.fn + prev.strftime(".%Y-%m-%d_%H-%M-%S") 4182 found = os.path.exists(fn) 4183 if found: 4184 self.rmfiles.append(fn) 4185 break 4186 msg = 'No rotated files found, went back %d seconds' % GO_BACK 4187 if not found: 4188 #print additional diagnostics 4189 dn, fn = os.path.split(self.fn) 4190 files = [f for f in os.listdir(dn) if f.startswith(fn)] 4191 print('Test time: %s' % now.strftime("%Y-%m-%d %H-%M-%S"), file=sys.stderr) 4192 print('The only matching files are: %s' % files, file=sys.stderr) 4193 for f in files: 4194 print('Contents of %s:' % f) 4195 path = os.path.join(dn, f) 4196 with open(path, 'r') as tf: 4197 print(tf.read()) 4198 self.assertTrue(found, msg=msg) 4199 4200 def test_invalid(self): 4201 assertRaises = self.assertRaises 4202 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 4203 self.fn, 'X', delay=True) 4204 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 4205 self.fn, 'W', delay=True) 4206 assertRaises(ValueError, logging.handlers.TimedRotatingFileHandler, 4207 self.fn, 'W7', delay=True) 4208 4209 def test_compute_rollover_daily_attime(self): 4210 currentTime = 0 4211 atTime = datetime.time(12, 0, 0) 4212 rh = logging.handlers.TimedRotatingFileHandler( 4213 self.fn, when='MIDNIGHT', interval=1, backupCount=0, utc=True, 4214 atTime=atTime) 4215 try: 4216 actual = rh.computeRollover(currentTime) 4217 self.assertEqual(actual, currentTime + 12 * 60 * 60) 4218 4219 actual = rh.computeRollover(currentTime + 13 * 60 * 60) 4220 self.assertEqual(actual, currentTime + 36 * 60 * 60) 4221 finally: 4222 rh.close() 4223 4224 #@unittest.skipIf(True, 'Temporarily skipped while failures investigated.') 4225 def test_compute_rollover_weekly_attime(self): 4226 currentTime = int(time.time()) 4227 today = currentTime - currentTime % 86400 4228 4229 atTime = datetime.time(12, 0, 0) 4230 4231 wday = time.gmtime(today).tm_wday 4232 for day in range(7): 4233 rh = logging.handlers.TimedRotatingFileHandler( 4234 self.fn, when='W%d' % day, interval=1, backupCount=0, utc=True, 4235 atTime=atTime) 4236 try: 4237 if wday > day: 4238 # The rollover day has already passed this week, so we 4239 # go over into next week 4240 expected = (7 - wday + day) 4241 else: 4242 expected = (day - wday) 4243 # At this point expected is in days from now, convert to seconds 4244 expected *= 24 * 60 * 60 4245 # Add in the rollover time 4246 expected += 12 * 60 * 60 4247 # Add in adjustment for today 4248 expected += today 4249 actual = rh.computeRollover(today) 4250 if actual != expected: 4251 print('failed in timezone: %d' % time.timezone) 4252 print('local vars: %s' % locals()) 4253 self.assertEqual(actual, expected) 4254 if day == wday: 4255 # goes into following week 4256 expected += 7 * 24 * 60 * 60 4257 actual = rh.computeRollover(today + 13 * 60 * 60) 4258 if actual != expected: 4259 print('failed in timezone: %d' % time.timezone) 4260 print('local vars: %s' % locals()) 4261 self.assertEqual(actual, expected) 4262 finally: 4263 rh.close() 4264 4265 4266def secs(**kw): 4267 return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) 4268 4269for when, exp in (('S', 1), 4270 ('M', 60), 4271 ('H', 60 * 60), 4272 ('D', 60 * 60 * 24), 4273 ('MIDNIGHT', 60 * 60 * 24), 4274 # current time (epoch start) is a Thursday, W0 means Monday 4275 ('W0', secs(days=4, hours=24)), 4276 ): 4277 def test_compute_rollover(self, when=when, exp=exp): 4278 rh = logging.handlers.TimedRotatingFileHandler( 4279 self.fn, when=when, interval=1, backupCount=0, utc=True) 4280 currentTime = 0.0 4281 actual = rh.computeRollover(currentTime) 4282 if exp != actual: 4283 # Failures occur on some systems for MIDNIGHT and W0. 4284 # Print detailed calculation for MIDNIGHT so we can try to see 4285 # what's going on 4286 if when == 'MIDNIGHT': 4287 try: 4288 if rh.utc: 4289 t = time.gmtime(currentTime) 4290 else: 4291 t = time.localtime(currentTime) 4292 currentHour = t[3] 4293 currentMinute = t[4] 4294 currentSecond = t[5] 4295 # r is the number of seconds left between now and midnight 4296 r = logging.handlers._MIDNIGHT - ((currentHour * 60 + 4297 currentMinute) * 60 + 4298 currentSecond) 4299 result = currentTime + r 4300 print('t: %s (%s)' % (t, rh.utc), file=sys.stderr) 4301 print('currentHour: %s' % currentHour, file=sys.stderr) 4302 print('currentMinute: %s' % currentMinute, file=sys.stderr) 4303 print('currentSecond: %s' % currentSecond, file=sys.stderr) 4304 print('r: %s' % r, file=sys.stderr) 4305 print('result: %s' % result, file=sys.stderr) 4306 except Exception: 4307 print('exception in diagnostic code: %s' % sys.exc_info()[1], file=sys.stderr) 4308 self.assertEqual(exp, actual) 4309 rh.close() 4310 setattr(TimedRotatingFileHandlerTest, "test_compute_rollover_%s" % when, test_compute_rollover) 4311 4312 4313@unittest.skipUnless(win32evtlog, 'win32evtlog/win32evtlogutil/pywintypes required for this test.') 4314class NTEventLogHandlerTest(BaseTest): 4315 def test_basic(self): 4316 logtype = 'Application' 4317 elh = win32evtlog.OpenEventLog(None, logtype) 4318 num_recs = win32evtlog.GetNumberOfEventLogRecords(elh) 4319 4320 try: 4321 h = logging.handlers.NTEventLogHandler('test_logging') 4322 except pywintypes.error as e: 4323 if e.winerror == 5: # access denied 4324 raise unittest.SkipTest('Insufficient privileges to run test') 4325 raise 4326 4327 r = logging.makeLogRecord({'msg': 'Test Log Message'}) 4328 h.handle(r) 4329 h.close() 4330 # Now see if the event is recorded 4331 self.assertLess(num_recs, win32evtlog.GetNumberOfEventLogRecords(elh)) 4332 flags = win32evtlog.EVENTLOG_BACKWARDS_READ | \ 4333 win32evtlog.EVENTLOG_SEQUENTIAL_READ 4334 found = False 4335 GO_BACK = 100 4336 events = win32evtlog.ReadEventLog(elh, flags, GO_BACK) 4337 for e in events: 4338 if e.SourceName != 'test_logging': 4339 continue 4340 msg = win32evtlogutil.SafeFormatMessage(e, logtype) 4341 if msg != 'Test Log Message\r\n': 4342 continue 4343 found = True 4344 break 4345 msg = 'Record not found in event log, went back %d records' % GO_BACK 4346 self.assertTrue(found, msg=msg) 4347 4348 4349class MiscTestCase(unittest.TestCase): 4350 def test__all__(self): 4351 blacklist = {'logThreads', 'logMultiprocessing', 4352 'logProcesses', 'currentframe', 4353 'PercentStyle', 'StrFormatStyle', 'StringTemplateStyle', 4354 'Filterer', 'PlaceHolder', 'Manager', 'RootLogger', 4355 'root', 'threading'} 4356 support.check__all__(self, logging, blacklist=blacklist) 4357 4358 4359# Set the locale to the platform-dependent default. I have no idea 4360# why the test does this, but in any case we save the current locale 4361# first and restore it at the end. 4362@support.run_with_locale('LC_ALL', '') 4363def test_main(): 4364 tests = [ 4365 BuiltinLevelsTest, BasicFilterTest, CustomLevelsAndFiltersTest, 4366 HandlerTest, MemoryHandlerTest, ConfigFileTest, SocketHandlerTest, 4367 DatagramHandlerTest, MemoryTest, EncodingTest, WarningsTest, 4368 ConfigDictTest, ManagerTest, FormatterTest, BufferingFormatterTest, 4369 StreamHandlerTest, LogRecordFactoryTest, ChildLoggerTest, 4370 QueueHandlerTest, ShutdownTest, ModuleLevelMiscTest, BasicConfigTest, 4371 LoggerAdapterTest, LoggerTest, SMTPHandlerTest, FileHandlerTest, 4372 RotatingFileHandlerTest, LastResortTest, LogRecordTest, 4373 ExceptionTest, SysLogHandlerTest, HTTPHandlerTest, 4374 NTEventLogHandlerTest, TimedRotatingFileHandlerTest, 4375 UnixSocketHandlerTest, UnixDatagramHandlerTest, UnixSysLogHandlerTest, 4376 MiscTestCase 4377 ] 4378 if hasattr(logging.handlers, 'QueueListener'): 4379 tests.append(QueueListenerTest) 4380 support.run_unittest(*tests) 4381 4382if __name__ == "__main__": 4383 test_main() 4384