full_update_generator.cc revision 477aec2166a571cbe28081d806c3226e8b31b6e9
1// Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "update_engine/payload_generator/full_update_generator.h" 6 7#include <fcntl.h> 8#include <inttypes.h> 9 10#include <algorithm> 11#include <deque> 12#include <memory> 13 14#include <base/format_macros.h> 15#include <base/strings/stringprintf.h> 16#include <base/strings/string_util.h> 17 18#include "update_engine/bzip.h" 19#include "update_engine/utils.h" 20 21using std::deque; 22using std::shared_ptr; 23using std::string; 24using std::vector; 25 26namespace chromeos_update_engine { 27 28namespace { 29 30const size_t kDefaultFullChunkSize = 1024 * 1024; // 1 MiB 31 32// This class encapsulates a full update chunk processing thread. The processor 33// reads a chunk of data from the input file descriptor and compresses it. The 34// processor needs to be started through Start() then waited on through Wait(). 35class ChunkProcessor { 36 public: 37 // Read a chunk of |size| bytes from |fd| starting at offset |offset|. 38 ChunkProcessor(int fd, off_t offset, size_t size) 39 : thread_(nullptr), 40 fd_(fd), 41 offset_(offset), 42 buffer_in_(size) {} 43 ~ChunkProcessor() { Wait(); } 44 45 off_t offset() const { return offset_; } 46 const chromeos::Blob& buffer_in() const { return buffer_in_; } 47 const chromeos::Blob& buffer_compressed() const { return buffer_compressed_; } 48 49 // Starts the processor. Returns true on success, false on failure. 50 bool Start(); 51 52 // Waits for the processor to complete. Returns true on success, false on 53 // failure. 54 bool Wait(); 55 56 bool ShouldCompress() const { 57 return buffer_compressed_.size() < buffer_in_.size(); 58 } 59 60 private: 61 // Reads the input data into |buffer_in_| and compresses it into 62 // |buffer_compressed_|. Returns true on success, false otherwise. 63 bool ReadAndCompress(); 64 static gpointer ReadAndCompressThread(gpointer data); 65 66 GThread* thread_; 67 int fd_; 68 off_t offset_; 69 chromeos::Blob buffer_in_; 70 chromeos::Blob buffer_compressed_; 71 72 DISALLOW_COPY_AND_ASSIGN(ChunkProcessor); 73}; 74 75bool ChunkProcessor::Start() { 76 // g_thread_create is deprecated since glib 2.32. Use 77 // g_thread_new instead. 78 thread_ = g_thread_try_new("chunk_proc", ReadAndCompressThread, this, 79 nullptr); 80 TEST_AND_RETURN_FALSE(thread_ != nullptr); 81 return true; 82} 83 84bool ChunkProcessor::Wait() { 85 if (!thread_) { 86 return false; 87 } 88 gpointer result = g_thread_join(thread_); 89 thread_ = nullptr; 90 TEST_AND_RETURN_FALSE(result == this); 91 return true; 92} 93 94gpointer ChunkProcessor::ReadAndCompressThread(gpointer data) { 95 return reinterpret_cast<ChunkProcessor*>(data)->ReadAndCompress() ? 96 data : nullptr; 97} 98 99bool ChunkProcessor::ReadAndCompress() { 100 ssize_t bytes_read = -1; 101 TEST_AND_RETURN_FALSE(utils::PReadAll(fd_, 102 buffer_in_.data(), 103 buffer_in_.size(), 104 offset_, 105 &bytes_read)); 106 TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(buffer_in_.size())); 107 TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &buffer_compressed_)); 108 return true; 109} 110 111} // namespace 112 113bool FullUpdateGenerator::GenerateOperations( 114 const PayloadGenerationConfig& config, 115 int fd, 116 off_t* data_file_size, 117 vector<AnnotatedOperation>* rootfs_ops, 118 vector<AnnotatedOperation>* kernel_ops) { 119 TEST_AND_RETURN_FALSE(config.Validate()); 120 rootfs_ops->clear(); 121 kernel_ops->clear(); 122 123 // FullUpdateGenerator requires a positive chunk_size, otherwise there will 124 // be only one operation with the whole partition which should not be allowed. 125 size_t full_chunk_size = kDefaultFullChunkSize; 126 if (config.chunk_size >= 0) { 127 full_chunk_size = config.chunk_size; 128 } else { 129 LOG(INFO) << "No chunk_size provided, using the default chunk_size for the " 130 << "full operations: " << full_chunk_size << " bytes."; 131 } 132 TEST_AND_RETURN_FALSE(full_chunk_size > 0); 133 TEST_AND_RETURN_FALSE(full_chunk_size % config.block_size == 0); 134 135 const ImageConfig& target = config.target; // Shortcut. 136 size_t max_threads = std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L); 137 LOG(INFO) << "Max threads: " << max_threads; 138 139 uint64_t part_sizes[] = { target.rootfs_size, target.kernel_size }; 140 string paths[] = { target.rootfs_part, target.kernel_part }; 141 142 for (int partition = 0; partition < 2; ++partition) { 143 const string& path = paths[partition]; 144 LOG(INFO) << "compressing " << path; 145 int in_fd = open(path.c_str(), O_RDONLY, 0); 146 TEST_AND_RETURN_FALSE(in_fd >= 0); 147 ScopedFdCloser in_fd_closer(&in_fd); 148 deque<shared_ptr<ChunkProcessor>> threads; 149 int last_progress_update = INT_MIN; 150 size_t bytes_left = part_sizes[partition], counter = 0, offset = 0; 151 while (bytes_left > 0 || !threads.empty()) { 152 // Check and start new chunk processors if possible. 153 while (threads.size() < max_threads && bytes_left > 0) { 154 size_t this_chunk_bytes = std::min(bytes_left, full_chunk_size); 155 shared_ptr<ChunkProcessor> processor( 156 new ChunkProcessor(in_fd, offset, this_chunk_bytes)); 157 threads.push_back(processor); 158 TEST_AND_RETURN_FALSE(processor->Start()); 159 bytes_left -= this_chunk_bytes; 160 offset += this_chunk_bytes; 161 } 162 163 // Need to wait for a chunk processor to complete and process its output 164 // before spawning new processors. 165 shared_ptr<ChunkProcessor> processor = threads.front(); 166 threads.pop_front(); 167 TEST_AND_RETURN_FALSE(processor->Wait()); 168 169 DeltaArchiveManifest_InstallOperation* op = nullptr; 170 if (partition == 0) { 171 rootfs_ops->emplace_back(); 172 rootfs_ops->back().name = 173 base::StringPrintf("<rootfs-operation-%" PRIuS ">", counter++); 174 op = &rootfs_ops->back().op; 175 } else { 176 kernel_ops->emplace_back(); 177 kernel_ops->back().name = 178 base::StringPrintf("<kernel-operation-%" PRIuS ">", counter++); 179 op = &kernel_ops->back().op; 180 } 181 182 const bool compress = processor->ShouldCompress(); 183 const chromeos::Blob& use_buf = 184 compress ? processor->buffer_compressed() : processor->buffer_in(); 185 op->set_type(compress ? 186 DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ : 187 DeltaArchiveManifest_InstallOperation_Type_REPLACE); 188 op->set_data_offset(*data_file_size); 189 TEST_AND_RETURN_FALSE(utils::WriteAll(fd, use_buf.data(), 190 use_buf.size())); 191 *data_file_size += use_buf.size(); 192 op->set_data_length(use_buf.size()); 193 Extent* dst_extent = op->add_dst_extents(); 194 dst_extent->set_start_block(processor->offset() / config.block_size); 195 dst_extent->set_num_blocks(full_chunk_size / config.block_size); 196 197 int progress = static_cast<int>( 198 (processor->offset() + processor->buffer_in().size()) * 100.0 / 199 part_sizes[partition]); 200 if (last_progress_update < progress && 201 (last_progress_update + 10 <= progress || progress == 100)) { 202 LOG(INFO) << progress << "% complete (output size: " 203 << *data_file_size << ")"; 204 last_progress_update = progress; 205 } 206 } 207 } 208 209 return true; 210} 211 212} // namespace chromeos_update_engine 213