spawner_communicator.cc revision b2df76ea8fec9e32f6f3718986dba0d95315b29c
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/test/spawned_test_server/spawner_communicator.h" 6 7#include "base/json/json_reader.h" 8#include "base/logging.h" 9#include "base/stringprintf.h" 10#include "base/supports_user_data.h" 11#include "base/test/test_timeouts.h" 12#include "base/time.h" 13#include "base/values.h" 14#include "build/build_config.h" 15#include "googleurl/src/gurl.h" 16#include "net/base/net_util.h" 17#include "net/base/upload_bytes_element_reader.h" 18#include "net/base/upload_data_stream.h" 19#include "net/http/http_response_headers.h" 20#include "net/url_request/url_request_test_util.h" 21 22namespace net { 23 24namespace { 25 26GURL GenerateSpawnerCommandURL(const std::string& command, uint16 port) { 27 // Always performs HTTP request for sending command to the spawner server. 28 return GURL(base::StringPrintf("%s:%u/%s", "http://127.0.0.1", port, 29 command.c_str())); 30} 31 32int kBufferSize = 2048; 33 34// A class to hold all data needed to send a command to spawner server. 35class SpawnerRequestData : public base::SupportsUserData::Data { 36 public: 37 SpawnerRequestData(int id, int* result_code, std::string* data_received) 38 : request_id_(id), 39 buf_(new IOBuffer(kBufferSize)), 40 result_code_(result_code), 41 data_received_(data_received), 42 response_started_count_(0) { 43 DCHECK(result_code); 44 *result_code_ = OK; 45 DCHECK(data_received); 46 data_received_->clear(); 47 } 48 49 virtual ~SpawnerRequestData() {} 50 51 bool DoesRequestIdMatch(int request_id) const { 52 return request_id_ == request_id; 53 } 54 55 IOBuffer* buf() const { return buf_.get(); } 56 57 bool IsResultOK() const { return *result_code_ == OK; } 58 59 void ClearReceivedData() { data_received_->clear(); } 60 61 void SetResultCode(int result_code) { *result_code_ = result_code; } 62 63 void IncreaseResponseStartedCount() { response_started_count_++; } 64 65 int response_started_count() const { return response_started_count_; } 66 67 // Write data read from URLRequest::Read() to |data_received_|. Returns true 68 // if |num_bytes| is great than 0. |num_bytes| is 0 for EOF, < 0 on errors. 69 bool ConsumeBytesRead(int num_bytes) { 70 // Error while reading, or EOF. 71 if (num_bytes <= 0) 72 return false; 73 74 data_received_->append(buf_->data(), num_bytes); 75 return true; 76 } 77 78 private: 79 // Unique ID for the current request. 80 int request_id_; 81 82 // Buffer that URLRequest writes into. 83 scoped_refptr<IOBuffer> buf_; 84 85 // Holds the error condition that was hit on the current request, or OK. 86 int* result_code_; 87 88 // Data received from server; 89 std::string* data_received_; 90 91 // Used to track how many times the OnResponseStarted get called after 92 // sending a command to spawner server. 93 int response_started_count_; 94 95 DISALLOW_COPY_AND_ASSIGN(SpawnerRequestData); 96}; 97 98} // namespace 99 100SpawnerCommunicator::SpawnerCommunicator(uint16 port) 101 : io_thread_("spawner_communicator"), 102 event_(false, false), 103 port_(port), 104 next_id_(0), 105 weak_factory_(this), 106 is_running_(false) {} 107 108SpawnerCommunicator::~SpawnerCommunicator() { 109 DCHECK(!is_running_); 110} 111 112void SpawnerCommunicator::WaitForResponse() { 113 DCHECK_NE(MessageLoop::current(), io_thread_.message_loop()); 114 event_.Wait(); 115 event_.Reset(); 116} 117 118void SpawnerCommunicator::StartIOThread() { 119 DCHECK_NE(MessageLoop::current(), io_thread_.message_loop()); 120 if (is_running_) 121 return; 122 123 allowed_port_.reset(new ScopedPortException(port_)); 124 base::Thread::Options options; 125 options.message_loop_type = MessageLoop::TYPE_IO; 126 is_running_ = io_thread_.StartWithOptions(options); 127 DCHECK(is_running_); 128} 129 130void SpawnerCommunicator::Shutdown() { 131 DCHECK_NE(MessageLoop::current(), io_thread_.message_loop()); 132 DCHECK(is_running_); 133 // The request and its context should be created and destroyed only on the 134 // IO thread. 135 DCHECK(!cur_request_.get()); 136 DCHECK(!context_.get()); 137 is_running_ = false; 138 io_thread_.Stop(); 139 allowed_port_.reset(); 140} 141 142void SpawnerCommunicator::SendCommandAndWaitForResult( 143 const std::string& command, 144 const std::string& post_data, 145 int* result_code, 146 std::string* data_received) { 147 if (!result_code || !data_received) 148 return; 149 // Start the communicator thread to talk to test server spawner. 150 StartIOThread(); 151 DCHECK(io_thread_.message_loop()); 152 153 // Since the method will be blocked until SpawnerCommunicator gets result 154 // from the spawner server or timed-out. It's safe to use base::Unretained 155 // when using base::Bind. 156 io_thread_.message_loop()->PostTask(FROM_HERE, base::Bind( 157 &SpawnerCommunicator::SendCommandAndWaitForResultOnIOThread, 158 base::Unretained(this), command, post_data, result_code, data_received)); 159 WaitForResponse(); 160} 161 162void SpawnerCommunicator::SendCommandAndWaitForResultOnIOThread( 163 const std::string& command, 164 const std::string& post_data, 165 int* result_code, 166 std::string* data_received) { 167 MessageLoop* loop = io_thread_.message_loop(); 168 DCHECK(loop); 169 DCHECK_EQ(MessageLoop::current(), loop); 170 171 // Prepare the URLRequest for sending the command. 172 DCHECK(!cur_request_.get()); 173 context_.reset(new TestURLRequestContext); 174 cur_request_.reset(context_->CreateRequest( 175 GenerateSpawnerCommandURL(command, port_), this)); 176 DCHECK(cur_request_.get()); 177 int current_request_id = ++next_id_; 178 SpawnerRequestData* data = new SpawnerRequestData(current_request_id, 179 result_code, 180 data_received); 181 DCHECK(data); 182 cur_request_->SetUserData(this, data); 183 184 if (post_data.empty()) { 185 cur_request_->set_method("GET"); 186 } else { 187 cur_request_->set_method("POST"); 188 scoped_ptr<UploadElementReader> reader( 189 UploadOwnedBytesElementReader::CreateWithString(post_data)); 190 cur_request_->set_upload(make_scoped_ptr( 191 UploadDataStream::CreateWithReader(reader.Pass(), 0))); 192 net::HttpRequestHeaders headers; 193 headers.SetHeader(net::HttpRequestHeaders::kContentType, 194 "application/json"); 195 cur_request_->SetExtraRequestHeaders(headers); 196 } 197 198 // Post a task to timeout this request if it takes too long. 199 MessageLoop::current()->PostDelayedTask( 200 FROM_HERE, 201 base::Bind(&SpawnerCommunicator::OnTimeout, weak_factory_.GetWeakPtr(), 202 current_request_id), 203 TestTimeouts::action_max_timeout()); 204 205 // Start the request. 206 cur_request_->Start(); 207} 208 209void SpawnerCommunicator::OnTimeout(int id) { 210 // Timeout tasks may outlive the URLRequest they reference. Make sure it 211 // is still applicable. 212 if (!cur_request_.get()) 213 return; 214 SpawnerRequestData* data = 215 static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this)); 216 DCHECK(data); 217 218 if (!data->DoesRequestIdMatch(id)) 219 return; 220 // Set the result code and cancel the timed-out task. 221 data->SetResultCode(ERR_TIMED_OUT); 222 cur_request_->Cancel(); 223 OnSpawnerCommandCompleted(cur_request_.get()); 224} 225 226void SpawnerCommunicator::OnSpawnerCommandCompleted(URLRequest* request) { 227 if (!cur_request_.get()) 228 return; 229 DCHECK_EQ(request, cur_request_.get()); 230 SpawnerRequestData* data = 231 static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this)); 232 DCHECK(data); 233 234 // If request is faild,return the error code. 235 if (!cur_request_->status().is_success()) 236 data->SetResultCode(cur_request_->status().error()); 237 238 if (!data->IsResultOK()) { 239 LOG(ERROR) << "request failed, status: " 240 << static_cast<int>(request->status().status()) 241 << ", error: " << request->status().error(); 242 // Clear the buffer of received data if any net error happened. 243 data->ClearReceivedData(); 244 } else { 245 DCHECK_EQ(1, data->response_started_count()); 246 } 247 248 // Clear current request to indicate the completion of sending a command 249 // to spawner server and getting the result. 250 cur_request_.reset(); 251 context_.reset(); 252 // Invalidate the weak pointers on the IO thread. 253 weak_factory_.InvalidateWeakPtrs(); 254 255 // Wakeup the caller in user thread. 256 event_.Signal(); 257} 258 259void SpawnerCommunicator::ReadResult(URLRequest* request) { 260 DCHECK_EQ(request, cur_request_.get()); 261 SpawnerRequestData* data = 262 static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this)); 263 DCHECK(data); 264 265 IOBuffer* buf = data->buf(); 266 // Read as many bytes as are available synchronously. 267 while (true) { 268 int num_bytes; 269 if (!request->Read(buf, kBufferSize, &num_bytes)) { 270 // Check whether the read failed synchronously. 271 if (!request->status().is_io_pending()) 272 OnSpawnerCommandCompleted(request); 273 return; 274 } 275 if (!data->ConsumeBytesRead(num_bytes)) { 276 OnSpawnerCommandCompleted(request); 277 return; 278 } 279 } 280} 281 282void SpawnerCommunicator::OnResponseStarted(URLRequest* request) { 283 DCHECK_EQ(request, cur_request_.get()); 284 SpawnerRequestData* data = 285 static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this)); 286 DCHECK(data); 287 288 data->IncreaseResponseStartedCount(); 289 290 if (!request->status().is_success()) { 291 OnSpawnerCommandCompleted(request); 292 return; 293 } 294 295 // Require HTTP responses to have a success status code. 296 if (request->GetResponseCode() != 200) { 297 LOG(ERROR) << "Spawner server returned bad status: " 298 << request->response_headers()->GetStatusLine(); 299 data->SetResultCode(ERR_FAILED); 300 request->Cancel(); 301 OnSpawnerCommandCompleted(request); 302 return; 303 } 304 305 ReadResult(request); 306} 307 308void SpawnerCommunicator::OnReadCompleted(URLRequest* request, int num_bytes) { 309 if (!cur_request_.get()) 310 return; 311 DCHECK_EQ(request, cur_request_.get()); 312 SpawnerRequestData* data = 313 static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this)); 314 DCHECK(data); 315 316 if (data->ConsumeBytesRead(num_bytes)) { 317 // Keep reading. 318 ReadResult(request); 319 } else { 320 OnSpawnerCommandCompleted(request); 321 } 322} 323 324bool SpawnerCommunicator::StartServer(const std::string& arguments, 325 uint16* port) { 326 *port = 0; 327 // Send the start command to spawner server to start the Python test server 328 // on remote machine. 329 std::string server_return_data; 330 int result_code; 331 SendCommandAndWaitForResult("start", arguments, &result_code, 332 &server_return_data); 333 if (OK != result_code || server_return_data.empty()) 334 return false; 335 336 // Check whether the data returned from spawner server is JSON-formatted. 337 scoped_ptr<base::Value> value(base::JSONReader::Read(server_return_data)); 338 if (!value.get() || !value->IsType(base::Value::TYPE_DICTIONARY)) { 339 LOG(ERROR) << "Invalid server data: " << server_return_data.c_str(); 340 return false; 341 } 342 343 // Check whether spawner server returns valid data. 344 DictionaryValue* server_data = static_cast<DictionaryValue*>(value.get()); 345 std::string message; 346 if (!server_data->GetString("message", &message) || message != "started") { 347 LOG(ERROR) << "Invalid message in server data: "; 348 return false; 349 } 350 int int_port; 351 if (!server_data->GetInteger("port", &int_port) || int_port <= 0 || 352 int_port > kuint16max) { 353 LOG(ERROR) << "Invalid port value: " << int_port; 354 return false; 355 } 356 *port = static_cast<uint16>(int_port); 357 return true; 358} 359 360bool SpawnerCommunicator::StopServer() { 361 // It's OK to stop the SpawnerCommunicator without starting it. Some tests 362 // have test server on their test fixture but do not actually use it. 363 if (!is_running_) 364 return true; 365 366 // When the test is done, ask the test server spawner to kill the test server 367 // on the remote machine. 368 std::string server_return_data; 369 int result_code; 370 SendCommandAndWaitForResult("kill", "", &result_code, &server_return_data); 371 Shutdown(); 372 if (OK != result_code || server_return_data != "killed") 373 return false; 374 return true; 375} 376 377} // namespace net 378