page_set_archive_info_unittest.py revision a36e5920737c6adbddd3e43b760e5de8431db6e0
1# Copyright (c) 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4import os
5import shutil
6import tempfile
7import unittest
8
9from telemetry.page import cloud_storage
10from telemetry.page import page_set_archive_info
11
12
13class MockPage(object):
14  def __init__(self, url):
15    self.url = url
16
17
18url1 = 'http://www.foo.com/'
19url2 = 'http://www.bar.com/'
20url3 = 'http://www.baz.com/'
21recording1 = 'data_001.wpr'
22recording2 = 'data_002.wpr'
23archive_info_contents = ("""
24{
25"archives": {
26  "%s": ["%s", "%s"],
27  "%s": ["%s"]
28}
29}
30""" % (recording1, url1, url2, recording2, url3))
31page1 = MockPage(url1)
32page2 = MockPage(url2)
33page3 = MockPage(url3)
34
35
36class TestPageSetArchiveInfo(unittest.TestCase):
37  def setUp(self):
38    self.tmp_dir = tempfile.mkdtemp()
39    # Write the metadata.
40    self.page_set_archive_info_file = os.path.join(self.tmp_dir, 'info.json')
41    with open(self.page_set_archive_info_file, 'w') as f:
42      f.write(archive_info_contents)
43
44    # Write the existing .wpr files.
45    for i in [1, 2]:
46      with open(os.path.join(self.tmp_dir, ('data_00%d.wpr' % i)), 'w') as f:
47        f.write(archive_info_contents)
48
49    # Create the PageSetArchiveInfo object to be tested.
50    self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
51        self.page_set_archive_info_file,
52        os.path.join(tempfile.gettempdir(), 'pageset.json'))
53
54  def tearDown(self):
55    shutil.rmtree(self.tmp_dir)
56
57  def assertCorrectHashFile(self, file_path):
58    self.assertTrue(os.path.exists(file_path + '.sha1'))
59    with open(file_path + '.sha1', 'rb') as f:
60      self.assertEquals(cloud_storage.GetHash(file_path), f.read())
61
62  def testReadingArchiveInfo(self):
63    self.assertEquals(recording1, os.path.basename(
64        self.archive_info.WprFilePathForPage(page1)))
65    self.assertEquals(recording1, os.path.basename(
66        self.archive_info.WprFilePathForPage(page2)))
67    self.assertEquals(recording2, os.path.basename(
68        self.archive_info.WprFilePathForPage(page3)))
69
70  def testModifications(self):
71    recording1_path = os.path.join(self.tmp_dir, recording1)
72    recording2_path = os.path.join(self.tmp_dir, recording2)
73
74    new_recording1 = os.path.join(self.tmp_dir, 'data_003.wpr')
75    new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
76    with open(new_temp_recording, 'w') as f:
77      f.write('wpr data')
78
79    self.archive_info.AddNewTemporaryRecording(new_temp_recording)
80
81    self.assertEquals(new_temp_recording,
82                      self.archive_info.WprFilePathForPage(page1))
83    self.assertEquals(new_temp_recording,
84                      self.archive_info.WprFilePathForPage(page2))
85    self.assertEquals(new_temp_recording,
86                      self.archive_info.WprFilePathForPage(page3))
87
88    self.archive_info.AddRecordedPages([page2.url])
89
90    self.assertTrue(os.path.exists(new_recording1))
91    self.assertFalse(os.path.exists(new_temp_recording))
92
93    self.assertTrue(os.path.exists(recording1_path))
94    self.assertTrue(os.path.exists(recording2_path))
95    self.assertCorrectHashFile(new_recording1)
96
97    new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr')
98    with open(new_temp_recording, 'w') as f:
99      f.write('wpr data')
100
101    self.archive_info.AddNewTemporaryRecording(new_temp_recording)
102    self.archive_info.AddRecordedPages([page3.url])
103
104    self.assertTrue(os.path.exists(new_recording2))
105    self.assertCorrectHashFile(new_recording2)
106    self.assertFalse(os.path.exists(new_temp_recording))
107
108    self.assertTrue(os.path.exists(recording1_path))
109    # recording2 is no longer needed, so it was deleted.
110    self.assertFalse(os.path.exists(recording2_path))
111
112  def testCreatingNewArchiveInfo(self):
113    # Write only the page set without the corresponding metadata file.
114    page_set_contents = ("""
115    {
116        archive_data_file": "new-metadata.json",
117        "pages": [
118            {
119                "url": "%s",
120            }
121        ]
122    }""" % url1)
123
124    page_set_file = os.path.join(self.tmp_dir, 'new.json')
125    with open(page_set_file, 'w') as f:
126      f.write(page_set_contents)
127
128    self.page_set_archive_info_file = os.path.join(self.tmp_dir,
129                                                   'new-metadata.json')
130
131    # Create the PageSetArchiveInfo object to be tested.
132    self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
133        self.page_set_archive_info_file, page_set_file)
134
135    # Add a recording for all the pages.
136    new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
137    with open(new_temp_recording, 'w') as f:
138      f.write('wpr data')
139
140    self.archive_info.AddNewTemporaryRecording(new_temp_recording)
141
142    self.assertEquals(new_temp_recording,
143                      self.archive_info.WprFilePathForPage(page1))
144
145    self.archive_info.AddRecordedPages([page1.url])
146
147    # Expected name for the recording (decided by PageSetArchiveInfo).
148    new_recording = os.path.join(self.tmp_dir, 'new_000.wpr')
149
150    self.assertTrue(os.path.exists(new_recording))
151    self.assertFalse(os.path.exists(new_temp_recording))
152    self.assertCorrectHashFile(new_recording)
153
154    # Check that the archive info was written correctly.
155    self.assertTrue(os.path.exists(self.page_set_archive_info_file))
156    read_archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
157        self.page_set_archive_info_file,
158        os.path.join(tempfile.gettempdir(), 'pageset.json'))
159    self.assertEquals(new_recording,
160                      read_archive_info.WprFilePathForPage(page1))
161    self.assertCorrectHashFile(self.page_set_archive_info_file)
162