full_update_generator.cc revision 161c4a132743f15fc4757112b673085c2a7a7f29
1// Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "update_engine/payload_generator/full_update_generator.h"
6
7#include <fcntl.h>
8#include <inttypes.h>
9
10#include <tr1/memory>
11
12#include <base/strings/string_util.h>
13#include <base/strings/stringprintf.h>
14
15#include "update_engine/bzip.h"
16#include "update_engine/utils.h"
17
18using std::deque;
19using std::max;
20using std::min;
21using std::string;
22using std::tr1::shared_ptr;
23using std::vector;
24
25namespace chromeos_update_engine {
26
27namespace {
28
29// This class encapsulates a full update chunk processing thread. The processor
30// reads a chunk of data from the input file descriptor and compresses it. The
31// processor needs to be started through Start() then waited on through Wait().
32class ChunkProcessor {
33 public:
34  // Read a chunk of |size| bytes from |fd| starting at offset |offset|.
35  ChunkProcessor(int fd, off_t offset, size_t size)
36      : thread_(NULL),
37        fd_(fd),
38        offset_(offset),
39        buffer_in_(size) {}
40  ~ChunkProcessor() { Wait(); }
41
42  off_t offset() const { return offset_; }
43  const vector<char>& buffer_in() const { return buffer_in_; }
44  const vector<char>& buffer_compressed() const { return buffer_compressed_; }
45
46  // Starts the processor. Returns true on success, false on failure.
47  bool Start();
48
49  // Waits for the processor to complete. Returns true on success, false on
50  // failure.
51  bool Wait();
52
53  bool ShouldCompress() const {
54    return buffer_compressed_.size() < buffer_in_.size();
55  }
56
57 private:
58  // Reads the input data into |buffer_in_| and compresses it into
59  // |buffer_compressed_|. Returns true on success, false otherwise.
60  bool ReadAndCompress();
61  static gpointer ReadAndCompressThread(gpointer data);
62
63  GThread* thread_;
64  int fd_;
65  off_t offset_;
66  vector<char> buffer_in_;
67  vector<char> buffer_compressed_;
68
69  DISALLOW_COPY_AND_ASSIGN(ChunkProcessor);
70};
71
72bool ChunkProcessor::Start() {
73  // g_thread_create is deprecated since glib 2.32. Use
74  // g_thread_new instead.
75  thread_ = g_thread_try_new("chunk_proc", ReadAndCompressThread, this, NULL);
76  TEST_AND_RETURN_FALSE(thread_ != NULL);
77  return true;
78}
79
80bool ChunkProcessor::Wait() {
81  if (!thread_) {
82    return false;
83  }
84  gpointer result = g_thread_join(thread_);
85  thread_ = NULL;
86  TEST_AND_RETURN_FALSE(result == this);
87  return true;
88}
89
90gpointer ChunkProcessor::ReadAndCompressThread(gpointer data) {
91  return
92      reinterpret_cast<ChunkProcessor*>(data)->ReadAndCompress() ? data : NULL;
93}
94
95bool ChunkProcessor::ReadAndCompress() {
96  ssize_t bytes_read = -1;
97  TEST_AND_RETURN_FALSE(utils::PReadAll(fd_,
98                                        buffer_in_.data(),
99                                        buffer_in_.size(),
100                                        offset_,
101                                        &bytes_read));
102  TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(buffer_in_.size()));
103  TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &buffer_compressed_));
104  return true;
105}
106
107}  // namespace
108
109bool FullUpdateGenerator::Run(
110    Graph* graph,
111    const std::string& new_kernel_part,
112    const std::string& new_image,
113    off_t image_size,
114    int fd,
115    off_t* data_file_size,
116    off_t chunk_size,
117    off_t block_size,
118    vector<DeltaArchiveManifest_InstallOperation>* kernel_ops,
119    std::vector<Vertex::Index>* final_order) {
120  TEST_AND_RETURN_FALSE(chunk_size > 0);
121  TEST_AND_RETURN_FALSE((chunk_size % block_size) == 0);
122
123  size_t max_threads = max(sysconf(_SC_NPROCESSORS_ONLN), 4L);
124  LOG(INFO) << "Max threads: " << max_threads;
125
126  // Get the sizes early in the function, so we can fail fast if the user
127  // passed us bad paths.
128  TEST_AND_RETURN_FALSE(image_size >= 0 &&
129                        image_size <= utils::FileSize(new_image));
130  const off_t kernel_size = utils::FileSize(new_kernel_part);
131  TEST_AND_RETURN_FALSE(kernel_size >= 0);
132
133  off_t part_sizes[] = { image_size, kernel_size };
134  string paths[] = { new_image, new_kernel_part };
135
136  for (int partition = 0; partition < 2; ++partition) {
137    const string& path = paths[partition];
138    LOG(INFO) << "compressing " << path;
139    int in_fd = open(path.c_str(), O_RDONLY, 0);
140    TEST_AND_RETURN_FALSE(in_fd >= 0);
141    ScopedFdCloser in_fd_closer(&in_fd);
142    deque<shared_ptr<ChunkProcessor> > threads;
143    int last_progress_update = INT_MIN;
144    off_t bytes_left = part_sizes[partition], counter = 0, offset = 0;
145    while (bytes_left > 0 || !threads.empty()) {
146      // Check and start new chunk processors if possible.
147      while (threads.size() < max_threads && bytes_left > 0) {
148        shared_ptr<ChunkProcessor> processor(
149            new ChunkProcessor(in_fd, offset, min(bytes_left, chunk_size)));
150        threads.push_back(processor);
151        TEST_AND_RETURN_FALSE(processor->Start());
152        bytes_left -= chunk_size;
153        offset += chunk_size;
154      }
155
156      // Need to wait for a chunk processor to complete and process its ouput
157      // before spawning new processors.
158      shared_ptr<ChunkProcessor> processor = threads.front();
159      threads.pop_front();
160      TEST_AND_RETURN_FALSE(processor->Wait());
161
162      DeltaArchiveManifest_InstallOperation* op = NULL;
163      if (partition == 0) {
164        graph->resize(graph->size() + 1);
165        graph->back().file_name =
166            base::StringPrintf("<rootfs-operation-%" PRIi64 ">", counter++);
167        op = &graph->back().op;
168        final_order->push_back(graph->size() - 1);
169      } else {
170        kernel_ops->resize(kernel_ops->size() + 1);
171        op = &kernel_ops->back();
172      }
173
174      const bool compress = processor->ShouldCompress();
175      const vector<char>& use_buf =
176          compress ? processor->buffer_compressed() : processor->buffer_in();
177      op->set_type(compress ?
178                   DeltaArchiveManifest_InstallOperation_Type_REPLACE_BZ :
179                   DeltaArchiveManifest_InstallOperation_Type_REPLACE);
180      op->set_data_offset(*data_file_size);
181      TEST_AND_RETURN_FALSE(utils::WriteAll(fd, &use_buf[0], use_buf.size()));
182      *data_file_size += use_buf.size();
183      op->set_data_length(use_buf.size());
184      Extent* dst_extent = op->add_dst_extents();
185      dst_extent->set_start_block(processor->offset() / block_size);
186      dst_extent->set_num_blocks(chunk_size / block_size);
187
188      int progress = static_cast<int>(
189          (processor->offset() + processor->buffer_in().size()) * 100.0 /
190          part_sizes[partition]);
191      if (last_progress_update < progress &&
192          (last_progress_update + 10 <= progress || progress == 100)) {
193        LOG(INFO) << progress << "% complete (output size: "
194                  << *data_file_size << ")";
195        last_progress_update = progress;
196      }
197    }
198  }
199
200  return true;
201}
202
203}  // namespace chromeos_update_engine
204