1/*
2 * Copyright (C) 2014 Square, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package okio;
17
18import java.io.IOException;
19import java.io.InterruptedIOException;
20
21/**
22 * This timeout uses a background thread to take action exactly when the timeout
23 * occurs. Use this to implement timeouts where they aren't supported natively,
24 * such as to sockets that are blocked on writing.
25 *
26 * <p>Subclasses should override {@link #timedOut} to take action when a timeout
27 * occurs. This method will be invoked by the shared watchdog thread so it
28 * should not do any long-running operations. Otherwise we risk starving other
29 * timeouts from being triggered.
30 *
31 * <p>Use {@link #sink} and {@link #source} to apply this timeout to a stream.
32 * The returned value will apply the timeout to each operation on the wrapped
33 * stream.
34 *
35 * <p>Callers should call {@link #enter} before doing work that is subject to
36 * timeouts, and {@link #exit} afterwards. The return value of {@link #exit}
37 * indicates whether a timeout was triggered. Note that the call to {@link
38 * #timedOut} is asynchronous, and may be called after {@link #exit}.
39 */
40public class AsyncTimeout extends Timeout {
41  /**
42   * The watchdog thread processes a linked list of pending timeouts, sorted in
43   * the order to be triggered. This class synchronizes on AsyncTimeout.class.
44   * This lock guards the queue.
45   *
46   * <p>Head's 'next' points to the first element of the linked list. The first
47   * element is the next node to time out, or null if the queue is empty. The
48   * head is null until the watchdog thread is started.
49   */
50  private static AsyncTimeout head;
51
52  /** True if this node is currently in the queue. */
53  private boolean inQueue;
54
55  /** The next node in the linked list. */
56  private AsyncTimeout next;
57
58  /** If scheduled, this is the time that the watchdog should time this out. */
59  private long timeoutAt;
60
61  public final void enter() {
62    if (inQueue) throw new IllegalStateException("Unbalanced enter/exit");
63    long timeoutNanos = timeoutNanos();
64    boolean hasDeadline = hasDeadline();
65    if (timeoutNanos == 0 && !hasDeadline) {
66      return; // No timeout and no deadline? Don't bother with the queue.
67    }
68    inQueue = true;
69    scheduleTimeout(this, timeoutNanos, hasDeadline);
70  }
71
72  private static synchronized void scheduleTimeout(
73      AsyncTimeout node, long timeoutNanos, boolean hasDeadline) {
74    // Start the watchdog thread and create the head node when the first timeout is scheduled.
75    if (head == null) {
76      head = new AsyncTimeout();
77      new Watchdog().start();
78    }
79
80    long now = System.nanoTime();
81    if (timeoutNanos != 0 && hasDeadline) {
82      // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap around,
83      // Math.min() is undefined for absolute values, but meaningful for relative ones.
84      node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now);
85    } else if (timeoutNanos != 0) {
86      node.timeoutAt = now + timeoutNanos;
87    } else if (hasDeadline) {
88      node.timeoutAt = node.deadlineNanoTime();
89    } else {
90      throw new AssertionError();
91    }
92
93    // Insert the node in sorted order.
94    long remainingNanos = node.remainingNanos(now);
95    for (AsyncTimeout prev = head; true; prev = prev.next) {
96      if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) {
97        node.next = prev.next;
98        prev.next = node;
99        if (prev == head) {
100          AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front.
101        }
102        break;
103      }
104    }
105  }
106
107  /** Returns true if the timeout occurred. */
108  public final boolean exit() {
109    if (!inQueue) return false;
110    inQueue = false;
111    return cancelScheduledTimeout(this);
112  }
113
114  /** Returns true if the timeout occurred. */
115  private static synchronized boolean cancelScheduledTimeout(AsyncTimeout node) {
116    // Remove the node from the linked list.
117    for (AsyncTimeout prev = head; prev != null; prev = prev.next) {
118      if (prev.next == node) {
119        prev.next = node.next;
120        node.next = null;
121        return false;
122      }
123    }
124
125    // The node wasn't found in the linked list: it must have timed out!
126    return true;
127  }
128
129  /**
130   * Returns the amount of time left until the time out. This will be negative
131   * if the timeout has elapsed and the timeout should occur immediately.
132   */
133  private long remainingNanos(long now) {
134    return timeoutAt - now;
135  }
136
137  /**
138   * Invoked by the watchdog thread when the time between calls to {@link
139   * #enter()} and {@link #exit()} has exceeded the timeout.
140   */
141  protected void timedOut() {
142  }
143
144  /**
145   * Returns a new sink that delegates to {@code sink}, using this to implement
146   * timeouts. This works best if {@link #timedOut} is overridden to interrupt
147   * {@code sink}'s current operation.
148   */
149  public final Sink sink(final Sink sink) {
150    return new Sink() {
151      @Override public void write(Buffer source, long byteCount) throws IOException {
152        boolean throwOnTimeout = false;
153        enter();
154        try {
155          sink.write(source, byteCount);
156          throwOnTimeout = true;
157        } catch (IOException e) {
158          throw exit(e);
159        } finally {
160          exit(throwOnTimeout);
161        }
162      }
163
164      @Override public void flush() throws IOException {
165        boolean throwOnTimeout = false;
166        enter();
167        try {
168          sink.flush();
169          throwOnTimeout = true;
170        } catch (IOException e) {
171          throw exit(e);
172        } finally {
173          exit(throwOnTimeout);
174        }
175      }
176
177      @Override public void close() throws IOException {
178        boolean throwOnTimeout = false;
179        enter();
180        try {
181          sink.close();
182          throwOnTimeout = true;
183        } catch (IOException e) {
184          throw exit(e);
185        } finally {
186          exit(throwOnTimeout);
187        }
188      }
189
190      @Override public Timeout timeout() {
191        return AsyncTimeout.this;
192      }
193
194      @Override public String toString() {
195        return "AsyncTimeout.sink(" + sink + ")";
196      }
197    };
198  }
199
200  /**
201   * Returns a new source that delegates to {@code source}, using this to
202   * implement timeouts. This works best if {@link #timedOut} is overridden to
203   * interrupt {@code sink}'s current operation.
204   */
205  public final Source source(final Source source) {
206    return new Source() {
207      @Override public long read(Buffer sink, long byteCount) throws IOException {
208        boolean throwOnTimeout = false;
209        enter();
210        try {
211          long result = source.read(sink, byteCount);
212          throwOnTimeout = true;
213          return result;
214        } catch (IOException e) {
215          throw exit(e);
216        } finally {
217          exit(throwOnTimeout);
218        }
219      }
220
221      @Override public void close() throws IOException {
222        boolean throwOnTimeout = false;
223        try {
224          source.close();
225          throwOnTimeout = true;
226        } catch (IOException e) {
227          throw exit(e);
228        } finally {
229          exit(throwOnTimeout);
230        }
231      }
232
233      @Override public Timeout timeout() {
234        return AsyncTimeout.this;
235      }
236
237      @Override public String toString() {
238        return "AsyncTimeout.source(" + source + ")";
239      }
240    };
241  }
242
243  /**
244   * Throws an IOException if {@code throwOnTimeout} is {@code true} and a
245   * timeout occurred. See {@link #newTimeoutException(java.io.IOException)}
246   * for the type of exception thrown.
247   */
248  final void exit(boolean throwOnTimeout) throws IOException {
249    boolean timedOut = exit();
250    if (timedOut && throwOnTimeout) throw newTimeoutException(null);
251  }
252
253  /**
254   * Returns either {@code cause} or an IOException that's caused by
255   * {@code cause} if a timeout occurred. See
256   * {@link #newTimeoutException(java.io.IOException)} for the type of
257   * exception returned.
258   */
259  final IOException exit(IOException cause) throws IOException {
260    if (!exit()) return cause;
261    return newTimeoutException(cause);
262  }
263
264  /**
265   * Returns an {@link IOException} to represent a timeout. By default this method returns
266   * {@link java.io.InterruptedIOException}. If {@code cause} is non-null it is set as the cause of
267   * the returned exception.
268   */
269  protected IOException newTimeoutException(IOException cause) {
270    InterruptedIOException e = new InterruptedIOException("timeout");
271    if (cause != null) {
272      e.initCause(cause);
273    }
274    return e;
275  }
276
277  private static final class Watchdog extends Thread {
278    public Watchdog() {
279      super("Okio Watchdog");
280      setDaemon(true);
281    }
282
283    public void run() {
284      while (true) {
285        try {
286          AsyncTimeout timedOut = awaitTimeout();
287
288          // Didn't find a node to interrupt. Try again.
289          if (timedOut == null) continue;
290
291          // Close the timed out node.
292          timedOut.timedOut();
293        } catch (InterruptedException ignored) {
294        }
295      }
296    }
297  }
298
299  /**
300   * Removes and returns the node at the head of the list, waiting for it to
301   * time out if necessary. Returns null if the situation changes while waiting:
302   * either a newer node is inserted at the head, or the node being waited on
303   * has been removed.
304   */
305  private static synchronized AsyncTimeout awaitTimeout() throws InterruptedException {
306    // Get the next eligible node.
307    AsyncTimeout node = head.next;
308
309    // The queue is empty. Wait for something to be enqueued.
310    if (node == null) {
311      AsyncTimeout.class.wait();
312      return null;
313    }
314
315    long waitNanos = node.remainingNanos(System.nanoTime());
316
317    // The head of the queue hasn't timed out yet. Await that.
318    if (waitNanos > 0) {
319      // Waiting is made complicated by the fact that we work in nanoseconds,
320      // but the API wants (millis, nanos) in two arguments.
321      long waitMillis = waitNanos / 1000000L;
322      waitNanos -= (waitMillis * 1000000L);
323      AsyncTimeout.class.wait(waitMillis, (int) waitNanos);
324      return null;
325    }
326
327    // The head of the queue has timed out. Remove it.
328    head.next = node.next;
329    node.next = null;
330    return node;
331  }
332}
333