1/*
2 * Copyright (C) 2010 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5 * in compliance with the License. You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software distributed under the License
10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11 * or implied. See the License for the specific language governing permissions and limitations under
12 * the License.
13 */
14
15package com.google.common.collect;
16
17import static com.google.common.base.Preconditions.checkNotNull;
18import static com.google.common.base.Preconditions.checkState;
19
20import com.google.common.base.Equivalence;
21import com.google.common.base.Function;
22import com.google.common.base.Throwables;
23import com.google.common.collect.MapMaker.RemovalCause;
24import com.google.common.collect.MapMaker.RemovalListener;
25
26import java.io.IOException;
27import java.io.ObjectInputStream;
28import java.io.ObjectOutputStream;
29import java.io.Serializable;
30import java.lang.ref.ReferenceQueue;
31import java.util.concurrent.ConcurrentMap;
32import java.util.concurrent.ExecutionException;
33import java.util.concurrent.atomic.AtomicReferenceArray;
34
35import javax.annotation.Nullable;
36import javax.annotation.concurrent.GuardedBy;
37
38/**
39 * Adds computing functionality to {@link MapMakerInternalMap}.
40 *
41 * @author Bob Lee
42 * @author Charles Fry
43 */
44class ComputingConcurrentHashMap<K, V> extends MapMakerInternalMap<K, V> {
45  final Function<? super K, ? extends V> computingFunction;
46
47  /**
48   * Creates a new, empty map with the specified strategy, initial capacity, load factor and
49   * concurrency level.
50   */
51  ComputingConcurrentHashMap(MapMaker builder,
52      Function<? super K, ? extends V> computingFunction) {
53    super(builder);
54    this.computingFunction = checkNotNull(computingFunction);
55  }
56
57  @Override
58  Segment<K, V> createSegment(int initialCapacity, int maxSegmentSize) {
59    return new ComputingSegment<K, V>(this, initialCapacity, maxSegmentSize);
60  }
61
62  @Override
63  ComputingSegment<K, V> segmentFor(int hash) {
64    return (ComputingSegment<K, V>) super.segmentFor(hash);
65  }
66
67  V getOrCompute(K key) throws ExecutionException {
68    int hash = hash(checkNotNull(key));
69    return segmentFor(hash).getOrCompute(key, hash, computingFunction);
70  }
71
72  @SuppressWarnings("serial") // This class is never serialized.
73  static final class ComputingSegment<K, V> extends Segment<K, V> {
74    ComputingSegment(MapMakerInternalMap<K, V> map, int initialCapacity, int maxSegmentSize) {
75      super(map, initialCapacity, maxSegmentSize);
76    }
77
78    V getOrCompute(K key, int hash, Function<? super K, ? extends V> computingFunction)
79        throws ExecutionException {
80      try {
81        outer: while (true) {
82          // don't call getLiveEntry, which would ignore computing values
83          ReferenceEntry<K, V> e = getEntry(key, hash);
84          if (e != null) {
85            V value = getLiveValue(e);
86            if (value != null) {
87              recordRead(e);
88              return value;
89            }
90          }
91
92          // at this point e is either null, computing, or expired;
93          // avoid locking if it's already computing
94          if (e == null || !e.getValueReference().isComputingReference()) {
95            boolean createNewEntry = true;
96            ComputingValueReference<K, V> computingValueReference = null;
97            lock();
98            try {
99              preWriteCleanup();
100
101              int newCount = this.count - 1;
102              AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
103              int index = hash & (table.length() - 1);
104              ReferenceEntry<K, V> first = table.get(index);
105
106              for (e = first; e != null; e = e.getNext()) {
107                K entryKey = e.getKey();
108                if (e.getHash() == hash && entryKey != null
109                    && map.keyEquivalence.equivalent(key, entryKey)) {
110                  ValueReference<K, V> valueReference = e.getValueReference();
111                  if (valueReference.isComputingReference()) {
112                    createNewEntry = false;
113                  } else {
114                    V value = e.getValueReference().get();
115                    if (value == null) {
116                      enqueueNotification(entryKey, hash, value, RemovalCause.COLLECTED);
117                    } else if (map.expires() && map.isExpired(e)) {
118                      // This is a duplicate check, as preWriteCleanup already purged expired
119                      // entries, but let's accomodate an incorrect expiration queue.
120                      enqueueNotification(entryKey, hash, value, RemovalCause.EXPIRED);
121                    } else {
122                      recordLockedRead(e);
123                      return value;
124                    }
125
126                    // immediately reuse invalid entries
127                    evictionQueue.remove(e);
128                    expirationQueue.remove(e);
129                    this.count = newCount; // write-volatile
130                  }
131                  break;
132                }
133              }
134
135              if (createNewEntry) {
136                computingValueReference = new ComputingValueReference<K, V>(computingFunction);
137
138                if (e == null) {
139                  e = newEntry(key, hash, first);
140                  e.setValueReference(computingValueReference);
141                  table.set(index, e);
142                } else {
143                  e.setValueReference(computingValueReference);
144                }
145              }
146            } finally {
147              unlock();
148              postWriteCleanup();
149            }
150
151            if (createNewEntry) {
152              // This thread solely created the entry.
153              return compute(key, hash, e, computingValueReference);
154            }
155          }
156
157          // The entry already exists. Wait for the computation.
158          checkState(!Thread.holdsLock(e), "Recursive computation");
159          // don't consider expiration as we're concurrent with computation
160          V value = e.getValueReference().waitForValue();
161          if (value != null) {
162            recordRead(e);
163            return value;
164          }
165          // else computing thread will clearValue
166          continue outer;
167        }
168      } finally {
169        postReadCleanup();
170      }
171    }
172
173    V compute(K key, int hash, ReferenceEntry<K, V> e,
174        ComputingValueReference<K, V> computingValueReference)
175        throws ExecutionException {
176      V value = null;
177      long start = System.nanoTime();
178      long end = 0;
179      try {
180        // Synchronizes on the entry to allow failing fast when a recursive computation is
181        // detected. This is not fool-proof since the entry may be copied when the segment
182        // is written to.
183        synchronized (e) {
184          value = computingValueReference.compute(key, hash);
185          end = System.nanoTime();
186        }
187        if (value != null) {
188          // putIfAbsent
189          V oldValue = put(key, hash, value, true);
190          if (oldValue != null) {
191            // the computed value was already clobbered
192            enqueueNotification(key, hash, value, RemovalCause.REPLACED);
193          }
194        }
195        return value;
196      } finally {
197        if (end == 0) {
198          end = System.nanoTime();
199        }
200        if (value == null) {
201          clearValue(key, hash, computingValueReference);
202        }
203      }
204    }
205  }
206
207  /**
208   * Used to provide computation exceptions to other threads.
209   */
210  private static final class ComputationExceptionReference<K, V> implements ValueReference<K, V> {
211    final Throwable t;
212
213    ComputationExceptionReference(Throwable t) {
214      this.t = t;
215    }
216
217    @Override
218    public V get() {
219      return null;
220    }
221
222    @Override
223    public ReferenceEntry<K, V> getEntry() {
224      return null;
225    }
226
227    @Override
228    public ValueReference<K, V> copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry) {
229      return this;
230    }
231
232    @Override
233    public boolean isComputingReference() {
234      return false;
235    }
236
237    @Override
238    public V waitForValue() throws ExecutionException {
239      throw new ExecutionException(t);
240    }
241
242    @Override
243    public void clear(ValueReference<K, V> newValue) {}
244  }
245
246  /**
247   * Used to provide computation result to other threads.
248   */
249  private static final class ComputedReference<K, V> implements ValueReference<K, V> {
250    final V value;
251
252    ComputedReference(@Nullable V value) {
253      this.value = value;
254    }
255
256    @Override
257    public V get() {
258      return value;
259    }
260
261    @Override
262    public ReferenceEntry<K, V> getEntry() {
263      return null;
264    }
265
266    @Override
267    public ValueReference<K, V> copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry) {
268      return this;
269    }
270
271    @Override
272    public boolean isComputingReference() {
273      return false;
274    }
275
276    @Override
277    public V waitForValue() {
278      return get();
279    }
280
281    @Override
282    public void clear(ValueReference<K, V> newValue) {}
283  }
284
285  private static final class ComputingValueReference<K, V> implements ValueReference<K, V> {
286    final Function<? super K, ? extends V> computingFunction;
287
288    @GuardedBy("ComputingValueReference.this") // writes
289    volatile ValueReference<K, V> computedReference = unset();
290
291    public ComputingValueReference(Function<? super K, ? extends V> computingFunction) {
292      this.computingFunction = computingFunction;
293    }
294
295    @Override
296    public V get() {
297      // All computation lookups go through waitForValue. This method thus is
298      // only used by put, to whom we always want to appear absent.
299      return null;
300    }
301
302    @Override
303    public ReferenceEntry<K, V> getEntry() {
304      return null;
305    }
306
307    @Override
308    public ValueReference<K, V> copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry) {
309      return this;
310    }
311
312    @Override
313    public boolean isComputingReference() {
314      return true;
315    }
316
317    /**
318     * Waits for a computation to complete. Returns the result of the computation.
319     */
320    @Override
321    public V waitForValue() throws ExecutionException {
322      if (computedReference == UNSET) {
323        boolean interrupted = false;
324        try {
325          synchronized (this) {
326            while (computedReference == UNSET) {
327              try {
328                wait();
329              } catch (InterruptedException ie) {
330                interrupted = true;
331              }
332            }
333          }
334        } finally {
335          if (interrupted) {
336            Thread.currentThread().interrupt();
337          }
338        }
339      }
340      return computedReference.waitForValue();
341    }
342
343    @Override
344    public void clear(ValueReference<K, V> newValue) {
345      // The pending computation was clobbered by a manual write. Unblock all
346      // pending gets, and have them return the new value.
347      setValueReference(newValue);
348
349      // TODO(fry): could also cancel computation if we had a thread handle
350    }
351
352    V compute(K key, int hash) throws ExecutionException {
353      V value;
354      try {
355        value = computingFunction.apply(key);
356      } catch (Throwable t) {
357        setValueReference(new ComputationExceptionReference<K, V>(t));
358        throw new ExecutionException(t);
359      }
360
361      setValueReference(new ComputedReference<K, V>(value));
362      return value;
363    }
364
365    void setValueReference(ValueReference<K, V> valueReference) {
366      synchronized (this) {
367        if (computedReference == UNSET) {
368          computedReference = valueReference;
369          notifyAll();
370        }
371      }
372    }
373  }
374
375  /**
376   * Overrides get() to compute on demand. Also throws an exception when {@code null} is returned
377   * from a computation.
378   */
379  static final class ComputingMapAdapter<K, V>
380      extends ComputingConcurrentHashMap<K, V> implements Serializable {
381    private static final long serialVersionUID = 0;
382
383    ComputingMapAdapter(MapMaker mapMaker,
384        Function<? super K, ? extends V> computingFunction) {
385      super(mapMaker, computingFunction);
386    }
387
388    @SuppressWarnings("unchecked") // unsafe, which is one advantage of Cache over Map
389    @Override
390    public V get(Object key) {
391      V value;
392      try {
393        value = getOrCompute((K) key);
394      } catch (ExecutionException e) {
395        Throwable cause = e.getCause();
396        Throwables.propagateIfInstanceOf(cause, ComputationException.class);
397        throw new ComputationException(cause);
398      }
399
400      if (value == null) {
401        throw new NullPointerException(computingFunction + " returned null for key " + key + ".");
402      }
403      return value;
404    }
405  }
406
407  // Serialization Support
408
409  private static final long serialVersionUID = 4;
410
411  @Override
412  Object writeReplace() {
413    return new ComputingSerializationProxy<K, V>(keyStrength, valueStrength, keyEquivalence,
414        valueEquivalence, expireAfterWriteNanos, expireAfterAccessNanos, maximumSize,
415        concurrencyLevel, removalListener, this, computingFunction);
416  }
417
418  static final class ComputingSerializationProxy<K, V> extends AbstractSerializationProxy<K, V> {
419
420    final Function<? super K, ? extends V> computingFunction;
421
422    ComputingSerializationProxy(Strength keyStrength, Strength valueStrength,
423        Equivalence<Object> keyEquivalence, Equivalence<Object> valueEquivalence,
424        long expireAfterWriteNanos, long expireAfterAccessNanos, int maximumSize,
425        int concurrencyLevel, RemovalListener<? super K, ? super V> removalListener,
426        ConcurrentMap<K, V> delegate, Function<? super K, ? extends V> computingFunction) {
427      super(keyStrength, valueStrength, keyEquivalence, valueEquivalence, expireAfterWriteNanos,
428          expireAfterAccessNanos, maximumSize, concurrencyLevel, removalListener, delegate);
429      this.computingFunction = computingFunction;
430    }
431
432    private void writeObject(ObjectOutputStream out) throws IOException {
433      out.defaultWriteObject();
434      writeMapTo(out);
435    }
436
437    @SuppressWarnings("deprecation") // self-use
438    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
439      in.defaultReadObject();
440      MapMaker mapMaker = readMapMaker(in);
441      delegate = mapMaker.makeComputingMap(computingFunction);
442      readEntries(in);
443    }
444
445    Object readResolve() {
446      return delegate;
447    }
448
449    private static final long serialVersionUID = 4;
450  }
451}
452