1/*
2 * Copyright (C) 2013 Google Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5 * in compliance with the License. You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software distributed under the License
10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11 * or implied. See the License for the specific language governing permissions and limitations under
12 * the License.
13 */
14
15package com.google.caliper.runner;
16
17import static com.google.common.base.Preconditions.checkArgument;
18import static com.google.common.base.Preconditions.checkState;
19import static org.junit.Assert.assertEquals;
20import static org.junit.Assert.assertNotSame;
21import static org.junit.Assert.assertTrue;
22import static org.junit.Assert.fail;
23
24import com.google.caliper.bridge.LogMessage;
25import com.google.caliper.bridge.OpenedSocket;
26import com.google.caliper.runner.FakeWorkers.DummyLogMessage;
27import com.google.caliper.runner.StreamService.StreamItem;
28import com.google.caliper.runner.StreamService.StreamItem.Kind;
29import com.google.caliper.util.Parser;
30import com.google.common.collect.Sets;
31import com.google.common.util.concurrent.ListenableFuture;
32import com.google.common.util.concurrent.ListenableFutureTask;
33import com.google.common.util.concurrent.MoreExecutors;
34import com.google.common.util.concurrent.Service.Listener;
35import com.google.common.util.concurrent.Service.State;
36
37import org.junit.After;
38import org.junit.Before;
39import org.junit.Test;
40import org.junit.runner.RunWith;
41import org.junit.runners.JUnit4;
42
43import java.io.File;
44import java.io.FileNotFoundException;
45import java.io.IOException;
46import java.io.PrintWriter;
47import java.io.StringWriter;
48import java.net.ServerSocket;
49import java.net.SocketException;
50import java.text.ParseException;
51import java.util.Set;
52import java.util.UUID;
53import java.util.concurrent.Callable;
54import java.util.concurrent.CountDownLatch;
55import java.util.concurrent.TimeUnit;
56
57/**
58 * Tests for {@link StreamService}.
59 */
60@RunWith(JUnit4.class)
61
62public class StreamServiceTest {
63
64  private ServerSocket serverSocket;
65  private final StringWriter writer = new StringWriter();
66  private final PrintWriter stdout = new PrintWriter(writer, true);
67  private final Parser<LogMessage> parser = new Parser<LogMessage>() {
68    @Override public LogMessage parse(final CharSequence text) throws ParseException {
69      return new DummyLogMessage(text.toString());
70    }
71  };
72
73  private StreamService service;
74  private final CountDownLatch terminalLatch = new CountDownLatch(1);
75  private static final int TRIAL_NUMBER = 3;
76
77  @Before public void setUp() throws IOException {
78    serverSocket = new ServerSocket(0);
79  }
80
81  @After public void closeSocket() throws IOException {
82    serverSocket.close();
83  }
84
85  @After public void stopService() {
86    if (service != null && service.state() != State.FAILED && service.state() != State.TERMINATED) {
87      service.stopAsync().awaitTerminated();
88    }
89  }
90
91  @Test public void testReadOutput() throws Exception {
92    makeService(FakeWorkers.PrintClient.class, "foo", "bar");
93    service.startAsync().awaitRunning();
94    StreamItem item1 = readItem();
95    assertEquals(Kind.DATA, item1.kind());
96    Set<String> lines = Sets.newHashSet();
97    lines.add(item1.content().toString());
98    StreamItem item2 = readItem();
99    assertEquals(Kind.DATA, item2.kind());
100    lines.add(item2.content().toString());
101    assertEquals(Sets.newHashSet("foo", "bar"), lines);
102    assertEquals(State.RUNNING, service.state());
103    StreamItem item3 = readItem();
104    assertEquals(Kind.EOF, item3.kind());
105    awaitStopped(100, TimeUnit.MILLISECONDS);
106    assertTerminated();
107  }
108
109  @Test public void failingProcess() throws Exception {
110    makeService(FakeWorkers.Exit.class, "1");
111    service.startAsync().awaitRunning();
112    assertEquals(Kind.EOF, readItem().kind());
113    awaitStopped(100, TimeUnit.MILLISECONDS);
114    assertEquals(State.FAILED, service.state());
115  }
116
117  @Test public void processDoesntExit() throws Exception {
118    // close all fds and then sleep
119    makeService(FakeWorkers.CloseAndSleep.class);
120    service.startAsync().awaitRunning();
121    assertEquals(Kind.EOF, readItem().kind());
122    awaitStopped(200, TimeUnit.MILLISECONDS);  // we
123    assertEquals(State.FAILED, service.state());
124  }
125
126  @Test public void testSocketInputOutput() throws Exception {
127    int localport = serverSocket.getLocalPort();
128    // read from the socket and echo it back
129    makeService(FakeWorkers.SocketEchoClient.class, Integer.toString(localport));
130
131    service.startAsync().awaitRunning();
132    assertEquals(new DummyLogMessage("start"), readItem().content());
133    service.sendMessage(new DummyLogMessage("hello socket world"));
134    assertEquals(new DummyLogMessage("hello socket world"), readItem().content());
135    service.closeWriter();
136    assertEquals(State.RUNNING, service.state());
137    StreamItem nextItem = readItem();
138    assertEquals("Expected EOF " + nextItem, Kind.EOF, nextItem.kind());
139    awaitStopped(100, TimeUnit.MILLISECONDS);
140    assertTerminated();
141  }
142
143  @Test public void testSocketClosesBeforeProcess() throws Exception {
144    int localport = serverSocket.getLocalPort();
145    // read from the socket and echo it back
146    makeService(FakeWorkers.SocketEchoClient.class, Integer.toString(localport), "foo");
147    service.startAsync().awaitRunning();
148    assertEquals(new DummyLogMessage("start"), readItem().content());
149    service.sendMessage(new DummyLogMessage("hello socket world"));
150    assertEquals(new DummyLogMessage("hello socket world"), readItem().content());
151    service.closeWriter();
152
153    assertEquals("foo", readItem().content().toString());
154
155    assertEquals(State.RUNNING, service.state());
156    assertEquals(Kind.EOF, readItem().kind());
157    awaitStopped(100, TimeUnit.MILLISECONDS);
158    assertTerminated();
159  }
160
161  @Test public void failsToAcceptConnection() throws Exception {
162    serverSocket.close();  // This will force serverSocket.accept to throw a SocketException
163    makeService(FakeWorkers.Sleeper.class, Long.toString(TimeUnit.MINUTES.toMillis(10)));
164    try {
165      service.startAsync().awaitRunning();
166      fail();
167    } catch (IllegalStateException expected) {}
168    assertEquals(SocketException.class, service.failureCause().getClass());
169  }
170
171  /** Reads an item, asserting that there was no timeout. */
172  private StreamItem readItem() throws InterruptedException {
173    StreamItem item = service.readItem(10, TimeUnit.SECONDS);
174    assertNotSame("Timed out while reading item from worker", Kind.TIMEOUT, item.kind());
175    return item;
176  }
177
178  /**
179   * Wait for the service to reach a terminal state without calling stop.
180   */
181  private void awaitStopped(long time, TimeUnit unit) throws InterruptedException {
182    assertTrue(terminalLatch.await(time, unit));
183  }
184
185  private void assertTerminated() {
186    State state = service.state();
187    if (state != State.TERMINATED) {
188      if (state == State.FAILED) {
189        throw new AssertionError(service.failureCause());
190      }
191      fail("Expected service to be terminated but was: " + state);
192    }
193  }
194
195  @SuppressWarnings("resource")
196  private void makeService(Class<?> main, String ...args) {
197    checkState(service == null, "You can only make one StreamService per test");
198    UUID trialId = UUID.randomUUID();
199    TrialOutputLogger trialOutput = new TrialOutputLogger(new TrialOutputFactory() {
200      @Override public FileAndWriter getTrialOutputFile(int trialNumber)
201          throws FileNotFoundException {
202        checkArgument(trialNumber == TRIAL_NUMBER);
203        return new FileAndWriter(new File("/tmp/not-a-file"), stdout);
204      }
205
206      @Override public void persistFile(File f) {
207        throw new UnsupportedOperationException();
208      }
209
210    }, TRIAL_NUMBER, trialId, null /* experiment */);
211    try {
212      // normally the TrialRunLoop opens/closes the logger
213      trialOutput.open();
214    } catch (IOException e) {
215      throw new RuntimeException(e);
216    }
217    service = new StreamService(
218        new WorkerProcess(FakeWorkers.createProcessBuilder(main, args),
219            trialId,
220            getSocketFuture(),
221            new RuntimeShutdownHookRegistrar()),
222        parser,
223        trialOutput);
224    service.addListener(new Listener() {
225      @Override public void starting() {}
226      @Override public void running() {}
227      @Override public void stopping(State from) {}
228      @Override public void terminated(State from) {
229        terminalLatch.countDown();
230      }
231      @Override public void failed(State from, Throwable failure) {
232        terminalLatch.countDown();
233      }
234    }, MoreExecutors.directExecutor());
235  }
236
237  private ListenableFuture<OpenedSocket> getSocketFuture() {
238    ListenableFutureTask<OpenedSocket> openSocketTask = ListenableFutureTask.create(
239        new Callable<OpenedSocket>() {
240          @Override
241          public OpenedSocket call() throws Exception {
242            return OpenedSocket.fromSocket(serverSocket.accept());
243          }
244        });
245    // N.B. this thread will block on serverSocket.accept until a connection is accepted or the
246    // socket is closed, so no matter what this thread will die with the test.
247    Thread opener = new Thread(openSocketTask, "SocketOpener");
248    opener.setDaemon(true);
249    opener.start();
250    return openSocketTask;
251  }
252}
253