full_update_generator.cc revision 2d3b2d635e50c6886e285afb86c3187d9e0bd360
1// Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "update_engine/payload_generator/full_update_generator.h" 6 7#include <fcntl.h> 8#include <inttypes.h> 9 10#include <algorithm> 11#include <deque> 12#include <memory> 13 14#include <base/format_macros.h> 15#include <base/strings/stringprintf.h> 16#include <base/strings/string_util.h> 17 18#include "update_engine/bzip.h" 19#include "update_engine/utils.h" 20 21using std::deque; 22using std::shared_ptr; 23using std::string; 24using std::vector; 25 26namespace chromeos_update_engine { 27 28namespace { 29 30const size_t kDefaultFullChunkSize = 1024 * 1024; // 1 MiB 31 32// This class encapsulates a full update chunk processing thread. The processor 33// reads a chunk of data from the input file descriptor and compresses it. The 34// processor needs to be started through Start() then waited on through Wait(). 35class ChunkProcessor { 36 public: 37 // Read a chunk of |size| bytes from |fd| starting at offset |offset|. 38 ChunkProcessor(int fd, off_t offset, size_t size) 39 : thread_(nullptr), 40 fd_(fd), 41 offset_(offset), 42 buffer_in_(size) {} 43 ~ChunkProcessor() { Wait(); } 44 45 off_t offset() const { return offset_; } 46 const chromeos::Blob& buffer_in() const { return buffer_in_; } 47 const chromeos::Blob& buffer_compressed() const { return buffer_compressed_; } 48 49 // Starts the processor. Returns true on success, false on failure. 50 bool Start(); 51 52 // Waits for the processor to complete. Returns true on success, false on 53 // failure. 54 bool Wait(); 55 56 bool ShouldCompress() const { 57 return buffer_compressed_.size() < buffer_in_.size(); 58 } 59 60 private: 61 // Reads the input data into |buffer_in_| and compresses it into 62 // |buffer_compressed_|. Returns true on success, false otherwise. 63 bool ReadAndCompress(); 64 static gpointer ReadAndCompressThread(gpointer data); 65 66 GThread* thread_; 67 int fd_; 68 off_t offset_; 69 chromeos::Blob buffer_in_; 70 chromeos::Blob buffer_compressed_; 71 72 DISALLOW_COPY_AND_ASSIGN(ChunkProcessor); 73}; 74 75bool ChunkProcessor::Start() { 76 // g_thread_create is deprecated since glib 2.32. Use 77 // g_thread_new instead. 78 thread_ = g_thread_try_new("chunk_proc", ReadAndCompressThread, this, 79 nullptr); 80 TEST_AND_RETURN_FALSE(thread_ != nullptr); 81 return true; 82} 83 84bool ChunkProcessor::Wait() { 85 if (!thread_) { 86 return false; 87 } 88 gpointer result = g_thread_join(thread_); 89 thread_ = nullptr; 90 TEST_AND_RETURN_FALSE(result == this); 91 return true; 92} 93 94gpointer ChunkProcessor::ReadAndCompressThread(gpointer data) { 95 return reinterpret_cast<ChunkProcessor*>(data)->ReadAndCompress() ? 96 data : nullptr; 97} 98 99bool ChunkProcessor::ReadAndCompress() { 100 ssize_t bytes_read = -1; 101 TEST_AND_RETURN_FALSE(utils::PReadAll(fd_, 102 buffer_in_.data(), 103 buffer_in_.size(), 104 offset_, 105 &bytes_read)); 106 TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(buffer_in_.size())); 107 TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &buffer_compressed_)); 108 return true; 109} 110 111} // namespace 112 113bool FullUpdateGenerator::GenerateOperations( 114 const PayloadGenerationConfig& config, 115 int fd, 116 off_t* data_file_size, 117 vector<AnnotatedOperation>* rootfs_ops, 118 vector<AnnotatedOperation>* kernel_ops) { 119 TEST_AND_RETURN_FALSE(config.Validate()); 120 rootfs_ops->clear(); 121 kernel_ops->clear(); 122 123 // FullUpdateGenerator requires a positive chunk_size, otherwise there will 124 // be only one operation with the whole partition which should not be allowed. 125 // For performance reasons, we force a small default hard limit of 1 MiB. This 126 // limit can be changed in the config, and we will use the smaller of the two 127 // soft/hard limits. 128 size_t full_chunk_size; 129 if (config.hard_chunk_size >= 0) { 130 full_chunk_size = std::min(static_cast<size_t>(config.hard_chunk_size), 131 config.soft_chunk_size); 132 } else { 133 full_chunk_size = std::min(kDefaultFullChunkSize, config.soft_chunk_size); 134 LOG(INFO) << "No chunk_size provided, using the default chunk_size for the " 135 << "full operations: " << full_chunk_size << " bytes."; 136 } 137 TEST_AND_RETURN_FALSE(full_chunk_size > 0); 138 TEST_AND_RETURN_FALSE(full_chunk_size % config.block_size == 0); 139 140 size_t max_threads = std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L); 141 LOG(INFO) << "Max threads: " << max_threads; 142 143 const PartitionConfig* partitions[] = { 144 &config.target.rootfs, 145 &config.target.kernel}; 146 147 for (int part_id = 0; part_id < 2; ++part_id) { 148 const PartitionConfig* partition = partitions[part_id]; 149 LOG(INFO) << "compressing " << partition->path; 150 int in_fd = open(partition->path.c_str(), O_RDONLY, 0); 151 TEST_AND_RETURN_FALSE(in_fd >= 0); 152 ScopedFdCloser in_fd_closer(&in_fd); 153 deque<shared_ptr<ChunkProcessor>> threads; 154 int last_progress_update = INT_MIN; 155 size_t bytes_left = partition->size, counter = 0, offset = 0; 156 while (bytes_left > 0 || !threads.empty()) { 157 // Check and start new chunk processors if possible. 158 while (threads.size() < max_threads && bytes_left > 0) { 159 size_t this_chunk_bytes = std::min(bytes_left, full_chunk_size); 160 shared_ptr<ChunkProcessor> processor( 161 new ChunkProcessor(in_fd, offset, this_chunk_bytes)); 162 threads.push_back(processor); 163 TEST_AND_RETURN_FALSE(processor->Start()); 164 bytes_left -= this_chunk_bytes; 165 offset += this_chunk_bytes; 166 } 167 168 // Need to wait for a chunk processor to complete and process its output 169 // before spawning new processors. 170 shared_ptr<ChunkProcessor> processor = threads.front(); 171 threads.pop_front(); 172 TEST_AND_RETURN_FALSE(processor->Wait()); 173 174 DeltaArchiveManifest_InstallOperation* op = nullptr; 175 if (part_id == 0) { 176 rootfs_ops->emplace_back(); 177 rootfs_ops->back().name = 178 base::StringPrintf("<rootfs-operation-%" PRIuS ">", counter++); 179 op = &rootfs_ops->back().op; 180 } else { 181 kernel_ops->emplace_back(); 182 kernel_ops->back().name = 183 base::StringPrintf("<kernel-operation-%" PRIuS ">", counter++); 184 op = &kernel_ops->back().op; 185 } 186 187 const bool compress = processor->ShouldCompress(); 188 const chromeos::Blob& use_buf = 189 compress ? processor->buffer_compressed() : processor->buffer_in(); 190 op->set_type(compress ? 191 DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ : 192 DeltaArchiveManifest_InstallOperation_Type_REPLACE); 193 op->set_data_offset(*data_file_size); 194 TEST_AND_RETURN_FALSE(utils::WriteAll(fd, use_buf.data(), 195 use_buf.size())); 196 *data_file_size += use_buf.size(); 197 op->set_data_length(use_buf.size()); 198 Extent* dst_extent = op->add_dst_extents(); 199 dst_extent->set_start_block(processor->offset() / config.block_size); 200 dst_extent->set_num_blocks( 201 processor->buffer_in().size() / config.block_size); 202 203 int progress = static_cast<int>( 204 (processor->offset() + processor->buffer_in().size()) * 100.0 / 205 partition->size); 206 if (last_progress_update < progress && 207 (last_progress_update + 10 <= progress || progress == 100)) { 208 LOG(INFO) << progress << "% complete (output size: " 209 << *data_file_size << ")"; 210 last_progress_update = progress; 211 } 212 } 213 } 214 215 return true; 216} 217 218} // namespace chromeos_update_engine 219