1/*
2 * Copyright (C) 2014 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *   http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.android.apps.common.testing.ui.espresso.base;
18
19import static com.google.common.base.Preconditions.checkNotNull;
20import static com.google.common.base.Preconditions.checkState;
21
22import java.util.concurrent.BrokenBarrierException;
23import java.util.concurrent.CyclicBarrier;
24import java.util.concurrent.ThreadPoolExecutor;
25import java.util.concurrent.atomic.AtomicInteger;
26import java.util.concurrent.atomic.AtomicReference;
27
28/**
29 * Provides a way to monitor AsyncTask's work queue to ensure that there is no work pending
30 * or executing (and to allow notification of idleness).
31 *
32 * This class is based on the assumption that we can get at the ThreadPoolExecutor AsyncTask uses.
33 * That is currently possible and easy in Froyo to JB. If it ever becomes impossible, as long as we
34 * know the max # of executor threads the AsyncTask framework allows we can still use this
35 * interface, just need a different implementation.
36 */
37class AsyncTaskPoolMonitor {
38  private final AtomicReference<IdleMonitor> monitor = new AtomicReference<IdleMonitor>(null);
39  private final ThreadPoolExecutor pool;
40  private final AtomicInteger activeBarrierChecks = new AtomicInteger(0);
41
42  AsyncTaskPoolMonitor(ThreadPoolExecutor pool) {
43    this.pool = checkNotNull(pool);
44  }
45
46  /**
47   * Checks if the pool is idle at this moment.
48   *
49   * @return true if the pool is idle, false otherwise.
50   */
51  boolean isIdleNow() {
52    if (!pool.getQueue().isEmpty()) {
53      return false;
54    } else {
55      int activeCount = pool.getActiveCount();
56      if (0 != activeCount) {
57        if (monitor.get() == null) {
58          // if there's no idle monitor scheduled and there are still barrier
59          // checks running, they are about to exit, ignore them.
60          activeCount = activeCount - activeBarrierChecks.get();
61        }
62      }
63      return 0 == activeCount;
64    }
65  }
66
67  /**
68   * Notifies caller once the pool is idle.
69   *
70   * We check for idle-ness by submitting the max # of tasks the pool will take and blocking
71   * the tasks until they are all executing. Then we know there are no other tasks _currently_
72   * executing in the pool, we look back at the work queue to see if its backed up, if it is
73   * we reenqueue ourselves and try again.
74   *
75   * Obviously this strategy will fail horribly if 2 parties are doing it at the same time,
76   * we prevent recursion here the best we can.
77   *
78   * @param idleCallback called once the pool is idle.
79   */
80  void notifyWhenIdle(final Runnable idleCallback) {
81    checkNotNull(idleCallback);
82    IdleMonitor myMonitor = new IdleMonitor(idleCallback);
83    checkState(monitor.compareAndSet(null, myMonitor), "cannot monitor for idle recursively!");
84    myMonitor.monitorForIdle();
85  }
86
87  /**
88   * Stops the idle monitoring mechanism if it is in place.
89   *
90   * Note: the callback may still be invoked after this method is called. The only thing
91   * this method guarantees is that we will stop/cancel any blockign tasks we've placed
92   * on the thread pool.
93   */
94  void cancelIdleMonitor() {
95    IdleMonitor myMonitor = monitor.getAndSet(null);
96    if (null != myMonitor) {
97      myMonitor.poison();
98    }
99  }
100
101  private class IdleMonitor {
102    private final Runnable onIdle;
103    private final AtomicInteger barrierGeneration = new AtomicInteger(0);
104    private final CyclicBarrier barrier;
105    // written by main, read by all.
106    private volatile boolean poisoned;
107
108    private IdleMonitor(final Runnable onIdle) {
109      this.onIdle = checkNotNull(onIdle);
110      this.barrier = new CyclicBarrier(pool.getCorePoolSize(),
111          new Runnable() {
112            @Override
113            public void run() {
114              if (pool.getQueue().isEmpty()) {
115                // no one is behind us, so the queue is idle!
116                monitor.compareAndSet(IdleMonitor.this, null);
117                onIdle.run();
118              } else {
119                // work is waiting behind us, enqueue another block of tasks and
120                // hopefully when they're all running, the queue will be empty.
121                monitorForIdle();
122              }
123
124            }
125          });
126    }
127
128    /**
129     * Stops this monitor from using the thread pool's resources, it may still cause the
130     * callback to be executed though.
131     */
132    private void poison() {
133      poisoned = true;
134      barrier.reset();
135    }
136
137    private void monitorForIdle() {
138      if (poisoned) {
139        return;
140      }
141
142      if (isIdleNow()) {
143        monitor.compareAndSet(this, null);
144        onIdle.run();
145      } else {
146        // Submit N tasks that will block until they are all running on the thread pool.
147        // at this point we can check the pool's queue and verify that there are no new
148        // tasks behind us and deem the queue idle.
149
150        int poolSize = pool.getCorePoolSize();
151        final BarrierRestarter restarter = new BarrierRestarter(barrier, barrierGeneration);
152
153        for (int i = 0; i < poolSize; i++) {
154          pool.execute(new Runnable() {
155            @Override
156            public void run() {
157              while (!poisoned) {
158                activeBarrierChecks.incrementAndGet();
159                int myGeneration = barrierGeneration.get();
160                try {
161                  barrier.await();
162                  return;
163                } catch (InterruptedException ie) {
164                  // sorry - I cant let you interrupt me!
165                  restarter.restart(myGeneration);
166                } catch (BrokenBarrierException bbe) {
167                  restarter.restart(myGeneration);
168                } finally {
169                  activeBarrierChecks.decrementAndGet();
170                }
171              }
172            }
173          });
174        }
175      }
176    }
177  }
178
179
180  private static class BarrierRestarter {
181    private final CyclicBarrier barrier;
182    private final AtomicInteger barrierGeneration;
183    BarrierRestarter(CyclicBarrier barrier, AtomicInteger barrierGeneration) {
184      this.barrier = barrier;
185      this.barrierGeneration = barrierGeneration;
186    }
187
188    /**
189     * restarts the barrier.
190     *
191     * After the calling this function it is guaranteed that barrier generation has been incremented
192     * and the barrier can be awaited on again.
193     *
194     * @param fromGeneration the generation that encountered the breaking exception.
195     */
196    synchronized void restart(int fromGeneration) {
197      // must be synchronized. T1 could pass the if check, be suspended before calling reset, T2
198      // sails thru - and awaits on the barrier again before T1 has awoken and reset it.
199      int nextGen = fromGeneration + 1;
200      if (barrierGeneration.compareAndSet(fromGeneration, nextGen)) {
201        // first time we've seen fromGeneration request a reset. lets reset the barrier.
202        barrier.reset();
203      } else {
204        // some other thread has already reset the barrier - this request is a no op.
205      }
206    }
207  }
208}
209