1/*
2 * Copyright (C) 2011 Google Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5 * in compliance with the License. You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software distributed under the License
10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11 * or implied. See the License for the specific language governing permissions and limitations under
12 * the License.
13 */
14
15package com.google.caliper.runner;
16
17import static java.util.concurrent.TimeUnit.MILLISECONDS;
18import static java.util.concurrent.TimeUnit.NANOSECONDS;
19
20import com.google.caliper.bridge.LogMessage;
21import com.google.caliper.bridge.ShouldContinueMessage;
22import com.google.caliper.bridge.StopMeasurementLogMessage;
23import com.google.caliper.model.Trial;
24import com.google.caliper.options.CaliperOptions;
25import com.google.caliper.runner.Instrument.MeasurementCollectingVisitor;
26import com.google.caliper.runner.StreamService.StreamItem;
27import com.google.caliper.util.ShortDuration;
28import com.google.common.base.Stopwatch;
29import com.google.common.base.Throwables;
30import com.google.common.util.concurrent.Service.State;
31
32import org.joda.time.Duration;
33
34import java.io.IOException;
35import java.util.concurrent.Callable;
36import java.util.logging.Level;
37import java.util.logging.Logger;
38
39import javax.inject.Inject;
40
41/**
42 * The main data gather control loop for a Trial.
43 *
44 * <p>This class starts the worker process, reads all the data from it and constructs the
45 * {@link Trial} while enforcing the trial timeout.
46 */
47@TrialScoped class TrialRunLoop implements Callable<TrialResult> {
48  private static final Logger logger = Logger.getLogger(TrialRunLoop.class.getName());
49
50  /** The time that the worker has to clean up after an experiment. */
51  private static final Duration WORKER_CLEANUP_DURATION = Duration.standardSeconds(2);
52
53  private final CaliperOptions options;
54  private final StreamService streamService;
55  private final TrialResultFactory trialFactory;
56
57  // TODO(lukes): The VmDataCollectingVisitor should be able to tell us when it has collected all
58  // its data.
59  private final VmDataCollectingVisitor dataCollectingVisitor;
60  private final Stopwatch trialStopwatch = Stopwatch.createUnstarted();
61  private final MeasurementCollectingVisitor measurementCollectingVisitor;
62  private final TrialOutputLogger trialOutput;
63
64  @Inject TrialRunLoop(
65      MeasurementCollectingVisitor measurementCollectingVisitor,
66      CaliperOptions options,
67      TrialResultFactory trialFactory,
68      TrialOutputLogger trialOutput,
69      StreamService streamService,
70      VmDataCollectingVisitor dataCollectingVisitor) {
71    this.options = options;
72    this.trialFactory = trialFactory;
73    this.streamService = streamService;
74    this.measurementCollectingVisitor = measurementCollectingVisitor;
75    this.trialOutput = trialOutput;
76    this.dataCollectingVisitor = dataCollectingVisitor;
77  }
78
79  @Override public TrialResult call() throws TrialFailureException, IOException {
80    if (streamService.state() != State.NEW) {
81      throw new IllegalStateException("You can only invoke the run loop once");
82    }
83    trialOutput.open();
84    trialOutput.printHeader();
85    streamService.startAsync().awaitRunning();
86    try {
87      long timeLimitNanos = getTrialTimeLimitTrialNanos();
88      boolean doneCollecting = false;
89      boolean done = false;
90      while (!done) {
91        StreamItem item;
92        try {
93          item = streamService.readItem(
94              timeLimitNanos - trialStopwatch.elapsed(NANOSECONDS),
95              NANOSECONDS);
96        } catch (InterruptedException e) {
97          trialOutput.ensureFileIsSaved();
98          // Someone has asked us to stop (via Futures.cancel?).
99          if (doneCollecting) {
100            logger.log(Level.WARNING, "Trial cancelled before completing normally (but after "
101                + "collecting sufficient data). Inspect {0} to see any worker output",
102                trialOutput.trialOutputFile());
103            done = true;
104            break;
105          }
106          // We were asked to stop but we didn't actually finish (the normal case).  Fail the trial.
107          throw new TrialFailureException(
108              String.format("Trial cancelled.  Inspect %s to see any worker output.",
109                trialOutput.trialOutputFile()));
110        }
111        switch (item.kind()) {
112          case DATA:
113            LogMessage logMessage = item.content();
114            logMessage.accept(measurementCollectingVisitor);
115            logMessage.accept(dataCollectingVisitor);
116            if (!doneCollecting && measurementCollectingVisitor.isDoneCollecting()) {
117              doneCollecting = true;
118              // We have received all the measurements we need and are about to tell the worker to
119              // shut down.  At this point the worker should shutdown soon, but we don't want to
120              // wait too long, so decrease the time limit so that we wait no more than
121              // WORKER_CLEANUP_DURATION.
122              long cleanupTimeNanos = MILLISECONDS.toNanos(WORKER_CLEANUP_DURATION.getMillis());
123              // TODO(lukes): Does the min operation make sense here? should we just use the
124              // cleanupTimeNanos?
125              timeLimitNanos = trialStopwatch.elapsed(NANOSECONDS) + cleanupTimeNanos;
126            }
127            // If it is a stop measurement message we need to tell the worker to either stop or keep
128            // going with a WorkerContinueMessage.  This needs to be done after the
129            // measurementCollecting visitor sees the message so that isDoneCollection will be up to
130            // date.
131            if (logMessage instanceof StopMeasurementLogMessage) {
132              // TODO(lukes): this is a blocking write, perhaps we should perform it in a non
133              // blocking manner to keep this thread only blocking in one place.  This would
134              // complicate error handling, but may increase performance since it would free this
135              // thread up to handle other messages
136              streamService.sendMessage(
137                  new ShouldContinueMessage(
138                      !doneCollecting,
139                      measurementCollectingVisitor.isWarmupComplete()));
140              if (doneCollecting) {
141                streamService.closeWriter();
142              }
143            }
144            break;
145          case EOF:
146            // We consider EOF to be synonymous with worker shutdown
147            if (!doneCollecting) {
148              trialOutput.ensureFileIsSaved();
149              throw new TrialFailureException(String.format("The worker exited without producing "
150                  + "data. It has likely crashed. Inspect %s to see any worker output.",
151                  trialOutput.trialOutputFile()));
152            }
153            done = true;
154            break;
155          case TIMEOUT:
156            trialOutput.ensureFileIsSaved();
157            if (doneCollecting) {
158              // Should this be an error?
159              logger.log(Level.WARNING, "Worker failed to exit cleanly within the alloted time. "
160                  + "Inspect {0} to see any worker output", trialOutput.trialOutputFile());
161              done = true;
162            } else {
163              throw new TrialFailureException(String.format(
164                  "Trial exceeded the total allowable runtime (%s). "
165                      + "The limit may be adjusted using the --time-limit flag.  Inspect %s to "
166                      + "see any worker output",
167                      options.timeLimit(), trialOutput.trialOutputFile()));
168            }
169            break;
170          default:
171            throw new AssertionError("Impossible item: " + item);
172        }
173      }
174      return trialFactory.newTrialResult(dataCollectingVisitor, measurementCollectingVisitor);
175    } catch (Throwable e) {
176      Throwables.propagateIfInstanceOf(e, TrialFailureException.class);
177      // This is some failure that is not a TrialFailureException, let the exception propagate but
178      // log the filename for the user.
179      trialOutput.ensureFileIsSaved();
180      logger.severe(
181          String.format(
182              "Unexpected error while executing trial. Inspect %s to see any worker output.",
183              trialOutput.trialOutputFile()));
184      throw Throwables.propagate(e);
185    } finally {
186      trialStopwatch.reset();
187      streamService.stopAsync();
188      trialOutput.close();
189    }
190  }
191
192  private long getTrialTimeLimitTrialNanos() {
193    ShortDuration timeLimit = options.timeLimit();
194    if (ShortDuration.zero().equals(timeLimit)) {
195      return Long.MAX_VALUE;
196    }
197    return timeLimit.to(NANOSECONDS);
198  }
199}
200