spawner_communicator.cc revision 7d4cd473f85ac64c3747c96c277f9e506a0d2246
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/test/spawned_test_server/spawner_communicator.h"
6
7#include "base/json/json_reader.h"
8#include "base/logging.h"
9#include "base/strings/stringprintf.h"
10#include "base/supports_user_data.h"
11#include "base/test/test_timeouts.h"
12#include "base/time.h"
13#include "base/values.h"
14#include "build/build_config.h"
15#include "googleurl/src/gurl.h"
16#include "net/base/net_util.h"
17#include "net/base/upload_bytes_element_reader.h"
18#include "net/base/upload_data_stream.h"
19#include "net/http/http_response_headers.h"
20#include "net/url_request/url_request_test_util.h"
21
22namespace net {
23
24namespace {
25
26GURL GenerateSpawnerCommandURL(const std::string& command, uint16 port) {
27  // Always performs HTTP request for sending command to the spawner server.
28  return GURL(base::StringPrintf("%s:%u/%s", "http://127.0.0.1", port,
29                                 command.c_str()));
30}
31
32int kBufferSize = 2048;
33
34// A class to hold all data needed to send a command to spawner server.
35class SpawnerRequestData : public base::SupportsUserData::Data {
36 public:
37  SpawnerRequestData(int id, int* result_code, std::string* data_received)
38      : request_id_(id),
39        buf_(new IOBuffer(kBufferSize)),
40        result_code_(result_code),
41        data_received_(data_received),
42        response_started_count_(0) {
43    DCHECK(result_code);
44    *result_code_ = OK;
45    DCHECK(data_received);
46    data_received_->clear();
47  }
48
49  virtual ~SpawnerRequestData() {}
50
51  bool DoesRequestIdMatch(int request_id) const {
52    return request_id_ == request_id;
53  }
54
55  IOBuffer* buf() const { return buf_.get(); }
56
57  bool IsResultOK() const { return *result_code_ == OK; }
58
59  void ClearReceivedData() { data_received_->clear(); }
60
61  void SetResultCode(int result_code) { *result_code_ = result_code; }
62
63  void IncreaseResponseStartedCount() { response_started_count_++; }
64
65  int response_started_count() const { return response_started_count_; }
66
67  // Write data read from URLRequest::Read() to |data_received_|. Returns true
68  // if |num_bytes| is great than 0. |num_bytes| is 0 for EOF, < 0 on errors.
69  bool ConsumeBytesRead(int num_bytes) {
70    // Error while reading, or EOF.
71    if (num_bytes <= 0)
72      return false;
73
74    data_received_->append(buf_->data(), num_bytes);
75    return true;
76  }
77
78 private:
79  // Unique ID for the current request.
80  int request_id_;
81
82  // Buffer that URLRequest writes into.
83  scoped_refptr<IOBuffer> buf_;
84
85  // Holds the error condition that was hit on the current request, or OK.
86  int* result_code_;
87
88  // Data received from server;
89  std::string* data_received_;
90
91  // Used to track how many times the OnResponseStarted get called after
92  // sending a command to spawner server.
93  int response_started_count_;
94
95  DISALLOW_COPY_AND_ASSIGN(SpawnerRequestData);
96};
97
98}  // namespace
99
100SpawnerCommunicator::SpawnerCommunicator(uint16 port)
101    : io_thread_("spawner_communicator"),
102      event_(false, false),
103      port_(port),
104      next_id_(0),
105      weak_factory_(this),
106      is_running_(false) {}
107
108SpawnerCommunicator::~SpawnerCommunicator() {
109  DCHECK(!is_running_);
110}
111
112void SpawnerCommunicator::WaitForResponse() {
113  DCHECK_NE(base::MessageLoop::current(), io_thread_.message_loop());
114  event_.Wait();
115  event_.Reset();
116}
117
118void SpawnerCommunicator::StartIOThread() {
119  DCHECK_NE(base::MessageLoop::current(), io_thread_.message_loop());
120  if (is_running_)
121    return;
122
123  allowed_port_.reset(new ScopedPortException(port_));
124  base::Thread::Options options;
125  options.message_loop_type = base::MessageLoop::TYPE_IO;
126  is_running_ = io_thread_.StartWithOptions(options);
127  DCHECK(is_running_);
128}
129
130void SpawnerCommunicator::Shutdown() {
131  DCHECK_NE(base::MessageLoop::current(), io_thread_.message_loop());
132  DCHECK(is_running_);
133  // The request and its context should be created and destroyed only on the
134  // IO thread.
135  DCHECK(!cur_request_.get());
136  DCHECK(!context_.get());
137  is_running_ = false;
138  io_thread_.Stop();
139  allowed_port_.reset();
140}
141
142void SpawnerCommunicator::SendCommandAndWaitForResult(
143    const std::string& command,
144    const std::string& post_data,
145    int* result_code,
146    std::string* data_received) {
147  if (!result_code || !data_received)
148    return;
149  // Start the communicator thread to talk to test server spawner.
150  StartIOThread();
151  DCHECK(io_thread_.message_loop());
152
153  // Since the method will be blocked until SpawnerCommunicator gets result
154  // from the spawner server or timed-out. It's safe to use base::Unretained
155  // when using base::Bind.
156  io_thread_.message_loop()->PostTask(FROM_HERE, base::Bind(
157      &SpawnerCommunicator::SendCommandAndWaitForResultOnIOThread,
158      base::Unretained(this), command, post_data, result_code, data_received));
159  WaitForResponse();
160}
161
162void SpawnerCommunicator::SendCommandAndWaitForResultOnIOThread(
163    const std::string& command,
164    const std::string& post_data,
165    int* result_code,
166    std::string* data_received) {
167  base::MessageLoop* loop = io_thread_.message_loop();
168  DCHECK(loop);
169  DCHECK_EQ(base::MessageLoop::current(), loop);
170
171  // Prepare the URLRequest for sending the command.
172  DCHECK(!cur_request_.get());
173  context_.reset(new TestURLRequestContext);
174  cur_request_.reset(context_->CreateRequest(
175      GenerateSpawnerCommandURL(command, port_), this));
176  DCHECK(cur_request_.get());
177  int current_request_id = ++next_id_;
178  SpawnerRequestData* data = new SpawnerRequestData(current_request_id,
179                                                    result_code,
180                                                    data_received);
181  DCHECK(data);
182  cur_request_->SetUserData(this, data);
183
184  if (post_data.empty()) {
185    cur_request_->set_method("GET");
186  } else {
187    cur_request_->set_method("POST");
188    scoped_ptr<UploadElementReader> reader(
189        UploadOwnedBytesElementReader::CreateWithString(post_data));
190    cur_request_->set_upload(make_scoped_ptr(
191        UploadDataStream::CreateWithReader(reader.Pass(), 0)));
192    net::HttpRequestHeaders headers;
193    headers.SetHeader(net::HttpRequestHeaders::kContentType,
194                      "application/json");
195    cur_request_->SetExtraRequestHeaders(headers);
196  }
197
198  // Post a task to timeout this request if it takes too long.
199  base::MessageLoop::current()->PostDelayedTask(
200      FROM_HERE,
201      base::Bind(&SpawnerCommunicator::OnTimeout,
202                 weak_factory_.GetWeakPtr(),
203                 current_request_id),
204      TestTimeouts::action_max_timeout());
205
206  // Start the request.
207  cur_request_->Start();
208}
209
210void SpawnerCommunicator::OnTimeout(int id) {
211  // Timeout tasks may outlive the URLRequest they reference. Make sure it
212  // is still applicable.
213  if (!cur_request_.get())
214    return;
215  SpawnerRequestData* data =
216      static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this));
217  DCHECK(data);
218
219  if (!data->DoesRequestIdMatch(id))
220    return;
221  // Set the result code and cancel the timed-out task.
222  data->SetResultCode(ERR_TIMED_OUT);
223  cur_request_->Cancel();
224  OnSpawnerCommandCompleted(cur_request_.get());
225}
226
227void SpawnerCommunicator::OnSpawnerCommandCompleted(URLRequest* request) {
228  if (!cur_request_.get())
229    return;
230  DCHECK_EQ(request, cur_request_.get());
231  SpawnerRequestData* data =
232      static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this));
233  DCHECK(data);
234
235  // If request is faild,return the error code.
236  if (!cur_request_->status().is_success())
237    data->SetResultCode(cur_request_->status().error());
238
239  if (!data->IsResultOK()) {
240    LOG(ERROR) << "request failed, status: "
241               << static_cast<int>(request->status().status())
242               << ", error: " << request->status().error();
243    // Clear the buffer of received data if any net error happened.
244    data->ClearReceivedData();
245  } else {
246    DCHECK_EQ(1, data->response_started_count());
247  }
248
249  // Clear current request to indicate the completion of sending a command
250  // to spawner server and getting the result.
251  cur_request_.reset();
252  context_.reset();
253  // Invalidate the weak pointers on the IO thread.
254  weak_factory_.InvalidateWeakPtrs();
255
256  // Wakeup the caller in user thread.
257  event_.Signal();
258}
259
260void SpawnerCommunicator::ReadResult(URLRequest* request) {
261  DCHECK_EQ(request, cur_request_.get());
262  SpawnerRequestData* data =
263      static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this));
264  DCHECK(data);
265
266  IOBuffer* buf = data->buf();
267  // Read as many bytes as are available synchronously.
268  while (true) {
269    int num_bytes;
270    if (!request->Read(buf, kBufferSize, &num_bytes)) {
271      // Check whether the read failed synchronously.
272      if (!request->status().is_io_pending())
273        OnSpawnerCommandCompleted(request);
274      return;
275    }
276    if (!data->ConsumeBytesRead(num_bytes)) {
277      OnSpawnerCommandCompleted(request);
278      return;
279    }
280  }
281}
282
283void SpawnerCommunicator::OnResponseStarted(URLRequest* request) {
284  DCHECK_EQ(request, cur_request_.get());
285  SpawnerRequestData* data =
286      static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this));
287  DCHECK(data);
288
289  data->IncreaseResponseStartedCount();
290
291  if (!request->status().is_success()) {
292    OnSpawnerCommandCompleted(request);
293    return;
294  }
295
296  // Require HTTP responses to have a success status code.
297  if (request->GetResponseCode() != 200) {
298    LOG(ERROR) << "Spawner server returned bad status: "
299               << request->response_headers()->GetStatusLine();
300    data->SetResultCode(ERR_FAILED);
301    request->Cancel();
302    OnSpawnerCommandCompleted(request);
303    return;
304  }
305
306  ReadResult(request);
307}
308
309void SpawnerCommunicator::OnReadCompleted(URLRequest* request, int num_bytes) {
310  if (!cur_request_.get())
311    return;
312  DCHECK_EQ(request, cur_request_.get());
313  SpawnerRequestData* data =
314      static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this));
315  DCHECK(data);
316
317  if (data->ConsumeBytesRead(num_bytes)) {
318    // Keep reading.
319    ReadResult(request);
320  } else {
321    OnSpawnerCommandCompleted(request);
322  }
323}
324
325bool SpawnerCommunicator::StartServer(const std::string& arguments,
326                                      uint16* port) {
327  *port = 0;
328  // Send the start command to spawner server to start the Python test server
329  // on remote machine.
330  std::string server_return_data;
331  int result_code;
332  SendCommandAndWaitForResult("start", arguments, &result_code,
333                              &server_return_data);
334  if (OK != result_code || server_return_data.empty())
335    return false;
336
337  // Check whether the data returned from spawner server is JSON-formatted.
338  scoped_ptr<base::Value> value(base::JSONReader::Read(server_return_data));
339  if (!value.get() || !value->IsType(base::Value::TYPE_DICTIONARY)) {
340    LOG(ERROR) << "Invalid server data: " << server_return_data.c_str();
341    return false;
342  }
343
344  // Check whether spawner server returns valid data.
345  base::DictionaryValue* server_data =
346      static_cast<base::DictionaryValue*>(value.get());
347  std::string message;
348  if (!server_data->GetString("message", &message) || message != "started") {
349    LOG(ERROR) << "Invalid message in server data: ";
350    return false;
351  }
352  int int_port;
353  if (!server_data->GetInteger("port", &int_port) || int_port <= 0 ||
354      int_port > kuint16max) {
355    LOG(ERROR) << "Invalid port value: " << int_port;
356    return false;
357  }
358  *port = static_cast<uint16>(int_port);
359  return true;
360}
361
362bool SpawnerCommunicator::StopServer() {
363  // It's OK to stop the SpawnerCommunicator without starting it. Some tests
364  // have test server on their test fixture but do not actually use it.
365  if (!is_running_)
366    return true;
367
368  // When the test is done, ask the test server spawner to kill the test server
369  // on the remote machine.
370  std::string server_return_data;
371  int result_code;
372  SendCommandAndWaitForResult("kill", "", &result_code, &server_return_data);
373  Shutdown();
374  if (OK != result_code || server_return_data != "killed")
375    return false;
376  return true;
377}
378
379}  // namespace net
380