TaskQueue.java revision f83be5e4273263df2bb9ef609946b911695b3996
1/*
2 * Copyright (C) 2011 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package vogar.tasks;
18
19import java.util.ArrayList;
20import java.util.Collection;
21import java.util.Iterator;
22import java.util.LinkedList;
23import java.util.List;
24import java.util.concurrent.ExecutorService;
25import java.util.concurrent.TimeUnit;
26import vogar.Console;
27import vogar.Result;
28import vogar.util.Threads;
29
30/**
31 * A set of tasks to execute.
32 */
33public final class TaskQueue {
34    private static final int FOREVER = 60 * 60 * 24 * 28; // four weeks
35    private final Console console;
36    private int runningTasks;
37    private final LinkedList<Task> tasks = new LinkedList<Task>();
38    private final LinkedList<Task> runnableTasks = new LinkedList<Task>();
39    private final List<Task> failedTasks = new ArrayList<Task>();
40
41    public TaskQueue(Console console) {
42        this.console = console;
43    }
44
45    /**
46     * Adds a task to the queue.
47     */
48    public synchronized void enqueue(Task task) {
49        tasks.add(task);
50    }
51
52    public void enqueueAll(Collection<Task> tasks) {
53        this.tasks.addAll(tasks);
54    }
55
56    public void runTasks() {
57        promoteBlockedTasks();
58
59        ExecutorService runners = Threads.threadPerCpuExecutor(console, "TaskQueue");
60        for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) {
61            runners.execute(new Runnable() {
62                @Override public void run() {
63                    while (runOneTask()) {
64                    }
65                }
66            });
67        }
68
69        runners.shutdown();
70        try {
71            runners.awaitTermination(FOREVER, TimeUnit.SECONDS);
72        } catch (InterruptedException e) {
73            throw new AssertionError();
74        }
75    }
76
77    public void printTasks() {
78        if (!console.isVerbose()) {
79            return;
80        }
81
82        int i = 0;
83        for (Task task : tasks) {
84            StringBuilder message = new StringBuilder()
85                    .append("Task ").append(i++).append(": ").append(task);
86            for (Task blocker : task.tasksThatMustFinishFirst) {
87                message.append("\n  depends on completed task: ").append(blocker);
88            }
89            for (Task blocker : task.tasksThatMustFinishSuccessfullyFirst) {
90                message.append("\n  depends on successful task: ").append(blocker);
91            }
92            console.verbose(message.toString());
93        }
94    }
95
96    public void printProblemTasks() {
97        for (Task task : failedTasks) {
98            String message = "Failed task: " + task + " " + task.result;
99            if (task.thrown != null) {
100                console.info(message, task.thrown);
101            } else {
102                console.info(message);
103            }
104        }
105        if (!console.isVerbose()) {
106            return;
107        }
108        for (Task task : tasks) {
109            StringBuilder message = new StringBuilder()
110                    .append("Failed to execute task: ").append(task);
111            for (Task blocker : task.tasksThatMustFinishFirst) {
112                if (blocker.result == null) {
113                    message.append("\n  blocked by unexecuted task: ").append(blocker);
114                }
115            }
116            for (Task blocker : task.tasksThatMustFinishSuccessfullyFirst) {
117                if (blocker.result == null) {
118                    message.append("\n  blocked by unexecuted task: ").append(blocker);
119                } else if (blocker.result != Result.SUCCESS) {
120                    message.append("\n  blocked by unsuccessful task: ").append(blocker);
121                }
122            }
123            console.verbose(message.toString());
124        }
125    }
126
127    private boolean runOneTask() {
128        Task task = takeTask();
129        if (task == null) {
130            return false;
131        }
132        String threadName = Thread.currentThread().getName();
133        Thread.currentThread().setName(task.toString());
134        try {
135            task.run(console);
136        } finally {
137            doneTask(task);
138            Thread.currentThread().setName(threadName);
139        }
140        return true;
141    }
142
143    private synchronized Task takeTask() {
144        while (true) {
145            Task task = runnableTasks.poll();
146            if (task != null) {
147                runningTasks++;
148                return task;
149            }
150
151            if (isExhausted()) {
152                return null;
153            }
154
155            try {
156                wait();
157            } catch (InterruptedException e) {
158                throw new AssertionError();
159            }
160        }
161    }
162
163    private synchronized void doneTask(Task task) {
164        if (task.result != Result.SUCCESS) {
165            failedTasks.add(task);
166        }
167        runningTasks--;
168        promoteBlockedTasks();
169        if (isExhausted()) {
170            notifyAll();
171        }
172    }
173
174    private synchronized void promoteBlockedTasks() {
175        for (Iterator<Task> it = tasks.iterator(); it.hasNext(); ) {
176            Task potentiallyUnblocked = it.next();
177            if (potentiallyUnblocked.isRunnable()) {
178                it.remove();
179                runnableTasks.add(potentiallyUnblocked);
180                notifyAll();
181            }
182        }
183    }
184
185    /**
186     * Returns true if there are no tasks to run and no tasks currently running.
187     */
188    private boolean isExhausted() {
189        return runnableTasks.isEmpty() && runningTasks == 0;
190    }
191}
192