1# Copyright 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4import json 5import os 6import shutil 7import tempfile 8import unittest 9 10from telemetry.page import page 11from telemetry.page import page_set_archive_info 12from telemetry.util import cloud_storage 13 14 15class MockPage(page.Page): 16 def __init__(self, url, name=None): 17 super(MockPage, self).__init__(url, None, name=name) 18 19 20page1 = MockPage('http://www.foo.com/', 'Foo') 21page2 = MockPage('http://www.bar.com/', 'Bar') 22page3 = MockPage('http://www.baz.com/') 23recording1 = 'data_001.wpr' 24recording2 = 'data_002.wpr' 25archive_info_contents = (""" 26{ 27"archives": { 28 "%s": ["%s", "%s"], 29 "%s": ["%s"] 30} 31} 32""" % (recording1, page1.display_name, page2.display_name, recording2, 33 page3.display_name)) 34 35 36class TestPageSetArchiveInfo(unittest.TestCase): 37 def setUp(self): 38 self.tmp_dir = tempfile.mkdtemp() 39 # Write the metadata. 40 self.page_set_archive_info_file = os.path.join(self.tmp_dir, 'info.json') 41 with open(self.page_set_archive_info_file, 'w') as f: 42 f.write(archive_info_contents) 43 44 # Write the existing .wpr files. 45 for i in [1, 2]: 46 with open(os.path.join(self.tmp_dir, ('data_00%d.wpr' % i)), 'w') as f: 47 f.write(archive_info_contents) 48 49 # Create the PageSetArchiveInfo object to be tested. 50 self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile( 51 self.page_set_archive_info_file) 52 53 def tearDown(self): 54 shutil.rmtree(self.tmp_dir) 55 56 def assertCorrectHashFile(self, file_path): 57 self.assertTrue(os.path.exists(file_path + '.sha1')) 58 with open(file_path + '.sha1', 'rb') as f: 59 self.assertEquals(cloud_storage.CalculateHash(file_path), f.read()) 60 61 def testReadingArchiveInfo(self): 62 self.assertIsNotNone(self.archive_info.WprFilePathForPage(page1)) 63 self.assertEquals(recording1, os.path.basename( 64 self.archive_info.WprFilePathForPage(page1))) 65 66 self.assertIsNotNone(self.archive_info.WprFilePathForPage(page2)) 67 self.assertEquals(recording1, os.path.basename( 68 self.archive_info.WprFilePathForPage(page2))) 69 70 self.assertIsNotNone(self.archive_info.WprFilePathForPage(page3)) 71 self.assertEquals(recording2, os.path.basename( 72 self.archive_info.WprFilePathForPage(page3))) 73 74 def testArchiveInfoFileGetsUpdated(self): 75 """Ensures that the archive info file is updated correctly.""" 76 77 expected_archive_file_contents = { 78 u'description': (u'Describes the Web Page Replay archives for a page' 79 u' set. Don\'t edit by hand! Use record_wpr for' 80 u' updating.'), 81 u'archives': { 82 u'data_003.wpr': [u'Bar', u'http://www.baz.com/'], 83 u'data_001.wpr': [u'Foo'] 84 } 85 } 86 87 new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr') 88 with open(new_temp_recording, 'w') as f: 89 f.write('wpr data') 90 self.archive_info.AddNewTemporaryRecording(new_temp_recording) 91 self.archive_info.AddRecordedPages([page2, page3]) 92 93 with open(self.page_set_archive_info_file, 'r') as f: 94 archive_file_contents = json.load(f) 95 self.assertEquals(expected_archive_file_contents, archive_file_contents) 96 97 def testModifications(self): 98 recording1_path = os.path.join(self.tmp_dir, recording1) 99 recording2_path = os.path.join(self.tmp_dir, recording2) 100 101 new_recording1 = os.path.join(self.tmp_dir, 'data_003.wpr') 102 new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr') 103 with open(new_temp_recording, 'w') as f: 104 f.write('wpr data') 105 106 self.archive_info.AddNewTemporaryRecording(new_temp_recording) 107 108 self.assertEquals(new_temp_recording, 109 self.archive_info.WprFilePathForPage(page1)) 110 self.assertEquals(new_temp_recording, 111 self.archive_info.WprFilePathForPage(page2)) 112 self.assertEquals(new_temp_recording, 113 self.archive_info.WprFilePathForPage(page3)) 114 115 self.archive_info.AddRecordedPages([page2]) 116 117 self.assertTrue(os.path.exists(new_recording1)) 118 self.assertFalse(os.path.exists(new_temp_recording)) 119 120 self.assertTrue(os.path.exists(recording1_path)) 121 self.assertTrue(os.path.exists(recording2_path)) 122 self.assertCorrectHashFile(new_recording1) 123 124 new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr') 125 with open(new_temp_recording, 'w') as f: 126 f.write('wpr data') 127 128 self.archive_info.AddNewTemporaryRecording(new_temp_recording) 129 self.archive_info.AddRecordedPages([page3]) 130 131 self.assertTrue(os.path.exists(new_recording2)) 132 self.assertCorrectHashFile(new_recording2) 133 self.assertFalse(os.path.exists(new_temp_recording)) 134 135 self.assertTrue(os.path.exists(recording1_path)) 136 # recording2 is no longer needed, so it was deleted. 137 self.assertFalse(os.path.exists(recording2_path)) 138 139 def testCreatingNewArchiveInfo(self): 140 # Write only the page set without the corresponding metadata file. 141 page_set_contents = (""" 142 { 143 archive_data_file": "new_archive_info.json", 144 "pages": [ 145 { 146 "url": "%s", 147 } 148 ] 149 }""" % page1.url) 150 151 page_set_file = os.path.join(self.tmp_dir, 'new_page_set.json') 152 with open(page_set_file, 'w') as f: 153 f.write(page_set_contents) 154 155 self.page_set_archive_info_file = os.path.join(self.tmp_dir, 156 'new_archive_info.json') 157 158 # Create the PageSetArchiveInfo object to be tested. 159 self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile( 160 self.page_set_archive_info_file) 161 162 # Add a recording for all the pages. 163 new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr') 164 with open(new_temp_recording, 'w') as f: 165 f.write('wpr data') 166 167 self.archive_info.AddNewTemporaryRecording(new_temp_recording) 168 169 self.assertEquals(new_temp_recording, 170 self.archive_info.WprFilePathForPage(page1)) 171 172 self.archive_info.AddRecordedPages([page1]) 173 174 # Expected name for the recording (decided by PageSetArchiveInfo). 175 new_recording = os.path.join(self.tmp_dir, 'new_archive_info_000.wpr') 176 177 self.assertTrue(os.path.exists(new_recording)) 178 self.assertFalse(os.path.exists(new_temp_recording)) 179 self.assertCorrectHashFile(new_recording) 180 181 # Check that the archive info was written correctly. 182 self.assertTrue(os.path.exists(self.page_set_archive_info_file)) 183 read_archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile( 184 self.page_set_archive_info_file) 185 self.assertEquals(new_recording, 186 read_archive_info.WprFilePathForPage(page1)) 187