full_update_generator.cc revision 2d3b2d635e50c6886e285afb86c3187d9e0bd360
1// Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "update_engine/payload_generator/full_update_generator.h"
6
7#include <fcntl.h>
8#include <inttypes.h>
9
10#include <algorithm>
11#include <deque>
12#include <memory>
13
14#include <base/format_macros.h>
15#include <base/strings/stringprintf.h>
16#include <base/strings/string_util.h>
17
18#include "update_engine/bzip.h"
19#include "update_engine/utils.h"
20
21using std::deque;
22using std::shared_ptr;
23using std::string;
24using std::vector;
25
26namespace chromeos_update_engine {
27
28namespace {
29
30const size_t kDefaultFullChunkSize = 1024 * 1024;  // 1 MiB
31
32// This class encapsulates a full update chunk processing thread. The processor
33// reads a chunk of data from the input file descriptor and compresses it. The
34// processor needs to be started through Start() then waited on through Wait().
35class ChunkProcessor {
36 public:
37  // Read a chunk of |size| bytes from |fd| starting at offset |offset|.
38  ChunkProcessor(int fd, off_t offset, size_t size)
39      : thread_(nullptr),
40        fd_(fd),
41        offset_(offset),
42        buffer_in_(size) {}
43  ~ChunkProcessor() { Wait(); }
44
45  off_t offset() const { return offset_; }
46  const chromeos::Blob& buffer_in() const { return buffer_in_; }
47  const chromeos::Blob& buffer_compressed() const { return buffer_compressed_; }
48
49  // Starts the processor. Returns true on success, false on failure.
50  bool Start();
51
52  // Waits for the processor to complete. Returns true on success, false on
53  // failure.
54  bool Wait();
55
56  bool ShouldCompress() const {
57    return buffer_compressed_.size() < buffer_in_.size();
58  }
59
60 private:
61  // Reads the input data into |buffer_in_| and compresses it into
62  // |buffer_compressed_|. Returns true on success, false otherwise.
63  bool ReadAndCompress();
64  static gpointer ReadAndCompressThread(gpointer data);
65
66  GThread* thread_;
67  int fd_;
68  off_t offset_;
69  chromeos::Blob buffer_in_;
70  chromeos::Blob buffer_compressed_;
71
72  DISALLOW_COPY_AND_ASSIGN(ChunkProcessor);
73};
74
75bool ChunkProcessor::Start() {
76  // g_thread_create is deprecated since glib 2.32. Use
77  // g_thread_new instead.
78  thread_ = g_thread_try_new("chunk_proc", ReadAndCompressThread, this,
79                             nullptr);
80  TEST_AND_RETURN_FALSE(thread_ != nullptr);
81  return true;
82}
83
84bool ChunkProcessor::Wait() {
85  if (!thread_) {
86    return false;
87  }
88  gpointer result = g_thread_join(thread_);
89  thread_ = nullptr;
90  TEST_AND_RETURN_FALSE(result == this);
91  return true;
92}
93
94gpointer ChunkProcessor::ReadAndCompressThread(gpointer data) {
95  return reinterpret_cast<ChunkProcessor*>(data)->ReadAndCompress() ?
96      data : nullptr;
97}
98
99bool ChunkProcessor::ReadAndCompress() {
100  ssize_t bytes_read = -1;
101  TEST_AND_RETURN_FALSE(utils::PReadAll(fd_,
102                                        buffer_in_.data(),
103                                        buffer_in_.size(),
104                                        offset_,
105                                        &bytes_read));
106  TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(buffer_in_.size()));
107  TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &buffer_compressed_));
108  return true;
109}
110
111}  // namespace
112
113bool FullUpdateGenerator::GenerateOperations(
114    const PayloadGenerationConfig& config,
115    int fd,
116    off_t* data_file_size,
117    vector<AnnotatedOperation>* rootfs_ops,
118    vector<AnnotatedOperation>* kernel_ops) {
119  TEST_AND_RETURN_FALSE(config.Validate());
120  rootfs_ops->clear();
121  kernel_ops->clear();
122
123  // FullUpdateGenerator requires a positive chunk_size, otherwise there will
124  // be only one operation with the whole partition which should not be allowed.
125  // For performance reasons, we force a small default hard limit of 1 MiB. This
126  // limit can be changed in the config, and we will use the smaller of the two
127  // soft/hard limits.
128  size_t full_chunk_size;
129  if (config.hard_chunk_size >= 0) {
130    full_chunk_size = std::min(static_cast<size_t>(config.hard_chunk_size),
131                               config.soft_chunk_size);
132  } else {
133    full_chunk_size = std::min(kDefaultFullChunkSize, config.soft_chunk_size);
134    LOG(INFO) << "No chunk_size provided, using the default chunk_size for the "
135              << "full operations: " << full_chunk_size << " bytes.";
136  }
137  TEST_AND_RETURN_FALSE(full_chunk_size > 0);
138  TEST_AND_RETURN_FALSE(full_chunk_size % config.block_size == 0);
139
140  size_t max_threads = std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L);
141  LOG(INFO) << "Max threads: " << max_threads;
142
143  const PartitionConfig* partitions[] = {
144      &config.target.rootfs,
145      &config.target.kernel};
146
147  for (int part_id = 0; part_id < 2; ++part_id) {
148    const PartitionConfig* partition = partitions[part_id];
149    LOG(INFO) << "compressing " << partition->path;
150    int in_fd = open(partition->path.c_str(), O_RDONLY, 0);
151    TEST_AND_RETURN_FALSE(in_fd >= 0);
152    ScopedFdCloser in_fd_closer(&in_fd);
153    deque<shared_ptr<ChunkProcessor>> threads;
154    int last_progress_update = INT_MIN;
155    size_t bytes_left = partition->size, counter = 0, offset = 0;
156    while (bytes_left > 0 || !threads.empty()) {
157      // Check and start new chunk processors if possible.
158      while (threads.size() < max_threads && bytes_left > 0) {
159        size_t this_chunk_bytes = std::min(bytes_left, full_chunk_size);
160        shared_ptr<ChunkProcessor> processor(
161            new ChunkProcessor(in_fd, offset, this_chunk_bytes));
162        threads.push_back(processor);
163        TEST_AND_RETURN_FALSE(processor->Start());
164        bytes_left -= this_chunk_bytes;
165        offset += this_chunk_bytes;
166      }
167
168      // Need to wait for a chunk processor to complete and process its output
169      // before spawning new processors.
170      shared_ptr<ChunkProcessor> processor = threads.front();
171      threads.pop_front();
172      TEST_AND_RETURN_FALSE(processor->Wait());
173
174      DeltaArchiveManifest_InstallOperation* op = nullptr;
175      if (part_id == 0) {
176        rootfs_ops->emplace_back();
177        rootfs_ops->back().name =
178            base::StringPrintf("<rootfs-operation-%" PRIuS ">", counter++);
179        op = &rootfs_ops->back().op;
180      } else {
181        kernel_ops->emplace_back();
182        kernel_ops->back().name =
183            base::StringPrintf("<kernel-operation-%" PRIuS ">", counter++);
184        op = &kernel_ops->back().op;
185      }
186
187      const bool compress = processor->ShouldCompress();
188      const chromeos::Blob& use_buf =
189          compress ? processor->buffer_compressed() : processor->buffer_in();
190      op->set_type(compress ?
191                   DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ :
192                   DeltaArchiveManifest_InstallOperation_Type_REPLACE);
193      op->set_data_offset(*data_file_size);
194      TEST_AND_RETURN_FALSE(utils::WriteAll(fd, use_buf.data(),
195                                            use_buf.size()));
196      *data_file_size += use_buf.size();
197      op->set_data_length(use_buf.size());
198      Extent* dst_extent = op->add_dst_extents();
199      dst_extent->set_start_block(processor->offset() / config.block_size);
200      dst_extent->set_num_blocks(
201          processor->buffer_in().size() / config.block_size);
202
203      int progress = static_cast<int>(
204          (processor->offset() + processor->buffer_in().size()) * 100.0 /
205          partition->size);
206      if (last_progress_update < progress &&
207          (last_progress_update + 10 <= progress || progress == 100)) {
208        LOG(INFO) << progress << "% complete (output size: "
209                  << *data_file_size << ")";
210        last_progress_update = progress;
211      }
212    }
213  }
214
215  return true;
216}
217
218}  // namespace chromeos_update_engine
219