10a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom test import test_support as support
20a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# If we end up with a significant number of tests that don't require
30a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# threading, this test module should be split.  Right now we skip
40a8c90248264a8b26970b4473770bcc3df8515fJosh Gao# them all if we don't have threading.
50a8c90248264a8b26970b4473770bcc3df8515fJosh Gaothreading = support.import_module('threading')
60a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
70a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom contextlib import contextmanager
80a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport imaplib
90a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport os.path
100a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport SocketServer
110a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport time
120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
130a8c90248264a8b26970b4473770bcc3df8515fJosh Gaofrom test_support import reap_threads, verbose, transient_internet
140a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoimport unittest
150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
160a8c90248264a8b26970b4473770bcc3df8515fJosh Gaotry:
170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    import ssl
180a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoexcept ImportError:
190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    ssl = None
200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
210a8c90248264a8b26970b4473770bcc3df8515fJosh GaoCERTFILE = None
220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
240a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass TestImaplib(unittest.TestCase):
250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_that_Time2Internaldate_returns_a_result(self):
270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # We can check only that it successfully produces a result,
280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # not the correctness of the result itself, since the result
290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # depends on the timezone the machine is in.
300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        timevalues = [2000000000, 2000000000.0, time.localtime(2000000000),
310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                      '"18-May-2033 05:33:20 +0200"']
320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        for t in timevalues:
340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            imaplib.Time2Internaldate(t)
350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
370a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoif ssl:
380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    class SecureTCPServer(SocketServer.TCPServer):
400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        def get_request(self):
420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            newsocket, fromaddr = self.socket.accept()
430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            connstream = ssl.wrap_socket(newsocket,
440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                                         server_side=True,
450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                                         certfile=CERTFILE)
460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            return connstream, fromaddr
470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    IMAP4_SSL = imaplib.IMAP4_SSL
490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
500a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoelse:
510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    class SecureTCPServer:
530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        pass
540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    IMAP4_SSL = None
560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
580a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass SimpleIMAPHandler(SocketServer.StreamRequestHandler):
590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    timeout = 1
610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def _send(self, message):
630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if verbose: print "SENT:", message.strip()
640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.wfile.write(message)
650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def handle(self):
670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        # Send a welcome message.
680a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._send('* OK IMAP4rev1\r\n')
690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        while 1:
700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # Gather up input until we receive a line terminator or we timeout.
710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # Accumulate read(1) because it's simpler to handle the differences
720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # between naked sockets and SSL sockets.
730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            line = ''
740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            while 1:
750a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                try:
760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    part = self.rfile.read(1)
770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    if part == '':
780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                        # Naked sockets return empty strings..
790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                        return
800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    line += part
810a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                except IOError:
820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    # ..but SSLSockets raise exceptions.
830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    return
840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                if line.endswith('\r\n'):
850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                    break
860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if verbose: print 'GOT:', line.strip()
880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            splitline = line.split()
890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            tag = splitline[0]
900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            cmd = splitline[1]
910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            args = splitline[2:]
920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if hasattr(self, 'cmd_%s' % (cmd,)):
940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                getattr(self, 'cmd_%s' % (cmd,))(tag, args)
950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            else:
960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self._send('%s BAD %s unknown\r\n' % (tag, cmd))
970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def cmd_CAPABILITY(self, tag, args):
990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._send('* CAPABILITY IMAP4rev1\r\n')
1000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self._send('%s OK CAPABILITY completed\r\n' % (tag,))
1010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1030a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass BaseThreadedNetworkedTests(unittest.TestCase):
1040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def make_server(self, addr, hdlr):
1060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        class MyServer(self.server_class):
1080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            def handle_error(self, request, client_address):
1090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self.close_request(request)
1100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self.server_close()
1110a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                raise
1120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if verbose: print "creating server"
1140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        server = MyServer(addr, hdlr)
1150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(server.server_address, server.socket.getsockname())
1160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if verbose:
1180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            print "server created"
1190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            print "ADDR =", addr
1200a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            print "CLASS =", self.server_class
1210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            print "HDLR =", server.RequestHandlerClass
1220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        t = threading.Thread(
1240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            name='%s serving' % self.server_class,
1250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            target=server.serve_forever,
1260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # Short poll interval to make the test finish quickly.
1270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # Time between requests is short enough that we won't wake
1280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            # up spuriously too many times.
1290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            kwargs={'poll_interval':0.01})
1300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        t.daemon = True  # In case this function raises.
1310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        t.start()
1320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if verbose: print "server running"
1330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        return server, t
1340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def reap_server(self, server, thread):
1360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if verbose: print "waiting for server"
1370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        server.shutdown()
1380a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        thread.join()
1390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if verbose: print "done"
1400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1410a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @contextmanager
1420a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def reaped_server(self, hdlr):
1430a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        server, thread = self.make_server((support.HOST, 0), hdlr)
1440a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        try:
1450a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            yield server
1460a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        finally:
1470a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.reap_server(server, thread)
1480a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1490a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @reap_threads
1500a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_connect(self):
1510a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        with self.reaped_server(SimpleIMAPHandler) as server:
1520a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            client = self.imap_class(*server.server_address)
1530a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            client.shutdown()
1540a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1550a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    @reap_threads
1560a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_issue5949(self):
1570a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1580a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        class EOFHandler(SocketServer.StreamRequestHandler):
1590a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            def handle(self):
1600a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                # EOF without sending a complete welcome message.
1610a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                self.wfile.write('* OK')
1620a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1630a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        with self.reaped_server(EOFHandler) as server:
1640a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.assertRaises(imaplib.IMAP4.abort,
1650a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                              self.imap_class, *server.server_address)
1660a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1670a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1680a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass ThreadedNetworkedTests(BaseThreadedNetworkedTests):
1690a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1700a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    server_class = SocketServer.TCPServer
1710a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    imap_class = imaplib.IMAP4
1720a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1730a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1740a8c90248264a8b26970b4473770bcc3df8515fJosh Gao@unittest.skipUnless(ssl, "SSL not available")
1750a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
1760a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1770a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    server_class = SecureTCPServer
1780a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    imap_class = IMAP4_SSL
1790a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1800a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1810a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass RemoteIMAPTest(unittest.TestCase):
1820a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    host = 'cyrus.andrew.cmu.edu'
1830a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    port = 143
1840a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    username = 'anonymous'
1850a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    password = 'pass'
1860a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    imap_class = imaplib.IMAP4
1870a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1880a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def setUp(self):
1890a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        with transient_internet(self.host):
1900a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.server = self.imap_class(self.host, self.port)
1910a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1920a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def tearDown(self):
1930a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if self.server is not None:
1940a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            self.server.logout()
1950a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1960a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_logincapa(self):
1970a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTrue('LOGINDISABLED' in self.server.capabilities)
1980a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
1990a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_anonlogin(self):
2000a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTrue('AUTH=ANONYMOUS' in self.server.capabilities)
2010a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        rs = self.server.login(self.username, self.password)
2020a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(rs[0], 'OK')
2030a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2040a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_logout(self):
2050a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        rs = self.server.logout()
2060a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.server = None
2070a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertEqual(rs[0], 'BYE')
2080a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2090a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2100a8c90248264a8b26970b4473770bcc3df8515fJosh Gao@unittest.skipUnless(ssl, "SSL not available")
2110a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoclass RemoteIMAP_SSLTest(RemoteIMAPTest):
2120a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    port = 993
2130a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    imap_class = IMAP4_SSL
2140a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2150a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    def test_logincapa(self):
2160a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertFalse('LOGINDISABLED' in self.server.capabilities)
2170a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        self.assertTrue('AUTH=PLAIN' in self.server.capabilities)
2180a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2190a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2200a8c90248264a8b26970b4473770bcc3df8515fJosh Gaodef test_main():
2210a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    tests = [TestImaplib]
2220a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2230a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    if support.is_resource_enabled('network'):
2240a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        if ssl:
2250a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            global CERTFILE
2260a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
2270a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                                    "keycert.pem")
2280a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            if not os.path.exists(CERTFILE):
2290a8c90248264a8b26970b4473770bcc3df8515fJosh Gao                raise support.TestFailed("Can't read certificate files!")
2300a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        tests.extend([
2310a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            ThreadedNetworkedTests, ThreadedNetworkedTestsSSL,
2320a8c90248264a8b26970b4473770bcc3df8515fJosh Gao            RemoteIMAPTest, RemoteIMAP_SSLTest,
2330a8c90248264a8b26970b4473770bcc3df8515fJosh Gao        ])
2340a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2350a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    support.run_unittest(*tests)
2360a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2370a8c90248264a8b26970b4473770bcc3df8515fJosh Gao
2380a8c90248264a8b26970b4473770bcc3df8515fJosh Gaoif __name__ == "__main__":
2390a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    support.use_resources = ['network']
2400a8c90248264a8b26970b4473770bcc3df8515fJosh Gao    test_main()
241