1import os
2import sys
3import time
4import stat
5import socket
6import email
7import email.message
8import re
9import io
10import tempfile
11from test import support
12import unittest
13import textwrap
14import mailbox
15import glob
16
17
18class TestBase:
19
20    all_mailbox_types = (mailbox.Message, mailbox.MaildirMessage,
21                         mailbox.mboxMessage, mailbox.MHMessage,
22                         mailbox.BabylMessage, mailbox.MMDFMessage)
23
24    def _check_sample(self, msg):
25        # Inspect a mailbox.Message representation of the sample message
26        self.assertIsInstance(msg, email.message.Message)
27        self.assertIsInstance(msg, mailbox.Message)
28        for key, value in _sample_headers.items():
29            self.assertIn(value, msg.get_all(key))
30        self.assertTrue(msg.is_multipart())
31        self.assertEqual(len(msg.get_payload()), len(_sample_payloads))
32        for i, payload in enumerate(_sample_payloads):
33            part = msg.get_payload(i)
34            self.assertIsInstance(part, email.message.Message)
35            self.assertNotIsInstance(part, mailbox.Message)
36            self.assertEqual(part.get_payload(), payload)
37
38    def _delete_recursively(self, target):
39        # Delete a file or delete a directory recursively
40        if os.path.isdir(target):
41            support.rmtree(target)
42        elif os.path.exists(target):
43            support.unlink(target)
44
45
46class TestMailbox(TestBase):
47
48    maxDiff = None
49
50    _factory = None     # Overridden by subclasses to reuse tests
51    _template = 'From: foo\n\n%s\n'
52
53    def setUp(self):
54        self._path = support.TESTFN
55        self._delete_recursively(self._path)
56        self._box = self._factory(self._path)
57
58    def tearDown(self):
59        self._box.close()
60        self._delete_recursively(self._path)
61
62    def test_add(self):
63        # Add copies of a sample message
64        keys = []
65        keys.append(self._box.add(self._template % 0))
66        self.assertEqual(len(self._box), 1)
67        keys.append(self._box.add(mailbox.Message(_sample_message)))
68        self.assertEqual(len(self._box), 2)
69        keys.append(self._box.add(email.message_from_string(_sample_message)))
70        self.assertEqual(len(self._box), 3)
71        keys.append(self._box.add(io.BytesIO(_bytes_sample_message)))
72        self.assertEqual(len(self._box), 4)
73        keys.append(self._box.add(_sample_message))
74        self.assertEqual(len(self._box), 5)
75        keys.append(self._box.add(_bytes_sample_message))
76        self.assertEqual(len(self._box), 6)
77        with self.assertWarns(DeprecationWarning):
78            keys.append(self._box.add(
79                io.TextIOWrapper(io.BytesIO(_bytes_sample_message))))
80        self.assertEqual(len(self._box), 7)
81        self.assertEqual(self._box.get_string(keys[0]), self._template % 0)
82        for i in (1, 2, 3, 4, 5, 6):
83            self._check_sample(self._box[keys[i]])
84
85    _nonascii_msg = textwrap.dedent("""\
86            From: foo
87            Subject: Falinaptár házhozszállítással. Már rendeltél?
88
89            0
90            """)
91
92    def test_add_invalid_8bit_bytes_header(self):
93        key = self._box.add(self._nonascii_msg.encode('latin-1'))
94        self.assertEqual(len(self._box), 1)
95        self.assertEqual(self._box.get_bytes(key),
96            self._nonascii_msg.encode('latin-1'))
97
98    def test_invalid_nonascii_header_as_string(self):
99        subj = self._nonascii_msg.splitlines()[1]
100        key = self._box.add(subj.encode('latin-1'))
101        self.assertEqual(self._box.get_string(key),
102            'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz'
103            'YWwuIE3hciByZW5kZWx06Ww/?=\n\n')
104
105    def test_add_nonascii_string_header_raises(self):
106        with self.assertRaisesRegex(ValueError, "ASCII-only"):
107            self._box.add(self._nonascii_msg)
108        self._box.flush()
109        self.assertEqual(len(self._box), 0)
110        self.assertMailboxEmpty()
111
112    def test_add_that_raises_leaves_mailbox_empty(self):
113        def raiser(*args, **kw):
114            raise Exception("a fake error")
115        support.patch(self, email.generator.BytesGenerator, 'flatten', raiser)
116        with self.assertRaises(Exception):
117            self._box.add(email.message_from_string("From: Alphöso"))
118        self.assertEqual(len(self._box), 0)
119        self._box.close()
120        self.assertMailboxEmpty()
121
122    _non_latin_bin_msg = textwrap.dedent("""\
123        From: foo@bar.com
124        To: báz
125        Subject: Maintenant je vous présente mon collègue, le pouf célèbre
126        \tJean de Baddie
127        Mime-Version: 1.0
128        Content-Type: text/plain; charset="utf-8"
129        Content-Transfer-Encoding: 8bit
130
131        Да, они летят.
132        """).encode('utf-8')
133
134    def test_add_8bit_body(self):
135        key = self._box.add(self._non_latin_bin_msg)
136        self.assertEqual(self._box.get_bytes(key),
137                         self._non_latin_bin_msg)
138        with self._box.get_file(key) as f:
139            self.assertEqual(f.read(),
140                             self._non_latin_bin_msg.replace(b'\n',
141                                os.linesep.encode()))
142        self.assertEqual(self._box[key].get_payload(),
143                        "Да, они летят.\n")
144
145    def test_add_binary_file(self):
146        with tempfile.TemporaryFile('wb+') as f:
147            f.write(_bytes_sample_message)
148            f.seek(0)
149            key = self._box.add(f)
150        self.assertEqual(self._box.get_bytes(key).split(b'\n'),
151            _bytes_sample_message.split(b'\n'))
152
153    def test_add_binary_nonascii_file(self):
154        with tempfile.TemporaryFile('wb+') as f:
155            f.write(self._non_latin_bin_msg)
156            f.seek(0)
157            key = self._box.add(f)
158        self.assertEqual(self._box.get_bytes(key).split(b'\n'),
159            self._non_latin_bin_msg.split(b'\n'))
160
161    def test_add_text_file_warns(self):
162        with tempfile.TemporaryFile('w+') as f:
163            f.write(_sample_message)
164            f.seek(0)
165            with self.assertWarns(DeprecationWarning):
166                key = self._box.add(f)
167        self.assertEqual(self._box.get_bytes(key).split(b'\n'),
168            _bytes_sample_message.split(b'\n'))
169
170    def test_add_StringIO_warns(self):
171        with self.assertWarns(DeprecationWarning):
172            key = self._box.add(io.StringIO(self._template % "0"))
173        self.assertEqual(self._box.get_string(key), self._template % "0")
174
175    def test_add_nonascii_StringIO_raises(self):
176        with self.assertWarns(DeprecationWarning):
177            with self.assertRaisesRegex(ValueError, "ASCII-only"):
178                self._box.add(io.StringIO(self._nonascii_msg))
179        self.assertEqual(len(self._box), 0)
180        self._box.close()
181        self.assertMailboxEmpty()
182
183    def test_remove(self):
184        # Remove messages using remove()
185        self._test_remove_or_delitem(self._box.remove)
186
187    def test_delitem(self):
188        # Remove messages using __delitem__()
189        self._test_remove_or_delitem(self._box.__delitem__)
190
191    def _test_remove_or_delitem(self, method):
192        # (Used by test_remove() and test_delitem().)
193        key0 = self._box.add(self._template % 0)
194        key1 = self._box.add(self._template % 1)
195        self.assertEqual(len(self._box), 2)
196        method(key0)
197        self.assertEqual(len(self._box), 1)
198        self.assertRaises(KeyError, lambda: self._box[key0])
199        self.assertRaises(KeyError, lambda: method(key0))
200        self.assertEqual(self._box.get_string(key1), self._template % 1)
201        key2 = self._box.add(self._template % 2)
202        self.assertEqual(len(self._box), 2)
203        method(key2)
204        self.assertEqual(len(self._box), 1)
205        self.assertRaises(KeyError, lambda: self._box[key2])
206        self.assertRaises(KeyError, lambda: method(key2))
207        self.assertEqual(self._box.get_string(key1), self._template % 1)
208        method(key1)
209        self.assertEqual(len(self._box), 0)
210        self.assertRaises(KeyError, lambda: self._box[key1])
211        self.assertRaises(KeyError, lambda: method(key1))
212
213    def test_discard(self, repetitions=10):
214        # Discard messages
215        key0 = self._box.add(self._template % 0)
216        key1 = self._box.add(self._template % 1)
217        self.assertEqual(len(self._box), 2)
218        self._box.discard(key0)
219        self.assertEqual(len(self._box), 1)
220        self.assertRaises(KeyError, lambda: self._box[key0])
221        self._box.discard(key0)
222        self.assertEqual(len(self._box), 1)
223        self.assertRaises(KeyError, lambda: self._box[key0])
224
225    def test_get(self):
226        # Retrieve messages using get()
227        key0 = self._box.add(self._template % 0)
228        msg = self._box.get(key0)
229        self.assertEqual(msg['from'], 'foo')
230        self.assertEqual(msg.get_payload(), '0\n')
231        self.assertIsNone(self._box.get('foo'))
232        self.assertIs(self._box.get('foo', False), False)
233        self._box.close()
234        self._box = self._factory(self._path)
235        key1 = self._box.add(self._template % 1)
236        msg = self._box.get(key1)
237        self.assertEqual(msg['from'], 'foo')
238        self.assertEqual(msg.get_payload(), '1\n')
239
240    def test_getitem(self):
241        # Retrieve message using __getitem__()
242        key0 = self._box.add(self._template % 0)
243        msg = self._box[key0]
244        self.assertEqual(msg['from'], 'foo')
245        self.assertEqual(msg.get_payload(), '0\n')
246        self.assertRaises(KeyError, lambda: self._box['foo'])
247        self._box.discard(key0)
248        self.assertRaises(KeyError, lambda: self._box[key0])
249
250    def test_get_message(self):
251        # Get Message representations of messages
252        key0 = self._box.add(self._template % 0)
253        key1 = self._box.add(_sample_message)
254        msg0 = self._box.get_message(key0)
255        self.assertIsInstance(msg0, mailbox.Message)
256        self.assertEqual(msg0['from'], 'foo')
257        self.assertEqual(msg0.get_payload(), '0\n')
258        self._check_sample(self._box.get_message(key1))
259
260    def test_get_bytes(self):
261        # Get bytes representations of messages
262        key0 = self._box.add(self._template % 0)
263        key1 = self._box.add(_sample_message)
264        self.assertEqual(self._box.get_bytes(key0),
265            (self._template % 0).encode('ascii'))
266        self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message)
267
268    def test_get_string(self):
269        # Get string representations of messages
270        key0 = self._box.add(self._template % 0)
271        key1 = self._box.add(_sample_message)
272        self.assertEqual(self._box.get_string(key0), self._template % 0)
273        self.assertEqual(self._box.get_string(key1).split('\n'),
274                         _sample_message.split('\n'))
275
276    def test_get_file(self):
277        # Get file representations of messages
278        key0 = self._box.add(self._template % 0)
279        key1 = self._box.add(_sample_message)
280        with self._box.get_file(key0) as file:
281            data0 = file.read()
282        with self._box.get_file(key1) as file:
283            data1 = file.read()
284        self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'),
285                         self._template % 0)
286        self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'),
287                         _sample_message)
288
289    def test_get_file_can_be_closed_twice(self):
290        # Issue 11700
291        key = self._box.add(_sample_message)
292        f = self._box.get_file(key)
293        f.close()
294        f.close()
295
296    def test_iterkeys(self):
297        # Get keys using iterkeys()
298        self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False)
299
300    def test_keys(self):
301        # Get keys using keys()
302        self._check_iteration(self._box.keys, do_keys=True, do_values=False)
303
304    def test_itervalues(self):
305        # Get values using itervalues()
306        self._check_iteration(self._box.itervalues, do_keys=False,
307                              do_values=True)
308
309    def test_iter(self):
310        # Get values using __iter__()
311        self._check_iteration(self._box.__iter__, do_keys=False,
312                              do_values=True)
313
314    def test_values(self):
315        # Get values using values()
316        self._check_iteration(self._box.values, do_keys=False, do_values=True)
317
318    def test_iteritems(self):
319        # Get keys and values using iteritems()
320        self._check_iteration(self._box.iteritems, do_keys=True,
321                              do_values=True)
322
323    def test_items(self):
324        # Get keys and values using items()
325        self._check_iteration(self._box.items, do_keys=True, do_values=True)
326
327    def _check_iteration(self, method, do_keys, do_values, repetitions=10):
328        for value in method():
329            self.fail("Not empty")
330        keys, values = [], []
331        for i in range(repetitions):
332            keys.append(self._box.add(self._template % i))
333            values.append(self._template % i)
334        if do_keys and not do_values:
335            returned_keys = list(method())
336        elif do_values and not do_keys:
337            returned_values = list(method())
338        else:
339            returned_keys, returned_values = [], []
340            for key, value in method():
341                returned_keys.append(key)
342                returned_values.append(value)
343        if do_keys:
344            self.assertEqual(len(keys), len(returned_keys))
345            self.assertEqual(set(keys), set(returned_keys))
346        if do_values:
347            count = 0
348            for value in returned_values:
349                self.assertEqual(value['from'], 'foo')
350                self.assertLess(int(value.get_payload()), repetitions)
351                count += 1
352            self.assertEqual(len(values), count)
353
354    def test_contains(self):
355        # Check existence of keys using __contains__()
356        self.assertNotIn('foo', self._box)
357        key0 = self._box.add(self._template % 0)
358        self.assertIn(key0, self._box)
359        self.assertNotIn('foo', self._box)
360        key1 = self._box.add(self._template % 1)
361        self.assertIn(key1, self._box)
362        self.assertIn(key0, self._box)
363        self.assertNotIn('foo', self._box)
364        self._box.remove(key0)
365        self.assertNotIn(key0, self._box)
366        self.assertIn(key1, self._box)
367        self.assertNotIn('foo', self._box)
368        self._box.remove(key1)
369        self.assertNotIn(key1, self._box)
370        self.assertNotIn(key0, self._box)
371        self.assertNotIn('foo', self._box)
372
373    def test_len(self, repetitions=10):
374        # Get message count
375        keys = []
376        for i in range(repetitions):
377            self.assertEqual(len(self._box), i)
378            keys.append(self._box.add(self._template % i))
379            self.assertEqual(len(self._box), i + 1)
380        for i in range(repetitions):
381            self.assertEqual(len(self._box), repetitions - i)
382            self._box.remove(keys[i])
383            self.assertEqual(len(self._box), repetitions - i - 1)
384
385    def test_set_item(self):
386        # Modify messages using __setitem__()
387        key0 = self._box.add(self._template % 'original 0')
388        self.assertEqual(self._box.get_string(key0),
389                         self._template % 'original 0')
390        key1 = self._box.add(self._template % 'original 1')
391        self.assertEqual(self._box.get_string(key1),
392                         self._template % 'original 1')
393        self._box[key0] = self._template % 'changed 0'
394        self.assertEqual(self._box.get_string(key0),
395                         self._template % 'changed 0')
396        self._box[key1] = self._template % 'changed 1'
397        self.assertEqual(self._box.get_string(key1),
398                         self._template % 'changed 1')
399        self._box[key0] = _sample_message
400        self._check_sample(self._box[key0])
401        self._box[key1] = self._box[key0]
402        self._check_sample(self._box[key1])
403        self._box[key0] = self._template % 'original 0'
404        self.assertEqual(self._box.get_string(key0),
405                     self._template % 'original 0')
406        self._check_sample(self._box[key1])
407        self.assertRaises(KeyError,
408                          lambda: self._box.__setitem__('foo', 'bar'))
409        self.assertRaises(KeyError, lambda: self._box['foo'])
410        self.assertEqual(len(self._box), 2)
411
412    def test_clear(self, iterations=10):
413        # Remove all messages using clear()
414        keys = []
415        for i in range(iterations):
416            self._box.add(self._template % i)
417        for i, key in enumerate(keys):
418            self.assertEqual(self._box.get_string(key), self._template % i)
419        self._box.clear()
420        self.assertEqual(len(self._box), 0)
421        for i, key in enumerate(keys):
422            self.assertRaises(KeyError, lambda: self._box.get_string(key))
423
424    def test_pop(self):
425        # Get and remove a message using pop()
426        key0 = self._box.add(self._template % 0)
427        self.assertIn(key0, self._box)
428        key1 = self._box.add(self._template % 1)
429        self.assertIn(key1, self._box)
430        self.assertEqual(self._box.pop(key0).get_payload(), '0\n')
431        self.assertNotIn(key0, self._box)
432        self.assertIn(key1, self._box)
433        key2 = self._box.add(self._template % 2)
434        self.assertIn(key2, self._box)
435        self.assertEqual(self._box.pop(key2).get_payload(), '2\n')
436        self.assertNotIn(key2, self._box)
437        self.assertIn(key1, self._box)
438        self.assertEqual(self._box.pop(key1).get_payload(), '1\n')
439        self.assertNotIn(key1, self._box)
440        self.assertEqual(len(self._box), 0)
441
442    def test_popitem(self, iterations=10):
443        # Get and remove an arbitrary (key, message) using popitem()
444        keys = []
445        for i in range(10):
446            keys.append(self._box.add(self._template % i))
447        seen = []
448        for i in range(10):
449            key, msg = self._box.popitem()
450            self.assertIn(key, keys)
451            self.assertNotIn(key, seen)
452            seen.append(key)
453            self.assertEqual(int(msg.get_payload()), keys.index(key))
454        self.assertEqual(len(self._box), 0)
455        for key in keys:
456            self.assertRaises(KeyError, lambda: self._box[key])
457
458    def test_update(self):
459        # Modify multiple messages using update()
460        key0 = self._box.add(self._template % 'original 0')
461        key1 = self._box.add(self._template % 'original 1')
462        key2 = self._box.add(self._template % 'original 2')
463        self._box.update({key0: self._template % 'changed 0',
464                          key2: _sample_message})
465        self.assertEqual(len(self._box), 3)
466        self.assertEqual(self._box.get_string(key0),
467                     self._template % 'changed 0')
468        self.assertEqual(self._box.get_string(key1),
469                     self._template % 'original 1')
470        self._check_sample(self._box[key2])
471        self._box.update([(key2, self._template % 'changed 2'),
472                    (key1, self._template % 'changed 1'),
473                    (key0, self._template % 'original 0')])
474        self.assertEqual(len(self._box), 3)
475        self.assertEqual(self._box.get_string(key0),
476                     self._template % 'original 0')
477        self.assertEqual(self._box.get_string(key1),
478                     self._template % 'changed 1')
479        self.assertEqual(self._box.get_string(key2),
480                     self._template % 'changed 2')
481        self.assertRaises(KeyError,
482                          lambda: self._box.update({'foo': 'bar',
483                                          key0: self._template % "changed 0"}))
484        self.assertEqual(len(self._box), 3)
485        self.assertEqual(self._box.get_string(key0),
486                     self._template % "changed 0")
487        self.assertEqual(self._box.get_string(key1),
488                     self._template % "changed 1")
489        self.assertEqual(self._box.get_string(key2),
490                     self._template % "changed 2")
491
492    def test_flush(self):
493        # Write changes to disk
494        self._test_flush_or_close(self._box.flush, True)
495
496    def test_popitem_and_flush_twice(self):
497        # See #15036.
498        self._box.add(self._template % 0)
499        self._box.add(self._template % 1)
500        self._box.flush()
501
502        self._box.popitem()
503        self._box.flush()
504        self._box.popitem()
505        self._box.flush()
506
507    def test_lock_unlock(self):
508        # Lock and unlock the mailbox
509        self.assertFalse(os.path.exists(self._get_lock_path()))
510        self._box.lock()
511        self.assertTrue(os.path.exists(self._get_lock_path()))
512        self._box.unlock()
513        self.assertFalse(os.path.exists(self._get_lock_path()))
514
515    def test_close(self):
516        # Close mailbox and flush changes to disk
517        self._test_flush_or_close(self._box.close, False)
518
519    def _test_flush_or_close(self, method, should_call_close):
520        contents = [self._template % i for i in range(3)]
521        self._box.add(contents[0])
522        self._box.add(contents[1])
523        self._box.add(contents[2])
524        oldbox = self._box
525        method()
526        if should_call_close:
527            self._box.close()
528        self._box = self._factory(self._path)
529        keys = self._box.keys()
530        self.assertEqual(len(keys), 3)
531        for key in keys:
532            self.assertIn(self._box.get_string(key), contents)
533        oldbox.close()
534
535    def test_dump_message(self):
536        # Write message representations to disk
537        for input in (email.message_from_string(_sample_message),
538                      _sample_message, io.BytesIO(_bytes_sample_message)):
539            output = io.BytesIO()
540            self._box._dump_message(input, output)
541            self.assertEqual(output.getvalue(),
542                _bytes_sample_message.replace(b'\n', os.linesep.encode()))
543        output = io.BytesIO()
544        self.assertRaises(TypeError,
545                          lambda: self._box._dump_message(None, output))
546
547    def _get_lock_path(self):
548        # Return the path of the dot lock file. May be overridden.
549        return self._path + '.lock'
550
551
552class TestMailboxSuperclass(TestBase, unittest.TestCase):
553
554    def test_notimplemented(self):
555        # Test that all Mailbox methods raise NotImplementedException.
556        box = mailbox.Mailbox('path')
557        self.assertRaises(NotImplementedError, lambda: box.add(''))
558        self.assertRaises(NotImplementedError, lambda: box.remove(''))
559        self.assertRaises(NotImplementedError, lambda: box.__delitem__(''))
560        self.assertRaises(NotImplementedError, lambda: box.discard(''))
561        self.assertRaises(NotImplementedError, lambda: box.__setitem__('', ''))
562        self.assertRaises(NotImplementedError, lambda: box.iterkeys())
563        self.assertRaises(NotImplementedError, lambda: box.keys())
564        self.assertRaises(NotImplementedError, lambda: box.itervalues().__next__())
565        self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__())
566        self.assertRaises(NotImplementedError, lambda: box.values())
567        self.assertRaises(NotImplementedError, lambda: box.iteritems().__next__())
568        self.assertRaises(NotImplementedError, lambda: box.items())
569        self.assertRaises(NotImplementedError, lambda: box.get(''))
570        self.assertRaises(NotImplementedError, lambda: box.__getitem__(''))
571        self.assertRaises(NotImplementedError, lambda: box.get_message(''))
572        self.assertRaises(NotImplementedError, lambda: box.get_string(''))
573        self.assertRaises(NotImplementedError, lambda: box.get_bytes(''))
574        self.assertRaises(NotImplementedError, lambda: box.get_file(''))
575        self.assertRaises(NotImplementedError, lambda: '' in box)
576        self.assertRaises(NotImplementedError, lambda: box.__contains__(''))
577        self.assertRaises(NotImplementedError, lambda: box.__len__())
578        self.assertRaises(NotImplementedError, lambda: box.clear())
579        self.assertRaises(NotImplementedError, lambda: box.pop(''))
580        self.assertRaises(NotImplementedError, lambda: box.popitem())
581        self.assertRaises(NotImplementedError, lambda: box.update((('', ''),)))
582        self.assertRaises(NotImplementedError, lambda: box.flush())
583        self.assertRaises(NotImplementedError, lambda: box.lock())
584        self.assertRaises(NotImplementedError, lambda: box.unlock())
585        self.assertRaises(NotImplementedError, lambda: box.close())
586
587
588class TestMaildir(TestMailbox, unittest.TestCase):
589
590    _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
591
592    def setUp(self):
593        TestMailbox.setUp(self)
594        if (os.name == 'nt') or (sys.platform == 'cygwin'):
595            self._box.colon = '!'
596
597    def assertMailboxEmpty(self):
598        self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), [])
599
600    def test_add_MM(self):
601        # Add a MaildirMessage instance
602        msg = mailbox.MaildirMessage(self._template % 0)
603        msg.set_subdir('cur')
604        msg.set_info('foo')
605        key = self._box.add(msg)
606        self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' %
607                                                 (key, self._box.colon))))
608
609    def test_get_MM(self):
610        # Get a MaildirMessage instance
611        msg = mailbox.MaildirMessage(self._template % 0)
612        msg.set_subdir('cur')
613        msg.set_flags('RF')
614        key = self._box.add(msg)
615        msg_returned = self._box.get_message(key)
616        self.assertIsInstance(msg_returned, mailbox.MaildirMessage)
617        self.assertEqual(msg_returned.get_subdir(), 'cur')
618        self.assertEqual(msg_returned.get_flags(), 'FR')
619
620    def test_set_MM(self):
621        # Set with a MaildirMessage instance
622        msg0 = mailbox.MaildirMessage(self._template % 0)
623        msg0.set_flags('TP')
624        key = self._box.add(msg0)
625        msg_returned = self._box.get_message(key)
626        self.assertEqual(msg_returned.get_subdir(), 'new')
627        self.assertEqual(msg_returned.get_flags(), 'PT')
628        msg1 = mailbox.MaildirMessage(self._template % 1)
629        self._box[key] = msg1
630        msg_returned = self._box.get_message(key)
631        self.assertEqual(msg_returned.get_subdir(), 'new')
632        self.assertEqual(msg_returned.get_flags(), '')
633        self.assertEqual(msg_returned.get_payload(), '1\n')
634        msg2 = mailbox.MaildirMessage(self._template % 2)
635        msg2.set_info('2,S')
636        self._box[key] = msg2
637        self._box[key] = self._template % 3
638        msg_returned = self._box.get_message(key)
639        self.assertEqual(msg_returned.get_subdir(), 'new')
640        self.assertEqual(msg_returned.get_flags(), 'S')
641        self.assertEqual(msg_returned.get_payload(), '3\n')
642
643    def test_consistent_factory(self):
644        # Add a message.
645        msg = mailbox.MaildirMessage(self._template % 0)
646        msg.set_subdir('cur')
647        msg.set_flags('RF')
648        key = self._box.add(msg)
649
650        # Create new mailbox with
651        class FakeMessage(mailbox.MaildirMessage):
652            pass
653        box = mailbox.Maildir(self._path, factory=FakeMessage)
654        box.colon = self._box.colon
655        msg2 = box.get_message(key)
656        self.assertIsInstance(msg2, FakeMessage)
657
658    def test_initialize_new(self):
659        # Initialize a non-existent mailbox
660        self.tearDown()
661        self._box = mailbox.Maildir(self._path)
662        self._check_basics()
663        self._delete_recursively(self._path)
664        self._box = self._factory(self._path, factory=None)
665        self._check_basics()
666
667    def test_initialize_existing(self):
668        # Initialize an existing mailbox
669        self.tearDown()
670        for subdir in '', 'tmp', 'new', 'cur':
671            os.mkdir(os.path.normpath(os.path.join(self._path, subdir)))
672        self._box = mailbox.Maildir(self._path)
673        self._check_basics()
674
675    def _check_basics(self, factory=None):
676        # (Used by test_open_new() and test_open_existing().)
677        self.assertEqual(self._box._path, os.path.abspath(self._path))
678        self.assertEqual(self._box._factory, factory)
679        for subdir in '', 'tmp', 'new', 'cur':
680            path = os.path.join(self._path, subdir)
681            mode = os.stat(path)[stat.ST_MODE]
682            self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path)
683
684    def test_list_folders(self):
685        # List folders
686        self._box.add_folder('one')
687        self._box.add_folder('two')
688        self._box.add_folder('three')
689        self.assertEqual(len(self._box.list_folders()), 3)
690        self.assertEqual(set(self._box.list_folders()),
691                     set(('one', 'two', 'three')))
692
693    def test_get_folder(self):
694        # Open folders
695        self._box.add_folder('foo.bar')
696        folder0 = self._box.get_folder('foo.bar')
697        folder0.add(self._template % 'bar')
698        self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar')))
699        folder1 = self._box.get_folder('foo.bar')
700        self.assertEqual(folder1.get_string(folder1.keys()[0]),
701                         self._template % 'bar')
702
703    def test_add_and_remove_folders(self):
704        # Delete folders
705        self._box.add_folder('one')
706        self._box.add_folder('two')
707        self.assertEqual(len(self._box.list_folders()), 2)
708        self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
709        self._box.remove_folder('one')
710        self.assertEqual(len(self._box.list_folders()), 1)
711        self.assertEqual(set(self._box.list_folders()), set(('two',)))
712        self._box.add_folder('three')
713        self.assertEqual(len(self._box.list_folders()), 2)
714        self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
715        self._box.remove_folder('three')
716        self.assertEqual(len(self._box.list_folders()), 1)
717        self.assertEqual(set(self._box.list_folders()), set(('two',)))
718        self._box.remove_folder('two')
719        self.assertEqual(len(self._box.list_folders()), 0)
720        self.assertEqual(self._box.list_folders(), [])
721
722    def test_clean(self):
723        # Remove old files from 'tmp'
724        foo_path = os.path.join(self._path, 'tmp', 'foo')
725        bar_path = os.path.join(self._path, 'tmp', 'bar')
726        with open(foo_path, 'w') as f:
727            f.write("@")
728        with open(bar_path, 'w') as f:
729            f.write("@")
730        self._box.clean()
731        self.assertTrue(os.path.exists(foo_path))
732        self.assertTrue(os.path.exists(bar_path))
733        foo_stat = os.stat(foo_path)
734        os.utime(foo_path, (time.time() - 129600 - 2,
735                            foo_stat.st_mtime))
736        self._box.clean()
737        self.assertFalse(os.path.exists(foo_path))
738        self.assertTrue(os.path.exists(bar_path))
739
740    def test_create_tmp(self, repetitions=10):
741        # Create files in tmp directory
742        hostname = socket.gethostname()
743        if '/' in hostname:
744            hostname = hostname.replace('/', r'\057')
745        if ':' in hostname:
746            hostname = hostname.replace(':', r'\072')
747        pid = os.getpid()
748        pattern = re.compile(r"(?P<time>\d+)\.M(?P<M>\d{1,6})P(?P<P>\d+)"
749                             r"Q(?P<Q>\d+)\.(?P<host>[^:/]+)")
750        previous_groups = None
751        for x in range(repetitions):
752            tmp_file = self._box._create_tmp()
753            head, tail = os.path.split(tmp_file.name)
754            self.assertEqual(head, os.path.abspath(os.path.join(self._path,
755                                                                "tmp")),
756                             "File in wrong location: '%s'" % head)
757            match = pattern.match(tail)
758            self.assertIsNotNone(match, "Invalid file name: '%s'" % tail)
759            groups = match.groups()
760            if previous_groups is not None:
761                self.assertGreaterEqual(int(groups[0]), int(previous_groups[0]),
762                             "Non-monotonic seconds: '%s' before '%s'" %
763                             (previous_groups[0], groups[0]))
764                if int(groups[0]) == int(previous_groups[0]):
765                    self.assertGreaterEqual(int(groups[1]), int(previous_groups[1]),
766                                "Non-monotonic milliseconds: '%s' before '%s'" %
767                                (previous_groups[1], groups[1]))
768                self.assertEqual(int(groups[2]), pid,
769                             "Process ID mismatch: '%s' should be '%s'" %
770                             (groups[2], pid))
771                self.assertEqual(int(groups[3]), int(previous_groups[3]) + 1,
772                             "Non-sequential counter: '%s' before '%s'" %
773                             (previous_groups[3], groups[3]))
774                self.assertEqual(groups[4], hostname,
775                             "Host name mismatch: '%s' should be '%s'" %
776                             (groups[4], hostname))
777            previous_groups = groups
778            tmp_file.write(_bytes_sample_message)
779            tmp_file.seek(0)
780            self.assertEqual(tmp_file.read(), _bytes_sample_message)
781            tmp_file.close()
782        file_count = len(os.listdir(os.path.join(self._path, "tmp")))
783        self.assertEqual(file_count, repetitions,
784                     "Wrong file count: '%s' should be '%s'" %
785                     (file_count, repetitions))
786
787    def test_refresh(self):
788        # Update the table of contents
789        self.assertEqual(self._box._toc, {})
790        key0 = self._box.add(self._template % 0)
791        key1 = self._box.add(self._template % 1)
792        self.assertEqual(self._box._toc, {})
793        self._box._refresh()
794        self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
795                                          key1: os.path.join('new', key1)})
796        key2 = self._box.add(self._template % 2)
797        self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
798                                          key1: os.path.join('new', key1)})
799        self._box._refresh()
800        self.assertEqual(self._box._toc, {key0: os.path.join('new', key0),
801                                          key1: os.path.join('new', key1),
802                                          key2: os.path.join('new', key2)})
803
804    def test_refresh_after_safety_period(self):
805        # Issue #13254: Call _refresh after the "file system safety
806        # period" of 2 seconds has passed; _toc should still be
807        # updated because this is the first call to _refresh.
808        key0 = self._box.add(self._template % 0)
809        key1 = self._box.add(self._template % 1)
810
811        self._box = self._factory(self._path)
812        self.assertEqual(self._box._toc, {})
813
814        # Emulate sleeping. Instead of sleeping for 2 seconds, use the
815        # skew factor to make _refresh think that the filesystem
816        # safety period has passed and re-reading the _toc is only
817        # required if mtimes differ.
818        self._box._skewfactor = -3
819
820        self._box._refresh()
821        self.assertEqual(sorted(self._box._toc.keys()), sorted([key0, key1]))
822
823    def test_lookup(self):
824        # Look up message subpaths in the TOC
825        self.assertRaises(KeyError, lambda: self._box._lookup('foo'))
826        key0 = self._box.add(self._template % 0)
827        self.assertEqual(self._box._lookup(key0), os.path.join('new', key0))
828        os.remove(os.path.join(self._path, 'new', key0))
829        self.assertEqual(self._box._toc, {key0: os.path.join('new', key0)})
830        # Be sure that the TOC is read back from disk (see issue #6896
831        # about bad mtime behaviour on some systems).
832        self._box.flush()
833        self.assertRaises(KeyError, lambda: self._box._lookup(key0))
834        self.assertEqual(self._box._toc, {})
835
836    def test_lock_unlock(self):
837        # Lock and unlock the mailbox. For Maildir, this does nothing.
838        self._box.lock()
839        self._box.unlock()
840
841    def test_folder (self):
842        # Test for bug #1569790: verify that folders returned by .get_folder()
843        # use the same factory function.
844        def dummy_factory (s):
845            return None
846        box = self._factory(self._path, factory=dummy_factory)
847        folder = box.add_folder('folder1')
848        self.assertIs(folder._factory, dummy_factory)
849
850        folder1_alias = box.get_folder('folder1')
851        self.assertIs(folder1_alias._factory, dummy_factory)
852
853    def test_directory_in_folder (self):
854        # Test that mailboxes still work if there's a stray extra directory
855        # in a folder.
856        for i in range(10):
857            self._box.add(mailbox.Message(_sample_message))
858
859        # Create a stray directory
860        os.mkdir(os.path.join(self._path, 'cur', 'stray-dir'))
861
862        # Check that looping still works with the directory present.
863        for msg in self._box:
864            pass
865
866    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
867    @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
868    def test_file_permissions(self):
869        # Verify that message files are created without execute permissions
870        msg = mailbox.MaildirMessage(self._template % 0)
871        orig_umask = os.umask(0)
872        try:
873            key = self._box.add(msg)
874        finally:
875            os.umask(orig_umask)
876        path = os.path.join(self._path, self._box._lookup(key))
877        mode = os.stat(path).st_mode
878        self.assertFalse(mode & 0o111)
879
880    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
881    @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
882    def test_folder_file_perms(self):
883        # From bug #3228, we want to verify that the file created inside a Maildir
884        # subfolder isn't marked as executable.
885        orig_umask = os.umask(0)
886        try:
887            subfolder = self._box.add_folder('subfolder')
888        finally:
889            os.umask(orig_umask)
890
891        path = os.path.join(subfolder._path, 'maildirfolder')
892        st = os.stat(path)
893        perms = st.st_mode
894        self.assertFalse((perms & 0o111)) # Execute bits should all be off.
895
896    def test_reread(self):
897        # Do an initial unconditional refresh
898        self._box._refresh()
899
900        # Put the last modified times more than two seconds into the past
901        # (because mtime may have a two second granularity)
902        for subdir in ('cur', 'new'):
903            os.utime(os.path.join(self._box._path, subdir),
904                     (time.time()-5,)*2)
905
906        # Because mtime has a two second granularity in worst case (FAT), a
907        # refresh is done unconditionally if called for within
908        # two-second-plus-a-bit of the last one, just in case the mbox has
909        # changed; so now we have to wait for that interval to expire.
910        #
911        # Because this is a test, emulate sleeping. Instead of
912        # sleeping for 2 seconds, use the skew factor to make _refresh
913        # think that 2 seconds have passed and re-reading the _toc is
914        # only required if mtimes differ.
915        self._box._skewfactor = -3
916
917        # Re-reading causes the ._toc attribute to be assigned a new dictionary
918        # object, so we'll check that the ._toc attribute isn't a different
919        # object.
920        orig_toc = self._box._toc
921        def refreshed():
922            return self._box._toc is not orig_toc
923
924        self._box._refresh()
925        self.assertFalse(refreshed())
926
927        # Now, write something into cur and remove it.  This changes
928        # the mtime and should cause a re-read. Note that "sleep
929        # emulation" is still in effect, as skewfactor is -3.
930        filename = os.path.join(self._path, 'cur', 'stray-file')
931        support.create_empty_file(filename)
932        os.unlink(filename)
933        self._box._refresh()
934        self.assertTrue(refreshed())
935
936
937class _TestSingleFile(TestMailbox):
938    '''Common tests for single-file mailboxes'''
939
940    def test_add_doesnt_rewrite(self):
941        # When only adding messages, flush() should not rewrite the
942        # mailbox file. See issue #9559.
943
944        # Inode number changes if the contents are written to another
945        # file which is then renamed over the original file. So we
946        # must check that the inode number doesn't change.
947        inode_before = os.stat(self._path).st_ino
948
949        self._box.add(self._template % 0)
950        self._box.flush()
951
952        inode_after = os.stat(self._path).st_ino
953        self.assertEqual(inode_before, inode_after)
954
955        # Make sure the message was really added
956        self._box.close()
957        self._box = self._factory(self._path)
958        self.assertEqual(len(self._box), 1)
959
960    def test_permissions_after_flush(self):
961        # See issue #5346
962
963        # Make the mailbox world writable. It's unlikely that the new
964        # mailbox file would have these permissions after flush(),
965        # because umask usually prevents it.
966        mode = os.stat(self._path).st_mode | 0o666
967        os.chmod(self._path, mode)
968
969        self._box.add(self._template % 0)
970        i = self._box.add(self._template % 1)
971        # Need to remove one message to make flush() create a new file
972        self._box.remove(i)
973        self._box.flush()
974
975        self.assertEqual(os.stat(self._path).st_mode, mode)
976
977
978class _TestMboxMMDF(_TestSingleFile):
979
980    def tearDown(self):
981        super().tearDown()
982        self._box.close()
983        self._delete_recursively(self._path)
984        for lock_remnant in glob.glob(self._path + '.*'):
985            support.unlink(lock_remnant)
986
987    def assertMailboxEmpty(self):
988        with open(self._path) as f:
989            self.assertEqual(f.readlines(), [])
990
991    def test_add_from_string(self):
992        # Add a string starting with 'From ' to the mailbox
993        key = self._box.add('From foo@bar blah\nFrom: foo\n\n0\n')
994        self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
995        self.assertEqual(self._box[key].get_payload(), '0\n')
996
997    def test_add_from_bytes(self):
998        # Add a byte string starting with 'From ' to the mailbox
999        key = self._box.add(b'From foo@bar blah\nFrom: foo\n\n0\n')
1000        self.assertEqual(self._box[key].get_from(), 'foo@bar blah')
1001        self.assertEqual(self._box[key].get_payload(), '0\n')
1002
1003    def test_add_mbox_or_mmdf_message(self):
1004        # Add an mboxMessage or MMDFMessage
1005        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1006            msg = class_('From foo@bar blah\nFrom: foo\n\n0\n')
1007            key = self._box.add(msg)
1008
1009    def test_open_close_open(self):
1010        # Open and inspect previously-created mailbox
1011        values = [self._template % i for i in range(3)]
1012        for value in values:
1013            self._box.add(value)
1014        self._box.close()
1015        mtime = os.path.getmtime(self._path)
1016        self._box = self._factory(self._path)
1017        self.assertEqual(len(self._box), 3)
1018        for key in self._box.iterkeys():
1019            self.assertIn(self._box.get_string(key), values)
1020        self._box.close()
1021        self.assertEqual(mtime, os.path.getmtime(self._path))
1022
1023    def test_add_and_close(self):
1024        # Verifying that closing a mailbox doesn't change added items
1025        self._box.add(_sample_message)
1026        for i in range(3):
1027            self._box.add(self._template % i)
1028        self._box.add(_sample_message)
1029        self._box._file.flush()
1030        self._box._file.seek(0)
1031        contents = self._box._file.read()
1032        self._box.close()
1033        with open(self._path, 'rb') as f:
1034            self.assertEqual(contents, f.read())
1035        self._box = self._factory(self._path)
1036
1037    @unittest.skipUnless(hasattr(os, 'fork'), "Test needs fork().")
1038    @unittest.skipUnless(hasattr(socket, 'socketpair'), "Test needs socketpair().")
1039    def test_lock_conflict(self):
1040        # Fork off a child process that will lock the mailbox temporarily,
1041        # unlock it and exit.
1042        c, p = socket.socketpair()
1043        self.addCleanup(c.close)
1044        self.addCleanup(p.close)
1045
1046        pid = os.fork()
1047        if pid == 0:
1048            # child
1049            try:
1050                # lock the mailbox, and signal the parent it can proceed
1051                self._box.lock()
1052                c.send(b'c')
1053
1054                # wait until the parent is done, and unlock the mailbox
1055                c.recv(1)
1056                self._box.unlock()
1057            finally:
1058                os._exit(0)
1059
1060        # In the parent, wait until the child signals it locked the mailbox.
1061        p.recv(1)
1062        try:
1063            self.assertRaises(mailbox.ExternalClashError,
1064                              self._box.lock)
1065        finally:
1066            # Signal the child it can now release the lock and exit.
1067            p.send(b'p')
1068            # Wait for child to exit.  Locking should now succeed.
1069            exited_pid, status = os.waitpid(pid, 0)
1070
1071        self._box.lock()
1072        self._box.unlock()
1073
1074    def test_relock(self):
1075        # Test case for bug #1575506: the mailbox class was locking the
1076        # wrong file object in its flush() method.
1077        msg = "Subject: sub\n\nbody\n"
1078        key1 = self._box.add(msg)
1079        self._box.flush()
1080        self._box.close()
1081
1082        self._box = self._factory(self._path)
1083        self._box.lock()
1084        key2 = self._box.add(msg)
1085        self._box.flush()
1086        self.assertTrue(self._box._locked)
1087        self._box.close()
1088
1089
1090class TestMbox(_TestMboxMMDF, unittest.TestCase):
1091
1092    _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
1093
1094    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
1095    @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
1096    def test_file_perms(self):
1097        # From bug #3228, we want to verify that the mailbox file isn't executable,
1098        # even if the umask is set to something that would leave executable bits set.
1099        # We only run this test on platforms that support umask.
1100        try:
1101            old_umask = os.umask(0o077)
1102            self._box.close()
1103            os.unlink(self._path)
1104            self._box = mailbox.mbox(self._path, create=True)
1105            self._box.add('')
1106            self._box.close()
1107        finally:
1108            os.umask(old_umask)
1109
1110        st = os.stat(self._path)
1111        perms = st.st_mode
1112        self.assertFalse((perms & 0o111)) # Execute bits should all be off.
1113
1114    def test_terminating_newline(self):
1115        message = email.message.Message()
1116        message['From'] = 'john@example.com'
1117        message.set_payload('No newline at the end')
1118        i = self._box.add(message)
1119
1120        # A newline should have been appended to the payload
1121        message = self._box.get(i)
1122        self.assertEqual(message.get_payload(), 'No newline at the end\n')
1123
1124    def test_message_separator(self):
1125        # Check there's always a single blank line after each message
1126        self._box.add('From: foo\n\n0')  # No newline at the end
1127        with open(self._path) as f:
1128            data = f.read()
1129            self.assertEqual(data[-3:], '0\n\n')
1130
1131        self._box.add('From: foo\n\n0\n')  # Newline at the end
1132        with open(self._path) as f:
1133            data = f.read()
1134            self.assertEqual(data[-3:], '0\n\n')
1135
1136
1137class TestMMDF(_TestMboxMMDF, unittest.TestCase):
1138
1139    _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
1140
1141
1142class TestMH(TestMailbox, unittest.TestCase):
1143
1144    _factory = lambda self, path, factory=None: mailbox.MH(path, factory)
1145
1146    def assertMailboxEmpty(self):
1147        self.assertEqual(os.listdir(self._path), ['.mh_sequences'])
1148
1149    def test_list_folders(self):
1150        # List folders
1151        self._box.add_folder('one')
1152        self._box.add_folder('two')
1153        self._box.add_folder('three')
1154        self.assertEqual(len(self._box.list_folders()), 3)
1155        self.assertEqual(set(self._box.list_folders()),
1156                     set(('one', 'two', 'three')))
1157
1158    def test_get_folder(self):
1159        # Open folders
1160        def dummy_factory (s):
1161            return None
1162        self._box = self._factory(self._path, dummy_factory)
1163
1164        new_folder = self._box.add_folder('foo.bar')
1165        folder0 = self._box.get_folder('foo.bar')
1166        folder0.add(self._template % 'bar')
1167        self.assertTrue(os.path.isdir(os.path.join(self._path, 'foo.bar')))
1168        folder1 = self._box.get_folder('foo.bar')
1169        self.assertEqual(folder1.get_string(folder1.keys()[0]),
1170                         self._template % 'bar')
1171
1172        # Test for bug #1569790: verify that folders returned by .get_folder()
1173        # use the same factory function.
1174        self.assertIs(new_folder._factory, self._box._factory)
1175        self.assertIs(folder0._factory, self._box._factory)
1176
1177    def test_add_and_remove_folders(self):
1178        # Delete folders
1179        self._box.add_folder('one')
1180        self._box.add_folder('two')
1181        self.assertEqual(len(self._box.list_folders()), 2)
1182        self.assertEqual(set(self._box.list_folders()), set(('one', 'two')))
1183        self._box.remove_folder('one')
1184        self.assertEqual(len(self._box.list_folders()), 1)
1185        self.assertEqual(set(self._box.list_folders()), set(('two',)))
1186        self._box.add_folder('three')
1187        self.assertEqual(len(self._box.list_folders()), 2)
1188        self.assertEqual(set(self._box.list_folders()), set(('two', 'three')))
1189        self._box.remove_folder('three')
1190        self.assertEqual(len(self._box.list_folders()), 1)
1191        self.assertEqual(set(self._box.list_folders()), set(('two',)))
1192        self._box.remove_folder('two')
1193        self.assertEqual(len(self._box.list_folders()), 0)
1194        self.assertEqual(self._box.list_folders(), [])
1195
1196    def test_sequences(self):
1197        # Get and set sequences
1198        self.assertEqual(self._box.get_sequences(), {})
1199        msg0 = mailbox.MHMessage(self._template % 0)
1200        msg0.add_sequence('foo')
1201        key0 = self._box.add(msg0)
1202        self.assertEqual(self._box.get_sequences(), {'foo':[key0]})
1203        msg1 = mailbox.MHMessage(self._template % 1)
1204        msg1.set_sequences(['bar', 'replied', 'foo'])
1205        key1 = self._box.add(msg1)
1206        self.assertEqual(self._box.get_sequences(),
1207                     {'foo':[key0, key1], 'bar':[key1], 'replied':[key1]})
1208        msg0.set_sequences(['flagged'])
1209        self._box[key0] = msg0
1210        self.assertEqual(self._box.get_sequences(),
1211                     {'foo':[key1], 'bar':[key1], 'replied':[key1],
1212                      'flagged':[key0]})
1213        self._box.remove(key1)
1214        self.assertEqual(self._box.get_sequences(), {'flagged':[key0]})
1215
1216    def test_issue2625(self):
1217        msg0 = mailbox.MHMessage(self._template % 0)
1218        msg0.add_sequence('foo')
1219        key0 = self._box.add(msg0)
1220        refmsg0 = self._box.get_message(key0)
1221
1222    def test_issue7627(self):
1223        msg0 = mailbox.MHMessage(self._template % 0)
1224        key0 = self._box.add(msg0)
1225        self._box.lock()
1226        self._box.remove(key0)
1227        self._box.unlock()
1228
1229    def test_pack(self):
1230        # Pack the contents of the mailbox
1231        msg0 = mailbox.MHMessage(self._template % 0)
1232        msg1 = mailbox.MHMessage(self._template % 1)
1233        msg2 = mailbox.MHMessage(self._template % 2)
1234        msg3 = mailbox.MHMessage(self._template % 3)
1235        msg0.set_sequences(['foo', 'unseen'])
1236        msg1.set_sequences(['foo'])
1237        msg2.set_sequences(['foo', 'flagged'])
1238        msg3.set_sequences(['foo', 'bar', 'replied'])
1239        key0 = self._box.add(msg0)
1240        key1 = self._box.add(msg1)
1241        key2 = self._box.add(msg2)
1242        key3 = self._box.add(msg3)
1243        self.assertEqual(self._box.get_sequences(),
1244                     {'foo':[key0,key1,key2,key3], 'unseen':[key0],
1245                      'flagged':[key2], 'bar':[key3], 'replied':[key3]})
1246        self._box.remove(key2)
1247        self.assertEqual(self._box.get_sequences(),
1248                     {'foo':[key0,key1,key3], 'unseen':[key0], 'bar':[key3],
1249                      'replied':[key3]})
1250        self._box.pack()
1251        self.assertEqual(self._box.keys(), [1, 2, 3])
1252        key0 = key0
1253        key1 = key0 + 1
1254        key2 = key1 + 1
1255        self.assertEqual(self._box.get_sequences(),
1256                     {'foo':[1, 2, 3], 'unseen':[1], 'bar':[3], 'replied':[3]})
1257
1258        # Test case for packing while holding the mailbox locked.
1259        key0 = self._box.add(msg1)
1260        key1 = self._box.add(msg1)
1261        key2 = self._box.add(msg1)
1262        key3 = self._box.add(msg1)
1263
1264        self._box.remove(key0)
1265        self._box.remove(key2)
1266        self._box.lock()
1267        self._box.pack()
1268        self._box.unlock()
1269        self.assertEqual(self._box.get_sequences(),
1270                     {'foo':[1, 2, 3, 4, 5],
1271                      'unseen':[1], 'bar':[3], 'replied':[3]})
1272
1273    def _get_lock_path(self):
1274        return os.path.join(self._path, '.mh_sequences.lock')
1275
1276
1277class TestBabyl(_TestSingleFile, unittest.TestCase):
1278
1279    _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
1280
1281    def assertMailboxEmpty(self):
1282        with open(self._path) as f:
1283            self.assertEqual(f.readlines(), [])
1284
1285    def tearDown(self):
1286        super().tearDown()
1287        self._box.close()
1288        self._delete_recursively(self._path)
1289        for lock_remnant in glob.glob(self._path + '.*'):
1290            support.unlink(lock_remnant)
1291
1292    def test_labels(self):
1293        # Get labels from the mailbox
1294        self.assertEqual(self._box.get_labels(), [])
1295        msg0 = mailbox.BabylMessage(self._template % 0)
1296        msg0.add_label('foo')
1297        key0 = self._box.add(msg0)
1298        self.assertEqual(self._box.get_labels(), ['foo'])
1299        msg1 = mailbox.BabylMessage(self._template % 1)
1300        msg1.set_labels(['bar', 'answered', 'foo'])
1301        key1 = self._box.add(msg1)
1302        self.assertEqual(set(self._box.get_labels()), set(['foo', 'bar']))
1303        msg0.set_labels(['blah', 'filed'])
1304        self._box[key0] = msg0
1305        self.assertEqual(set(self._box.get_labels()),
1306                     set(['foo', 'bar', 'blah']))
1307        self._box.remove(key1)
1308        self.assertEqual(set(self._box.get_labels()), set(['blah']))
1309
1310
1311class FakeFileLikeObject:
1312
1313    def __init__(self):
1314        self.closed = False
1315
1316    def close(self):
1317        self.closed = True
1318
1319
1320class FakeMailBox(mailbox.Mailbox):
1321
1322    def __init__(self):
1323        mailbox.Mailbox.__init__(self, '', lambda file: None)
1324        self.files = [FakeFileLikeObject() for i in range(10)]
1325
1326    def get_file(self, key):
1327        return self.files[key]
1328
1329
1330class TestFakeMailBox(unittest.TestCase):
1331
1332    def test_closing_fd(self):
1333        box = FakeMailBox()
1334        for i in range(10):
1335            self.assertFalse(box.files[i].closed)
1336        for i in range(10):
1337            box[i]
1338        for i in range(10):
1339            self.assertTrue(box.files[i].closed)
1340
1341
1342class TestMessage(TestBase, unittest.TestCase):
1343
1344    _factory = mailbox.Message      # Overridden by subclasses to reuse tests
1345
1346    def setUp(self):
1347        self._path = support.TESTFN
1348
1349    def tearDown(self):
1350        self._delete_recursively(self._path)
1351
1352    def test_initialize_with_eMM(self):
1353        # Initialize based on email.message.Message instance
1354        eMM = email.message_from_string(_sample_message)
1355        msg = self._factory(eMM)
1356        self._post_initialize_hook(msg)
1357        self._check_sample(msg)
1358
1359    def test_initialize_with_string(self):
1360        # Initialize based on string
1361        msg = self._factory(_sample_message)
1362        self._post_initialize_hook(msg)
1363        self._check_sample(msg)
1364
1365    def test_initialize_with_file(self):
1366        # Initialize based on contents of file
1367        with open(self._path, 'w+') as f:
1368            f.write(_sample_message)
1369            f.seek(0)
1370            msg = self._factory(f)
1371            self._post_initialize_hook(msg)
1372            self._check_sample(msg)
1373
1374    def test_initialize_with_binary_file(self):
1375        # Initialize based on contents of binary file
1376        with open(self._path, 'wb+') as f:
1377            f.write(_bytes_sample_message)
1378            f.seek(0)
1379            msg = self._factory(f)
1380            self._post_initialize_hook(msg)
1381            self._check_sample(msg)
1382
1383    def test_initialize_with_nothing(self):
1384        # Initialize without arguments
1385        msg = self._factory()
1386        self._post_initialize_hook(msg)
1387        self.assertIsInstance(msg, email.message.Message)
1388        self.assertIsInstance(msg, mailbox.Message)
1389        self.assertIsInstance(msg, self._factory)
1390        self.assertEqual(msg.keys(), [])
1391        self.assertFalse(msg.is_multipart())
1392        self.assertIsNone(msg.get_payload())
1393
1394    def test_initialize_incorrectly(self):
1395        # Initialize with invalid argument
1396        self.assertRaises(TypeError, lambda: self._factory(object()))
1397
1398    def test_all_eMM_attribues_exist(self):
1399        # Issue 12537
1400        eMM = email.message_from_string(_sample_message)
1401        msg = self._factory(_sample_message)
1402        for attr in eMM.__dict__:
1403            self.assertIn(attr, msg.__dict__,
1404                '{} attribute does not exist'.format(attr))
1405
1406    def test_become_message(self):
1407        # Take on the state of another message
1408        eMM = email.message_from_string(_sample_message)
1409        msg = self._factory()
1410        msg._become_message(eMM)
1411        self._check_sample(msg)
1412
1413    def test_explain_to(self):
1414        # Copy self's format-specific data to other message formats.
1415        # This test is superficial; better ones are in TestMessageConversion.
1416        msg = self._factory()
1417        for class_ in self.all_mailbox_types:
1418            other_msg = class_()
1419            msg._explain_to(other_msg)
1420        other_msg = email.message.Message()
1421        self.assertRaises(TypeError, lambda: msg._explain_to(other_msg))
1422
1423    def _post_initialize_hook(self, msg):
1424        # Overridden by subclasses to check extra things after initialization
1425        pass
1426
1427
1428class TestMaildirMessage(TestMessage, unittest.TestCase):
1429
1430    _factory = mailbox.MaildirMessage
1431
1432    def _post_initialize_hook(self, msg):
1433        self.assertEqual(msg._subdir, 'new')
1434        self.assertEqual(msg._info, '')
1435
1436    def test_subdir(self):
1437        # Use get_subdir() and set_subdir()
1438        msg = mailbox.MaildirMessage(_sample_message)
1439        self.assertEqual(msg.get_subdir(), 'new')
1440        msg.set_subdir('cur')
1441        self.assertEqual(msg.get_subdir(), 'cur')
1442        msg.set_subdir('new')
1443        self.assertEqual(msg.get_subdir(), 'new')
1444        self.assertRaises(ValueError, lambda: msg.set_subdir('tmp'))
1445        self.assertEqual(msg.get_subdir(), 'new')
1446        msg.set_subdir('new')
1447        self.assertEqual(msg.get_subdir(), 'new')
1448        self._check_sample(msg)
1449
1450    def test_flags(self):
1451        # Use get_flags(), set_flags(), add_flag(), remove_flag()
1452        msg = mailbox.MaildirMessage(_sample_message)
1453        self.assertEqual(msg.get_flags(), '')
1454        self.assertEqual(msg.get_subdir(), 'new')
1455        msg.set_flags('F')
1456        self.assertEqual(msg.get_subdir(), 'new')
1457        self.assertEqual(msg.get_flags(), 'F')
1458        msg.set_flags('SDTP')
1459        self.assertEqual(msg.get_flags(), 'DPST')
1460        msg.add_flag('FT')
1461        self.assertEqual(msg.get_flags(), 'DFPST')
1462        msg.remove_flag('TDRP')
1463        self.assertEqual(msg.get_flags(), 'FS')
1464        self.assertEqual(msg.get_subdir(), 'new')
1465        self._check_sample(msg)
1466
1467    def test_date(self):
1468        # Use get_date() and set_date()
1469        msg = mailbox.MaildirMessage(_sample_message)
1470        self.assertLess(abs(msg.get_date() - time.time()), 60)
1471        msg.set_date(0.0)
1472        self.assertEqual(msg.get_date(), 0.0)
1473
1474    def test_info(self):
1475        # Use get_info() and set_info()
1476        msg = mailbox.MaildirMessage(_sample_message)
1477        self.assertEqual(msg.get_info(), '')
1478        msg.set_info('1,foo=bar')
1479        self.assertEqual(msg.get_info(), '1,foo=bar')
1480        self.assertRaises(TypeError, lambda: msg.set_info(None))
1481        self._check_sample(msg)
1482
1483    def test_info_and_flags(self):
1484        # Test interaction of info and flag methods
1485        msg = mailbox.MaildirMessage(_sample_message)
1486        self.assertEqual(msg.get_info(), '')
1487        msg.set_flags('SF')
1488        self.assertEqual(msg.get_flags(), 'FS')
1489        self.assertEqual(msg.get_info(), '2,FS')
1490        msg.set_info('1,')
1491        self.assertEqual(msg.get_flags(), '')
1492        self.assertEqual(msg.get_info(), '1,')
1493        msg.remove_flag('RPT')
1494        self.assertEqual(msg.get_flags(), '')
1495        self.assertEqual(msg.get_info(), '1,')
1496        msg.add_flag('D')
1497        self.assertEqual(msg.get_flags(), 'D')
1498        self.assertEqual(msg.get_info(), '2,D')
1499        self._check_sample(msg)
1500
1501
1502class _TestMboxMMDFMessage:
1503
1504    _factory = mailbox._mboxMMDFMessage
1505
1506    def _post_initialize_hook(self, msg):
1507        self._check_from(msg)
1508
1509    def test_initialize_with_unixfrom(self):
1510        # Initialize with a message that already has a _unixfrom attribute
1511        msg = mailbox.Message(_sample_message)
1512        msg.set_unixfrom('From foo@bar blah')
1513        msg = mailbox.mboxMessage(msg)
1514        self.assertEqual(msg.get_from(), 'foo@bar blah', msg.get_from())
1515
1516    def test_from(self):
1517        # Get and set "From " line
1518        msg = mailbox.mboxMessage(_sample_message)
1519        self._check_from(msg)
1520        msg.set_from('foo bar')
1521        self.assertEqual(msg.get_from(), 'foo bar')
1522        msg.set_from('foo@bar', True)
1523        self._check_from(msg, 'foo@bar')
1524        msg.set_from('blah@temp', time.localtime())
1525        self._check_from(msg, 'blah@temp')
1526
1527    def test_flags(self):
1528        # Use get_flags(), set_flags(), add_flag(), remove_flag()
1529        msg = mailbox.mboxMessage(_sample_message)
1530        self.assertEqual(msg.get_flags(), '')
1531        msg.set_flags('F')
1532        self.assertEqual(msg.get_flags(), 'F')
1533        msg.set_flags('XODR')
1534        self.assertEqual(msg.get_flags(), 'RODX')
1535        msg.add_flag('FA')
1536        self.assertEqual(msg.get_flags(), 'RODFAX')
1537        msg.remove_flag('FDXA')
1538        self.assertEqual(msg.get_flags(), 'RO')
1539        self._check_sample(msg)
1540
1541    def _check_from(self, msg, sender=None):
1542        # Check contents of "From " line
1543        if sender is None:
1544            sender = "MAILER-DAEMON"
1545        self.assertIsNotNone(re.match(
1546                sender + r" \w{3} \w{3} [\d ]\d [\d ]\d:\d{2}:\d{2} \d{4}",
1547                msg.get_from()))
1548
1549
1550class TestMboxMessage(_TestMboxMMDFMessage, TestMessage):
1551
1552    _factory = mailbox.mboxMessage
1553
1554
1555class TestMHMessage(TestMessage, unittest.TestCase):
1556
1557    _factory = mailbox.MHMessage
1558
1559    def _post_initialize_hook(self, msg):
1560        self.assertEqual(msg._sequences, [])
1561
1562    def test_sequences(self):
1563        # Get, set, join, and leave sequences
1564        msg = mailbox.MHMessage(_sample_message)
1565        self.assertEqual(msg.get_sequences(), [])
1566        msg.set_sequences(['foobar'])
1567        self.assertEqual(msg.get_sequences(), ['foobar'])
1568        msg.set_sequences([])
1569        self.assertEqual(msg.get_sequences(), [])
1570        msg.add_sequence('unseen')
1571        self.assertEqual(msg.get_sequences(), ['unseen'])
1572        msg.add_sequence('flagged')
1573        self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
1574        msg.add_sequence('flagged')
1575        self.assertEqual(msg.get_sequences(), ['unseen', 'flagged'])
1576        msg.remove_sequence('unseen')
1577        self.assertEqual(msg.get_sequences(), ['flagged'])
1578        msg.add_sequence('foobar')
1579        self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
1580        msg.remove_sequence('replied')
1581        self.assertEqual(msg.get_sequences(), ['flagged', 'foobar'])
1582        msg.set_sequences(['foobar', 'replied'])
1583        self.assertEqual(msg.get_sequences(), ['foobar', 'replied'])
1584
1585
1586class TestBabylMessage(TestMessage, unittest.TestCase):
1587
1588    _factory = mailbox.BabylMessage
1589
1590    def _post_initialize_hook(self, msg):
1591        self.assertEqual(msg._labels, [])
1592
1593    def test_labels(self):
1594        # Get, set, join, and leave labels
1595        msg = mailbox.BabylMessage(_sample_message)
1596        self.assertEqual(msg.get_labels(), [])
1597        msg.set_labels(['foobar'])
1598        self.assertEqual(msg.get_labels(), ['foobar'])
1599        msg.set_labels([])
1600        self.assertEqual(msg.get_labels(), [])
1601        msg.add_label('filed')
1602        self.assertEqual(msg.get_labels(), ['filed'])
1603        msg.add_label('resent')
1604        self.assertEqual(msg.get_labels(), ['filed', 'resent'])
1605        msg.add_label('resent')
1606        self.assertEqual(msg.get_labels(), ['filed', 'resent'])
1607        msg.remove_label('filed')
1608        self.assertEqual(msg.get_labels(), ['resent'])
1609        msg.add_label('foobar')
1610        self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
1611        msg.remove_label('unseen')
1612        self.assertEqual(msg.get_labels(), ['resent', 'foobar'])
1613        msg.set_labels(['foobar', 'answered'])
1614        self.assertEqual(msg.get_labels(), ['foobar', 'answered'])
1615
1616    def test_visible(self):
1617        # Get, set, and update visible headers
1618        msg = mailbox.BabylMessage(_sample_message)
1619        visible = msg.get_visible()
1620        self.assertEqual(visible.keys(), [])
1621        self.assertIsNone(visible.get_payload())
1622        visible['User-Agent'] = 'FooBar 1.0'
1623        visible['X-Whatever'] = 'Blah'
1624        self.assertEqual(msg.get_visible().keys(), [])
1625        msg.set_visible(visible)
1626        visible = msg.get_visible()
1627        self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
1628        self.assertEqual(visible['User-Agent'], 'FooBar 1.0')
1629        self.assertEqual(visible['X-Whatever'], 'Blah')
1630        self.assertIsNone(visible.get_payload())
1631        msg.update_visible()
1632        self.assertEqual(visible.keys(), ['User-Agent', 'X-Whatever'])
1633        self.assertIsNone(visible.get_payload())
1634        visible = msg.get_visible()
1635        self.assertEqual(visible.keys(), ['User-Agent', 'Date', 'From', 'To',
1636                                          'Subject'])
1637        for header in ('User-Agent', 'Date', 'From', 'To', 'Subject'):
1638            self.assertEqual(visible[header], msg[header])
1639
1640
1641class TestMMDFMessage(_TestMboxMMDFMessage, TestMessage):
1642
1643    _factory = mailbox.MMDFMessage
1644
1645
1646class TestMessageConversion(TestBase, unittest.TestCase):
1647
1648    def test_plain_to_x(self):
1649        # Convert Message to all formats
1650        for class_ in self.all_mailbox_types:
1651            msg_plain = mailbox.Message(_sample_message)
1652            msg = class_(msg_plain)
1653            self._check_sample(msg)
1654
1655    def test_x_to_plain(self):
1656        # Convert all formats to Message
1657        for class_ in self.all_mailbox_types:
1658            msg = class_(_sample_message)
1659            msg_plain = mailbox.Message(msg)
1660            self._check_sample(msg_plain)
1661
1662    def test_x_from_bytes(self):
1663        # Convert all formats to Message
1664        for class_ in self.all_mailbox_types:
1665            msg = class_(_bytes_sample_message)
1666            self._check_sample(msg)
1667
1668    def test_x_to_invalid(self):
1669        # Convert all formats to an invalid format
1670        for class_ in self.all_mailbox_types:
1671            self.assertRaises(TypeError, lambda: class_(False))
1672
1673    def test_type_specific_attributes_removed_on_conversion(self):
1674        reference = {class_: class_(_sample_message).__dict__
1675                        for class_ in self.all_mailbox_types}
1676        for class1 in self.all_mailbox_types:
1677            for class2 in self.all_mailbox_types:
1678                if class1 is class2:
1679                    continue
1680                source = class1(_sample_message)
1681                target = class2(source)
1682                type_specific = [a for a in reference[class1]
1683                                   if a not in reference[class2]]
1684                for attr in type_specific:
1685                    self.assertNotIn(attr, target.__dict__,
1686                        "while converting {} to {}".format(class1, class2))
1687
1688    def test_maildir_to_maildir(self):
1689        # Convert MaildirMessage to MaildirMessage
1690        msg_maildir = mailbox.MaildirMessage(_sample_message)
1691        msg_maildir.set_flags('DFPRST')
1692        msg_maildir.set_subdir('cur')
1693        date = msg_maildir.get_date()
1694        msg = mailbox.MaildirMessage(msg_maildir)
1695        self._check_sample(msg)
1696        self.assertEqual(msg.get_flags(), 'DFPRST')
1697        self.assertEqual(msg.get_subdir(), 'cur')
1698        self.assertEqual(msg.get_date(), date)
1699
1700    def test_maildir_to_mboxmmdf(self):
1701        # Convert MaildirMessage to mboxmessage and MMDFMessage
1702        pairs = (('D', ''), ('F', 'F'), ('P', ''), ('R', 'A'), ('S', 'R'),
1703                 ('T', 'D'), ('DFPRST', 'RDFA'))
1704        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1705            msg_maildir = mailbox.MaildirMessage(_sample_message)
1706            msg_maildir.set_date(0.0)
1707            for setting, result in pairs:
1708                msg_maildir.set_flags(setting)
1709                msg = class_(msg_maildir)
1710                self.assertEqual(msg.get_flags(), result)
1711                self.assertEqual(msg.get_from(), 'MAILER-DAEMON %s' %
1712                             time.asctime(time.gmtime(0.0)))
1713            msg_maildir.set_subdir('cur')
1714            self.assertEqual(class_(msg_maildir).get_flags(), 'RODFA')
1715
1716    def test_maildir_to_mh(self):
1717        # Convert MaildirMessage to MHMessage
1718        msg_maildir = mailbox.MaildirMessage(_sample_message)
1719        pairs = (('D', ['unseen']), ('F', ['unseen', 'flagged']),
1720                 ('P', ['unseen']), ('R', ['unseen', 'replied']), ('S', []),
1721                 ('T', ['unseen']), ('DFPRST', ['replied', 'flagged']))
1722        for setting, result in pairs:
1723            msg_maildir.set_flags(setting)
1724            self.assertEqual(mailbox.MHMessage(msg_maildir).get_sequences(),
1725                             result)
1726
1727    def test_maildir_to_babyl(self):
1728        # Convert MaildirMessage to Babyl
1729        msg_maildir = mailbox.MaildirMessage(_sample_message)
1730        pairs = (('D', ['unseen']), ('F', ['unseen']),
1731                 ('P', ['unseen', 'forwarded']), ('R', ['unseen', 'answered']),
1732                 ('S', []), ('T', ['unseen', 'deleted']),
1733                 ('DFPRST', ['deleted', 'answered', 'forwarded']))
1734        for setting, result in pairs:
1735            msg_maildir.set_flags(setting)
1736            self.assertEqual(mailbox.BabylMessage(msg_maildir).get_labels(),
1737                             result)
1738
1739    def test_mboxmmdf_to_maildir(self):
1740        # Convert mboxMessage and MMDFMessage to MaildirMessage
1741        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1742            msg_mboxMMDF = class_(_sample_message)
1743            msg_mboxMMDF.set_from('foo@bar', time.gmtime(0.0))
1744            pairs = (('R', 'S'), ('O', ''), ('D', 'T'), ('F', 'F'), ('A', 'R'),
1745                     ('RODFA', 'FRST'))
1746            for setting, result in pairs:
1747                msg_mboxMMDF.set_flags(setting)
1748                msg = mailbox.MaildirMessage(msg_mboxMMDF)
1749                self.assertEqual(msg.get_flags(), result)
1750                self.assertEqual(msg.get_date(), 0.0)
1751            msg_mboxMMDF.set_flags('O')
1752            self.assertEqual(mailbox.MaildirMessage(msg_mboxMMDF).get_subdir(),
1753                             'cur')
1754
1755    def test_mboxmmdf_to_mboxmmdf(self):
1756        # Convert mboxMessage and MMDFMessage to mboxMessage and MMDFMessage
1757        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1758            msg_mboxMMDF = class_(_sample_message)
1759            msg_mboxMMDF.set_flags('RODFA')
1760            msg_mboxMMDF.set_from('foo@bar')
1761            for class2_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1762                msg2 = class2_(msg_mboxMMDF)
1763                self.assertEqual(msg2.get_flags(), 'RODFA')
1764                self.assertEqual(msg2.get_from(), 'foo@bar')
1765
1766    def test_mboxmmdf_to_mh(self):
1767        # Convert mboxMessage and MMDFMessage to MHMessage
1768        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1769            msg_mboxMMDF = class_(_sample_message)
1770            pairs = (('R', []), ('O', ['unseen']), ('D', ['unseen']),
1771                     ('F', ['unseen', 'flagged']),
1772                     ('A', ['unseen', 'replied']),
1773                     ('RODFA', ['replied', 'flagged']))
1774            for setting, result in pairs:
1775                msg_mboxMMDF.set_flags(setting)
1776                self.assertEqual(mailbox.MHMessage(msg_mboxMMDF).get_sequences(),
1777                                 result)
1778
1779    def test_mboxmmdf_to_babyl(self):
1780        # Convert mboxMessage and MMDFMessage to BabylMessage
1781        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1782            msg = class_(_sample_message)
1783            pairs = (('R', []), ('O', ['unseen']),
1784                     ('D', ['unseen', 'deleted']), ('F', ['unseen']),
1785                     ('A', ['unseen', 'answered']),
1786                     ('RODFA', ['deleted', 'answered']))
1787            for setting, result in pairs:
1788                msg.set_flags(setting)
1789                self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
1790
1791    def test_mh_to_maildir(self):
1792        # Convert MHMessage to MaildirMessage
1793        pairs = (('unseen', ''), ('replied', 'RS'), ('flagged', 'FS'))
1794        for setting, result in pairs:
1795            msg = mailbox.MHMessage(_sample_message)
1796            msg.add_sequence(setting)
1797            self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
1798            self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1799        msg = mailbox.MHMessage(_sample_message)
1800        msg.add_sequence('unseen')
1801        msg.add_sequence('replied')
1802        msg.add_sequence('flagged')
1803        self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'FR')
1804        self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1805
1806    def test_mh_to_mboxmmdf(self):
1807        # Convert MHMessage to mboxMessage and MMDFMessage
1808        pairs = (('unseen', 'O'), ('replied', 'ROA'), ('flagged', 'ROF'))
1809        for setting, result in pairs:
1810            msg = mailbox.MHMessage(_sample_message)
1811            msg.add_sequence(setting)
1812            for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1813                self.assertEqual(class_(msg).get_flags(), result)
1814        msg = mailbox.MHMessage(_sample_message)
1815        msg.add_sequence('unseen')
1816        msg.add_sequence('replied')
1817        msg.add_sequence('flagged')
1818        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1819            self.assertEqual(class_(msg).get_flags(), 'OFA')
1820
1821    def test_mh_to_mh(self):
1822        # Convert MHMessage to MHMessage
1823        msg = mailbox.MHMessage(_sample_message)
1824        msg.add_sequence('unseen')
1825        msg.add_sequence('replied')
1826        msg.add_sequence('flagged')
1827        self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
1828                         ['unseen', 'replied', 'flagged'])
1829
1830    def test_mh_to_babyl(self):
1831        # Convert MHMessage to BabylMessage
1832        pairs = (('unseen', ['unseen']), ('replied', ['answered']),
1833                 ('flagged', []))
1834        for setting, result in pairs:
1835            msg = mailbox.MHMessage(_sample_message)
1836            msg.add_sequence(setting)
1837            self.assertEqual(mailbox.BabylMessage(msg).get_labels(), result)
1838        msg = mailbox.MHMessage(_sample_message)
1839        msg.add_sequence('unseen')
1840        msg.add_sequence('replied')
1841        msg.add_sequence('flagged')
1842        self.assertEqual(mailbox.BabylMessage(msg).get_labels(),
1843                         ['unseen', 'answered'])
1844
1845    def test_babyl_to_maildir(self):
1846        # Convert BabylMessage to MaildirMessage
1847        pairs = (('unseen', ''), ('deleted', 'ST'), ('filed', 'S'),
1848                 ('answered', 'RS'), ('forwarded', 'PS'), ('edited', 'S'),
1849                 ('resent', 'PS'))
1850        for setting, result in pairs:
1851            msg = mailbox.BabylMessage(_sample_message)
1852            msg.add_label(setting)
1853            self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), result)
1854            self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1855        msg = mailbox.BabylMessage(_sample_message)
1856        for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1857                      'edited', 'resent'):
1858            msg.add_label(label)
1859        self.assertEqual(mailbox.MaildirMessage(msg).get_flags(), 'PRT')
1860        self.assertEqual(mailbox.MaildirMessage(msg).get_subdir(), 'cur')
1861
1862    def test_babyl_to_mboxmmdf(self):
1863        # Convert BabylMessage to mboxMessage and MMDFMessage
1864        pairs = (('unseen', 'O'), ('deleted', 'ROD'), ('filed', 'RO'),
1865                 ('answered', 'ROA'), ('forwarded', 'RO'), ('edited', 'RO'),
1866                 ('resent', 'RO'))
1867        for setting, result in pairs:
1868            for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1869                msg = mailbox.BabylMessage(_sample_message)
1870                msg.add_label(setting)
1871                self.assertEqual(class_(msg).get_flags(), result)
1872        msg = mailbox.BabylMessage(_sample_message)
1873        for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1874                      'edited', 'resent'):
1875            msg.add_label(label)
1876        for class_ in (mailbox.mboxMessage, mailbox.MMDFMessage):
1877            self.assertEqual(class_(msg).get_flags(), 'ODA')
1878
1879    def test_babyl_to_mh(self):
1880        # Convert BabylMessage to MHMessage
1881        pairs = (('unseen', ['unseen']), ('deleted', []), ('filed', []),
1882                 ('answered', ['replied']), ('forwarded', []), ('edited', []),
1883                 ('resent', []))
1884        for setting, result in pairs:
1885            msg = mailbox.BabylMessage(_sample_message)
1886            msg.add_label(setting)
1887            self.assertEqual(mailbox.MHMessage(msg).get_sequences(), result)
1888        msg = mailbox.BabylMessage(_sample_message)
1889        for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1890                      'edited', 'resent'):
1891            msg.add_label(label)
1892        self.assertEqual(mailbox.MHMessage(msg).get_sequences(),
1893                         ['unseen', 'replied'])
1894
1895    def test_babyl_to_babyl(self):
1896        # Convert BabylMessage to BabylMessage
1897        msg = mailbox.BabylMessage(_sample_message)
1898        msg.update_visible()
1899        for label in ('unseen', 'deleted', 'filed', 'answered', 'forwarded',
1900                      'edited', 'resent'):
1901            msg.add_label(label)
1902        msg2 = mailbox.BabylMessage(msg)
1903        self.assertEqual(msg2.get_labels(), ['unseen', 'deleted', 'filed',
1904                                             'answered', 'forwarded', 'edited',
1905                                             'resent'])
1906        self.assertEqual(msg.get_visible().keys(), msg2.get_visible().keys())
1907        for key in msg.get_visible().keys():
1908            self.assertEqual(msg.get_visible()[key], msg2.get_visible()[key])
1909
1910
1911class TestProxyFileBase(TestBase):
1912
1913    def _test_read(self, proxy):
1914        # Read by byte
1915        proxy.seek(0)
1916        self.assertEqual(proxy.read(), b'bar')
1917        proxy.seek(1)
1918        self.assertEqual(proxy.read(), b'ar')
1919        proxy.seek(0)
1920        self.assertEqual(proxy.read(2), b'ba')
1921        proxy.seek(1)
1922        self.assertEqual(proxy.read(-1), b'ar')
1923        proxy.seek(2)
1924        self.assertEqual(proxy.read(1000), b'r')
1925
1926    def _test_readline(self, proxy):
1927        # Read by line
1928        linesep = os.linesep.encode()
1929        proxy.seek(0)
1930        self.assertEqual(proxy.readline(), b'foo' + linesep)
1931        self.assertEqual(proxy.readline(), b'bar' + linesep)
1932        self.assertEqual(proxy.readline(), b'fred' + linesep)
1933        self.assertEqual(proxy.readline(), b'bob')
1934        proxy.seek(2)
1935        self.assertEqual(proxy.readline(), b'o' + linesep)
1936        proxy.seek(6 + 2 * len(os.linesep))
1937        self.assertEqual(proxy.readline(), b'fred' + linesep)
1938        proxy.seek(6 + 2 * len(os.linesep))
1939        self.assertEqual(proxy.readline(2), b'fr')
1940        self.assertEqual(proxy.readline(-10), b'ed' + linesep)
1941
1942    def _test_readlines(self, proxy):
1943        # Read multiple lines
1944        linesep = os.linesep.encode()
1945        proxy.seek(0)
1946        self.assertEqual(proxy.readlines(), [b'foo' + linesep,
1947                                           b'bar' + linesep,
1948                                           b'fred' + linesep, b'bob'])
1949        proxy.seek(0)
1950        self.assertEqual(proxy.readlines(2), [b'foo' + linesep])
1951        proxy.seek(3 + len(linesep))
1952        self.assertEqual(proxy.readlines(4 + len(linesep)),
1953                     [b'bar' + linesep, b'fred' + linesep])
1954        proxy.seek(3)
1955        self.assertEqual(proxy.readlines(1000), [linesep, b'bar' + linesep,
1956                                               b'fred' + linesep, b'bob'])
1957
1958    def _test_iteration(self, proxy):
1959        # Iterate by line
1960        linesep = os.linesep.encode()
1961        proxy.seek(0)
1962        iterator = iter(proxy)
1963        self.assertEqual(next(iterator), b'foo' + linesep)
1964        self.assertEqual(next(iterator), b'bar' + linesep)
1965        self.assertEqual(next(iterator), b'fred' + linesep)
1966        self.assertEqual(next(iterator), b'bob')
1967        self.assertRaises(StopIteration, next, iterator)
1968
1969    def _test_seek_and_tell(self, proxy):
1970        # Seek and use tell to check position
1971        linesep = os.linesep.encode()
1972        proxy.seek(3)
1973        self.assertEqual(proxy.tell(), 3)
1974        self.assertEqual(proxy.read(len(linesep)), linesep)
1975        proxy.seek(2, 1)
1976        self.assertEqual(proxy.read(1 + len(linesep)), b'r' + linesep)
1977        proxy.seek(-3 - len(linesep), 2)
1978        self.assertEqual(proxy.read(3), b'bar')
1979        proxy.seek(2, 0)
1980        self.assertEqual(proxy.read(), b'o' + linesep + b'bar' + linesep)
1981        proxy.seek(100)
1982        self.assertFalse(proxy.read())
1983
1984    def _test_close(self, proxy):
1985        # Close a file
1986        self.assertFalse(proxy.closed)
1987        proxy.close()
1988        self.assertTrue(proxy.closed)
1989        # Issue 11700 subsequent closes should be a no-op.
1990        proxy.close()
1991        self.assertTrue(proxy.closed)
1992
1993
1994class TestProxyFile(TestProxyFileBase, unittest.TestCase):
1995
1996    def setUp(self):
1997        self._path = support.TESTFN
1998        self._file = open(self._path, 'wb+')
1999
2000    def tearDown(self):
2001        self._file.close()
2002        self._delete_recursively(self._path)
2003
2004    def test_initialize(self):
2005        # Initialize and check position
2006        self._file.write(b'foo')
2007        pos = self._file.tell()
2008        proxy0 = mailbox._ProxyFile(self._file)
2009        self.assertEqual(proxy0.tell(), pos)
2010        self.assertEqual(self._file.tell(), pos)
2011        proxy1 = mailbox._ProxyFile(self._file, 0)
2012        self.assertEqual(proxy1.tell(), 0)
2013        self.assertEqual(self._file.tell(), pos)
2014
2015    def test_read(self):
2016        self._file.write(b'bar')
2017        self._test_read(mailbox._ProxyFile(self._file))
2018
2019    def test_readline(self):
2020        self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2021                                                  os.linesep), 'ascii'))
2022        self._test_readline(mailbox._ProxyFile(self._file))
2023
2024    def test_readlines(self):
2025        self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2026                                                  os.linesep), 'ascii'))
2027        self._test_readlines(mailbox._ProxyFile(self._file))
2028
2029    def test_iteration(self):
2030        self._file.write(bytes('foo%sbar%sfred%sbob' % (os.linesep, os.linesep,
2031                                                  os.linesep), 'ascii'))
2032        self._test_iteration(mailbox._ProxyFile(self._file))
2033
2034    def test_seek_and_tell(self):
2035        self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii'))
2036        self._test_seek_and_tell(mailbox._ProxyFile(self._file))
2037
2038    def test_close(self):
2039        self._file.write(bytes('foo%sbar%s' % (os.linesep, os.linesep), 'ascii'))
2040        self._test_close(mailbox._ProxyFile(self._file))
2041
2042
2043class TestPartialFile(TestProxyFileBase, unittest.TestCase):
2044
2045    def setUp(self):
2046        self._path = support.TESTFN
2047        self._file = open(self._path, 'wb+')
2048
2049    def tearDown(self):
2050        self._file.close()
2051        self._delete_recursively(self._path)
2052
2053    def test_initialize(self):
2054        # Initialize and check position
2055        self._file.write(bytes('foo' + os.linesep + 'bar', 'ascii'))
2056        pos = self._file.tell()
2057        proxy = mailbox._PartialFile(self._file, 2, 5)
2058        self.assertEqual(proxy.tell(), 0)
2059        self.assertEqual(self._file.tell(), pos)
2060
2061    def test_read(self):
2062        self._file.write(bytes('***bar***', 'ascii'))
2063        self._test_read(mailbox._PartialFile(self._file, 3, 6))
2064
2065    def test_readline(self):
2066        self._file.write(bytes('!!!!!foo%sbar%sfred%sbob!!!!!' %
2067                         (os.linesep, os.linesep, os.linesep), 'ascii'))
2068        self._test_readline(mailbox._PartialFile(self._file, 5,
2069                                                 18 + 3 * len(os.linesep)))
2070
2071    def test_readlines(self):
2072        self._file.write(bytes('foo%sbar%sfred%sbob?????' %
2073                         (os.linesep, os.linesep, os.linesep), 'ascii'))
2074        self._test_readlines(mailbox._PartialFile(self._file, 0,
2075                                                  13 + 3 * len(os.linesep)))
2076
2077    def test_iteration(self):
2078        self._file.write(bytes('____foo%sbar%sfred%sbob####' %
2079                         (os.linesep, os.linesep, os.linesep), 'ascii'))
2080        self._test_iteration(mailbox._PartialFile(self._file, 4,
2081                                                  17 + 3 * len(os.linesep)))
2082
2083    def test_seek_and_tell(self):
2084        self._file.write(bytes('(((foo%sbar%s$$$' % (os.linesep, os.linesep), 'ascii'))
2085        self._test_seek_and_tell(mailbox._PartialFile(self._file, 3,
2086                                                      9 + 2 * len(os.linesep)))
2087
2088    def test_close(self):
2089        self._file.write(bytes('&foo%sbar%s^' % (os.linesep, os.linesep), 'ascii'))
2090        self._test_close(mailbox._PartialFile(self._file, 1,
2091                                              6 + 3 * len(os.linesep)))
2092
2093
2094## Start: tests from the original module (for backward compatibility).
2095
2096FROM_ = "From some.body@dummy.domain  Sat Jul 24 13:43:35 2004\n"
2097DUMMY_MESSAGE = """\
2098From: some.body@dummy.domain
2099To: me@my.domain
2100Subject: Simple Test
2101
2102This is a dummy message.
2103"""
2104
2105class MaildirTestCase(unittest.TestCase):
2106
2107    def setUp(self):
2108        # create a new maildir mailbox to work with:
2109        self._dir = support.TESTFN
2110        if os.path.isdir(self._dir):
2111            support.rmtree(self._dir)
2112        elif os.path.isfile(self._dir):
2113            support.unlink(self._dir)
2114        os.mkdir(self._dir)
2115        os.mkdir(os.path.join(self._dir, "cur"))
2116        os.mkdir(os.path.join(self._dir, "tmp"))
2117        os.mkdir(os.path.join(self._dir, "new"))
2118        self._counter = 1
2119        self._msgfiles = []
2120
2121    def tearDown(self):
2122        list(map(os.unlink, self._msgfiles))
2123        support.rmdir(os.path.join(self._dir, "cur"))
2124        support.rmdir(os.path.join(self._dir, "tmp"))
2125        support.rmdir(os.path.join(self._dir, "new"))
2126        support.rmdir(self._dir)
2127
2128    def createMessage(self, dir, mbox=False):
2129        t = int(time.time() % 1000000)
2130        pid = self._counter
2131        self._counter += 1
2132        filename = ".".join((str(t), str(pid), "myhostname", "mydomain"))
2133        tmpname = os.path.join(self._dir, "tmp", filename)
2134        newname = os.path.join(self._dir, dir, filename)
2135        with open(tmpname, "w") as fp:
2136            self._msgfiles.append(tmpname)
2137            if mbox:
2138                fp.write(FROM_)
2139            fp.write(DUMMY_MESSAGE)
2140        try:
2141            os.link(tmpname, newname)
2142        except (AttributeError, PermissionError):
2143            with open(newname, "w") as fp:
2144                fp.write(DUMMY_MESSAGE)
2145        self._msgfiles.append(newname)
2146        return tmpname
2147
2148    def test_empty_maildir(self):
2149        """Test an empty maildir mailbox"""
2150        # Test for regression on bug #117490:
2151        # Make sure the boxes attribute actually gets set.
2152        self.mbox = mailbox.Maildir(support.TESTFN)
2153        #self.assertTrue(hasattr(self.mbox, "boxes"))
2154        #self.assertEqual(len(self.mbox.boxes), 0)
2155        self.assertIsNone(self.mbox.next())
2156        self.assertIsNone(self.mbox.next())
2157
2158    def test_nonempty_maildir_cur(self):
2159        self.createMessage("cur")
2160        self.mbox = mailbox.Maildir(support.TESTFN)
2161        #self.assertEqual(len(self.mbox.boxes), 1)
2162        self.assertIsNotNone(self.mbox.next())
2163        self.assertIsNone(self.mbox.next())
2164        self.assertIsNone(self.mbox.next())
2165
2166    def test_nonempty_maildir_new(self):
2167        self.createMessage("new")
2168        self.mbox = mailbox.Maildir(support.TESTFN)
2169        #self.assertEqual(len(self.mbox.boxes), 1)
2170        self.assertIsNotNone(self.mbox.next())
2171        self.assertIsNone(self.mbox.next())
2172        self.assertIsNone(self.mbox.next())
2173
2174    def test_nonempty_maildir_both(self):
2175        self.createMessage("cur")
2176        self.createMessage("new")
2177        self.mbox = mailbox.Maildir(support.TESTFN)
2178        #self.assertEqual(len(self.mbox.boxes), 2)
2179        self.assertIsNotNone(self.mbox.next())
2180        self.assertIsNotNone(self.mbox.next())
2181        self.assertIsNone(self.mbox.next())
2182        self.assertIsNone(self.mbox.next())
2183
2184## End: tests from the original module (for backward compatibility).
2185
2186
2187_sample_message = """\
2188Return-Path: <gkj@gregorykjohnson.com>
2189X-Original-To: gkj+person@localhost
2190Delivered-To: gkj+person@localhost
2191Received: from localhost (localhost [127.0.0.1])
2192        by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
2193        for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
2194Delivered-To: gkj@sundance.gregorykjohnson.com
2195Received: from localhost [127.0.0.1]
2196        by localhost with POP3 (fetchmail-6.2.5)
2197        for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)
2198Received: from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
2199        by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
2200        for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
2201Received: by andy.gregorykjohnson.com (Postfix, from userid 1000)
2202        id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)
2203Date: Wed, 13 Jul 2005 17:23:11 -0400
2204From: "Gregory K. Johnson" <gkj@gregorykjohnson.com>
2205To: gkj@gregorykjohnson.com
2206Subject: Sample message
2207Message-ID: <20050713212311.GC4701@andy.gregorykjohnson.com>
2208Mime-Version: 1.0
2209Content-Type: multipart/mixed; boundary="NMuMz9nt05w80d4+"
2210Content-Disposition: inline
2211User-Agent: Mutt/1.5.9i
2212
2213
2214--NMuMz9nt05w80d4+
2215Content-Type: text/plain; charset=us-ascii
2216Content-Disposition: inline
2217
2218This is a sample message.
2219
2220--
2221Gregory K. Johnson
2222
2223--NMuMz9nt05w80d4+
2224Content-Type: application/octet-stream
2225Content-Disposition: attachment; filename="text.gz"
2226Content-Transfer-Encoding: base64
2227
2228H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
22293FYlAAAA
2230
2231--NMuMz9nt05w80d4+--
2232"""
2233
2234_bytes_sample_message = _sample_message.encode('ascii')
2235
2236_sample_headers = {
2237    "Return-Path":"<gkj@gregorykjohnson.com>",
2238    "X-Original-To":"gkj+person@localhost",
2239    "Delivered-To":"gkj+person@localhost",
2240    "Received":"""from localhost (localhost [127.0.0.1])
2241        by andy.gregorykjohnson.com (Postfix) with ESMTP id 356ED9DD17
2242        for <gkj+person@localhost>; Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
2243    "Delivered-To":"gkj@sundance.gregorykjohnson.com",
2244    "Received":"""from localhost [127.0.0.1]
2245        by localhost with POP3 (fetchmail-6.2.5)
2246        for gkj+person@localhost (single-drop); Wed, 13 Jul 2005 17:23:16 -0400 (EDT)""",
2247    "Received":"""from andy.gregorykjohnson.com (andy.gregorykjohnson.com [64.32.235.228])
2248        by sundance.gregorykjohnson.com (Postfix) with ESMTP id 5B056316746
2249        for <gkj@gregorykjohnson.com>; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
2250    "Received":"""by andy.gregorykjohnson.com (Postfix, from userid 1000)
2251        id 490CD9DD17; Wed, 13 Jul 2005 17:23:11 -0400 (EDT)""",
2252    "Date":"Wed, 13 Jul 2005 17:23:11 -0400",
2253    "From":""""Gregory K. Johnson" <gkj@gregorykjohnson.com>""",
2254    "To":"gkj@gregorykjohnson.com",
2255    "Subject":"Sample message",
2256    "Mime-Version":"1.0",
2257    "Content-Type":"""multipart/mixed; boundary="NMuMz9nt05w80d4+\"""",
2258    "Content-Disposition":"inline",
2259    "User-Agent": "Mutt/1.5.9i" }
2260
2261_sample_payloads = ("""This is a sample message.
2262
2263--
2264Gregory K. Johnson
2265""",
2266"""H4sICM2D1UIAA3RleHQAC8nILFYAokSFktSKEoW0zJxUPa7wzJIMhZLyfIWczLzUYj0uAHTs
22673FYlAAAA
2268""")
2269
2270
2271class MiscTestCase(unittest.TestCase):
2272    def test__all__(self):
2273        blacklist = {"linesep", "fcntl"}
2274        support.check__all__(self, mailbox, blacklist=blacklist)
2275
2276
2277def test_main():
2278    tests = (TestMailboxSuperclass, TestMaildir, TestMbox, TestMMDF, TestMH,
2279             TestBabyl, TestMessage, TestMaildirMessage, TestMboxMessage,
2280             TestMHMessage, TestBabylMessage, TestMMDFMessage,
2281             TestMessageConversion, TestProxyFile, TestPartialFile,
2282             MaildirTestCase, TestFakeMailBox, MiscTestCase)
2283    support.run_unittest(*tests)
2284    support.reap_children()
2285
2286
2287if __name__ == '__main__':
2288    test_main()
2289