1// Copyright 2016 the V8 project authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#ifndef V8_HEAP_PAGE_PARALLEL_JOB_
6#define V8_HEAP_PAGE_PARALLEL_JOB_
7
8#include "src/allocation.h"
9#include "src/cancelable-task.h"
10#include "src/utils.h"
11#include "src/v8.h"
12
13namespace v8 {
14namespace internal {
15
16class Heap;
17class Isolate;
18
19// This class manages background tasks that process set of pages in parallel.
20// The JobTraits class needs to define:
21// - PerPageData type - state associated with each page.
22// - PerTaskData type - state associated with each task.
23// - static bool ProcessPageInParallel(Heap* heap,
24//                                     PerTaskData task_data,
25//                                     MemoryChunk* page,
26//                                     PerPageData page_data)
27//   The function should return true iff processing succeeded.
28// - static const bool NeedSequentialFinalization
29// - static void FinalizePageSequentially(Heap* heap,
30//                                        bool processing_succeeded,
31//                                        MemoryChunk* page,
32//                                        PerPageData page_data)
33template <typename JobTraits>
34class PageParallelJob {
35 public:
36  // PageParallelJob cannot dynamically create a semaphore because of a bug in
37  // glibc. See http://crbug.com/609249 and
38  // https://sourceware.org/bugzilla/show_bug.cgi?id=12674.
39  // The caller must provide a semaphore with value 0 and ensure that
40  // the lifetime of the semaphore is the same as the lifetime of the Isolate.
41  // It is guaranteed that the semaphore value will be 0 after Run() call.
42  PageParallelJob(Heap* heap, CancelableTaskManager* cancelable_task_manager,
43                  base::Semaphore* semaphore)
44      : heap_(heap),
45        cancelable_task_manager_(cancelable_task_manager),
46        items_(nullptr),
47        num_items_(0),
48        num_tasks_(0),
49        pending_tasks_(semaphore) {}
50
51  ~PageParallelJob() {
52    Item* item = items_;
53    while (item != nullptr) {
54      Item* next = item->next;
55      delete item;
56      item = next;
57    }
58  }
59
60  void AddPage(MemoryChunk* chunk, typename JobTraits::PerPageData data) {
61    Item* item = new Item(chunk, data, items_);
62    items_ = item;
63    ++num_items_;
64  }
65
66  int NumberOfPages() { return num_items_; }
67
68  // Returns the number of tasks that were spawned when running the job.
69  int NumberOfTasks() { return num_tasks_; }
70
71  // Runs the given number of tasks in parallel and processes the previously
72  // added pages. This function blocks until all tasks finish.
73  // The callback takes the index of a task and returns data for that task.
74  template <typename Callback>
75  void Run(int num_tasks, Callback per_task_data_callback) {
76    if (num_items_ == 0) return;
77    DCHECK_GE(num_tasks, 1);
78    uint32_t task_ids[kMaxNumberOfTasks];
79    const int max_num_tasks = Min(
80        kMaxNumberOfTasks,
81        static_cast<int>(
82            V8::GetCurrentPlatform()->NumberOfAvailableBackgroundThreads()));
83    num_tasks_ = Max(1, Min(num_tasks, max_num_tasks));
84    int items_per_task = (num_items_ + num_tasks_ - 1) / num_tasks_;
85    int start_index = 0;
86    Task* main_task = nullptr;
87    for (int i = 0; i < num_tasks_; i++, start_index += items_per_task) {
88      if (start_index >= num_items_) {
89        start_index -= num_items_;
90      }
91      Task* task = new Task(heap_, items_, num_items_, start_index,
92                            pending_tasks_, per_task_data_callback(i));
93      task_ids[i] = task->id();
94      if (i > 0) {
95        V8::GetCurrentPlatform()->CallOnBackgroundThread(
96            task, v8::Platform::kShortRunningTask);
97      } else {
98        main_task = task;
99      }
100    }
101    // Contribute on main thread.
102    main_task->Run();
103    delete main_task;
104    // Wait for background tasks.
105    for (int i = 0; i < num_tasks_; i++) {
106      if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
107          CancelableTaskManager::kTaskAborted) {
108        pending_tasks_->Wait();
109      }
110    }
111    if (JobTraits::NeedSequentialFinalization) {
112      Item* item = items_;
113      while (item != nullptr) {
114        bool success = (item->state.Value() == kFinished);
115        JobTraits::FinalizePageSequentially(heap_, item->chunk, success,
116                                            item->data);
117        item = item->next;
118      }
119    }
120  }
121
122 private:
123  static const int kMaxNumberOfTasks = 10;
124
125  enum ProcessingState { kAvailable, kProcessing, kFinished, kFailed };
126
127  struct Item : public Malloced {
128    Item(MemoryChunk* chunk, typename JobTraits::PerPageData data, Item* next)
129        : chunk(chunk), state(kAvailable), data(data), next(next) {}
130    MemoryChunk* chunk;
131    base::AtomicValue<ProcessingState> state;
132    typename JobTraits::PerPageData data;
133    Item* next;
134  };
135
136  class Task : public CancelableTask {
137   public:
138    Task(Heap* heap, Item* items, int num_items, int start_index,
139         base::Semaphore* on_finish, typename JobTraits::PerTaskData data)
140        : CancelableTask(heap->isolate()),
141          heap_(heap),
142          items_(items),
143          num_items_(num_items),
144          start_index_(start_index),
145          on_finish_(on_finish),
146          data_(data) {}
147
148    virtual ~Task() {}
149
150   private:
151    // v8::internal::CancelableTask overrides.
152    void RunInternal() override {
153      // Each task starts at a different index to improve parallelization.
154      Item* current = items_;
155      int skip = start_index_;
156      while (skip-- > 0) {
157        current = current->next;
158      }
159      for (int i = 0; i < num_items_; i++) {
160        if (current->state.TrySetValue(kAvailable, kProcessing)) {
161          bool success = JobTraits::ProcessPageInParallel(
162              heap_, data_, current->chunk, current->data);
163          current->state.SetValue(success ? kFinished : kFailed);
164        }
165        current = current->next;
166        // Wrap around if needed.
167        if (current == nullptr) {
168          current = items_;
169        }
170      }
171      on_finish_->Signal();
172    }
173
174    Heap* heap_;
175    Item* items_;
176    int num_items_;
177    int start_index_;
178    base::Semaphore* on_finish_;
179    typename JobTraits::PerTaskData data_;
180    DISALLOW_COPY_AND_ASSIGN(Task);
181  };
182
183  Heap* heap_;
184  CancelableTaskManager* cancelable_task_manager_;
185  Item* items_;
186  int num_items_;
187  int num_tasks_;
188  base::Semaphore* pending_tasks_;
189  DISALLOW_COPY_AND_ASSIGN(PageParallelJob);
190};
191
192}  // namespace internal
193}  // namespace v8
194
195#endif  // V8_HEAP_PAGE_PARALLEL_JOB_
196