1/* 2 * Copyright (C) 2011 Google Inc. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 5 * in compliance with the License. You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software distributed under the License 10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 11 * or implied. See the License for the specific language governing permissions and limitations under 12 * the License. 13 */ 14 15package com.google.caliper.runner; 16 17import static java.util.logging.Level.WARNING; 18 19import com.google.caliper.api.ResultProcessor; 20import com.google.caliper.api.SkipThisScenarioException; 21import com.google.caliper.options.CaliperOptions; 22import com.google.caliper.util.Stdout; 23import com.google.common.annotations.VisibleForTesting; 24import com.google.common.base.Function; 25import com.google.common.base.Stopwatch; 26import com.google.common.base.Throwables; 27import com.google.common.collect.FluentIterable; 28import com.google.common.collect.ImmutableList; 29import com.google.common.collect.ImmutableSet; 30import com.google.common.collect.Lists; 31import com.google.common.collect.Queues; 32import com.google.common.util.concurrent.AsyncFunction; 33import com.google.common.util.concurrent.FutureFallback; 34import com.google.common.util.concurrent.Futures; 35import com.google.common.util.concurrent.ListenableFuture; 36import com.google.common.util.concurrent.ListeningExecutorService; 37import com.google.common.util.concurrent.MoreExecutors; 38import com.google.common.util.concurrent.SettableFuture; 39import com.google.common.util.concurrent.Uninterruptibles; 40 41import java.io.IOException; 42import java.io.PrintWriter; 43import java.util.List; 44import java.util.UUID; 45import java.util.concurrent.CancellationException; 46import java.util.concurrent.ConcurrentLinkedQueue; 47import java.util.concurrent.ExecutionException; 48import java.util.logging.Logger; 49 50import javax.inject.Inject; 51import javax.inject.Provider; 52 53/** 54 * An execution of each {@link Experiment} for the configured number of trials. 55 */ 56@VisibleForTesting 57public final class ExperimentingCaliperRun implements CaliperRun { 58 59 private static final Logger logger = Logger.getLogger(ExperimentingCaliperRun.class.getName()); 60 61 private static final FutureFallback<Object> FALLBACK_TO_NULL = new FutureFallback<Object>() { 62 final ListenableFuture<Object> nullFuture = Futures.immediateFuture(null); 63 @Override public ListenableFuture<Object> create(Throwable t) throws Exception { 64 return nullFuture; 65 } 66 }; 67 68 private final MainComponent mainComponent; 69 private final CaliperOptions options; 70 private final PrintWriter stdout; 71 private final BenchmarkClass benchmarkClass; 72 private final ImmutableSet<Instrument> instruments; 73 private final ImmutableSet<ResultProcessor> resultProcessors; 74 private final ExperimentSelector selector; 75 private final Provider<ListeningExecutorService> executorProvider; 76 77 @Inject @VisibleForTesting 78 public ExperimentingCaliperRun( 79 MainComponent mainComponent, 80 CaliperOptions options, 81 @Stdout PrintWriter stdout, 82 BenchmarkClass benchmarkClass, 83 ImmutableSet<Instrument> instruments, 84 ImmutableSet<ResultProcessor> resultProcessors, 85 ExperimentSelector selector, 86 Provider<ListeningExecutorService> executorProvider) { 87 this.mainComponent = mainComponent; 88 this.options = options; 89 this.stdout = stdout; 90 this.benchmarkClass = benchmarkClass; 91 this.instruments = instruments; 92 this.resultProcessors = resultProcessors; 93 this.selector = selector; 94 this.executorProvider = executorProvider; 95 } 96 97 @Override 98 public void run() throws InvalidBenchmarkException { 99 ImmutableSet<Experiment> allExperiments = selector.selectExperiments(); 100 // TODO(lukes): move this standard-out handling into the ConsoleOutput class? 101 stdout.println("Experiment selection: "); 102 stdout.println(" Benchmark Methods: " + FluentIterable.from(allExperiments) 103 .transform(new Function<Experiment, String>() { 104 @Override public String apply(Experiment experiment) { 105 return experiment.instrumentation().benchmarkMethod().getName(); 106 } 107 }).toSet()); 108 stdout.println(" Instruments: " + FluentIterable.from(selector.instruments()) 109 .transform(new Function<Instrument, String>() { 110 @Override public String apply(Instrument instrument) { 111 return instrument.name(); 112 } 113 })); 114 stdout.println(" User parameters: " + selector.userParameters()); 115 stdout.println(" Virtual machines: " + FluentIterable.from(selector.vms()) 116 .transform( 117 new Function<VirtualMachine, String>() { 118 @Override public String apply(VirtualMachine vm) { 119 return vm.name; 120 } 121 })); 122 stdout.println(" Selection type: " + selector.selectionType()); 123 stdout.println(); 124 125 if (allExperiments.isEmpty()) { 126 throw new InvalidBenchmarkException( 127 "There were no experiments to be performed for the class %s using the instruments %s", 128 benchmarkClass.benchmarkClass().getSimpleName(), instruments); 129 } 130 131 stdout.format("This selection yields %s experiments.%n", allExperiments.size()); 132 stdout.flush(); 133 134 // always dry run first. 135 ImmutableSet<Experiment> experimentsToRun = dryRun(allExperiments); 136 if (experimentsToRun.size() != allExperiments.size()) { 137 stdout.format("%d experiments were skipped.%n", 138 allExperiments.size() - experimentsToRun.size()); 139 } 140 141 if (experimentsToRun.isEmpty()) { 142 throw new InvalidBenchmarkException("All experiments were skipped."); 143 } 144 145 if (options.dryRun()) { 146 return; 147 } 148 149 stdout.flush(); 150 151 int totalTrials = experimentsToRun.size() * options.trialsPerScenario(); 152 Stopwatch stopwatch = Stopwatch.createStarted(); 153 List<ScheduledTrial> trials = createScheduledTrials(experimentsToRun, totalTrials); 154 155 final ListeningExecutorService executor = executorProvider.get(); 156 List<ListenableFuture<TrialResult>> pendingTrials = scheduleTrials(trials, executor); 157 ConsoleOutput output = new ConsoleOutput(stdout, totalTrials, stopwatch); 158 try { 159 // Process results as they complete. 160 for (ListenableFuture<TrialResult> trialFuture : inCompletionOrder(pendingTrials)) { 161 try { 162 TrialResult result = trialFuture.get(); 163 output.processTrial(result); 164 for (ResultProcessor resultProcessor : resultProcessors) { 165 resultProcessor.processTrial(result.getTrial()); 166 } 167 } catch (ExecutionException e) { 168 if (e.getCause() instanceof TrialFailureException) { 169 output.processFailedTrial((TrialFailureException) e.getCause()); 170 } else { 171 for (ListenableFuture<?> toCancel : pendingTrials) { 172 toCancel.cancel(true); 173 } 174 throw Throwables.propagate(e.getCause()); 175 } 176 } catch (InterruptedException e) { 177 // be responsive to interruption, cancel outstanding work and exit 178 for (ListenableFuture<?> toCancel : pendingTrials) { 179 // N.B. TrialRunLoop is responsive to interruption. 180 toCancel.cancel(true); 181 } 182 throw new RuntimeException(e); 183 } 184 } 185 } finally { 186 executor.shutdown(); 187 output.close(); 188 } 189 190 for (ResultProcessor resultProcessor : resultProcessors) { 191 try { 192 resultProcessor.close(); 193 } catch (IOException e) { 194 logger.log(WARNING, "Could not close a result processor: " + resultProcessor, e); 195 } 196 } 197 } 198 199 /** 200 * Schedule all the trials. 201 * 202 * <p>This method arranges all the {@link ScheduledTrial trials} to run according to their 203 * scheduling criteria. The executor instance is responsible for enforcing max parallelism. 204 */ 205 private List<ListenableFuture<TrialResult>> scheduleTrials(List<ScheduledTrial> trials, 206 final ListeningExecutorService executor) { 207 List<ListenableFuture<TrialResult>> pendingTrials = Lists.newArrayList(); 208 List<ScheduledTrial> serialTrials = Lists.newArrayList(); 209 for (final ScheduledTrial scheduledTrial : trials) { 210 if (scheduledTrial.policy() == TrialSchedulingPolicy.PARALLEL) { 211 pendingTrials.add(executor.submit(scheduledTrial.trialTask())); 212 } else { 213 serialTrials.add(scheduledTrial); 214 } 215 } 216 // A future representing the completion of all prior tasks. Futures.successfulAsList allows us 217 // to ignore failure. 218 ListenableFuture<?> previous = Futures.successfulAsList(pendingTrials); 219 for (final ScheduledTrial scheduledTrial : serialTrials) { 220 // each of these trials can only start after all prior trials have finished, so we use 221 // Futures.transform to force the sequencing. 222 ListenableFuture<TrialResult> current = 223 Futures.transform( 224 previous, 225 new AsyncFunction<Object, TrialResult>() { 226 @Override public ListenableFuture<TrialResult> apply(Object ignored) { 227 return executor.submit(scheduledTrial.trialTask()); 228 } 229 }); 230 pendingTrials.add(current); 231 // ignore failure of the prior task. 232 previous = Futures.withFallback(current, FALLBACK_TO_NULL); 233 } 234 return pendingTrials; 235 } 236 237 /** Returns all the ScheduledTrials for this run. */ 238 private List<ScheduledTrial> createScheduledTrials(ImmutableSet<Experiment> experimentsToRun, 239 int totalTrials) { 240 List<ScheduledTrial> trials = Lists.newArrayListWithCapacity(totalTrials); 241 /** This is 1-indexed because it's only used for display to users. E.g. "Trial 1 of 27" */ 242 int trialNumber = 1; 243 for (int i = 0; i < options.trialsPerScenario(); i++) { 244 for (Experiment experiment : experimentsToRun) { 245 try { 246 TrialScopeComponent trialScopeComponent = mainComponent.newTrialComponent( 247 new TrialModule(UUID.randomUUID(), trialNumber, experiment)); 248 249 trials.add(trialScopeComponent.getScheduledTrial()); 250 } finally { 251 trialNumber++; 252 } 253 } 254 } 255 return trials; 256 } 257 258 /** 259 * Attempts to run each given scenario once, in the current VM. Returns a set of all of the 260 * scenarios that didn't throw a {@link SkipThisScenarioException}. 261 */ 262 ImmutableSet<Experiment> dryRun(Iterable<Experiment> experiments) 263 throws InvalidBenchmarkException { 264 ImmutableSet.Builder<Experiment> builder = ImmutableSet.builder(); 265 for (Experiment experiment : experiments) { 266 try { 267 ExperimentComponent experimentComponent = 268 mainComponent.newExperimentComponent(ExperimentModule.forExperiment(experiment)); 269 Object benchmark = experimentComponent.getBenchmarkInstance(); 270 benchmarkClass.setUpBenchmark(benchmark); 271 try { 272 experiment.instrumentation().dryRun(benchmark); 273 builder.add(experiment); 274 } finally { 275 // discard 'benchmark' now; the worker will have to instantiate its own anyway 276 benchmarkClass.cleanup(benchmark); 277 } 278 } catch (SkipThisScenarioException innocuous) {} 279 } 280 return builder.build(); 281 } 282 283 public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder( 284 Iterable<? extends ListenableFuture<? extends T>> futures) { 285 final ConcurrentLinkedQueue<SettableFuture<T>> delegates = Queues.newConcurrentLinkedQueue(); 286 ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder(); 287 for (final ListenableFuture<? extends T> future : futures) { 288 SettableFuture<T> delegate = SettableFuture.create(); 289 // Must make sure to add the delegate to the queue first in case the future is already done 290 delegates.add(delegate); 291 future.addListener(new Runnable() { 292 @Override public void run() { 293 SettableFuture<T> delegate = delegates.remove(); 294 try { 295 delegate.set(Uninterruptibles.getUninterruptibly(future)); 296 } catch (ExecutionException e) { 297 delegate.setException(e.getCause()); 298 } catch (CancellationException e) { 299 delegate.cancel(true); 300 } 301 } 302 }, MoreExecutors.directExecutor()); 303 listBuilder.add(delegate); 304 } 305 return listBuilder.build(); 306 } 307} 308