1# Copyright 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4import json
5import os
6import shutil
7import tempfile
8import unittest
9
10from telemetry.page import page
11from telemetry.page import page_set_archive_info
12from telemetry.util import cloud_storage
13
14
15class MockPage(page.Page):
16  def __init__(self, url, name=None):
17    super(MockPage, self).__init__(url, None, name=name)
18
19
20page1 = MockPage('http://www.foo.com/', 'Foo')
21page2 = MockPage('http://www.bar.com/', 'Bar')
22page3 = MockPage('http://www.baz.com/')
23recording1 = 'data_001.wpr'
24recording2 = 'data_002.wpr'
25archive_info_contents = ("""
26{
27"archives": {
28  "%s": ["%s", "%s"],
29  "%s": ["%s"]
30}
31}
32""" % (recording1, page1.display_name, page2.display_name, recording2,
33       page3.display_name))
34
35
36class TestPageSetArchiveInfo(unittest.TestCase):
37  def setUp(self):
38    self.tmp_dir = tempfile.mkdtemp()
39    # Write the metadata.
40    self.page_set_archive_info_file = os.path.join(self.tmp_dir, 'info.json')
41    with open(self.page_set_archive_info_file, 'w') as f:
42      f.write(archive_info_contents)
43
44    # Write the existing .wpr files.
45    for i in [1, 2]:
46      with open(os.path.join(self.tmp_dir, ('data_00%d.wpr' % i)), 'w') as f:
47        f.write(archive_info_contents)
48
49    # Create the PageSetArchiveInfo object to be tested.
50    self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
51        self.page_set_archive_info_file)
52
53  def tearDown(self):
54    shutil.rmtree(self.tmp_dir)
55
56  def assertCorrectHashFile(self, file_path):
57    self.assertTrue(os.path.exists(file_path + '.sha1'))
58    with open(file_path + '.sha1', 'rb') as f:
59      self.assertEquals(cloud_storage.CalculateHash(file_path), f.read())
60
61  def testReadingArchiveInfo(self):
62    self.assertIsNotNone(self.archive_info.WprFilePathForPage(page1))
63    self.assertEquals(recording1, os.path.basename(
64        self.archive_info.WprFilePathForPage(page1)))
65
66    self.assertIsNotNone(self.archive_info.WprFilePathForPage(page2))
67    self.assertEquals(recording1, os.path.basename(
68        self.archive_info.WprFilePathForPage(page2)))
69
70    self.assertIsNotNone(self.archive_info.WprFilePathForPage(page3))
71    self.assertEquals(recording2, os.path.basename(
72        self.archive_info.WprFilePathForPage(page3)))
73
74  def testArchiveInfoFileGetsUpdated(self):
75    """Ensures that the archive info file is updated correctly."""
76
77    expected_archive_file_contents = {
78        u'description': (u'Describes the Web Page Replay archives for a page'
79                         u' set. Don\'t edit by hand! Use record_wpr for'
80                         u' updating.'),
81        u'archives': {
82            u'data_003.wpr': [u'Bar', u'http://www.baz.com/'],
83            u'data_001.wpr': [u'Foo']
84        }
85    }
86
87    new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
88    with open(new_temp_recording, 'w') as f:
89      f.write('wpr data')
90    self.archive_info.AddNewTemporaryRecording(new_temp_recording)
91    self.archive_info.AddRecordedPages([page2, page3])
92
93    with open(self.page_set_archive_info_file, 'r') as f:
94      archive_file_contents = json.load(f)
95      self.assertEquals(expected_archive_file_contents, archive_file_contents)
96
97  def testModifications(self):
98    recording1_path = os.path.join(self.tmp_dir, recording1)
99    recording2_path = os.path.join(self.tmp_dir, recording2)
100
101    new_recording1 = os.path.join(self.tmp_dir, 'data_003.wpr')
102    new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
103    with open(new_temp_recording, 'w') as f:
104      f.write('wpr data')
105
106    self.archive_info.AddNewTemporaryRecording(new_temp_recording)
107
108    self.assertEquals(new_temp_recording,
109                      self.archive_info.WprFilePathForPage(page1))
110    self.assertEquals(new_temp_recording,
111                      self.archive_info.WprFilePathForPage(page2))
112    self.assertEquals(new_temp_recording,
113                      self.archive_info.WprFilePathForPage(page3))
114
115    self.archive_info.AddRecordedPages([page2])
116
117    self.assertTrue(os.path.exists(new_recording1))
118    self.assertFalse(os.path.exists(new_temp_recording))
119
120    self.assertTrue(os.path.exists(recording1_path))
121    self.assertTrue(os.path.exists(recording2_path))
122    self.assertCorrectHashFile(new_recording1)
123
124    new_recording2 = os.path.join(self.tmp_dir, 'data_004.wpr')
125    with open(new_temp_recording, 'w') as f:
126      f.write('wpr data')
127
128    self.archive_info.AddNewTemporaryRecording(new_temp_recording)
129    self.archive_info.AddRecordedPages([page3])
130
131    self.assertTrue(os.path.exists(new_recording2))
132    self.assertCorrectHashFile(new_recording2)
133    self.assertFalse(os.path.exists(new_temp_recording))
134
135    self.assertTrue(os.path.exists(recording1_path))
136    # recording2 is no longer needed, so it was deleted.
137    self.assertFalse(os.path.exists(recording2_path))
138
139  def testCreatingNewArchiveInfo(self):
140    # Write only the page set without the corresponding metadata file.
141    page_set_contents = ("""
142    {
143        archive_data_file": "new_archive_info.json",
144        "pages": [
145            {
146                "url": "%s",
147            }
148        ]
149    }""" % page1.url)
150
151    page_set_file = os.path.join(self.tmp_dir, 'new_page_set.json')
152    with open(page_set_file, 'w') as f:
153      f.write(page_set_contents)
154
155    self.page_set_archive_info_file = os.path.join(self.tmp_dir,
156                                                   'new_archive_info.json')
157
158    # Create the PageSetArchiveInfo object to be tested.
159    self.archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
160        self.page_set_archive_info_file)
161
162    # Add a recording for all the pages.
163    new_temp_recording = os.path.join(self.tmp_dir, 'recording.wpr')
164    with open(new_temp_recording, 'w') as f:
165      f.write('wpr data')
166
167    self.archive_info.AddNewTemporaryRecording(new_temp_recording)
168
169    self.assertEquals(new_temp_recording,
170                      self.archive_info.WprFilePathForPage(page1))
171
172    self.archive_info.AddRecordedPages([page1])
173
174    # Expected name for the recording (decided by PageSetArchiveInfo).
175    new_recording = os.path.join(self.tmp_dir, 'new_archive_info_000.wpr')
176
177    self.assertTrue(os.path.exists(new_recording))
178    self.assertFalse(os.path.exists(new_temp_recording))
179    self.assertCorrectHashFile(new_recording)
180
181    # Check that the archive info was written correctly.
182    self.assertTrue(os.path.exists(self.page_set_archive_info_file))
183    read_archive_info = page_set_archive_info.PageSetArchiveInfo.FromFile(
184        self.page_set_archive_info_file)
185    self.assertEquals(new_recording,
186                      read_archive_info.WprFilePathForPage(page1))
187