full_update_generator.cc revision f329b933db41d26644a97afef928eb1b319d6d99
1// Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "update_engine/payload_generator/full_update_generator.h" 6 7#include <fcntl.h> 8#include <inttypes.h> 9 10#include <algorithm> 11#include <deque> 12#include <memory> 13 14#include <base/strings/string_util.h> 15#include <base/strings/stringprintf.h> 16 17#include "update_engine/bzip.h" 18#include "update_engine/utils.h" 19 20using std::deque; 21using std::shared_ptr; 22using std::string; 23using std::vector; 24 25namespace chromeos_update_engine { 26 27namespace { 28 29// This class encapsulates a full update chunk processing thread. The processor 30// reads a chunk of data from the input file descriptor and compresses it. The 31// processor needs to be started through Start() then waited on through Wait(). 32class ChunkProcessor { 33 public: 34 // Read a chunk of |size| bytes from |fd| starting at offset |offset|. 35 ChunkProcessor(int fd, off_t offset, size_t size) 36 : thread_(nullptr), 37 fd_(fd), 38 offset_(offset), 39 buffer_in_(size) {} 40 ~ChunkProcessor() { Wait(); } 41 42 off_t offset() const { return offset_; } 43 const vector<char>& buffer_in() const { return buffer_in_; } 44 const vector<char>& buffer_compressed() const { return buffer_compressed_; } 45 46 // Starts the processor. Returns true on success, false on failure. 47 bool Start(); 48 49 // Waits for the processor to complete. Returns true on success, false on 50 // failure. 51 bool Wait(); 52 53 bool ShouldCompress() const { 54 return buffer_compressed_.size() < buffer_in_.size(); 55 } 56 57 private: 58 // Reads the input data into |buffer_in_| and compresses it into 59 // |buffer_compressed_|. Returns true on success, false otherwise. 60 bool ReadAndCompress(); 61 static gpointer ReadAndCompressThread(gpointer data); 62 63 GThread* thread_; 64 int fd_; 65 off_t offset_; 66 vector<char> buffer_in_; 67 vector<char> buffer_compressed_; 68 69 DISALLOW_COPY_AND_ASSIGN(ChunkProcessor); 70}; 71 72bool ChunkProcessor::Start() { 73 // g_thread_create is deprecated since glib 2.32. Use 74 // g_thread_new instead. 75 thread_ = g_thread_try_new("chunk_proc", ReadAndCompressThread, this, 76 nullptr); 77 TEST_AND_RETURN_FALSE(thread_ != nullptr); 78 return true; 79} 80 81bool ChunkProcessor::Wait() { 82 if (!thread_) { 83 return false; 84 } 85 gpointer result = g_thread_join(thread_); 86 thread_ = nullptr; 87 TEST_AND_RETURN_FALSE(result == this); 88 return true; 89} 90 91gpointer ChunkProcessor::ReadAndCompressThread(gpointer data) { 92 return reinterpret_cast<ChunkProcessor*>(data)->ReadAndCompress() ? 93 data : nullptr; 94} 95 96bool ChunkProcessor::ReadAndCompress() { 97 ssize_t bytes_read = -1; 98 TEST_AND_RETURN_FALSE(utils::PReadAll(fd_, 99 buffer_in_.data(), 100 buffer_in_.size(), 101 offset_, 102 &bytes_read)); 103 TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(buffer_in_.size())); 104 TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &buffer_compressed_)); 105 return true; 106} 107 108} // namespace 109 110bool FullUpdateGenerator::Run( 111 Graph* graph, 112 const string& new_kernel_part, 113 const string& new_image, 114 off_t image_size, 115 int fd, 116 off_t* data_file_size, 117 off_t chunk_size, 118 off_t block_size, 119 vector<DeltaArchiveManifest_InstallOperation>* kernel_ops, 120 vector<Vertex::Index>* final_order) { 121 TEST_AND_RETURN_FALSE(chunk_size > 0); 122 TEST_AND_RETURN_FALSE((chunk_size % block_size) == 0); 123 124 size_t max_threads = std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L); 125 LOG(INFO) << "Max threads: " << max_threads; 126 127 // Get the sizes early in the function, so we can fail fast if the user 128 // passed us bad paths. 129 TEST_AND_RETURN_FALSE(image_size >= 0 && 130 image_size <= utils::FileSize(new_image)); 131 const off_t kernel_size = utils::FileSize(new_kernel_part); 132 TEST_AND_RETURN_FALSE(kernel_size >= 0); 133 134 off_t part_sizes[] = { image_size, kernel_size }; 135 string paths[] = { new_image, new_kernel_part }; 136 137 for (int partition = 0; partition < 2; ++partition) { 138 const string& path = paths[partition]; 139 LOG(INFO) << "compressing " << path; 140 int in_fd = open(path.c_str(), O_RDONLY, 0); 141 TEST_AND_RETURN_FALSE(in_fd >= 0); 142 ScopedFdCloser in_fd_closer(&in_fd); 143 deque<shared_ptr<ChunkProcessor>> threads; 144 int last_progress_update = INT_MIN; 145 off_t bytes_left = part_sizes[partition], counter = 0, offset = 0; 146 while (bytes_left > 0 || !threads.empty()) { 147 // Check and start new chunk processors if possible. 148 while (threads.size() < max_threads && bytes_left > 0) { 149 shared_ptr<ChunkProcessor> processor( 150 new ChunkProcessor(in_fd, offset, 151 std::min(bytes_left, chunk_size))); 152 threads.push_back(processor); 153 TEST_AND_RETURN_FALSE(processor->Start()); 154 bytes_left -= chunk_size; 155 offset += chunk_size; 156 } 157 158 // Need to wait for a chunk processor to complete and process its output 159 // before spawning new processors. 160 shared_ptr<ChunkProcessor> processor = threads.front(); 161 threads.pop_front(); 162 TEST_AND_RETURN_FALSE(processor->Wait()); 163 164 DeltaArchiveManifest_InstallOperation* op = nullptr; 165 if (partition == 0) { 166 graph->resize(graph->size() + 1); 167 graph->back().file_name = 168 base::StringPrintf("<rootfs-operation-%" PRIi64 ">", counter++); 169 op = &graph->back().op; 170 final_order->push_back(graph->size() - 1); 171 } else { 172 kernel_ops->resize(kernel_ops->size() + 1); 173 op = &kernel_ops->back(); 174 } 175 176 const bool compress = processor->ShouldCompress(); 177 const vector<char>& use_buf = 178 compress ? processor->buffer_compressed() : processor->buffer_in(); 179 op->set_type(compress ? 180 DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ : 181 DeltaArchiveManifest_InstallOperation_Type_REPLACE); 182 op->set_data_offset(*data_file_size); 183 TEST_AND_RETURN_FALSE(utils::WriteAll(fd, &use_buf[0], use_buf.size())); 184 *data_file_size += use_buf.size(); 185 op->set_data_length(use_buf.size()); 186 Extent* dst_extent = op->add_dst_extents(); 187 dst_extent->set_start_block(processor->offset() / block_size); 188 dst_extent->set_num_blocks(chunk_size / block_size); 189 190 int progress = static_cast<int>( 191 (processor->offset() + processor->buffer_in().size()) * 100.0 / 192 part_sizes[partition]); 193 if (last_progress_update < progress && 194 (last_progress_update + 10 <= progress || progress == 100)) { 195 LOG(INFO) << progress << "% complete (output size: " 196 << *data_file_size << ")"; 197 last_progress_update = progress; 198 } 199 } 200 } 201 202 return true; 203} 204 205} // namespace chromeos_update_engine 206