1/* 2 * Copyright (C) 2014 Square, Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16package okio; 17 18import java.io.IOException; 19import java.io.InterruptedIOException; 20 21/** 22 * This timeout uses a background thread to take action exactly when the timeout 23 * occurs. Use this to implement timeouts where they aren't supported natively, 24 * such as to sockets that are blocked on writing. 25 * 26 * <p>Subclasses should override {@link #timedOut} to take action when a timeout 27 * occurs. This method will be invoked by the shared watchdog thread so it 28 * should not do any long-running operations. Otherwise we risk starving other 29 * timeouts from being triggered. 30 * 31 * <p>Use {@link #sink} and {@link #source} to apply this timeout to a stream. 32 * The returned value will apply the timeout to each operation on the wrapped 33 * stream. 34 * 35 * <p>Callers should call {@link #enter} before doing work that is subject to 36 * timeouts, and {@link #exit} afterwards. The return value of {@link #exit} 37 * indicates whether a timeout was triggered. Note that the call to {@link 38 * #timedOut} is asynchronous, and may be called after {@link #exit}. 39 */ 40public class AsyncTimeout extends Timeout { 41 /** 42 * The watchdog thread processes a linked list of pending timeouts, sorted in 43 * the order to be triggered. This class synchronizes on AsyncTimeout.class. 44 * This lock guards the queue. 45 * 46 * <p>Head's 'next' points to the first element of the linked list. The first 47 * element is the next node to time out, or null if the queue is empty. The 48 * head is null until the watchdog thread is started. 49 */ 50 private static AsyncTimeout head; 51 52 /** True if this node is currently in the queue. */ 53 private boolean inQueue; 54 55 /** The next node in the linked list. */ 56 private AsyncTimeout next; 57 58 /** If scheduled, this is the time that the watchdog should time this out. */ 59 private long timeoutAt; 60 61 public final void enter() { 62 if (inQueue) throw new IllegalStateException("Unbalanced enter/exit"); 63 long timeoutNanos = timeoutNanos(); 64 boolean hasDeadline = hasDeadline(); 65 if (timeoutNanos == 0 && !hasDeadline) { 66 return; // No timeout and no deadline? Don't bother with the queue. 67 } 68 inQueue = true; 69 scheduleTimeout(this, timeoutNanos, hasDeadline); 70 } 71 72 private static synchronized void scheduleTimeout( 73 AsyncTimeout node, long timeoutNanos, boolean hasDeadline) { 74 // Start the watchdog thread and create the head node when the first timeout is scheduled. 75 if (head == null) { 76 head = new AsyncTimeout(); 77 new Watchdog().start(); 78 } 79 80 long now = System.nanoTime(); 81 if (timeoutNanos != 0 && hasDeadline) { 82 // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around, 83 // Math.min() is undefined for absolute values, but meaningful for relative ones. 84 node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now); 85 } else if (timeoutNanos != 0) { 86 node.timeoutAt = now + timeoutNanos; 87 } else if (hasDeadline) { 88 node.timeoutAt = node.deadlineNanoTime(); 89 } else { 90 throw new AssertionError(); 91 } 92 93 // Insert the node in sorted order. 94 long remainingNanos = node.remainingNanos(now); 95 for (AsyncTimeout prev = head; true; prev = prev.next) { 96 if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) { 97 node.next = prev.next; 98 prev.next = node; 99 if (prev == head) { 100 AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front. 101 } 102 break; 103 } 104 } 105 } 106 107 /** Returns true if the timeout occurred. */ 108 public final boolean exit() { 109 if (!inQueue) return false; 110 inQueue = false; 111 return cancelScheduledTimeout(this); 112 } 113 114 /** Returns true if the timeout occurred. */ 115 private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) { 116 // Remove the node from the linked list. 117 for (AsyncTimeout prev = head; prev != null; prev = prev.next) { 118 if (prev.next == node) { 119 prev.next = node.next; 120 node.next = null; 121 return false; 122 } 123 } 124 125 // The node wasn't found in the linked list: it must have timed out! 126 return true; 127 } 128 129 /** 130 * Returns the amount of time left until the time out. This will be negative 131 * if the timeout has elapsed and the timeout should occur immediately. 132 */ 133 private long remainingNanos(long now) { 134 return timeoutAt - now; 135 } 136 137 /** 138 * Invoked by the watchdog thread when the time between calls to {@link 139 * #enter()} and {@link #exit()} has exceeded the timeout. 140 */ 141 protected void timedOut() { 142 } 143 144 /** 145 * Returns a new sink that delegates to {@code sink}, using this to implement 146 * timeouts. This works best if {@link #timedOut} is overridden to interrupt 147 * {@code sink}'s current operation. 148 */ 149 public final Sink sink(final Sink sink) { 150 return new Sink() { 151 @Override public void write(Buffer source, long byteCount) throws IOException { 152 boolean throwOnTimeout = false; 153 enter(); 154 try { 155 sink.write(source, byteCount); 156 throwOnTimeout = true; 157 } catch (IOException e) { 158 throw exit(e); 159 } finally { 160 exit(throwOnTimeout); 161 } 162 } 163 164 @Override public void flush() throws IOException { 165 boolean throwOnTimeout = false; 166 enter(); 167 try { 168 sink.flush(); 169 throwOnTimeout = true; 170 } catch (IOException e) { 171 throw exit(e); 172 } finally { 173 exit(throwOnTimeout); 174 } 175 } 176 177 @Override public void close() throws IOException { 178 boolean throwOnTimeout = false; 179 enter(); 180 try { 181 sink.close(); 182 throwOnTimeout = true; 183 } catch (IOException e) { 184 throw exit(e); 185 } finally { 186 exit(throwOnTimeout); 187 } 188 } 189 190 @Override public Timeout timeout() { 191 return AsyncTimeout.this; 192 } 193 194 @Override public String toString() { 195 return "AsyncTimeout.sink(" + sink + ")"; 196 } 197 }; 198 } 199 200 /** 201 * Returns a new source that delegates to {@code source}, using this to 202 * implement timeouts. This works best if {@link #timedOut} is overridden to 203 * interrupt {@code sink}'s current operation. 204 */ 205 public final Source source(final Source source) { 206 return new Source() { 207 @Override public long read(Buffer sink, long byteCount) throws IOException { 208 boolean throwOnTimeout = false; 209 enter(); 210 try { 211 long result = source.read(sink, byteCount); 212 throwOnTimeout = true; 213 return result; 214 } catch (IOException e) { 215 throw exit(e); 216 } finally { 217 exit(throwOnTimeout); 218 } 219 } 220 221 @Override public void close() throws IOException { 222 boolean throwOnTimeout = false; 223 try { 224 source.close(); 225 throwOnTimeout = true; 226 } catch (IOException e) { 227 throw exit(e); 228 } finally { 229 exit(throwOnTimeout); 230 } 231 } 232 233 @Override public Timeout timeout() { 234 return AsyncTimeout.this; 235 } 236 237 @Override public String toString() { 238 return "AsyncTimeout.source(" + source + ")"; 239 } 240 }; 241 } 242 243 /** 244 * Throws an IOException if {@code throwOnTimeout} is {@code true} and a 245 * timeout occurred. See {@link #newTimeoutException(java.io.IOException)} 246 * for the type of exception thrown. 247 */ 248 final void exit(boolean throwOnTimeout) throws IOException { 249 boolean timedOut = exit(); 250 if (timedOut && throwOnTimeout) throw newTimeoutException(null); 251 } 252 253 /** 254 * Returns either {@code cause} or an IOException that's caused by 255 * {@code cause} if a timeout occurred. See 256 * {@link #newTimeoutException(java.io.IOException)} for the type of 257 * exception returned. 258 */ 259 final IOException exit(IOException cause) throws IOException { 260 if (!exit()) return cause; 261 return newTimeoutException(cause); 262 } 263 264 /** 265 * Returns an {@link IOException} to represent a timeout. By default this method returns 266 * {@link java.io.InterruptedIOException}. If {@code cause} is non-null it is set as the cause of 267 * the returned exception. 268 */ 269 protected IOException newTimeoutException(IOException cause) { 270 InterruptedIOException e = new InterruptedIOException("timeout"); 271 if (cause != null) { 272 e.initCause(cause); 273 } 274 return e; 275 } 276 277 private static final class Watchdog extends Thread { 278 public Watchdog() { 279 super("Okio Watchdog"); 280 setDaemon(true); 281 } 282 283 public void run() { 284 while (true) { 285 try { 286 AsyncTimeout timedOut = awaitTimeout(); 287 288 // Didn't find a node to interrupt. Try again. 289 if (timedOut == null) continue; 290 291 // Close the timed out node. 292 timedOut.timedOut(); 293 } catch (InterruptedException ignored) { 294 } 295 } 296 } 297 } 298 299 /** 300 * Removes and returns the node at the head of the list, waiting for it to 301 * time out if necessary. Returns null if the situation changes while waiting: 302 * either a newer node is inserted at the head, or the node being waited on 303 * has been removed. 304 */ 305 private static synchronized AsyncTimeout awaitTimeout() throws InterruptedException { 306 // Get the next eligible node. 307 AsyncTimeout node = head.next; 308 309 // The queue is empty. Wait for something to be enqueued. 310 if (node == null) { 311 AsyncTimeout.class.wait(); 312 return null; 313 } 314 315 long waitNanos = node.remainingNanos(System.nanoTime()); 316 317 // The head of the queue hasn't timed out yet. Await that. 318 if (waitNanos > 0) { 319 // Waiting is made complicated by the fact that we work in nanoseconds, 320 // but the API wants (millis, nanos) in two arguments. 321 long waitMillis = waitNanos / 1000000L; 322 waitNanos -= (waitMillis * 1000000L); 323 AsyncTimeout.class.wait(waitMillis, (int) waitNanos); 324 return null; 325 } 326 327 // The head of the queue has timed out. Remove it. 328 head.next = node.next; 329 node.next = null; 330 return node; 331 } 332} 333