1/*
2 * Copyright (C) 2014 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "base/time_utils.h"
18#include "common_runtime_test.h"
19#include "task_processor.h"
20#include "thread_pool.h"
21#include "thread-inl.h"
22
23namespace art {
24namespace gc {
25
26class TaskProcessorTest : public CommonRuntimeTest {
27 public:
28};
29
30class RecursiveTask : public HeapTask {
31 public:
32  RecursiveTask(TaskProcessor* task_processor, Atomic<size_t>* counter, size_t max_recursion)
33     : HeapTask(NanoTime() + MsToNs(10)), task_processor_(task_processor), counter_(counter),
34       max_recursion_(max_recursion) {
35  }
36  virtual void Run(Thread* self) OVERRIDE {
37    if (max_recursion_ > 0) {
38      task_processor_->AddTask(self,
39                               new RecursiveTask(task_processor_, counter_, max_recursion_ - 1));
40      counter_->FetchAndAddSequentiallyConsistent(1U);
41    }
42  }
43
44 private:
45  TaskProcessor* const task_processor_;
46  Atomic<size_t>* const counter_;
47  const size_t max_recursion_;
48};
49
50class WorkUntilDoneTask : public SelfDeletingTask {
51 public:
52  WorkUntilDoneTask(TaskProcessor* task_processor, Atomic<bool>* done_running)
53      : task_processor_(task_processor), done_running_(done_running) {
54  }
55  virtual void Run(Thread* self) OVERRIDE {
56    task_processor_->RunAllTasks(self);
57    done_running_->StoreSequentiallyConsistent(true);
58  }
59
60 private:
61  TaskProcessor* const task_processor_;
62  Atomic<bool>* done_running_;
63};
64
65TEST_F(TaskProcessorTest, Interrupt) {
66  ThreadPool thread_pool("task processor test", 1U);
67  Thread* const self = Thread::Current();
68  TaskProcessor task_processor;
69  static constexpr size_t kRecursion = 10;
70  Atomic<bool> done_running(false);
71  Atomic<size_t> counter(0);
72  task_processor.AddTask(self, new RecursiveTask(&task_processor, &counter, kRecursion));
73  task_processor.Start(self);
74  // Add a task which will wait until interrupted to the thread pool.
75  thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running));
76  thread_pool.StartWorkers(self);
77  ASSERT_FALSE(done_running);
78  // Wait until all the tasks are done, but since we didn't interrupt, done_running should be 0.
79  while (counter.LoadSequentiallyConsistent() != kRecursion) {
80    usleep(10);
81  }
82  ASSERT_FALSE(done_running);
83  task_processor.Stop(self);
84  thread_pool.Wait(self, true, false);
85  // After the interrupt and wait, the WorkUntilInterruptedTasktask should have terminated and
86  // set done_running_ to true.
87  ASSERT_TRUE(done_running.LoadSequentiallyConsistent());
88
89  // Test that we finish remaining tasks before returning from RunTasksUntilInterrupted.
90  counter.StoreSequentiallyConsistent(0);
91  done_running.StoreSequentiallyConsistent(false);
92  // Self interrupt before any of the other tasks run, but since we added them we should keep on
93  // working until all the tasks are completed.
94  task_processor.Stop(self);
95  task_processor.AddTask(self, new RecursiveTask(&task_processor, &counter, kRecursion));
96  thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running));
97  thread_pool.StartWorkers(self);
98  thread_pool.Wait(self, true, false);
99  ASSERT_TRUE(done_running.LoadSequentiallyConsistent());
100  ASSERT_EQ(counter.LoadSequentiallyConsistent(), kRecursion);
101}
102
103class TestOrderTask : public HeapTask {
104 public:
105  TestOrderTask(uint64_t expected_time, size_t expected_counter, size_t* counter)
106     : HeapTask(expected_time), expected_counter_(expected_counter), counter_(counter) {
107  }
108  virtual void Run(Thread* thread ATTRIBUTE_UNUSED) OVERRIDE {
109    ASSERT_EQ(*counter_, expected_counter_);
110    ++*counter_;
111  }
112
113 private:
114  const size_t expected_counter_;
115  size_t* const counter_;
116};
117
118TEST_F(TaskProcessorTest, Ordering) {
119  static const size_t kNumTasks = 25;
120  const uint64_t current_time = NanoTime();
121  Thread* const self = Thread::Current();
122  TaskProcessor task_processor;
123  task_processor.Stop(self);
124  size_t counter = 0;
125  std::vector<std::pair<uint64_t, size_t>> orderings;
126  for (size_t i = 0; i < kNumTasks; ++i) {
127    orderings.push_back(std::make_pair(current_time + MsToNs(10U * i), i));
128  }
129  for (size_t i = 0; i < kNumTasks; ++i) {
130    std::swap(orderings[i], orderings[(i * 87654231 + 12345) % orderings.size()]);
131  }
132  for (const auto& pair : orderings) {
133    auto* task = new TestOrderTask(pair.first, pair.second, &counter);
134    task_processor.AddTask(self, task);
135  }
136  ThreadPool thread_pool("task processor test", 1U);
137  Atomic<bool> done_running(false);
138  // Add a task which will wait until interrupted to the thread pool.
139  thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running));
140  ASSERT_FALSE(done_running.LoadSequentiallyConsistent());
141  thread_pool.StartWorkers(self);
142  thread_pool.Wait(self, true, false);
143  ASSERT_TRUE(done_running.LoadSequentiallyConsistent());
144  ASSERT_EQ(counter, kNumTasks);
145}
146
147}  // namespace gc
148}  // namespace art
149