full_update_generator.cc revision f329b933db41d26644a97afef928eb1b319d6d99
1// Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "update_engine/payload_generator/full_update_generator.h"
6
7#include <fcntl.h>
8#include <inttypes.h>
9
10#include <algorithm>
11#include <deque>
12#include <memory>
13
14#include <base/strings/string_util.h>
15#include <base/strings/stringprintf.h>
16
17#include "update_engine/bzip.h"
18#include "update_engine/utils.h"
19
20using std::deque;
21using std::shared_ptr;
22using std::string;
23using std::vector;
24
25namespace chromeos_update_engine {
26
27namespace {
28
29// This class encapsulates a full update chunk processing thread. The processor
30// reads a chunk of data from the input file descriptor and compresses it. The
31// processor needs to be started through Start() then waited on through Wait().
32class ChunkProcessor {
33 public:
34  // Read a chunk of |size| bytes from |fd| starting at offset |offset|.
35  ChunkProcessor(int fd, off_t offset, size_t size)
36      : thread_(nullptr),
37        fd_(fd),
38        offset_(offset),
39        buffer_in_(size) {}
40  ~ChunkProcessor() { Wait(); }
41
42  off_t offset() const { return offset_; }
43  const vector<char>& buffer_in() const { return buffer_in_; }
44  const vector<char>& buffer_compressed() const { return buffer_compressed_; }
45
46  // Starts the processor. Returns true on success, false on failure.
47  bool Start();
48
49  // Waits for the processor to complete. Returns true on success, false on
50  // failure.
51  bool Wait();
52
53  bool ShouldCompress() const {
54    return buffer_compressed_.size() < buffer_in_.size();
55  }
56
57 private:
58  // Reads the input data into |buffer_in_| and compresses it into
59  // |buffer_compressed_|. Returns true on success, false otherwise.
60  bool ReadAndCompress();
61  static gpointer ReadAndCompressThread(gpointer data);
62
63  GThread* thread_;
64  int fd_;
65  off_t offset_;
66  vector<char> buffer_in_;
67  vector<char> buffer_compressed_;
68
69  DISALLOW_COPY_AND_ASSIGN(ChunkProcessor);
70};
71
72bool ChunkProcessor::Start() {
73  // g_thread_create is deprecated since glib 2.32. Use
74  // g_thread_new instead.
75  thread_ = g_thread_try_new("chunk_proc", ReadAndCompressThread, this,
76                             nullptr);
77  TEST_AND_RETURN_FALSE(thread_ != nullptr);
78  return true;
79}
80
81bool ChunkProcessor::Wait() {
82  if (!thread_) {
83    return false;
84  }
85  gpointer result = g_thread_join(thread_);
86  thread_ = nullptr;
87  TEST_AND_RETURN_FALSE(result == this);
88  return true;
89}
90
91gpointer ChunkProcessor::ReadAndCompressThread(gpointer data) {
92  return reinterpret_cast<ChunkProcessor*>(data)->ReadAndCompress() ?
93      data : nullptr;
94}
95
96bool ChunkProcessor::ReadAndCompress() {
97  ssize_t bytes_read = -1;
98  TEST_AND_RETURN_FALSE(utils::PReadAll(fd_,
99                                        buffer_in_.data(),
100                                        buffer_in_.size(),
101                                        offset_,
102                                        &bytes_read));
103  TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(buffer_in_.size()));
104  TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &buffer_compressed_));
105  return true;
106}
107
108}  // namespace
109
110bool FullUpdateGenerator::Run(
111    Graph* graph,
112    const string& new_kernel_part,
113    const string& new_image,
114    off_t image_size,
115    int fd,
116    off_t* data_file_size,
117    off_t chunk_size,
118    off_t block_size,
119    vector<DeltaArchiveManifest_InstallOperation>* kernel_ops,
120    vector<Vertex::Index>* final_order) {
121  TEST_AND_RETURN_FALSE(chunk_size > 0);
122  TEST_AND_RETURN_FALSE((chunk_size % block_size) == 0);
123
124  size_t max_threads = std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L);
125  LOG(INFO) << "Max threads: " << max_threads;
126
127  // Get the sizes early in the function, so we can fail fast if the user
128  // passed us bad paths.
129  TEST_AND_RETURN_FALSE(image_size >= 0 &&
130                        image_size <= utils::FileSize(new_image));
131  const off_t kernel_size = utils::FileSize(new_kernel_part);
132  TEST_AND_RETURN_FALSE(kernel_size >= 0);
133
134  off_t part_sizes[] = { image_size, kernel_size };
135  string paths[] = { new_image, new_kernel_part };
136
137  for (int partition = 0; partition < 2; ++partition) {
138    const string& path = paths[partition];
139    LOG(INFO) << "compressing " << path;
140    int in_fd = open(path.c_str(), O_RDONLY, 0);
141    TEST_AND_RETURN_FALSE(in_fd >= 0);
142    ScopedFdCloser in_fd_closer(&in_fd);
143    deque<shared_ptr<ChunkProcessor>> threads;
144    int last_progress_update = INT_MIN;
145    off_t bytes_left = part_sizes[partition], counter = 0, offset = 0;
146    while (bytes_left > 0 || !threads.empty()) {
147      // Check and start new chunk processors if possible.
148      while (threads.size() < max_threads && bytes_left > 0) {
149        shared_ptr<ChunkProcessor> processor(
150            new ChunkProcessor(in_fd, offset,
151                               std::min(bytes_left, chunk_size)));
152        threads.push_back(processor);
153        TEST_AND_RETURN_FALSE(processor->Start());
154        bytes_left -= chunk_size;
155        offset += chunk_size;
156      }
157
158      // Need to wait for a chunk processor to complete and process its output
159      // before spawning new processors.
160      shared_ptr<ChunkProcessor> processor = threads.front();
161      threads.pop_front();
162      TEST_AND_RETURN_FALSE(processor->Wait());
163
164      DeltaArchiveManifest_InstallOperation* op = nullptr;
165      if (partition == 0) {
166        graph->resize(graph->size() + 1);
167        graph->back().file_name =
168            base::StringPrintf("<rootfs-operation-%" PRIi64 ">", counter++);
169        op = &graph->back().op;
170        final_order->push_back(graph->size() - 1);
171      } else {
172        kernel_ops->resize(kernel_ops->size() + 1);
173        op = &kernel_ops->back();
174      }
175
176      const bool compress = processor->ShouldCompress();
177      const vector<char>& use_buf =
178          compress ? processor->buffer_compressed() : processor->buffer_in();
179      op->set_type(compress ?
180                   DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ :
181                   DeltaArchiveManifest_InstallOperation_Type_REPLACE);
182      op->set_data_offset(*data_file_size);
183      TEST_AND_RETURN_FALSE(utils::WriteAll(fd, &use_buf[0], use_buf.size()));
184      *data_file_size += use_buf.size();
185      op->set_data_length(use_buf.size());
186      Extent* dst_extent = op->add_dst_extents();
187      dst_extent->set_start_block(processor->offset() / block_size);
188      dst_extent->set_num_blocks(chunk_size / block_size);
189
190      int progress = static_cast<int>(
191          (processor->offset() + processor->buffer_in().size()) * 100.0 /
192          part_sizes[partition]);
193      if (last_progress_update < progress &&
194          (last_progress_update + 10 <= progress || progress == 100)) {
195        LOG(INFO) << progress << "% complete (output size: "
196                  << *data_file_size << ")";
197        last_progress_update = progress;
198      }
199    }
200  }
201
202  return true;
203}
204
205}  // namespace chromeos_update_engine
206