1/*
2 * Copyright (C) 2011 Google Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5 * in compliance with the License. You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software distributed under the License
10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11 * or implied. See the License for the specific language governing permissions and limitations under
12 * the License.
13 */
14
15package com.google.caliper.runner;
16
17import static java.util.logging.Level.WARNING;
18
19import com.google.caliper.api.ResultProcessor;
20import com.google.caliper.api.SkipThisScenarioException;
21import com.google.caliper.options.CaliperOptions;
22import com.google.caliper.util.Stdout;
23import com.google.common.annotations.VisibleForTesting;
24import com.google.common.base.Function;
25import com.google.common.base.Stopwatch;
26import com.google.common.base.Throwables;
27import com.google.common.collect.FluentIterable;
28import com.google.common.collect.ImmutableList;
29import com.google.common.collect.ImmutableSet;
30import com.google.common.collect.Lists;
31import com.google.common.collect.Queues;
32import com.google.common.util.concurrent.AsyncFunction;
33import com.google.common.util.concurrent.FutureFallback;
34import com.google.common.util.concurrent.Futures;
35import com.google.common.util.concurrent.ListenableFuture;
36import com.google.common.util.concurrent.ListeningExecutorService;
37import com.google.common.util.concurrent.MoreExecutors;
38import com.google.common.util.concurrent.SettableFuture;
39import com.google.common.util.concurrent.Uninterruptibles;
40
41import java.io.IOException;
42import java.io.PrintWriter;
43import java.util.List;
44import java.util.UUID;
45import java.util.concurrent.CancellationException;
46import java.util.concurrent.ConcurrentLinkedQueue;
47import java.util.concurrent.ExecutionException;
48import java.util.logging.Logger;
49
50import javax.inject.Inject;
51import javax.inject.Provider;
52
53/**
54 * An execution of each {@link Experiment} for the configured number of trials.
55 */
56@VisibleForTesting
57public final class ExperimentingCaliperRun implements CaliperRun {
58
59  private static final Logger logger = Logger.getLogger(ExperimentingCaliperRun.class.getName());
60
61  private static final FutureFallback<Object> FALLBACK_TO_NULL = new FutureFallback<Object>() {
62    final ListenableFuture<Object> nullFuture = Futures.immediateFuture(null);
63    @Override public ListenableFuture<Object> create(Throwable t) throws Exception {
64      return nullFuture;
65    }
66  };
67
68  private final MainComponent mainComponent;
69  private final CaliperOptions options;
70  private final PrintWriter stdout;
71  private final BenchmarkClass benchmarkClass;
72  private final ImmutableSet<Instrument> instruments;
73  private final ImmutableSet<ResultProcessor> resultProcessors;
74  private final ExperimentSelector selector;
75  private final Provider<ListeningExecutorService> executorProvider;
76
77  @Inject @VisibleForTesting
78  public ExperimentingCaliperRun(
79      MainComponent mainComponent,
80      CaliperOptions options,
81      @Stdout PrintWriter stdout,
82      BenchmarkClass benchmarkClass,
83      ImmutableSet<Instrument> instruments,
84      ImmutableSet<ResultProcessor> resultProcessors,
85      ExperimentSelector selector,
86      Provider<ListeningExecutorService> executorProvider) {
87    this.mainComponent = mainComponent;
88    this.options = options;
89    this.stdout = stdout;
90    this.benchmarkClass = benchmarkClass;
91    this.instruments = instruments;
92    this.resultProcessors = resultProcessors;
93    this.selector = selector;
94    this.executorProvider = executorProvider;
95  }
96
97  @Override
98  public void run() throws InvalidBenchmarkException {
99    ImmutableSet<Experiment> allExperiments = selector.selectExperiments();
100    // TODO(lukes): move this standard-out handling into the ConsoleOutput class?
101    stdout.println("Experiment selection: ");
102    stdout.println("  Benchmark Methods:   " + FluentIterable.from(allExperiments)
103        .transform(new Function<Experiment, String>() {
104          @Override public String apply(Experiment experiment) {
105            return experiment.instrumentation().benchmarkMethod().getName();
106          }
107        }).toSet());
108    stdout.println("  Instruments:   " + FluentIterable.from(selector.instruments())
109        .transform(new Function<Instrument, String>() {
110              @Override public String apply(Instrument instrument) {
111                return instrument.name();
112              }
113            }));
114    stdout.println("  User parameters:   " + selector.userParameters());
115    stdout.println("  Virtual machines:  " + FluentIterable.from(selector.vms())
116        .transform(
117            new Function<VirtualMachine, String>() {
118              @Override public String apply(VirtualMachine vm) {
119                return vm.name;
120              }
121            }));
122    stdout.println("  Selection type:    " + selector.selectionType());
123    stdout.println();
124
125    if (allExperiments.isEmpty()) {
126      throw new InvalidBenchmarkException(
127          "There were no experiments to be performed for the class %s using the instruments %s",
128          benchmarkClass.benchmarkClass().getSimpleName(), instruments);
129    }
130
131    stdout.format("This selection yields %s experiments.%n", allExperiments.size());
132    stdout.flush();
133
134    // always dry run first.
135    ImmutableSet<Experiment> experimentsToRun = dryRun(allExperiments);
136    if (experimentsToRun.size() != allExperiments.size()) {
137      stdout.format("%d experiments were skipped.%n",
138          allExperiments.size() - experimentsToRun.size());
139    }
140
141    if (experimentsToRun.isEmpty()) {
142      throw new InvalidBenchmarkException("All experiments were skipped.");
143    }
144
145    if (options.dryRun()) {
146      return;
147    }
148
149    stdout.flush();
150
151    int totalTrials = experimentsToRun.size() * options.trialsPerScenario();
152    Stopwatch stopwatch = Stopwatch.createStarted();
153    List<ScheduledTrial> trials = createScheduledTrials(experimentsToRun, totalTrials);
154
155    final ListeningExecutorService executor = executorProvider.get();
156    List<ListenableFuture<TrialResult>> pendingTrials = scheduleTrials(trials, executor);
157    ConsoleOutput output = new ConsoleOutput(stdout, totalTrials, stopwatch);
158    try {
159      // Process results as they complete.
160      for (ListenableFuture<TrialResult> trialFuture : inCompletionOrder(pendingTrials)) {
161        try {
162          TrialResult result = trialFuture.get();
163          output.processTrial(result);
164          for (ResultProcessor resultProcessor : resultProcessors) {
165            resultProcessor.processTrial(result.getTrial());
166          }
167        } catch (ExecutionException e) {
168          if (e.getCause() instanceof TrialFailureException) {
169            output.processFailedTrial((TrialFailureException) e.getCause());
170          } else {
171            for (ListenableFuture<?> toCancel : pendingTrials) {
172              toCancel.cancel(true);
173            }
174            throw Throwables.propagate(e.getCause());
175          }
176        } catch (InterruptedException e) {
177          // be responsive to interruption, cancel outstanding work and exit
178          for (ListenableFuture<?> toCancel : pendingTrials) {
179            // N.B. TrialRunLoop is responsive to interruption.
180            toCancel.cancel(true);
181          }
182          throw new RuntimeException(e);
183        }
184      }
185    } finally {
186      executor.shutdown();
187      output.close();
188    }
189
190    for (ResultProcessor resultProcessor : resultProcessors) {
191      try {
192        resultProcessor.close();
193      } catch (IOException e) {
194        logger.log(WARNING, "Could not close a result processor: " + resultProcessor, e);
195      }
196    }
197  }
198
199  /**
200   * Schedule all the trials.
201   *
202   * <p>This method arranges all the {@link ScheduledTrial trials} to run according to their
203   * scheduling criteria.  The executor instance is responsible for enforcing max parallelism.
204   */
205  private List<ListenableFuture<TrialResult>> scheduleTrials(List<ScheduledTrial> trials,
206      final ListeningExecutorService executor) {
207    List<ListenableFuture<TrialResult>> pendingTrials = Lists.newArrayList();
208    List<ScheduledTrial> serialTrials = Lists.newArrayList();
209    for (final ScheduledTrial scheduledTrial : trials) {
210      if (scheduledTrial.policy() == TrialSchedulingPolicy.PARALLEL) {
211        pendingTrials.add(executor.submit(scheduledTrial.trialTask()));
212      } else {
213        serialTrials.add(scheduledTrial);
214      }
215    }
216    // A future representing the completion of all prior tasks. Futures.successfulAsList allows us
217    // to ignore failure.
218    ListenableFuture<?> previous = Futures.successfulAsList(pendingTrials);
219    for (final ScheduledTrial scheduledTrial : serialTrials) {
220      // each of these trials can only start after all prior trials have finished, so we use
221      // Futures.transform to force the sequencing.
222      ListenableFuture<TrialResult> current =
223          Futures.transform(
224              previous,
225              new AsyncFunction<Object, TrialResult>() {
226                @Override public ListenableFuture<TrialResult> apply(Object ignored) {
227                  return executor.submit(scheduledTrial.trialTask());
228                }
229              });
230      pendingTrials.add(current);
231      // ignore failure of the prior task.
232      previous = Futures.withFallback(current, FALLBACK_TO_NULL);
233    }
234    return pendingTrials;
235  }
236
237  /** Returns all the ScheduledTrials for this run. */
238  private List<ScheduledTrial> createScheduledTrials(ImmutableSet<Experiment> experimentsToRun,
239      int totalTrials) {
240    List<ScheduledTrial> trials = Lists.newArrayListWithCapacity(totalTrials);
241    /** This is 1-indexed because it's only used for display to users.  E.g. "Trial 1 of 27" */
242    int trialNumber = 1;
243    for (int i = 0; i < options.trialsPerScenario(); i++) {
244      for (Experiment experiment : experimentsToRun) {
245        try {
246          TrialScopeComponent trialScopeComponent = mainComponent.newTrialComponent(
247              new TrialModule(UUID.randomUUID(), trialNumber, experiment));
248
249          trials.add(trialScopeComponent.getScheduledTrial());
250        } finally {
251          trialNumber++;
252        }
253      }
254    }
255    return trials;
256  }
257
258  /**
259   * Attempts to run each given scenario once, in the current VM. Returns a set of all of the
260   * scenarios that didn't throw a {@link SkipThisScenarioException}.
261   */
262  ImmutableSet<Experiment> dryRun(Iterable<Experiment> experiments)
263      throws InvalidBenchmarkException {
264    ImmutableSet.Builder<Experiment> builder = ImmutableSet.builder();
265    for (Experiment experiment : experiments) {
266      try {
267        ExperimentComponent experimentComponent =
268            mainComponent.newExperimentComponent(ExperimentModule.forExperiment(experiment));
269        Object benchmark = experimentComponent.getBenchmarkInstance();
270        benchmarkClass.setUpBenchmark(benchmark);
271        try {
272          experiment.instrumentation().dryRun(benchmark);
273          builder.add(experiment);
274        } finally {
275          // discard 'benchmark' now; the worker will have to instantiate its own anyway
276          benchmarkClass.cleanup(benchmark);
277        }
278      } catch (SkipThisScenarioException innocuous) {}
279    }
280    return builder.build();
281  }
282
283  public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(
284      Iterable<? extends ListenableFuture<? extends T>> futures) {
285    final ConcurrentLinkedQueue<SettableFuture<T>> delegates = Queues.newConcurrentLinkedQueue();
286    ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder();
287    for (final ListenableFuture<? extends T> future : futures) {
288      SettableFuture<T> delegate = SettableFuture.create();
289      // Must make sure to add the delegate to the queue first in case the future is already done
290      delegates.add(delegate);
291      future.addListener(new Runnable() {
292        @Override public void run() {
293          SettableFuture<T> delegate = delegates.remove();
294          try {
295            delegate.set(Uninterruptibles.getUninterruptibly(future));
296          } catch (ExecutionException e) {
297            delegate.setException(e.getCause());
298          } catch (CancellationException e) {
299            delegate.cancel(true);
300          }
301        }
302      }, MoreExecutors.directExecutor());
303      listBuilder.add(delegate);
304    }
305    return listBuilder.build();
306  }
307}
308