spawner_communicator.cc revision 7d4cd473f85ac64c3747c96c277f9e506a0d2246
1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "net/test/spawned_test_server/spawner_communicator.h" 6 7#include "base/json/json_reader.h" 8#include "base/logging.h" 9#include "base/strings/stringprintf.h" 10#include "base/supports_user_data.h" 11#include "base/test/test_timeouts.h" 12#include "base/time.h" 13#include "base/values.h" 14#include "build/build_config.h" 15#include "googleurl/src/gurl.h" 16#include "net/base/net_util.h" 17#include "net/base/upload_bytes_element_reader.h" 18#include "net/base/upload_data_stream.h" 19#include "net/http/http_response_headers.h" 20#include "net/url_request/url_request_test_util.h" 21 22namespace net { 23 24namespace { 25 26GURL GenerateSpawnerCommandURL(const std::string& command, uint16 port) { 27 // Always performs HTTP request for sending command to the spawner server. 28 return GURL(base::StringPrintf("%s:%u/%s", "http://127.0.0.1", port, 29 command.c_str())); 30} 31 32int kBufferSize = 2048; 33 34// A class to hold all data needed to send a command to spawner server. 35class SpawnerRequestData : public base::SupportsUserData::Data { 36 public: 37 SpawnerRequestData(int id, int* result_code, std::string* data_received) 38 : request_id_(id), 39 buf_(new IOBuffer(kBufferSize)), 40 result_code_(result_code), 41 data_received_(data_received), 42 response_started_count_(0) { 43 DCHECK(result_code); 44 *result_code_ = OK; 45 DCHECK(data_received); 46 data_received_->clear(); 47 } 48 49 virtual ~SpawnerRequestData() {} 50 51 bool DoesRequestIdMatch(int request_id) const { 52 return request_id_ == request_id; 53 } 54 55 IOBuffer* buf() const { return buf_.get(); } 56 57 bool IsResultOK() const { return *result_code_ == OK; } 58 59 void ClearReceivedData() { data_received_->clear(); } 60 61 void SetResultCode(int result_code) { *result_code_ = result_code; } 62 63 void IncreaseResponseStartedCount() { response_started_count_++; } 64 65 int response_started_count() const { return response_started_count_; } 66 67 // Write data read from URLRequest::Read() to |data_received_|. Returns true 68 // if |num_bytes| is great than 0. |num_bytes| is 0 for EOF, < 0 on errors. 69 bool ConsumeBytesRead(int num_bytes) { 70 // Error while reading, or EOF. 71 if (num_bytes <= 0) 72 return false; 73 74 data_received_->append(buf_->data(), num_bytes); 75 return true; 76 } 77 78 private: 79 // Unique ID for the current request. 80 int request_id_; 81 82 // Buffer that URLRequest writes into. 83 scoped_refptr<IOBuffer> buf_; 84 85 // Holds the error condition that was hit on the current request, or OK. 86 int* result_code_; 87 88 // Data received from server; 89 std::string* data_received_; 90 91 // Used to track how many times the OnResponseStarted get called after 92 // sending a command to spawner server. 93 int response_started_count_; 94 95 DISALLOW_COPY_AND_ASSIGN(SpawnerRequestData); 96}; 97 98} // namespace 99 100SpawnerCommunicator::SpawnerCommunicator(uint16 port) 101 : io_thread_("spawner_communicator"), 102 event_(false, false), 103 port_(port), 104 next_id_(0), 105 weak_factory_(this), 106 is_running_(false) {} 107 108SpawnerCommunicator::~SpawnerCommunicator() { 109 DCHECK(!is_running_); 110} 111 112void SpawnerCommunicator::WaitForResponse() { 113 DCHECK_NE(base::MessageLoop::current(), io_thread_.message_loop()); 114 event_.Wait(); 115 event_.Reset(); 116} 117 118void SpawnerCommunicator::StartIOThread() { 119 DCHECK_NE(base::MessageLoop::current(), io_thread_.message_loop()); 120 if (is_running_) 121 return; 122 123 allowed_port_.reset(new ScopedPortException(port_)); 124 base::Thread::Options options; 125 options.message_loop_type = base::MessageLoop::TYPE_IO; 126 is_running_ = io_thread_.StartWithOptions(options); 127 DCHECK(is_running_); 128} 129 130void SpawnerCommunicator::Shutdown() { 131 DCHECK_NE(base::MessageLoop::current(), io_thread_.message_loop()); 132 DCHECK(is_running_); 133 // The request and its context should be created and destroyed only on the 134 // IO thread. 135 DCHECK(!cur_request_.get()); 136 DCHECK(!context_.get()); 137 is_running_ = false; 138 io_thread_.Stop(); 139 allowed_port_.reset(); 140} 141 142void SpawnerCommunicator::SendCommandAndWaitForResult( 143 const std::string& command, 144 const std::string& post_data, 145 int* result_code, 146 std::string* data_received) { 147 if (!result_code || !data_received) 148 return; 149 // Start the communicator thread to talk to test server spawner. 150 StartIOThread(); 151 DCHECK(io_thread_.message_loop()); 152 153 // Since the method will be blocked until SpawnerCommunicator gets result 154 // from the spawner server or timed-out. It's safe to use base::Unretained 155 // when using base::Bind. 156 io_thread_.message_loop()->PostTask(FROM_HERE, base::Bind( 157 &SpawnerCommunicator::SendCommandAndWaitForResultOnIOThread, 158 base::Unretained(this), command, post_data, result_code, data_received)); 159 WaitForResponse(); 160} 161 162void SpawnerCommunicator::SendCommandAndWaitForResultOnIOThread( 163 const std::string& command, 164 const std::string& post_data, 165 int* result_code, 166 std::string* data_received) { 167 base::MessageLoop* loop = io_thread_.message_loop(); 168 DCHECK(loop); 169 DCHECK_EQ(base::MessageLoop::current(), loop); 170 171 // Prepare the URLRequest for sending the command. 172 DCHECK(!cur_request_.get()); 173 context_.reset(new TestURLRequestContext); 174 cur_request_.reset(context_->CreateRequest( 175 GenerateSpawnerCommandURL(command, port_), this)); 176 DCHECK(cur_request_.get()); 177 int current_request_id = ++next_id_; 178 SpawnerRequestData* data = new SpawnerRequestData(current_request_id, 179 result_code, 180 data_received); 181 DCHECK(data); 182 cur_request_->SetUserData(this, data); 183 184 if (post_data.empty()) { 185 cur_request_->set_method("GET"); 186 } else { 187 cur_request_->set_method("POST"); 188 scoped_ptr<UploadElementReader> reader( 189 UploadOwnedBytesElementReader::CreateWithString(post_data)); 190 cur_request_->set_upload(make_scoped_ptr( 191 UploadDataStream::CreateWithReader(reader.Pass(), 0))); 192 net::HttpRequestHeaders headers; 193 headers.SetHeader(net::HttpRequestHeaders::kContentType, 194 "application/json"); 195 cur_request_->SetExtraRequestHeaders(headers); 196 } 197 198 // Post a task to timeout this request if it takes too long. 199 base::MessageLoop::current()->PostDelayedTask( 200 FROM_HERE, 201 base::Bind(&SpawnerCommunicator::OnTimeout, 202 weak_factory_.GetWeakPtr(), 203 current_request_id), 204 TestTimeouts::action_max_timeout()); 205 206 // Start the request. 207 cur_request_->Start(); 208} 209 210void SpawnerCommunicator::OnTimeout(int id) { 211 // Timeout tasks may outlive the URLRequest they reference. Make sure it 212 // is still applicable. 213 if (!cur_request_.get()) 214 return; 215 SpawnerRequestData* data = 216 static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this)); 217 DCHECK(data); 218 219 if (!data->DoesRequestIdMatch(id)) 220 return; 221 // Set the result code and cancel the timed-out task. 222 data->SetResultCode(ERR_TIMED_OUT); 223 cur_request_->Cancel(); 224 OnSpawnerCommandCompleted(cur_request_.get()); 225} 226 227void SpawnerCommunicator::OnSpawnerCommandCompleted(URLRequest* request) { 228 if (!cur_request_.get()) 229 return; 230 DCHECK_EQ(request, cur_request_.get()); 231 SpawnerRequestData* data = 232 static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this)); 233 DCHECK(data); 234 235 // If request is faild,return the error code. 236 if (!cur_request_->status().is_success()) 237 data->SetResultCode(cur_request_->status().error()); 238 239 if (!data->IsResultOK()) { 240 LOG(ERROR) << "request failed, status: " 241 << static_cast<int>(request->status().status()) 242 << ", error: " << request->status().error(); 243 // Clear the buffer of received data if any net error happened. 244 data->ClearReceivedData(); 245 } else { 246 DCHECK_EQ(1, data->response_started_count()); 247 } 248 249 // Clear current request to indicate the completion of sending a command 250 // to spawner server and getting the result. 251 cur_request_.reset(); 252 context_.reset(); 253 // Invalidate the weak pointers on the IO thread. 254 weak_factory_.InvalidateWeakPtrs(); 255 256 // Wakeup the caller in user thread. 257 event_.Signal(); 258} 259 260void SpawnerCommunicator::ReadResult(URLRequest* request) { 261 DCHECK_EQ(request, cur_request_.get()); 262 SpawnerRequestData* data = 263 static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this)); 264 DCHECK(data); 265 266 IOBuffer* buf = data->buf(); 267 // Read as many bytes as are available synchronously. 268 while (true) { 269 int num_bytes; 270 if (!request->Read(buf, kBufferSize, &num_bytes)) { 271 // Check whether the read failed synchronously. 272 if (!request->status().is_io_pending()) 273 OnSpawnerCommandCompleted(request); 274 return; 275 } 276 if (!data->ConsumeBytesRead(num_bytes)) { 277 OnSpawnerCommandCompleted(request); 278 return; 279 } 280 } 281} 282 283void SpawnerCommunicator::OnResponseStarted(URLRequest* request) { 284 DCHECK_EQ(request, cur_request_.get()); 285 SpawnerRequestData* data = 286 static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this)); 287 DCHECK(data); 288 289 data->IncreaseResponseStartedCount(); 290 291 if (!request->status().is_success()) { 292 OnSpawnerCommandCompleted(request); 293 return; 294 } 295 296 // Require HTTP responses to have a success status code. 297 if (request->GetResponseCode() != 200) { 298 LOG(ERROR) << "Spawner server returned bad status: " 299 << request->response_headers()->GetStatusLine(); 300 data->SetResultCode(ERR_FAILED); 301 request->Cancel(); 302 OnSpawnerCommandCompleted(request); 303 return; 304 } 305 306 ReadResult(request); 307} 308 309void SpawnerCommunicator::OnReadCompleted(URLRequest* request, int num_bytes) { 310 if (!cur_request_.get()) 311 return; 312 DCHECK_EQ(request, cur_request_.get()); 313 SpawnerRequestData* data = 314 static_cast<SpawnerRequestData*>(cur_request_->GetUserData(this)); 315 DCHECK(data); 316 317 if (data->ConsumeBytesRead(num_bytes)) { 318 // Keep reading. 319 ReadResult(request); 320 } else { 321 OnSpawnerCommandCompleted(request); 322 } 323} 324 325bool SpawnerCommunicator::StartServer(const std::string& arguments, 326 uint16* port) { 327 *port = 0; 328 // Send the start command to spawner server to start the Python test server 329 // on remote machine. 330 std::string server_return_data; 331 int result_code; 332 SendCommandAndWaitForResult("start", arguments, &result_code, 333 &server_return_data); 334 if (OK != result_code || server_return_data.empty()) 335 return false; 336 337 // Check whether the data returned from spawner server is JSON-formatted. 338 scoped_ptr<base::Value> value(base::JSONReader::Read(server_return_data)); 339 if (!value.get() || !value->IsType(base::Value::TYPE_DICTIONARY)) { 340 LOG(ERROR) << "Invalid server data: " << server_return_data.c_str(); 341 return false; 342 } 343 344 // Check whether spawner server returns valid data. 345 base::DictionaryValue* server_data = 346 static_cast<base::DictionaryValue*>(value.get()); 347 std::string message; 348 if (!server_data->GetString("message", &message) || message != "started") { 349 LOG(ERROR) << "Invalid message in server data: "; 350 return false; 351 } 352 int int_port; 353 if (!server_data->GetInteger("port", &int_port) || int_port <= 0 || 354 int_port > kuint16max) { 355 LOG(ERROR) << "Invalid port value: " << int_port; 356 return false; 357 } 358 *port = static_cast<uint16>(int_port); 359 return true; 360} 361 362bool SpawnerCommunicator::StopServer() { 363 // It's OK to stop the SpawnerCommunicator without starting it. Some tests 364 // have test server on their test fixture but do not actually use it. 365 if (!is_running_) 366 return true; 367 368 // When the test is done, ask the test server spawner to kill the test server 369 // on the remote machine. 370 std::string server_return_data; 371 int result_code; 372 SendCommandAndWaitForResult("kill", "", &result_code, &server_return_data); 373 Shutdown(); 374 if (OK != result_code || server_return_data != "killed") 375 return false; 376 return true; 377} 378 379} // namespace net 380