TaskQueue.java revision f83be5e4273263df2bb9ef609946b911695b3996
1/* 2 * Copyright (C) 2011 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package vogar.tasks; 18 19import java.util.ArrayList; 20import java.util.Collection; 21import java.util.Iterator; 22import java.util.LinkedList; 23import java.util.List; 24import java.util.concurrent.ExecutorService; 25import java.util.concurrent.TimeUnit; 26import vogar.Console; 27import vogar.Result; 28import vogar.util.Threads; 29 30/** 31 * A set of tasks to execute. 32 */ 33public final class TaskQueue { 34 private static final int FOREVER = 60 * 60 * 24 * 28; // four weeks 35 private final Console console; 36 private int runningTasks; 37 private final LinkedList<Task> tasks = new LinkedList<Task>(); 38 private final LinkedList<Task> runnableTasks = new LinkedList<Task>(); 39 private final List<Task> failedTasks = new ArrayList<Task>(); 40 41 public TaskQueue(Console console) { 42 this.console = console; 43 } 44 45 /** 46 * Adds a task to the queue. 47 */ 48 public synchronized void enqueue(Task task) { 49 tasks.add(task); 50 } 51 52 public void enqueueAll(Collection<Task> tasks) { 53 this.tasks.addAll(tasks); 54 } 55 56 public void runTasks() { 57 promoteBlockedTasks(); 58 59 ExecutorService runners = Threads.threadPerCpuExecutor(console, "TaskQueue"); 60 for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) { 61 runners.execute(new Runnable() { 62 @Override public void run() { 63 while (runOneTask()) { 64 } 65 } 66 }); 67 } 68 69 runners.shutdown(); 70 try { 71 runners.awaitTermination(FOREVER, TimeUnit.SECONDS); 72 } catch (InterruptedException e) { 73 throw new AssertionError(); 74 } 75 } 76 77 public void printTasks() { 78 if (!console.isVerbose()) { 79 return; 80 } 81 82 int i = 0; 83 for (Task task : tasks) { 84 StringBuilder message = new StringBuilder() 85 .append("Task ").append(i++).append(": ").append(task); 86 for (Task blocker : task.tasksThatMustFinishFirst) { 87 message.append("\n depends on completed task: ").append(blocker); 88 } 89 for (Task blocker : task.tasksThatMustFinishSuccessfullyFirst) { 90 message.append("\n depends on successful task: ").append(blocker); 91 } 92 console.verbose(message.toString()); 93 } 94 } 95 96 public void printProblemTasks() { 97 for (Task task : failedTasks) { 98 String message = "Failed task: " + task + " " + task.result; 99 if (task.thrown != null) { 100 console.info(message, task.thrown); 101 } else { 102 console.info(message); 103 } 104 } 105 if (!console.isVerbose()) { 106 return; 107 } 108 for (Task task : tasks) { 109 StringBuilder message = new StringBuilder() 110 .append("Failed to execute task: ").append(task); 111 for (Task blocker : task.tasksThatMustFinishFirst) { 112 if (blocker.result == null) { 113 message.append("\n blocked by unexecuted task: ").append(blocker); 114 } 115 } 116 for (Task blocker : task.tasksThatMustFinishSuccessfullyFirst) { 117 if (blocker.result == null) { 118 message.append("\n blocked by unexecuted task: ").append(blocker); 119 } else if (blocker.result != Result.SUCCESS) { 120 message.append("\n blocked by unsuccessful task: ").append(blocker); 121 } 122 } 123 console.verbose(message.toString()); 124 } 125 } 126 127 private boolean runOneTask() { 128 Task task = takeTask(); 129 if (task == null) { 130 return false; 131 } 132 String threadName = Thread.currentThread().getName(); 133 Thread.currentThread().setName(task.toString()); 134 try { 135 task.run(console); 136 } finally { 137 doneTask(task); 138 Thread.currentThread().setName(threadName); 139 } 140 return true; 141 } 142 143 private synchronized Task takeTask() { 144 while (true) { 145 Task task = runnableTasks.poll(); 146 if (task != null) { 147 runningTasks++; 148 return task; 149 } 150 151 if (isExhausted()) { 152 return null; 153 } 154 155 try { 156 wait(); 157 } catch (InterruptedException e) { 158 throw new AssertionError(); 159 } 160 } 161 } 162 163 private synchronized void doneTask(Task task) { 164 if (task.result != Result.SUCCESS) { 165 failedTasks.add(task); 166 } 167 runningTasks--; 168 promoteBlockedTasks(); 169 if (isExhausted()) { 170 notifyAll(); 171 } 172 } 173 174 private synchronized void promoteBlockedTasks() { 175 for (Iterator<Task> it = tasks.iterator(); it.hasNext(); ) { 176 Task potentiallyUnblocked = it.next(); 177 if (potentiallyUnblocked.isRunnable()) { 178 it.remove(); 179 runnableTasks.add(potentiallyUnblocked); 180 notifyAll(); 181 } 182 } 183 } 184 185 /** 186 * Returns true if there are no tasks to run and no tasks currently running. 187 */ 188 private boolean isExhausted() { 189 return runnableTasks.isEmpty() && runningTasks == 0; 190 } 191} 192