full_update_generator.cc revision b42b98db059a12c44110588fc0b3d5f82d32a2f8
1// Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "update_engine/payload_generator/full_update_generator.h" 6 7#include <fcntl.h> 8#include <inttypes.h> 9 10#include <algorithm> 11#include <deque> 12#include <memory> 13 14#include <base/format_macros.h> 15#include <base/strings/stringprintf.h> 16#include <base/strings/string_util.h> 17 18#include "update_engine/bzip.h" 19#include "update_engine/utils.h" 20 21using std::deque; 22using std::shared_ptr; 23using std::string; 24using std::vector; 25 26namespace chromeos_update_engine { 27 28namespace { 29 30const size_t kDefaultFullChunkSize = 1024 * 1024; // 1 MiB 31 32// This class encapsulates a full update chunk processing thread. The processor 33// reads a chunk of data from the input file descriptor and compresses it. The 34// processor needs to be started through Start() then waited on through Wait(). 35class ChunkProcessor { 36 public: 37 // Read a chunk of |size| bytes from |fd| starting at offset |offset|. 38 ChunkProcessor(int fd, off_t offset, size_t size) 39 : thread_(nullptr), 40 fd_(fd), 41 offset_(offset), 42 buffer_in_(size) {} 43 ~ChunkProcessor() { Wait(); } 44 45 off_t offset() const { return offset_; } 46 const chromeos::Blob& buffer_in() const { return buffer_in_; } 47 const chromeos::Blob& buffer_compressed() const { return buffer_compressed_; } 48 49 // Starts the processor. Returns true on success, false on failure. 50 bool Start(); 51 52 // Waits for the processor to complete. Returns true on success, false on 53 // failure. 54 bool Wait(); 55 56 bool ShouldCompress() const { 57 return buffer_compressed_.size() < buffer_in_.size(); 58 } 59 60 private: 61 // Reads the input data into |buffer_in_| and compresses it into 62 // |buffer_compressed_|. Returns true on success, false otherwise. 63 bool ReadAndCompress(); 64 static gpointer ReadAndCompressThread(gpointer data); 65 66 GThread* thread_; 67 int fd_; 68 off_t offset_; 69 chromeos::Blob buffer_in_; 70 chromeos::Blob buffer_compressed_; 71 72 DISALLOW_COPY_AND_ASSIGN(ChunkProcessor); 73}; 74 75bool ChunkProcessor::Start() { 76 // g_thread_create is deprecated since glib 2.32. Use 77 // g_thread_new instead. 78 thread_ = g_thread_try_new("chunk_proc", ReadAndCompressThread, this, 79 nullptr); 80 TEST_AND_RETURN_FALSE(thread_ != nullptr); 81 return true; 82} 83 84bool ChunkProcessor::Wait() { 85 if (!thread_) { 86 return false; 87 } 88 gpointer result = g_thread_join(thread_); 89 thread_ = nullptr; 90 TEST_AND_RETURN_FALSE(result == this); 91 return true; 92} 93 94gpointer ChunkProcessor::ReadAndCompressThread(gpointer data) { 95 return reinterpret_cast<ChunkProcessor*>(data)->ReadAndCompress() ? 96 data : nullptr; 97} 98 99bool ChunkProcessor::ReadAndCompress() { 100 ssize_t bytes_read = -1; 101 TEST_AND_RETURN_FALSE(utils::PReadAll(fd_, 102 buffer_in_.data(), 103 buffer_in_.size(), 104 offset_, 105 &bytes_read)); 106 TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(buffer_in_.size())); 107 TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &buffer_compressed_)); 108 return true; 109} 110 111} // namespace 112 113bool FullUpdateGenerator::GenerateOperations( 114 const PayloadGenerationConfig& config, 115 int fd, 116 off_t* data_file_size, 117 vector<AnnotatedOperation>* rootfs_ops, 118 vector<AnnotatedOperation>* kernel_ops) { 119 TEST_AND_RETURN_FALSE(config.Validate()); 120 rootfs_ops->clear(); 121 kernel_ops->clear(); 122 123 // FullUpdateGenerator requires a positive chunk_size, otherwise there will 124 // be only one operation with the whole partition which should not be allowed. 125 size_t full_chunk_size = kDefaultFullChunkSize; 126 if (config.chunk_size >= 0) { 127 full_chunk_size = config.chunk_size; 128 } else { 129 LOG(INFO) << "No chunk_size provided, using the default chunk_size for the " 130 << "full operations: " << full_chunk_size << " bytes."; 131 } 132 TEST_AND_RETURN_FALSE(full_chunk_size > 0); 133 TEST_AND_RETURN_FALSE(full_chunk_size % config.block_size == 0); 134 135 size_t max_threads = std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L); 136 LOG(INFO) << "Max threads: " << max_threads; 137 138 const PartitionConfig* partitions[] = { 139 &config.target.rootfs, 140 &config.target.kernel}; 141 142 for (int part_id = 0; part_id < 2; ++part_id) { 143 const PartitionConfig* partition = partitions[part_id]; 144 LOG(INFO) << "compressing " << partition->path; 145 int in_fd = open(partition->path.c_str(), O_RDONLY, 0); 146 TEST_AND_RETURN_FALSE(in_fd >= 0); 147 ScopedFdCloser in_fd_closer(&in_fd); 148 deque<shared_ptr<ChunkProcessor>> threads; 149 int last_progress_update = INT_MIN; 150 size_t bytes_left = partition->size, counter = 0, offset = 0; 151 while (bytes_left > 0 || !threads.empty()) { 152 // Check and start new chunk processors if possible. 153 while (threads.size() < max_threads && bytes_left > 0) { 154 size_t this_chunk_bytes = std::min(bytes_left, full_chunk_size); 155 shared_ptr<ChunkProcessor> processor( 156 new ChunkProcessor(in_fd, offset, this_chunk_bytes)); 157 threads.push_back(processor); 158 TEST_AND_RETURN_FALSE(processor->Start()); 159 bytes_left -= this_chunk_bytes; 160 offset += this_chunk_bytes; 161 } 162 163 // Need to wait for a chunk processor to complete and process its output 164 // before spawning new processors. 165 shared_ptr<ChunkProcessor> processor = threads.front(); 166 threads.pop_front(); 167 TEST_AND_RETURN_FALSE(processor->Wait()); 168 169 DeltaArchiveManifest_InstallOperation* op = nullptr; 170 if (part_id == 0) { 171 rootfs_ops->emplace_back(); 172 rootfs_ops->back().name = 173 base::StringPrintf("<rootfs-operation-%" PRIuS ">", counter++); 174 op = &rootfs_ops->back().op; 175 } else { 176 kernel_ops->emplace_back(); 177 kernel_ops->back().name = 178 base::StringPrintf("<kernel-operation-%" PRIuS ">", counter++); 179 op = &kernel_ops->back().op; 180 } 181 182 const bool compress = processor->ShouldCompress(); 183 const chromeos::Blob& use_buf = 184 compress ? processor->buffer_compressed() : processor->buffer_in(); 185 op->set_type(compress ? 186 DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ : 187 DeltaArchiveManifest_InstallOperation_Type_REPLACE); 188 op->set_data_offset(*data_file_size); 189 TEST_AND_RETURN_FALSE(utils::WriteAll(fd, use_buf.data(), 190 use_buf.size())); 191 *data_file_size += use_buf.size(); 192 op->set_data_length(use_buf.size()); 193 Extent* dst_extent = op->add_dst_extents(); 194 dst_extent->set_start_block(processor->offset() / config.block_size); 195 dst_extent->set_num_blocks( 196 processor->buffer_in().size() / config.block_size); 197 198 int progress = static_cast<int>( 199 (processor->offset() + processor->buffer_in().size()) * 100.0 / 200 partition->size); 201 if (last_progress_update < progress && 202 (last_progress_update + 10 <= progress || progress == 100)) { 203 LOG(INFO) << progress << "% complete (output size: " 204 << *data_file_size << ")"; 205 last_progress_update = progress; 206 } 207 } 208 } 209 210 return true; 211} 212 213} // namespace chromeos_update_engine 214