full_update_generator.cc revision 477aec2166a571cbe28081d806c3226e8b31b6e9
1// Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "update_engine/payload_generator/full_update_generator.h"
6
7#include <fcntl.h>
8#include <inttypes.h>
9
10#include <algorithm>
11#include <deque>
12#include <memory>
13
14#include <base/format_macros.h>
15#include <base/strings/stringprintf.h>
16#include <base/strings/string_util.h>
17
18#include "update_engine/bzip.h"
19#include "update_engine/utils.h"
20
21using std::deque;
22using std::shared_ptr;
23using std::string;
24using std::vector;
25
26namespace chromeos_update_engine {
27
28namespace {
29
30const size_t kDefaultFullChunkSize = 1024 * 1024;  // 1 MiB
31
32// This class encapsulates a full update chunk processing thread. The processor
33// reads a chunk of data from the input file descriptor and compresses it. The
34// processor needs to be started through Start() then waited on through Wait().
35class ChunkProcessor {
36 public:
37  // Read a chunk of |size| bytes from |fd| starting at offset |offset|.
38  ChunkProcessor(int fd, off_t offset, size_t size)
39      : thread_(nullptr),
40        fd_(fd),
41        offset_(offset),
42        buffer_in_(size) {}
43  ~ChunkProcessor() { Wait(); }
44
45  off_t offset() const { return offset_; }
46  const chromeos::Blob& buffer_in() const { return buffer_in_; }
47  const chromeos::Blob& buffer_compressed() const { return buffer_compressed_; }
48
49  // Starts the processor. Returns true on success, false on failure.
50  bool Start();
51
52  // Waits for the processor to complete. Returns true on success, false on
53  // failure.
54  bool Wait();
55
56  bool ShouldCompress() const {
57    return buffer_compressed_.size() < buffer_in_.size();
58  }
59
60 private:
61  // Reads the input data into |buffer_in_| and compresses it into
62  // |buffer_compressed_|. Returns true on success, false otherwise.
63  bool ReadAndCompress();
64  static gpointer ReadAndCompressThread(gpointer data);
65
66  GThread* thread_;
67  int fd_;
68  off_t offset_;
69  chromeos::Blob buffer_in_;
70  chromeos::Blob buffer_compressed_;
71
72  DISALLOW_COPY_AND_ASSIGN(ChunkProcessor);
73};
74
75bool ChunkProcessor::Start() {
76  // g_thread_create is deprecated since glib 2.32. Use
77  // g_thread_new instead.
78  thread_ = g_thread_try_new("chunk_proc", ReadAndCompressThread, this,
79                             nullptr);
80  TEST_AND_RETURN_FALSE(thread_ != nullptr);
81  return true;
82}
83
84bool ChunkProcessor::Wait() {
85  if (!thread_) {
86    return false;
87  }
88  gpointer result = g_thread_join(thread_);
89  thread_ = nullptr;
90  TEST_AND_RETURN_FALSE(result == this);
91  return true;
92}
93
94gpointer ChunkProcessor::ReadAndCompressThread(gpointer data) {
95  return reinterpret_cast<ChunkProcessor*>(data)->ReadAndCompress() ?
96      data : nullptr;
97}
98
99bool ChunkProcessor::ReadAndCompress() {
100  ssize_t bytes_read = -1;
101  TEST_AND_RETURN_FALSE(utils::PReadAll(fd_,
102                                        buffer_in_.data(),
103                                        buffer_in_.size(),
104                                        offset_,
105                                        &bytes_read));
106  TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(buffer_in_.size()));
107  TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &buffer_compressed_));
108  return true;
109}
110
111}  // namespace
112
113bool FullUpdateGenerator::GenerateOperations(
114    const PayloadGenerationConfig& config,
115    int fd,
116    off_t* data_file_size,
117    vector<AnnotatedOperation>* rootfs_ops,
118    vector<AnnotatedOperation>* kernel_ops) {
119  TEST_AND_RETURN_FALSE(config.Validate());
120  rootfs_ops->clear();
121  kernel_ops->clear();
122
123  // FullUpdateGenerator requires a positive chunk_size, otherwise there will
124  // be only one operation with the whole partition which should not be allowed.
125  size_t full_chunk_size = kDefaultFullChunkSize;
126  if (config.chunk_size >= 0) {
127    full_chunk_size = config.chunk_size;
128  } else {
129    LOG(INFO) << "No chunk_size provided, using the default chunk_size for the "
130              << "full operations: " << full_chunk_size << " bytes.";
131  }
132  TEST_AND_RETURN_FALSE(full_chunk_size > 0);
133  TEST_AND_RETURN_FALSE(full_chunk_size % config.block_size == 0);
134
135  const ImageConfig& target = config.target;  // Shortcut.
136  size_t max_threads = std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L);
137  LOG(INFO) << "Max threads: " << max_threads;
138
139  uint64_t part_sizes[] = { target.rootfs_size, target.kernel_size };
140  string paths[] = { target.rootfs_part, target.kernel_part };
141
142  for (int partition = 0; partition < 2; ++partition) {
143    const string& path = paths[partition];
144    LOG(INFO) << "compressing " << path;
145    int in_fd = open(path.c_str(), O_RDONLY, 0);
146    TEST_AND_RETURN_FALSE(in_fd >= 0);
147    ScopedFdCloser in_fd_closer(&in_fd);
148    deque<shared_ptr<ChunkProcessor>> threads;
149    int last_progress_update = INT_MIN;
150    size_t bytes_left = part_sizes[partition], counter = 0, offset = 0;
151    while (bytes_left > 0 || !threads.empty()) {
152      // Check and start new chunk processors if possible.
153      while (threads.size() < max_threads && bytes_left > 0) {
154        size_t this_chunk_bytes = std::min(bytes_left, full_chunk_size);
155        shared_ptr<ChunkProcessor> processor(
156            new ChunkProcessor(in_fd, offset, this_chunk_bytes));
157        threads.push_back(processor);
158        TEST_AND_RETURN_FALSE(processor->Start());
159        bytes_left -= this_chunk_bytes;
160        offset += this_chunk_bytes;
161      }
162
163      // Need to wait for a chunk processor to complete and process its output
164      // before spawning new processors.
165      shared_ptr<ChunkProcessor> processor = threads.front();
166      threads.pop_front();
167      TEST_AND_RETURN_FALSE(processor->Wait());
168
169      DeltaArchiveManifest_InstallOperation* op = nullptr;
170      if (partition == 0) {
171        rootfs_ops->emplace_back();
172        rootfs_ops->back().name =
173            base::StringPrintf("<rootfs-operation-%" PRIuS ">", counter++);
174        op = &rootfs_ops->back().op;
175      } else {
176        kernel_ops->emplace_back();
177        kernel_ops->back().name =
178            base::StringPrintf("<kernel-operation-%" PRIuS ">", counter++);
179        op = &kernel_ops->back().op;
180      }
181
182      const bool compress = processor->ShouldCompress();
183      const chromeos::Blob& use_buf =
184          compress ? processor->buffer_compressed() : processor->buffer_in();
185      op->set_type(compress ?
186                   DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ :
187                   DeltaArchiveManifest_InstallOperation_Type_REPLACE);
188      op->set_data_offset(*data_file_size);
189      TEST_AND_RETURN_FALSE(utils::WriteAll(fd, use_buf.data(),
190                                            use_buf.size()));
191      *data_file_size += use_buf.size();
192      op->set_data_length(use_buf.size());
193      Extent* dst_extent = op->add_dst_extents();
194      dst_extent->set_start_block(processor->offset() / config.block_size);
195      dst_extent->set_num_blocks(full_chunk_size / config.block_size);
196
197      int progress = static_cast<int>(
198          (processor->offset() + processor->buffer_in().size()) * 100.0 /
199          part_sizes[partition]);
200      if (last_progress_update < progress &&
201          (last_progress_update + 10 <= progress || progress == 100)) {
202        LOG(INFO) << progress << "% complete (output size: "
203                  << *data_file_size << ")";
204        last_progress_update = progress;
205      }
206    }
207  }
208
209  return true;
210}
211
212}  // namespace chromeos_update_engine
213