spawner_communicator.cc revision b2df76ea8fec9e32f6f3718986dba0d95315b29c
1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/test/spawned_test_server/spawner_communicator.h"
6
7#include "base/json/json_reader.h"
8#include "base/logging.h"
9#include "base/stringprintf.h"
10#include "base/supports_user_data.h"
11#include "base/test/test_timeouts.h"
12#include "base/time.h"
13#include "base/values.h"
14#include "build/build_config.h"
15#include "googleurl/src/gurl.h"
16#include "net/base/net_util.h"
17#include "net/base/upload_bytes_element_reader.h"
18#include "net/base/upload_data_stream.h"
19#include "net/http/http_response_headers.h"
20#include "net/url_request/url_request_test_util.h"
21
22namespace net {
23
24namespace {
25
26GURL GenerateSpawnerCommandURL(const std::string& command, uint16 port) {
27  // Always performs HTTP request for sending command to the spawner server.
28  return GURL(base::StringPrintf("%s:%u/%s", "http://127.0.0.1", port,
29                                 command.c_str()));
30}
31
32int kBufferSize = 2048;
33
34// A class to hold all data needed to send a command to spawner server.
35class SpawnerRequestData : public base::SupportsUserData::Data {
36 public:
37  SpawnerRequestData(int id, int* result_code, std::string* data_received)
38      : request_id_(id),
39        buf_(new IOBuffer(kBufferSize)),
40        result_code_(result_code),
41        data_received_(data_received),
42        response_started_count_(0) {
43    DCHECK(result_code);
44    *result_code_ = OK;
45    DCHECK(data_received);
46    data_received_->clear();
47  }
48
49  virtual ~SpawnerRequestData() {}
50
51  bool DoesRequestIdMatch(int request_id) const {
52    return request_id_ == request_id;
53  }
54
55  IOBuffer* buf() const { return buf_.get(); }
56
57  bool IsResultOK() const { return *result_code_ == OK; }
58
59  void ClearReceivedData() { data_received_->clear(); }
60
61  void SetResultCode(int result_code) { *result_code_ = result_code; }
62
63  void IncreaseResponseStartedCount() { response_started_count_++; }
64
65  int response_started_count() const { return response_started_count_; }
66
67  // Write data read from URLRequest::Read() to |data_received_|. Returns true
68  // if |num_bytes| is great than 0. |num_bytes| is 0 for EOF, < 0 on errors.
69  bool ConsumeBytesRead(int num_bytes) {
70    // Error while reading, or EOF.
71    if (num_bytes <= 0)
72      return false;
73
74    data_received_->append(buf_->data(), num_bytes);
75    return true;
76  }
77
78 private:
79  // Unique ID for the current request.
80  int request_id_;
81
82  // Buffer that URLRequest writes into.
83  scoped_refptr<IOBuffer> buf_;
84
85  // Holds the error condition that was hit on the current request, or OK.
86  int* result_code_;
87
88  // Data received from server;
89  std::string* data_received_;
90
91  // Used to track how many times the OnResponseStarted get called after
92  // sending a command to spawner server.
93  int response_started_count_;
94
95  DISALLOW_COPY_AND_ASSIGN(SpawnerRequestData);
96};
97
98}  // namespace
99
100SpawnerCommunicator::SpawnerCommunicator(uint16 port)
101    : io_thread_("spawner_communicator"),
102      event_(false, false),
103      port_(port),
104      next_id_(0),
105      weak_factory_(this),
106      is_running_(false) {}
107
108SpawnerCommunicator::~SpawnerCommunicator() {
109  DCHECK(!is_running_);
110}
111
112void SpawnerCommunicator::WaitForResponse() {
113  DCHECK_NE(MessageLoop::current(), io_thread_.message_loop());
114  event_.Wait();
115  event_.Reset();
116}
117
118void SpawnerCommunicator::StartIOThread() {
119  DCHECK_NE(MessageLoop::current(), io_thread_.message_loop());
120  if (is_running_)
121    return;
122
123  allowed_port_.reset(new ScopedPortException(port_));
124  base::Thread::Options options;
125  options.message_loop_type = MessageLoop::TYPE_IO;
126  is_running_ = io_thread_.StartWithOptions(options);
127  DCHECK(is_running_);
128}
129
130void SpawnerCommunicator::Shutdown() {
131  DCHECK_NE(MessageLoop::current(), io_thread_.message_loop());
132  DCHECK(is_running_);
133  // The request and its context should be created and destroyed only on the
134  // IO thread.
135  DCHECK(!cur_request_.get());
136  DCHECK(!context_.get());
137  is_running_ = false;
138  io_thread_.Stop();
139  allowed_port_.reset();
140}
141
142void SpawnerCommunicator::SendCommandAndWaitForResult(
143    const std::string& command,
144    const std::string& post_data,
145    int* result_code,
146    std::string* data_received) {
147  if (!result_code || !data_received)
148    return;
149  // Start the communicator thread to talk to test server spawner.
150  StartIOThread();
151  DCHECK(io_thread_.message_loop());
152
153  // Since the method will be blocked until SpawnerCommunicator gets result
154  // from the spawner server or timed-out. It's safe to use base::Unretained
155  // when using base::Bind.
156  io_thread_.message_loop()->PostTask(FROM_HERE, base::Bind(
157      &SpawnerCommunicator::SendCommandAndWaitForResultOnIOThread,
158      base::Unretained(this), command, post_data, result_code, data_received));
159  WaitForResponse();
160}
161
162void SpawnerCommunicator::SendCommandAndWaitForResultOnIOThread(
163    const std::string& command,
164    const std::string& post_data,
165    int* result_code,
166    std::string* data_received) {
167  MessageLoop* loop = io_thread_.message_loop();
168  DCHECK(loop);
169  DCHECK_EQ(MessageLoop::current(), loop);
170
171  // Prepare the URLRequest for sending the command.
172  DCHECK(!cur_request_.get());
173  context_.reset(new TestURLRequestContext);
174  cur_request_.reset(context_->CreateRequest(
175      GenerateSpawnerCommandURL(command, port_), this));
176  DCHECK(cur_request_.get());
177  int current_request_id = ++next_id_;
178  SpawnerRequestData* data = new SpawnerRequestData(current_request_id,
179                                                    result_code,
180                                                    data_received);
181  DCHECK(data);
182  cur_request_->SetUserData(this, data);
183
184  if (post_data.empty()) {
185    cur_request_->set_method("GET");
186  } else {
187    cur_request_->set_method("POST");
188    scoped_ptr<UploadElementReader> reader(
189        UploadOwnedBytesElementReader::CreateWithString(post_data));
190    cur_request_->set_upload(make_scoped_ptr(
191        UploadDataStream::CreateWithReader(reader.Pass(), 0)));
192    net::HttpRequestHeaders headers;
193    headers.SetHeader(net::HttpRequestHeaders::kContentType,
194                      "application/json");
195    cur_request_->SetExtraRequestHeaders(headers);
196  }
197
198  // Post a task to timeout this request if it takes too long.
199  MessageLoop::current()->PostDelayedTask(
200      FROM_HERE,
201      base::Bind(&SpawnerCommunicator::OnTimeout, weak_factory_.GetWeakPtr(),
202                 current_request_id),
203      TestTimeouts::action_max_timeout());
204
205  // Start the request.
206  cur_request_->Start();
207}
208
209void SpawnerCommunicator::OnTimeout(int id) {
210  // Timeout tasks may outlive the URLRequest they reference. Make sure it
211  // is still applicable.
212  if (!cur_request_.get())
213    return;
214  SpawnerRequestData* data =
215      static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this));
216  DCHECK(data);
217
218  if (!data->DoesRequestIdMatch(id))
219    return;
220  // Set the result code and cancel the timed-out task.
221  data->SetResultCode(ERR_TIMED_OUT);
222  cur_request_->Cancel();
223  OnSpawnerCommandCompleted(cur_request_.get());
224}
225
226void SpawnerCommunicator::OnSpawnerCommandCompleted(URLRequest* request) {
227  if (!cur_request_.get())
228    return;
229  DCHECK_EQ(request, cur_request_.get());
230  SpawnerRequestData* data =
231      static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this));
232  DCHECK(data);
233
234  // If request is faild,return the error code.
235  if (!cur_request_->status().is_success())
236    data->SetResultCode(cur_request_->status().error());
237
238  if (!data->IsResultOK()) {
239    LOG(ERROR) << "request failed, status: "
240               << static_cast<int>(request->status().status())
241               << ", error: " << request->status().error();
242    // Clear the buffer of received data if any net error happened.
243    data->ClearReceivedData();
244  } else {
245    DCHECK_EQ(1, data->response_started_count());
246  }
247
248  // Clear current request to indicate the completion of sending a command
249  // to spawner server and getting the result.
250  cur_request_.reset();
251  context_.reset();
252  // Invalidate the weak pointers on the IO thread.
253  weak_factory_.InvalidateWeakPtrs();
254
255  // Wakeup the caller in user thread.
256  event_.Signal();
257}
258
259void SpawnerCommunicator::ReadResult(URLRequest* request) {
260  DCHECK_EQ(request, cur_request_.get());
261  SpawnerRequestData* data =
262      static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this));
263  DCHECK(data);
264
265  IOBuffer* buf = data->buf();
266  // Read as many bytes as are available synchronously.
267  while (true) {
268    int num_bytes;
269    if (!request->Read(buf, kBufferSize, &num_bytes)) {
270      // Check whether the read failed synchronously.
271      if (!request->status().is_io_pending())
272        OnSpawnerCommandCompleted(request);
273      return;
274    }
275    if (!data->ConsumeBytesRead(num_bytes)) {
276      OnSpawnerCommandCompleted(request);
277      return;
278    }
279  }
280}
281
282void SpawnerCommunicator::OnResponseStarted(URLRequest* request) {
283  DCHECK_EQ(request, cur_request_.get());
284  SpawnerRequestData* data =
285      static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this));
286  DCHECK(data);
287
288  data->IncreaseResponseStartedCount();
289
290  if (!request->status().is_success()) {
291    OnSpawnerCommandCompleted(request);
292    return;
293  }
294
295  // Require HTTP responses to have a success status code.
296  if (request->GetResponseCode() != 200) {
297    LOG(ERROR) << "Spawner server returned bad status: "
298               << request->response_headers()->GetStatusLine();
299    data->SetResultCode(ERR_FAILED);
300    request->Cancel();
301    OnSpawnerCommandCompleted(request);
302    return;
303  }
304
305  ReadResult(request);
306}
307
308void SpawnerCommunicator::OnReadCompleted(URLRequest* request, int num_bytes) {
309  if (!cur_request_.get())
310    return;
311  DCHECK_EQ(request, cur_request_.get());
312  SpawnerRequestData* data =
313      static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this));
314  DCHECK(data);
315
316  if (data->ConsumeBytesRead(num_bytes)) {
317    // Keep reading.
318    ReadResult(request);
319  } else {
320    OnSpawnerCommandCompleted(request);
321  }
322}
323
324bool SpawnerCommunicator::StartServer(const std::string& arguments,
325                                      uint16* port) {
326  *port = 0;
327  // Send the start command to spawner server to start the Python test server
328  // on remote machine.
329  std::string server_return_data;
330  int result_code;
331  SendCommandAndWaitForResult("start", arguments, &result_code,
332                              &server_return_data);
333  if (OK != result_code || server_return_data.empty())
334    return false;
335
336  // Check whether the data returned from spawner server is JSON-formatted.
337  scoped_ptr<base::Value> value(base::JSONReader::Read(server_return_data));
338  if (!value.get() || !value->IsType(base::Value::TYPE_DICTIONARY)) {
339    LOG(ERROR) << "Invalid server data: " << server_return_data.c_str();
340    return false;
341  }
342
343  // Check whether spawner server returns valid data.
344  DictionaryValue* server_data = static_cast<DictionaryValue*>(value.get());
345  std::string message;
346  if (!server_data->GetString("message", &message) || message != "started") {
347    LOG(ERROR) << "Invalid message in server data: ";
348    return false;
349  }
350  int int_port;
351  if (!server_data->GetInteger("port", &int_port) || int_port <= 0 ||
352      int_port > kuint16max) {
353    LOG(ERROR) << "Invalid port value: " << int_port;
354    return false;
355  }
356  *port = static_cast<uint16>(int_port);
357  return true;
358}
359
360bool SpawnerCommunicator::StopServer() {
361  // It's OK to stop the SpawnerCommunicator without starting it. Some tests
362  // have test server on their test fixture but do not actually use it.
363  if (!is_running_)
364    return true;
365
366  // When the test is done, ask the test server spawner to kill the test server
367  // on the remote machine.
368  std::string server_return_data;
369  int result_code;
370  SendCommandAndWaitForResult("kill", "", &result_code, &server_return_data);
371  Shutdown();
372  if (OK != result_code || server_return_data != "killed")
373    return false;
374  return true;
375}
376
377}  // namespace net
378