MyHandler.h revision e0dd7d396051942ccce0429d7a1fe968d63ac3f7
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef MY_HANDLER_H_ 18 19#define MY_HANDLER_H_ 20 21#include "APacketSource.h" 22#include "ARTPConnection.h" 23#include "ARTSPConnection.h" 24#include "ASessionDescription.h" 25 26#include <media/stagefright/foundation/ABuffer.h> 27#include <media/stagefright/foundation/ADebug.h> 28#include <media/stagefright/foundation/ALooper.h> 29#include <media/stagefright/foundation/AMessage.h> 30#include <media/stagefright/MediaErrors.h> 31 32namespace android { 33 34struct MyHandler : public AHandler { 35 MyHandler(const char *url, const sp<ALooper> &looper) 36 : mLooper(looper), 37 mNetLooper(new ALooper), 38 mConn(new ARTSPConnection), 39 mRTPConn(new ARTPConnection), 40 mSessionURL(url), 41 mSetupTracksSuccessful(false), 42 mSeekPending(false), 43 mFirstAccessUnit(true), 44 mFirstAccessUnitNTP(0) { 45 46 mNetLooper->start(false /* runOnCallingThread */, 47 false /* canCallJava */, 48 PRIORITY_HIGHEST); 49 } 50 51 void connect(const sp<AMessage> &doneMsg) { 52 mDoneMsg = doneMsg; 53 54 mLooper->registerHandler(this); 55 mLooper->registerHandler(mConn); 56 (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn); 57 58 sp<AMessage> reply = new AMessage('conn', id()); 59 mConn->connect(mSessionURL.c_str(), reply); 60 } 61 62 void disconnect(const sp<AMessage> &doneMsg) { 63 mDoneMsg = doneMsg; 64 65 (new AMessage('abor', id()))->post(); 66 } 67 68 void seek(int64_t timeUs) { 69 sp<AMessage> msg = new AMessage('seek', id()); 70 msg->setInt64("time", timeUs); 71 msg->post(); 72 } 73 74 virtual void onMessageReceived(const sp<AMessage> &msg) { 75 switch (msg->what()) { 76 case 'conn': 77 { 78 int32_t result; 79 CHECK(msg->findInt32("result", &result)); 80 81 LOG(INFO) << "connection request completed with result " 82 << result << " (" << strerror(-result) << ")"; 83 84 if (result == OK) { 85 AString request; 86 request = "DESCRIBE "; 87 request.append(mSessionURL); 88 request.append(" RTSP/1.0\r\n"); 89 request.append("Accept: application/sdp\r\n"); 90 request.append("\r\n"); 91 92 sp<AMessage> reply = new AMessage('desc', id()); 93 mConn->sendRequest(request.c_str(), reply); 94 } 95 break; 96 } 97 98 case 'disc': 99 { 100 (new AMessage('quit', id()))->post(); 101 break; 102 } 103 104 case 'desc': 105 { 106 int32_t result; 107 CHECK(msg->findInt32("result", &result)); 108 109 LOG(INFO) << "DESCRIBE completed with result " 110 << result << " (" << strerror(-result) << ")"; 111 112 if (result == OK) { 113 sp<RefBase> obj; 114 CHECK(msg->findObject("response", &obj)); 115 sp<ARTSPResponse> response = 116 static_cast<ARTSPResponse *>(obj.get()); 117 118 if (response->mStatusCode == 302) { 119 ssize_t i = response->mHeaders.indexOfKey("location"); 120 CHECK_GE(i, 0); 121 122 mSessionURL = response->mHeaders.valueAt(i); 123 124 AString request; 125 request = "DESCRIBE "; 126 request.append(mSessionURL); 127 request.append(" RTSP/1.0\r\n"); 128 request.append("Accept: application/sdp\r\n"); 129 request.append("\r\n"); 130 131 sp<AMessage> reply = new AMessage('desc', id()); 132 mConn->sendRequest(request.c_str(), reply); 133 break; 134 } 135 136 CHECK_EQ(response->mStatusCode, 200u); 137 138 mSessionDesc = new ASessionDescription; 139 140 mSessionDesc->setTo( 141 response->mContent->data(), 142 response->mContent->size()); 143 144 CHECK(mSessionDesc->isValid()); 145 146 ssize_t i = response->mHeaders.indexOfKey("content-base"); 147 if (i >= 0) { 148 mBaseURL = response->mHeaders.valueAt(i); 149 } else { 150 i = response->mHeaders.indexOfKey("content-location"); 151 if (i >= 0) { 152 mBaseURL = response->mHeaders.valueAt(i); 153 } else { 154 mBaseURL = mSessionURL; 155 } 156 } 157 158 CHECK_GT(mSessionDesc->countTracks(), 1u); 159 setupTrack(1); 160 } else { 161 sp<AMessage> reply = new AMessage('disc', id()); 162 mConn->disconnect(reply); 163 } 164 break; 165 } 166 167 case 'setu': 168 { 169 size_t index; 170 CHECK(msg->findSize("index", &index)); 171 172 TrackInfo *track = NULL; 173 size_t trackIndex; 174 if (msg->findSize("track-index", &trackIndex)) { 175 track = &mTracks.editItemAt(trackIndex); 176 } 177 178 int32_t result; 179 CHECK(msg->findInt32("result", &result)); 180 181 LOG(INFO) << "SETUP(" << index << ") completed with result " 182 << result << " (" << strerror(-result) << ")"; 183 184 if (result != OK) { 185 if (track) { 186 close(track->mRTPSocket); 187 close(track->mRTCPSocket); 188 189 mTracks.removeItemsAt(trackIndex); 190 } 191 } else { 192 CHECK(track != NULL); 193 194 sp<RefBase> obj; 195 CHECK(msg->findObject("response", &obj)); 196 sp<ARTSPResponse> response = 197 static_cast<ARTSPResponse *>(obj.get()); 198 199 CHECK_EQ(response->mStatusCode, 200u); 200 201 ssize_t i = response->mHeaders.indexOfKey("session"); 202 CHECK_GE(i, 0); 203 204 if (index == 1) { 205 mSessionID = response->mHeaders.valueAt(i); 206 i = mSessionID.find(";"); 207 if (i >= 0) { 208 // Remove options, i.e. ";timeout=90" 209 mSessionID.erase(i, mSessionID.size() - i); 210 } 211 } 212 213 sp<AMessage> notify = new AMessage('accu', id()); 214 notify->setSize("track-index", trackIndex); 215 216 mRTPConn->addStream( 217 track->mRTPSocket, track->mRTCPSocket, 218 mSessionDesc, index, 219 notify); 220 221 mSetupTracksSuccessful = true; 222 } 223 224 ++index; 225 if (index < mSessionDesc->countTracks()) { 226 setupTrack(index); 227 } else if (mSetupTracksSuccessful) { 228 AString request = "PLAY "; 229 request.append(mSessionURL); 230 request.append(" RTSP/1.0\r\n"); 231 232 request.append("Session: "); 233 request.append(mSessionID); 234 request.append("\r\n"); 235 236 request.append("\r\n"); 237 238 sp<AMessage> reply = new AMessage('play', id()); 239 mConn->sendRequest(request.c_str(), reply); 240 } else { 241 sp<AMessage> reply = new AMessage('disc', id()); 242 mConn->disconnect(reply); 243 } 244 break; 245 } 246 247 case 'play': 248 { 249 int32_t result; 250 CHECK(msg->findInt32("result", &result)); 251 252 LOG(INFO) << "PLAY completed with result " 253 << result << " (" << strerror(-result) << ")"; 254 255 if (result == OK) { 256 sp<RefBase> obj; 257 CHECK(msg->findObject("response", &obj)); 258 sp<ARTSPResponse> response = 259 static_cast<ARTSPResponse *>(obj.get()); 260 261 CHECK_EQ(response->mStatusCode, 200u); 262 263 mDoneMsg->setInt32("result", OK); 264 mDoneMsg->post(); 265 mDoneMsg = NULL; 266 } else { 267 sp<AMessage> reply = new AMessage('disc', id()); 268 mConn->disconnect(reply); 269 } 270 271 break; 272 } 273 274 case 'abor': 275 { 276 for (size_t i = 0; i < mTracks.size(); ++i) { 277 mTracks.editItemAt(i).mPacketSource->signalEOS( 278 ERROR_END_OF_STREAM); 279 } 280 281 sp<AMessage> reply = new AMessage('tear', id()); 282 283 AString request; 284 request = "TEARDOWN "; 285 286 // XXX should use aggregate url from SDP here... 287 request.append(mSessionURL); 288 request.append(" RTSP/1.0\r\n"); 289 290 request.append("Session: "); 291 request.append(mSessionID); 292 request.append("\r\n"); 293 294 request.append("\r\n"); 295 296 mConn->sendRequest(request.c_str(), reply); 297 break; 298 } 299 300 case 'tear': 301 { 302 int32_t result; 303 CHECK(msg->findInt32("result", &result)); 304 305 LOG(INFO) << "TEARDOWN completed with result " 306 << result << " (" << strerror(-result) << ")"; 307 308 sp<AMessage> reply = new AMessage('disc', id()); 309 mConn->disconnect(reply); 310 break; 311 } 312 313 case 'quit': 314 { 315 if (mDoneMsg != NULL) { 316 mDoneMsg->setInt32("result", UNKNOWN_ERROR); 317 mDoneMsg->post(); 318 mDoneMsg = NULL; 319 } 320 break; 321 } 322 323 case 'accu': 324 { 325 size_t trackIndex; 326 CHECK(msg->findSize("track-index", &trackIndex)); 327 328 int32_t eos; 329 if (msg->findInt32("eos", &eos)) { 330 LOG(INFO) << "received BYE on track index " << trackIndex; 331#if 0 332 TrackInfo *track = &mTracks.editItemAt(trackIndex); 333 track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 334#endif 335 return; 336 } 337 338 sp<RefBase> obj; 339 CHECK(msg->findObject("access-unit", &obj)); 340 341 sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get()); 342 343 uint64_t ntpTime; 344 CHECK(accessUnit->meta()->findInt64( 345 "ntp-time", (int64_t *)&ntpTime)); 346 347 if (mFirstAccessUnit) { 348 mFirstAccessUnit = false; 349 mFirstAccessUnitNTP = ntpTime; 350 } 351 352 if (ntpTime >= mFirstAccessUnitNTP) { 353 ntpTime -= mFirstAccessUnitNTP; 354 } else { 355 ntpTime = 0; 356 } 357 358 int64_t timeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 359 360 accessUnit->meta()->setInt64("timeUs", timeUs); 361 362#if 0 363 int32_t damaged; 364 if (accessUnit->meta()->findInt32("damaged", &damaged) 365 && damaged != 0) { 366 LOG(INFO) << "ignoring damaged AU"; 367 } else 368#endif 369 { 370 TrackInfo *track = &mTracks.editItemAt(trackIndex); 371 track->mPacketSource->queueAccessUnit(accessUnit); 372 } 373 break; 374 } 375 376 case 'seek': 377 { 378 if (mSeekPending) { 379 break; 380 } 381 382 int64_t timeUs; 383 CHECK(msg->findInt64("time", &timeUs)); 384 385 mSeekPending = true; 386 387 AString request = "PAUSE "; 388 request.append(mSessionURL); 389 request.append(" RTSP/1.0\r\n"); 390 391 request.append("Session: "); 392 request.append(mSessionID); 393 request.append("\r\n"); 394 395 request.append("\r\n"); 396 397 sp<AMessage> reply = new AMessage('see1', id()); 398 reply->setInt64("time", timeUs); 399 mConn->sendRequest(request.c_str(), reply); 400 break; 401 } 402 403 case 'see1': 404 { 405 int64_t timeUs; 406 CHECK(msg->findInt64("time", &timeUs)); 407 408 AString request = "PLAY "; 409 request.append(mSessionURL); 410 request.append(" RTSP/1.0\r\n"); 411 412 request.append("Session: "); 413 request.append(mSessionID); 414 request.append("\r\n"); 415 416 request.append( 417 StringPrintf( 418 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 419 420 request.append("\r\n"); 421 422 sp<AMessage> reply = new AMessage('see2', id()); 423 mConn->sendRequest(request.c_str(), reply); 424 break; 425 } 426 427 case 'see2': 428 { 429 CHECK(mSeekPending); 430 431 LOG(INFO) << "seek completed."; 432 mSeekPending = false; 433 434 int32_t result; 435 CHECK(msg->findInt32("result", &result)); 436 if (result != OK) { 437 LOG(ERROR) << "seek FAILED"; 438 break; 439 } 440 441 sp<RefBase> obj; 442 CHECK(msg->findObject("response", &obj)); 443 sp<ARTSPResponse> response = 444 static_cast<ARTSPResponse *>(obj.get()); 445 446 CHECK_EQ(response->mStatusCode, 200u); 447 448 for (size_t i = 0; i < mTracks.size(); ++i) { 449 mTracks.editItemAt(i).mPacketSource->flushQueue(); 450 } 451 break; 452 } 453 454 default: 455 TRESPASS(); 456 break; 457 } 458 } 459 460 sp<APacketSource> getPacketSource(size_t index) { 461 CHECK_GE(index, 0u); 462 CHECK_LT(index, mTracks.size()); 463 464 return mTracks.editItemAt(index).mPacketSource; 465 } 466 467 size_t countTracks() const { 468 return mTracks.size(); 469 } 470 471private: 472 sp<ALooper> mLooper; 473 sp<ALooper> mNetLooper; 474 sp<ARTSPConnection> mConn; 475 sp<ARTPConnection> mRTPConn; 476 sp<ASessionDescription> mSessionDesc; 477 AString mSessionURL; 478 AString mBaseURL; 479 AString mSessionID; 480 bool mSetupTracksSuccessful; 481 bool mSeekPending; 482 bool mFirstAccessUnit; 483 uint64_t mFirstAccessUnitNTP; 484 485 struct TrackInfo { 486 int mRTPSocket; 487 int mRTCPSocket; 488 489 sp<APacketSource> mPacketSource; 490 }; 491 Vector<TrackInfo> mTracks; 492 493 sp<AMessage> mDoneMsg; 494 495 void setupTrack(size_t index) { 496 sp<APacketSource> source = 497 new APacketSource(mSessionDesc, index); 498 if (source->initCheck() != OK) { 499 LOG(WARNING) << "Unsupported format. Ignoring track #" 500 << index << "."; 501 502 sp<AMessage> reply = new AMessage('setu', id()); 503 reply->setSize("index", index); 504 reply->setInt32("result", ERROR_UNSUPPORTED); 505 reply->post(); 506 return; 507 } 508 509 AString url; 510 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 511 512 AString trackURL; 513 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 514 515 mTracks.push(TrackInfo()); 516 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 517 info->mPacketSource = source; 518 519 unsigned rtpPort; 520 ARTPConnection::MakePortPair( 521 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 522 523 AString request = "SETUP "; 524 request.append(trackURL); 525 request.append(" RTSP/1.0\r\n"); 526 527 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 528 request.append(rtpPort); 529 request.append("-"); 530 request.append(rtpPort + 1); 531 request.append("\r\n"); 532 533 if (index > 1) { 534 request.append("Session: "); 535 request.append(mSessionID); 536 request.append("\r\n"); 537 } 538 539 request.append("\r\n"); 540 541 sp<AMessage> reply = new AMessage('setu', id()); 542 reply->setSize("index", index); 543 reply->setSize("track-index", mTracks.size() - 1); 544 mConn->sendRequest(request.c_str(), reply); 545 } 546 547 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 548 out->clear(); 549 550 if (strncasecmp("rtsp://", baseURL, 7)) { 551 // Base URL must be absolute 552 return false; 553 } 554 555 if (!strncasecmp("rtsp://", url, 7)) { 556 // "url" is already an absolute URL, ignore base URL. 557 out->setTo(url); 558 return true; 559 } 560 561 size_t n = strlen(baseURL); 562 if (baseURL[n - 1] == '/') { 563 out->setTo(baseURL); 564 out->append(url); 565 } else { 566 char *slashPos = strrchr(baseURL, '/'); 567 568 if (slashPos > &baseURL[6]) { 569 out->setTo(baseURL, slashPos - baseURL); 570 } else { 571 out->setTo(baseURL); 572 } 573 574 out->append("/"); 575 out->append(url); 576 } 577 578 return true; 579 } 580 581 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 582}; 583 584} // namespace android 585 586#endif // MY_HANDLER_H_ 587