full_update_generator.cc revision 161c4a132743f15fc4757112b673085c2a7a7f29
1// Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "update_engine/payload_generator/full_update_generator.h" 6 7#include <fcntl.h> 8#include <inttypes.h> 9 10#include <tr1/memory> 11 12#include <base/strings/string_util.h> 13#include <base/strings/stringprintf.h> 14 15#include "update_engine/bzip.h" 16#include "update_engine/utils.h" 17 18using std::deque; 19using std::max; 20using std::min; 21using std::string; 22using std::tr1::shared_ptr; 23using std::vector; 24 25namespace chromeos_update_engine { 26 27namespace { 28 29// This class encapsulates a full update chunk processing thread. The processor 30// reads a chunk of data from the input file descriptor and compresses it. The 31// processor needs to be started through Start() then waited on through Wait(). 32class ChunkProcessor { 33 public: 34 // Read a chunk of |size| bytes from |fd| starting at offset |offset|. 35 ChunkProcessor(int fd, off_t offset, size_t size) 36 : thread_(NULL), 37 fd_(fd), 38 offset_(offset), 39 buffer_in_(size) {} 40 ~ChunkProcessor() { Wait(); } 41 42 off_t offset() const { return offset_; } 43 const vector<char>& buffer_in() const { return buffer_in_; } 44 const vector<char>& buffer_compressed() const { return buffer_compressed_; } 45 46 // Starts the processor. Returns true on success, false on failure. 47 bool Start(); 48 49 // Waits for the processor to complete. Returns true on success, false on 50 // failure. 51 bool Wait(); 52 53 bool ShouldCompress() const { 54 return buffer_compressed_.size() < buffer_in_.size(); 55 } 56 57 private: 58 // Reads the input data into |buffer_in_| and compresses it into 59 // |buffer_compressed_|. Returns true on success, false otherwise. 60 bool ReadAndCompress(); 61 static gpointer ReadAndCompressThread(gpointer data); 62 63 GThread* thread_; 64 int fd_; 65 off_t offset_; 66 vector<char> buffer_in_; 67 vector<char> buffer_compressed_; 68 69 DISALLOW_COPY_AND_ASSIGN(ChunkProcessor); 70}; 71 72bool ChunkProcessor::Start() { 73 // g_thread_create is deprecated since glib 2.32. Use 74 // g_thread_new instead. 75 thread_ = g_thread_try_new("chunk_proc", ReadAndCompressThread, this, NULL); 76 TEST_AND_RETURN_FALSE(thread_ != NULL); 77 return true; 78} 79 80bool ChunkProcessor::Wait() { 81 if (!thread_) { 82 return false; 83 } 84 gpointer result = g_thread_join(thread_); 85 thread_ = NULL; 86 TEST_AND_RETURN_FALSE(result == this); 87 return true; 88} 89 90gpointer ChunkProcessor::ReadAndCompressThread(gpointer data) { 91 return 92 reinterpret_cast<ChunkProcessor*>(data)->ReadAndCompress() ? data : NULL; 93} 94 95bool ChunkProcessor::ReadAndCompress() { 96 ssize_t bytes_read = -1; 97 TEST_AND_RETURN_FALSE(utils::PReadAll(fd_, 98 buffer_in_.data(), 99 buffer_in_.size(), 100 offset_, 101 &bytes_read)); 102 TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(buffer_in_.size())); 103 TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &buffer_compressed_)); 104 return true; 105} 106 107} // namespace 108 109bool FullUpdateGenerator::Run( 110 Graph* graph, 111 const std::string& new_kernel_part, 112 const std::string& new_image, 113 off_t image_size, 114 int fd, 115 off_t* data_file_size, 116 off_t chunk_size, 117 off_t block_size, 118 vector<DeltaArchiveManifest_InstallOperation>* kernel_ops, 119 std::vector<Vertex::Index>* final_order) { 120 TEST_AND_RETURN_FALSE(chunk_size > 0); 121 TEST_AND_RETURN_FALSE((chunk_size % block_size) == 0); 122 123 size_t max_threads = max(sysconf(_SC_NPROCESSORS_ONLN), 4L); 124 LOG(INFO) << "Max threads: " << max_threads; 125 126 // Get the sizes early in the function, so we can fail fast if the user 127 // passed us bad paths. 128 TEST_AND_RETURN_FALSE(image_size >= 0 && 129 image_size <= utils::FileSize(new_image)); 130 const off_t kernel_size = utils::FileSize(new_kernel_part); 131 TEST_AND_RETURN_FALSE(kernel_size >= 0); 132 133 off_t part_sizes[] = { image_size, kernel_size }; 134 string paths[] = { new_image, new_kernel_part }; 135 136 for (int partition = 0; partition < 2; ++partition) { 137 const string& path = paths[partition]; 138 LOG(INFO) << "compressing " << path; 139 int in_fd = open(path.c_str(), O_RDONLY, 0); 140 TEST_AND_RETURN_FALSE(in_fd >= 0); 141 ScopedFdCloser in_fd_closer(&in_fd); 142 deque<shared_ptr<ChunkProcessor> > threads; 143 int last_progress_update = INT_MIN; 144 off_t bytes_left = part_sizes[partition], counter = 0, offset = 0; 145 while (bytes_left > 0 || !threads.empty()) { 146 // Check and start new chunk processors if possible. 147 while (threads.size() < max_threads && bytes_left > 0) { 148 shared_ptr<ChunkProcessor> processor( 149 new ChunkProcessor(in_fd, offset, min(bytes_left, chunk_size))); 150 threads.push_back(processor); 151 TEST_AND_RETURN_FALSE(processor->Start()); 152 bytes_left -= chunk_size; 153 offset += chunk_size; 154 } 155 156 // Need to wait for a chunk processor to complete and process its ouput 157 // before spawning new processors. 158 shared_ptr<ChunkProcessor> processor = threads.front(); 159 threads.pop_front(); 160 TEST_AND_RETURN_FALSE(processor->Wait()); 161 162 DeltaArchiveManifest_InstallOperation* op = NULL; 163 if (partition == 0) { 164 graph->resize(graph->size() + 1); 165 graph->back().file_name = 166 base::StringPrintf("<rootfs-operation-%" PRIi64 ">", counter++); 167 op = &graph->back().op; 168 final_order->push_back(graph->size() - 1); 169 } else { 170 kernel_ops->resize(kernel_ops->size() + 1); 171 op = &kernel_ops->back(); 172 } 173 174 const bool compress = processor->ShouldCompress(); 175 const vector<char>& use_buf = 176 compress ? processor->buffer_compressed() : processor->buffer_in(); 177 op->set_type(compress ? 178 DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ : 179 DeltaArchiveManifest_InstallOperation_Type_REPLACE); 180 op->set_data_offset(*data_file_size); 181 TEST_AND_RETURN_FALSE(utils::WriteAll(fd, &use_buf[0], use_buf.size())); 182 *data_file_size += use_buf.size(); 183 op->set_data_length(use_buf.size()); 184 Extent* dst_extent = op->add_dst_extents(); 185 dst_extent->set_start_block(processor->offset() / block_size); 186 dst_extent->set_num_blocks(chunk_size / block_size); 187 188 int progress = static_cast<int>( 189 (processor->offset() + processor->buffer_in().size()) * 100.0 / 190 part_sizes[partition]); 191 if (last_progress_update < progress && 192 (last_progress_update + 10 <= progress || progress == 100)) { 193 LOG(INFO) << progress << "% complete (output size: " 194 << *data_file_size << ")"; 195 last_progress_update = progress; 196 } 197 } 198 } 199 200 return true; 201} 202 203} // namespace chromeos_update_engine 204