10a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom test import test_support as support 20a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# If we end up with a significant number of tests that don't require 30a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# threading, this test module should be split. Right now we skip 40a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# them all if we don't have threading. 50a8c90248264a8b26970b4473770bcc3df8515fJosh Gaothreading = support.import_module('threading') 60a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 70a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom contextlib import contextmanager 80a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport imaplib 90a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport os.path 100a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport SocketServer 110a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport time 120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 130a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom test_support import reap_threads, verbose, transient_internet 140a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport unittest 150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 160a8c90248264a8b26970b4473770bcc3df8515fJosh Gaotry: 170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao import ssl 180a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoexcept ImportError: 190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ssl = None 200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 210a8c90248264a8b26970b4473770bcc3df8515fJosh GaoCERTFILE = None 220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 240a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass TestImaplib(unittest.TestCase): 250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def test_that_Time2Internaldate_returns_a_result(self): 270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # We can check only that it successfully produces a result, 280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # not the correctness of the result itself, since the result 290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # depends on the timezone the machine is in. 300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao timevalues = [2000000000, 2000000000.0, time.localtime(2000000000), 310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao '"18-May-2033 05:33:20 +0200"'] 320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao for t in timevalues: 340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao imaplib.Time2Internaldate(t) 350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 370a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoif ssl: 380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao class SecureTCPServer(SocketServer.TCPServer): 400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def get_request(self): 420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao newsocket, fromaddr = self.socket.accept() 430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao connstream = ssl.wrap_socket(newsocket, 440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao server_side=True, 450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao certfile=CERTFILE) 460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return connstream, fromaddr 470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao IMAP4_SSL = imaplib.IMAP4_SSL 490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 500a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoelse: 510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao class SecureTCPServer: 530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao pass 540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao IMAP4_SSL = None 560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 580a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass SimpleIMAPHandler(SocketServer.StreamRequestHandler): 590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao timeout = 1 610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def _send(self, message): 630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if verbose: print "SENT:", message.strip() 640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.wfile.write(message) 650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def handle(self): 670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # Send a welcome message. 680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._send('* OK IMAP4rev1\r\n') 690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao while 1: 700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # Gather up input until we receive a line terminator or we timeout. 710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # Accumulate read(1) because it's simpler to handle the differences 720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # between naked sockets and SSL sockets. 730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao line = '' 740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao while 1: 750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao part = self.rfile.read(1) 770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if part == '': 780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # Naked sockets return empty strings.. 790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return 800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao line += part 810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao except IOError: 820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # ..but SSLSockets raise exceptions. 830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return 840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if line.endswith('\r\n'): 850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao break 860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if verbose: print 'GOT:', line.strip() 880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao splitline = line.split() 890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao tag = splitline[0] 900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao cmd = splitline[1] 910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao args = splitline[2:] 920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if hasattr(self, 'cmd_%s' % (cmd,)): 940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao getattr(self, 'cmd_%s' % (cmd,))(tag, args) 950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao else: 960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._send('%s BAD %s unknown\r\n' % (tag, cmd)) 970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def cmd_CAPABILITY(self, tag, args): 990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._send('* CAPABILITY IMAP4rev1\r\n') 1000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self._send('%s OK CAPABILITY completed\r\n' % (tag,)) 1010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1030a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass BaseThreadedNetworkedTests(unittest.TestCase): 1040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def make_server(self, addr, hdlr): 1060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao class MyServer(self.server_class): 1080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def handle_error(self, request, client_address): 1090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.close_request(request) 1100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.server_close() 1110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise 1120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if verbose: print "creating server" 1140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao server = MyServer(addr, hdlr) 1150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.assertEqual(server.server_address, server.socket.getsockname()) 1160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if verbose: 1180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao print "server created" 1190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao print "ADDR =", addr 1200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao print "CLASS =", self.server_class 1210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao print "HDLR =", server.RequestHandlerClass 1220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao t = threading.Thread( 1240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao name='%s serving' % self.server_class, 1250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao target=server.serve_forever, 1260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # Short poll interval to make the test finish quickly. 1270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # Time between requests is short enough that we won't wake 1280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # up spuriously too many times. 1290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao kwargs={'poll_interval':0.01}) 1300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao t.daemon = True # In case this function raises. 1310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao t.start() 1320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if verbose: print "server running" 1330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao return server, t 1340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def reap_server(self, server, thread): 1360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if verbose: print "waiting for server" 1370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao server.shutdown() 1380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao thread.join() 1390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if verbose: print "done" 1400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao @contextmanager 1420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def reaped_server(self, hdlr): 1430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao server, thread = self.make_server((support.HOST, 0), hdlr) 1440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao try: 1450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao yield server 1460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao finally: 1470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.reap_server(server, thread) 1480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao @reap_threads 1500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def test_connect(self): 1510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao with self.reaped_server(SimpleIMAPHandler) as server: 1520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao client = self.imap_class(*server.server_address) 1530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao client.shutdown() 1540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao @reap_threads 1560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def test_issue5949(self): 1570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao class EOFHandler(SocketServer.StreamRequestHandler): 1590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def handle(self): 1600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao # EOF without sending a complete welcome message. 1610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.wfile.write('* OK') 1620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao with self.reaped_server(EOFHandler) as server: 1640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.assertRaises(imaplib.IMAP4.abort, 1650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.imap_class, *server.server_address) 1660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1680a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass ThreadedNetworkedTests(BaseThreadedNetworkedTests): 1690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao server_class = SocketServer.TCPServer 1710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao imap_class = imaplib.IMAP4 1720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao@unittest.skipUnless(ssl, "SSL not available") 1750a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests): 1760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao server_class = SecureTCPServer 1780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao imap_class = IMAP4_SSL 1790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1810a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass RemoteIMAPTest(unittest.TestCase): 1820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao host = 'cyrus.andrew.cmu.edu' 1830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao port = 143 1840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao username = 'anonymous' 1850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao password = 'pass' 1860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao imap_class = imaplib.IMAP4 1870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def setUp(self): 1890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao with transient_internet(self.host): 1900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.server = self.imap_class(self.host, self.port) 1910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def tearDown(self): 1930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if self.server is not None: 1940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.server.logout() 1950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def test_logincapa(self): 1970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.assertTrue('LOGINDISABLED' in self.server.capabilities) 1980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 1990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def test_anonlogin(self): 2000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.assertTrue('AUTH=ANONYMOUS' in self.server.capabilities) 2010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao rs = self.server.login(self.username, self.password) 2020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.assertEqual(rs[0], 'OK') 2030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def test_logout(self): 2050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao rs = self.server.logout() 2060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.server = None 2070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.assertEqual(rs[0], 'BYE') 2080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao@unittest.skipUnless(ssl, "SSL not available") 2110a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass RemoteIMAP_SSLTest(RemoteIMAPTest): 2120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao port = 993 2130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao imap_class = IMAP4_SSL 2140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao def test_logincapa(self): 2160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.assertFalse('LOGINDISABLED' in self.server.capabilities) 2170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao self.assertTrue('AUTH=PLAIN' in self.server.capabilities) 2180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2200a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef test_main(): 2210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao tests = [TestImaplib] 2220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if support.is_resource_enabled('network'): 2240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if ssl: 2250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao global CERTFILE 2260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, 2270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao "keycert.pem") 2280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao if not os.path.exists(CERTFILE): 2290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao raise support.TestFailed("Can't read certificate files!") 2300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao tests.extend([ 2310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ThreadedNetworkedTests, ThreadedNetworkedTestsSSL, 2320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao RemoteIMAPTest, RemoteIMAP_SSLTest, 2330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao ]) 2340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao support.run_unittest(*tests) 2360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao 2380a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoif __name__ == "__main__": 2390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao support.use_resources = ['network'] 2400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao test_main() 241