1/*
2 * Copyright (C) 2012 Google Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.google.caliper.runner;
18
19import com.google.caliper.bridge.CommandLineSerializer;
20import com.google.caliper.bridge.OpenedSocket;
21import com.google.caliper.bridge.WorkerSpec;
22import com.google.caliper.config.VmConfig;
23import com.google.caliper.model.BenchmarkSpec;
24import com.google.caliper.runner.Instrument.Instrumentation;
25import com.google.caliper.worker.WorkerMain;
26import com.google.common.annotations.VisibleForTesting;
27import com.google.common.collect.ImmutableList;
28import com.google.common.collect.ImmutableSet;
29import com.google.common.collect.Iterables;
30import com.google.common.collect.Lists;
31import com.google.common.util.concurrent.ListenableFuture;
32
33import java.io.IOException;
34import java.io.InputStream;
35import java.io.OutputStream;
36import java.util.Collections;
37import java.util.List;
38import java.util.UUID;
39import java.util.logging.Logger;
40
41import javax.annotation.concurrent.GuardedBy;
42import javax.inject.Inject;
43
44/**
45 * A representation of an unstarted worker.
46 *
47 * <p>A worker is a sub process that runs a benchmark trial.  Specifically it is a JVM running
48 * {@link com.google.caliper.worker.WorkerMain}.  Because of this we can make certain assumptions
49 * about its behavior, including but not limited to:
50 *
51 * <ul>
52 *   <li>The worker will connect back to us over a socket connection and send us UTF-8 json
53 *       messages in a line oriented protocol.
54 *   <li>TODO(lukes,gak): This is probably as good a place as any to specify the entire protocol.
55 * </ul>
56 */
57@TrialScoped final class WorkerProcess {
58  private static final Logger logger = Logger.getLogger(WorkerProcess.class.getName());
59
60  @GuardedBy("this")
61  private Process worker;
62  private final ProcessBuilder workerBuilder;
63  private final ShutdownHookRegistrar shutdownHookRegistrar;
64  private final ListenableFuture<OpenedSocket> openedSocket;
65  private final UUID trialId;
66
67  @VisibleForTesting WorkerProcess(ProcessBuilder workerBuilder,
68      UUID trialId,
69      ListenableFuture<OpenedSocket> openedSocket,
70      ShutdownHookRegistrar shutdownHookRegistrar) {
71    this.trialId = trialId;
72    this.workerBuilder = workerBuilder;
73    this.openedSocket = openedSocket;
74    this.shutdownHookRegistrar = shutdownHookRegistrar;
75  }
76
77  @Inject WorkerProcess(@TrialId UUID trialId,
78      ListenableFuture<OpenedSocket> openedSocket,
79      Experiment experiment,
80      BenchmarkSpec benchmarkSpec,
81      @LocalPort int localPort,
82      BenchmarkClass benchmarkClass,
83      ShutdownHookRegistrar shutdownHookRegistrar) {
84    this.trialId = trialId;
85    this.workerBuilder =
86        buildProcess(trialId, experiment, benchmarkSpec, localPort, benchmarkClass);
87    this.openedSocket = openedSocket;
88    this.shutdownHookRegistrar = shutdownHookRegistrar;
89  }
90
91  ListenableFuture<OpenedSocket> socketFuture() {
92    return openedSocket;
93  }
94
95  /**
96   * Returns a {@link Process} representing this worker.  The process will be started if it hasn't
97   * already.
98   */
99  synchronized Process startWorker() throws IOException {
100    if (worker == null) {
101      final Process delegate = workerBuilder.start();
102      final Thread shutdownHook = new Thread("worker-shutdown-hook-" + trialId) {
103        @Override public void run() {
104          delegate.destroy();
105        }
106      };
107      shutdownHookRegistrar.addShutdownHook(shutdownHook);
108      worker = new Process() {
109        @Override public OutputStream getOutputStream() {
110          return delegate.getOutputStream();
111        }
112
113        @Override public InputStream getInputStream() {
114          return delegate.getInputStream();
115        }
116
117        @Override public InputStream getErrorStream() {
118          return delegate.getErrorStream();
119        }
120
121        @Override public int waitFor() throws InterruptedException {
122          int waitFor = delegate.waitFor();
123          shutdownHookRegistrar.removeShutdownHook(shutdownHook);
124          return waitFor;
125        }
126
127        @Override public int exitValue() {
128          int exitValue = delegate.exitValue();
129          // if it hasn't thrown, the process is done
130          shutdownHookRegistrar.removeShutdownHook(shutdownHook);
131          return exitValue;
132        }
133
134        @Override public void destroy() {
135          delegate.destroy();
136          shutdownHookRegistrar.removeShutdownHook(shutdownHook);
137        }
138      };
139    }
140    return worker;
141  }
142
143  @VisibleForTesting static ProcessBuilder buildProcess(
144      UUID trialId,
145      Experiment experiment,
146      BenchmarkSpec benchmarkSpec,
147      int localPort,
148      BenchmarkClass benchmarkClass) {
149    // TODO(lukes): it would be nice to split this method into a few smaller more targeted methods
150    Instrumentation instrumentation = experiment.instrumentation();
151    Instrument instrument = instrumentation.instrument();
152    WorkerSpec request = new WorkerSpec(
153        trialId,
154        instrumentation.workerClass(),
155        instrumentation.workerOptions(),
156        benchmarkSpec,
157        ImmutableList.copyOf(instrumentation.benchmarkMethod.getParameterTypes()),
158        localPort);
159
160    ProcessBuilder processBuilder = new ProcessBuilder().redirectErrorStream(false);
161
162    List<String> args = processBuilder.command();
163
164    VirtualMachine vm = experiment.vm();
165    VmConfig vmConfig = vm.config;
166    args.addAll(getJvmArgs(vm, benchmarkClass));
167
168    Iterable<String> instrumentJvmOptions = instrument.getExtraCommandLineArgs(vmConfig);
169    logger.fine(String.format("Instrument(%s) Java args: %s", instrument.getClass().getName(),
170        instrumentJvmOptions));
171    Iterables.addAll(args, instrumentJvmOptions);
172
173    // last to ensure that they're always applied
174    args.addAll(vmConfig.workerProcessArgs());
175
176    args.add(WorkerMain.class.getName());
177    args.add(CommandLineSerializer.render(request));
178
179    logger.finest(String.format("Full JVM (%s) args: %s", vm.name, args));
180    return processBuilder;
181  }
182
183  @VisibleForTesting static List<String> getJvmArgs(
184      VirtualMachine vm,
185      BenchmarkClass benchmarkClass) {
186
187    VmConfig vmConfig = vm.config;
188    String platformName = vmConfig.platformName();
189
190    List<String> args = Lists.newArrayList();
191    String jdkPath = vmConfig.vmExecutable().getAbsolutePath();
192    args.add(jdkPath);
193    logger.fine(String.format("%s(%s) Path: %s", platformName, vm.name, jdkPath));
194
195    ImmutableList<String> jvmOptions = vmConfig.options();
196    args.addAll(jvmOptions);
197    logger.fine(String.format("%s(%s) args: %s", platformName, vm.name, jvmOptions));
198
199    ImmutableSet<String> benchmarkJvmOptions = benchmarkClass.vmOptions();
200    args.addAll(benchmarkJvmOptions);
201    logger.fine(String.format("Benchmark(%s) %s args: %s", benchmarkClass.name(), platformName,
202        benchmarkJvmOptions));
203
204    ImmutableList<String> classPathArgs = vmConfig.workerClassPathArgs();
205    args.addAll(classPathArgs);
206    logger.finer(String.format("Class path args: %s", classPathArgs));
207
208    // TODO(iam): consider forwarding -Djava.library.path= for JNI library support.
209    return args;
210  }
211}
212