1/* 2 * Copyright (C) 2014 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17package com.google.android.apps.common.testing.ui.espresso.base; 18 19import static com.google.common.base.Preconditions.checkNotNull; 20import static com.google.common.base.Preconditions.checkState; 21 22import java.util.concurrent.BrokenBarrierException; 23import java.util.concurrent.CyclicBarrier; 24import java.util.concurrent.ThreadPoolExecutor; 25import java.util.concurrent.atomic.AtomicInteger; 26import java.util.concurrent.atomic.AtomicReference; 27 28/** 29 * Provides a way to monitor AsyncTask's work queue to ensure that there is no work pending 30 * or executing (and to allow notification of idleness). 31 * 32 * This class is based on the assumption that we can get at the ThreadPoolExecutor AsyncTask uses. 33 * That is currently possible and easy in Froyo to JB. If it ever becomes impossible, as long as we 34 * know the max # of executor threads the AsyncTask framework allows we can still use this 35 * interface, just need a different implementation. 36 */ 37class AsyncTaskPoolMonitor { 38 private final AtomicReference<IdleMonitor> monitor = new AtomicReference<IdleMonitor>(null); 39 private final ThreadPoolExecutor pool; 40 private final AtomicInteger activeBarrierChecks = new AtomicInteger(0); 41 42 AsyncTaskPoolMonitor(ThreadPoolExecutor pool) { 43 this.pool = checkNotNull(pool); 44 } 45 46 /** 47 * Checks if the pool is idle at this moment. 48 * 49 * @return true if the pool is idle, false otherwise. 50 */ 51 boolean isIdleNow() { 52 if (!pool.getQueue().isEmpty()) { 53 return false; 54 } else { 55 int activeCount = pool.getActiveCount(); 56 if (0 != activeCount) { 57 if (monitor.get() == null) { 58 // if there's no idle monitor scheduled and there are still barrier 59 // checks running, they are about to exit, ignore them. 60 activeCount = activeCount - activeBarrierChecks.get(); 61 } 62 } 63 return 0 == activeCount; 64 } 65 } 66 67 /** 68 * Notifies caller once the pool is idle. 69 * 70 * We check for idle-ness by submitting the max # of tasks the pool will take and blocking 71 * the tasks until they are all executing. Then we know there are no other tasks _currently_ 72 * executing in the pool, we look back at the work queue to see if its backed up, if it is 73 * we reenqueue ourselves and try again. 74 * 75 * Obviously this strategy will fail horribly if 2 parties are doing it at the same time, 76 * we prevent recursion here the best we can. 77 * 78 * @param idleCallback called once the pool is idle. 79 */ 80 void notifyWhenIdle(final Runnable idleCallback) { 81 checkNotNull(idleCallback); 82 IdleMonitor myMonitor = new IdleMonitor(idleCallback); 83 checkState(monitor.compareAndSet(null, myMonitor), "cannot monitor for idle recursively!"); 84 myMonitor.monitorForIdle(); 85 } 86 87 /** 88 * Stops the idle monitoring mechanism if it is in place. 89 * 90 * Note: the callback may still be invoked after this method is called. The only thing 91 * this method guarantees is that we will stop/cancel any blockign tasks we've placed 92 * on the thread pool. 93 */ 94 void cancelIdleMonitor() { 95 IdleMonitor myMonitor = monitor.getAndSet(null); 96 if (null != myMonitor) { 97 myMonitor.poison(); 98 } 99 } 100 101 private class IdleMonitor { 102 private final Runnable onIdle; 103 private final AtomicInteger barrierGeneration = new AtomicInteger(0); 104 private final CyclicBarrier barrier; 105 // written by main, read by all. 106 private volatile boolean poisoned; 107 108 private IdleMonitor(final Runnable onIdle) { 109 this.onIdle = checkNotNull(onIdle); 110 this.barrier = new CyclicBarrier(pool.getCorePoolSize(), 111 new Runnable() { 112 @Override 113 public void run() { 114 if (pool.getQueue().isEmpty()) { 115 // no one is behind us, so the queue is idle! 116 monitor.compareAndSet(IdleMonitor.this, null); 117 onIdle.run(); 118 } else { 119 // work is waiting behind us, enqueue another block of tasks and 120 // hopefully when they're all running, the queue will be empty. 121 monitorForIdle(); 122 } 123 124 } 125 }); 126 } 127 128 /** 129 * Stops this monitor from using the thread pool's resources, it may still cause the 130 * callback to be executed though. 131 */ 132 private void poison() { 133 poisoned = true; 134 barrier.reset(); 135 } 136 137 private void monitorForIdle() { 138 if (poisoned) { 139 return; 140 } 141 142 if (isIdleNow()) { 143 monitor.compareAndSet(this, null); 144 onIdle.run(); 145 } else { 146 // Submit N tasks that will block until they are all running on the thread pool. 147 // at this point we can check the pool's queue and verify that there are no new 148 // tasks behind us and deem the queue idle. 149 150 int poolSize = pool.getCorePoolSize(); 151 final BarrierRestarter restarter = new BarrierRestarter(barrier, barrierGeneration); 152 153 for (int i = 0; i < poolSize; i++) { 154 pool.execute(new Runnable() { 155 @Override 156 public void run() { 157 while (!poisoned) { 158 activeBarrierChecks.incrementAndGet(); 159 int myGeneration = barrierGeneration.get(); 160 try { 161 barrier.await(); 162 return; 163 } catch (InterruptedException ie) { 164 // sorry - I cant let you interrupt me! 165 restarter.restart(myGeneration); 166 } catch (BrokenBarrierException bbe) { 167 restarter.restart(myGeneration); 168 } finally { 169 activeBarrierChecks.decrementAndGet(); 170 } 171 } 172 } 173 }); 174 } 175 } 176 } 177 } 178 179 180 private static class BarrierRestarter { 181 private final CyclicBarrier barrier; 182 private final AtomicInteger barrierGeneration; 183 BarrierRestarter(CyclicBarrier barrier, AtomicInteger barrierGeneration) { 184 this.barrier = barrier; 185 this.barrierGeneration = barrierGeneration; 186 } 187 188 /** 189 * restarts the barrier. 190 * 191 * After the calling this function it is guaranteed that barrier generation has been incremented 192 * and the barrier can be awaited on again. 193 * 194 * @param fromGeneration the generation that encountered the breaking exception. 195 */ 196 synchronized void restart(int fromGeneration) { 197 // must be synchronized. T1 could pass the if check, be suspended before calling reset, T2 198 // sails thru - and awaits on the barrier again before T1 has awoken and reset it. 199 int nextGen = fromGeneration + 1; 200 if (barrierGeneration.compareAndSet(fromGeneration, nextGen)) { 201 // first time we've seen fromGeneration request a reset. lets reset the barrier. 202 barrier.reset(); 203 } else { 204 // some other thread has already reset the barrier - this request is a no op. 205 } 206 } 207 } 208} 209