page_set_archive_info_unittest.py revision a36e5920737c6adbddd3e43b760e5de8431db6e0
1# Copyright (c) 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4import os 5import shutil 6import tempfile 7import unittest 8 9from telemetry.page import cloud_storage 10from telemetry.page import page_set_archive_info 11 12 13class MockPage(object): 14 def __init__(self, url): 15 self.url = url 16 17 18url1 = 'http://www.foo.com/' 19url2 = 'http://www.bar.com/' 20url3 = 'http://www.baz.com/' 21recording1 = 'data_001.wpr' 22recording2 = 'data_002.wpr' 23archive_info_contents = (""" 24{ 25"archives": { 26 "%s": ["%s", "%s"], 27 "%s": ["%s"] 28} 29} 30""" % (recording1, url1, url2, recording2, url3)) 31page1 = MockPage(url1) 32page2 = MockPage(url2) 33page3 = MockPage(url3) 34 35 36class TestPageSetArchiveInfo(unittest.TestCase): 37 def setUp(self): 38 self.tmp_dir = tempfile.mkdtemp() 39 # Write the metadata. 40 self.page_set_archive_info_file = os.path.join(self.tmp_dir, 'info.json') 41 with open(self.page_set_archive_info_file, 'w') as f: 42 f.write(archive_info_contents) 43 44 # Write the existing .wpr files. 45 for i in [1, 2]: 46 with open(os.path.join(self.tmp_dir, ('data_00%d.wpr' % i)), 'w') as f: 47 f.write(archive_info_contents) 48 49 # Create the PageSetArchiveInfo object to be tested. 50 self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile( 51 self.page_set_archive_info_file, 52 os.path.join(tempfile.gettempdir(), 'pageset.json')) 53 54 def tearDown(self): 55 shutil.rmtree(self.tmp_dir) 56 57 def assertCorrectHashFile(self, file_path): 58 self.assertTrue(os.path.exists(file_path + '.sha1')) 59 with open(file_path + '.sha1', 'rb') as f: 60 self.assertEquals(cloud_storage.GetHash(file_path), f.read()) 61 62 def testReadingArchiveInfo(self): 63 self.assertEquals(recording1, os.path.basename( 64 self.archive_info.WprFilePathForPage(page1))) 65 self.assertEquals(recording1, os.path.basename( 66 self.archive_info.WprFilePathForPage(page2))) 67 self.assertEquals(recording2, os.path.basename( 68 self.archive_info.WprFilePathForPage(page3))) 69 70 def testModifications(self): 71 recording1_path = os.path.join(self.tmp_dir, recording1) 72 recording2_path = os.path.join(self.tmp_dir, recording2) 73 74 new_recording1 = os.path.join(self.tmp_dir, 'data_003.wpr') 75 new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr') 76 with open(new_temp_recording, 'w') as f: 77 f.write('wpr data') 78 79 self.archive_info.AddNewTemporaryRecording(new_temp_recording) 80 81 self.assertEquals(new_temp_recording, 82 self.archive_info.WprFilePathForPage(page1)) 83 self.assertEquals(new_temp_recording, 84 self.archive_info.WprFilePathForPage(page2)) 85 self.assertEquals(new_temp_recording, 86 self.archive_info.WprFilePathForPage(page3)) 87 88 self.archive_info.AddRecordedPages([page2.url]) 89 90 self.assertTrue(os.path.exists(new_recording1)) 91 self.assertFalse(os.path.exists(new_temp_recording)) 92 93 self.assertTrue(os.path.exists(recording1_path)) 94 self.assertTrue(os.path.exists(recording2_path)) 95 self.assertCorrectHashFile(new_recording1) 96 97 new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr') 98 with open(new_temp_recording, 'w') as f: 99 f.write('wpr data') 100 101 self.archive_info.AddNewTemporaryRecording(new_temp_recording) 102 self.archive_info.AddRecordedPages([page3.url]) 103 104 self.assertTrue(os.path.exists(new_recording2)) 105 self.assertCorrectHashFile(new_recording2) 106 self.assertFalse(os.path.exists(new_temp_recording)) 107 108 self.assertTrue(os.path.exists(recording1_path)) 109 # recording2 is no longer needed, so it was deleted. 110 self.assertFalse(os.path.exists(recording2_path)) 111 112 def testCreatingNewArchiveInfo(self): 113 # Write only the page set without the corresponding metadata file. 114 page_set_contents = (""" 115 { 116 archive_data_file": "new-metadata.json", 117 "pages": [ 118 { 119 "url": "%s", 120 } 121 ] 122 }""" % url1) 123 124 page_set_file = os.path.join(self.tmp_dir, 'new.json') 125 with open(page_set_file, 'w') as f: 126 f.write(page_set_contents) 127 128 self.page_set_archive_info_file = os.path.join(self.tmp_dir, 129 'new-metadata.json') 130 131 # Create the PageSetArchiveInfo object to be tested. 132 self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile( 133 self.page_set_archive_info_file, page_set_file) 134 135 # Add a recording for all the pages. 136 new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr') 137 with open(new_temp_recording, 'w') as f: 138 f.write('wpr data') 139 140 self.archive_info.AddNewTemporaryRecording(new_temp_recording) 141 142 self.assertEquals(new_temp_recording, 143 self.archive_info.WprFilePathForPage(page1)) 144 145 self.archive_info.AddRecordedPages([page1.url]) 146 147 # Expected name for the recording (decided by PageSetArchiveInfo). 148 new_recording = os.path.join(self.tmp_dir, 'new_000.wpr') 149 150 self.assertTrue(os.path.exists(new_recording)) 151 self.assertFalse(os.path.exists(new_temp_recording)) 152 self.assertCorrectHashFile(new_recording) 153 154 # Check that the archive info was written correctly. 155 self.assertTrue(os.path.exists(self.page_set_archive_info_file)) 156 read_archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile( 157 self.page_set_archive_info_file, 158 os.path.join(tempfile.gettempdir(), 'pageset.json')) 159 self.assertEquals(new_recording, 160 read_archive_info.WprFilePathForPage(page1)) 161 self.assertCorrectHashFile(self.page_set_archive_info_file) 162