full_update_generator.cc revision 0bc2611b49aba30202fdb9c23871430eb67e4738
13e7cb52cbf4ca537cafb6617b20387225672ad1aCedric Beust//
23e7cb52cbf4ca537cafb6617b20387225672ad1aCedric Beust// Copyright (C) 2012 The Android Open Source Project
33e7cb52cbf4ca537cafb6617b20387225672ad1aCedric Beust//
43e7cb52cbf4ca537cafb6617b20387225672ad1aCedric Beust// Licensed under the Apache License, Version 2.0 (the "License");
53e7cb52cbf4ca537cafb6617b20387225672ad1aCedric Beust// you may not use this file except in compliance with the License.
63e7cb52cbf4ca537cafb6617b20387225672ad1aCedric Beust// You may obtain a copy of the License at
73e7cb52cbf4ca537cafb6617b20387225672ad1aCedric Beust//
83e7cb52cbf4ca537cafb6617b20387225672ad1aCedric Beust//      http://www.apache.org/licenses/LICENSE-2.0
93e7cb52cbf4ca537cafb6617b20387225672ad1aCedric Beust//
103e7cb52cbf4ca537cafb6617b20387225672ad1aCedric Beust// Unless required by applicable law or agreed to in writing, software
113e7cb52cbf4ca537cafb6617b20387225672ad1aCedric Beust// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15//
16
17#include "update_engine/payload_generator/full_update_generator.h"
18
19#include <fcntl.h>
20#include <inttypes.h>
21
22#include <algorithm>
23#include <deque>
24#include <memory>
25
26#include <base/format_macros.h>
27#include <base/strings/string_util.h>
28#include <base/strings/stringprintf.h>
29#include <base/synchronization/lock.h>
30#include <base/threading/simple_thread.h>
31#include <brillo/secure_blob.h>
32
33#include "update_engine/payload_generator/bzip.h"
34#include "update_engine/utils.h"
35
36using std::vector;
37
38namespace chromeos_update_engine {
39
40namespace {
41
42const size_t kDefaultFullChunkSize = 1024 * 1024;  // 1 MiB
43
44// This class encapsulates a full update chunk processing thread work. The
45// processor reads a chunk of data from the input file descriptor and compresses
46// it. The processor will destroy itself when the work is done.
47class ChunkProcessor : public base::DelegateSimpleThread::Delegate {
48 public:
49  // Read a chunk of |size| bytes from |fd| starting at offset |offset|.
50  ChunkProcessor(int fd, off_t offset, size_t size,
51                 BlobFileWriter* blob_file, AnnotatedOperation* aop)
52    : fd_(fd),
53      offset_(offset),
54      size_(size),
55      blob_file_(blob_file),
56      aop_(aop) {}
57  // We use a default move constructor since all the data members are POD types.
58  ChunkProcessor(ChunkProcessor&&) = default;
59  ~ChunkProcessor() override = default;
60
61  // Overrides DelegateSimpleThread::Delegate.
62  // Run() handles the read from |fd| in a thread-safe way, and stores the
63  // new operation to generate the region starting at |offset| of size |size|
64  // in the output operation |aop|. The associated blob data is stored in
65  // |blob_fd| and |blob_file_size| is updated.
66  void Run() override;
67
68 private:
69  bool ProcessChunk();
70
71  // Stores the operation blob in the |blob_fd_| and updates the
72  // |blob_file_size| accordingly.
73  // This method is thread-safe since it uses a mutex to access the file.
74  // Returns the data offset where the data was written to.
75  off_t StoreBlob(const brillo::Blob& blob);
76
77  // Work parameters.
78  int fd_;
79  off_t offset_;
80  size_t size_;
81  BlobFileWriter* blob_file_;
82  AnnotatedOperation* aop_;
83
84  DISALLOW_COPY_AND_ASSIGN(ChunkProcessor);
85};
86
87void ChunkProcessor::Run() {
88  if (!ProcessChunk()) {
89    LOG(ERROR) << "Error processing region at " << offset_ << " of size "
90               << size_;
91  }
92}
93
94bool ChunkProcessor::ProcessChunk() {
95  brillo::Blob buffer_in_(size_);
96  brillo::Blob op_blob;
97  ssize_t bytes_read = -1;
98  TEST_AND_RETURN_FALSE(utils::PReadAll(fd_,
99                                        buffer_in_.data(),
100                                        buffer_in_.size(),
101                                        offset_,
102                                        &bytes_read));
103  TEST_AND_RETURN_FALSE(bytes_read == static_cast<ssize_t>(size_));
104  TEST_AND_RETURN_FALSE(BzipCompress(buffer_in_, &op_blob));
105
106  InstallOperation_Type op_type = InstallOperation::REPLACE_BZ;
107
108  if (op_blob.size() >= buffer_in_.size()) {
109    // A REPLACE is cheaper than a REPLACE_BZ in this case.
110    op_blob = std::move(buffer_in_);
111    op_type = InstallOperation::REPLACE;
112  }
113
114  TEST_AND_RETURN_FALSE(aop_->SetOperationBlob(&op_blob, blob_file_));
115  aop_->op.set_type(op_type);
116  return true;
117}
118
119}  // namespace
120
121bool FullUpdateGenerator::GenerateOperations(
122    const PayloadGenerationConfig& config,
123    const PartitionConfig& old_part,
124    const PartitionConfig& new_part,
125    BlobFileWriter* blob_file,
126    vector<AnnotatedOperation>* aops) {
127  TEST_AND_RETURN_FALSE(new_part.ValidateExists());
128
129  // FullUpdateGenerator requires a positive chunk_size, otherwise there will
130  // be only one operation with the whole partition which should not be allowed.
131  // For performance reasons, we force a small default hard limit of 1 MiB. This
132  // limit can be changed in the config, and we will use the smaller of the two
133  // soft/hard limits.
134  size_t full_chunk_size;
135  if (config.hard_chunk_size >= 0) {
136    full_chunk_size = std::min(static_cast<size_t>(config.hard_chunk_size),
137                               config.soft_chunk_size);
138  } else {
139    full_chunk_size = std::min(kDefaultFullChunkSize, config.soft_chunk_size);
140    LOG(INFO) << "No chunk_size provided, using the default chunk_size for the "
141              << "full operations: " << full_chunk_size << " bytes.";
142  }
143  TEST_AND_RETURN_FALSE(full_chunk_size > 0);
144  TEST_AND_RETURN_FALSE(full_chunk_size % config.block_size == 0);
145
146  size_t chunk_blocks = full_chunk_size / config.block_size;
147  size_t max_threads = std::max(sysconf(_SC_NPROCESSORS_ONLN), 4L);
148  LOG(INFO) << "Compressing partition " << new_part.name
149            << " from " << new_part.path << " splitting in chunks of "
150            << chunk_blocks << " blocks (" << config.block_size
151            << " bytes each) using " << max_threads << " threads";
152
153  int in_fd = open(new_part.path.c_str(), O_RDONLY, 0);
154  TEST_AND_RETURN_FALSE(in_fd >= 0);
155  ScopedFdCloser in_fd_closer(&in_fd);
156
157  // We potentially have all the ChunkProcessors in memory but only
158  // |max_threads| will actually hold a block in memory while we process.
159  size_t partition_blocks = new_part.size / config.block_size;
160  size_t num_chunks = (partition_blocks + chunk_blocks - 1) / chunk_blocks;
161  aops->resize(num_chunks);
162  vector<ChunkProcessor> chunk_processors;
163  chunk_processors.reserve(num_chunks);
164  blob_file->SetTotalBlobs(num_chunks);
165
166  for (size_t i = 0; i < num_chunks; ++i) {
167    size_t start_block = i * chunk_blocks;
168    // The last chunk could be smaller.
169    size_t num_blocks = std::min(chunk_blocks,
170                                 partition_blocks - i * chunk_blocks);
171
172    // Preset all the static information about the operations. The
173    // ChunkProcessor will set the rest.
174    AnnotatedOperation* aop = aops->data() + i;
175    aop->name = base::StringPrintf("<%s-operation-%" PRIuS ">",
176                                   new_part.name.c_str(), i);
177    Extent* dst_extent = aop->op.add_dst_extents();
178    dst_extent->set_start_block(start_block);
179    dst_extent->set_num_blocks(num_blocks);
180
181    chunk_processors.emplace_back(
182        in_fd,
183        static_cast<off_t>(start_block) * config.block_size,
184        num_blocks * config.block_size,
185        blob_file,
186        aop);
187  }
188
189  // Thread pool used for worker threads.
190  base::DelegateSimpleThreadPool thread_pool("full-update-generator",
191                                             max_threads);
192  thread_pool.Start();
193  for (ChunkProcessor& processor : chunk_processors)
194    thread_pool.AddWork(&processor);
195  thread_pool.JoinAll();
196  // All the operations must have a type set at this point. Otherwise, a
197  // ChunkProcessor failed to complete.
198  for (const AnnotatedOperation& aop : *aops) {
199    if (!aop.op.has_type())
200      return false;
201  }
202  return true;
203}
204
205}  // namespace chromeos_update_engine
206