MyHandler.h revision 437ab8c4b66a6c9dc47faa257df90089ebef10a9
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef MY_HANDLER_H_ 18 19#define MY_HANDLER_H_ 20 21#include "APacketSource.h" 22#include "ARTPConnection.h" 23#include "ARTSPConnection.h" 24#include "ASessionDescription.h" 25 26#include <media/stagefright/foundation/ABuffer.h> 27#include <media/stagefright/foundation/ADebug.h> 28#include <media/stagefright/foundation/ALooper.h> 29#include <media/stagefright/foundation/AMessage.h> 30#include <media/stagefright/MediaErrors.h> 31 32#define USE_TCP_INTERLEAVED 0 33 34namespace android { 35 36struct MyHandler : public AHandler { 37 MyHandler(const char *url, const sp<ALooper> &looper) 38 : mLooper(looper), 39 mNetLooper(new ALooper), 40 mConn(new ARTSPConnection), 41 mRTPConn(new ARTPConnection), 42 mSessionURL(url), 43 mSetupTracksSuccessful(false), 44 mSeekPending(false), 45 mFirstAccessUnit(true), 46 mFirstAccessUnitNTP(0) { 47 48 mNetLooper->start(false /* runOnCallingThread */, 49 false /* canCallJava */, 50 PRIORITY_HIGHEST); 51 } 52 53 void connect(const sp<AMessage> &doneMsg) { 54 mDoneMsg = doneMsg; 55 56 mLooper->registerHandler(this); 57 mLooper->registerHandler(mConn); 58 (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn); 59 60 sp<AMessage> notify = new AMessage('biny', id()); 61 mConn->observeBinaryData(notify); 62 63 sp<AMessage> reply = new AMessage('conn', id()); 64 mConn->connect(mSessionURL.c_str(), reply); 65 } 66 67 void disconnect(const sp<AMessage> &doneMsg) { 68 mDoneMsg = doneMsg; 69 70 (new AMessage('abor', id()))->post(); 71 } 72 73 void seek(int64_t timeUs) { 74 sp<AMessage> msg = new AMessage('seek', id()); 75 msg->setInt64("time", timeUs); 76 msg->post(); 77 } 78 79 virtual void onMessageReceived(const sp<AMessage> &msg) { 80 switch (msg->what()) { 81 case 'conn': 82 { 83 int32_t result; 84 CHECK(msg->findInt32("result", &result)); 85 86 LOG(INFO) << "connection request completed with result " 87 << result << " (" << strerror(-result) << ")"; 88 89 if (result == OK) { 90 AString request; 91 request = "DESCRIBE "; 92 request.append(mSessionURL); 93 request.append(" RTSP/1.0\r\n"); 94 request.append("Accept: application/sdp\r\n"); 95 request.append("\r\n"); 96 97 sp<AMessage> reply = new AMessage('desc', id()); 98 mConn->sendRequest(request.c_str(), reply); 99 } else { 100 (new AMessage('disc', id()))->post(); 101 } 102 break; 103 } 104 105 case 'disc': 106 { 107 (new AMessage('quit', id()))->post(); 108 break; 109 } 110 111 case 'desc': 112 { 113 int32_t result; 114 CHECK(msg->findInt32("result", &result)); 115 116 LOG(INFO) << "DESCRIBE completed with result " 117 << result << " (" << strerror(-result) << ")"; 118 119 if (result == OK) { 120 sp<RefBase> obj; 121 CHECK(msg->findObject("response", &obj)); 122 sp<ARTSPResponse> response = 123 static_cast<ARTSPResponse *>(obj.get()); 124 125 if (response->mStatusCode == 302) { 126 ssize_t i = response->mHeaders.indexOfKey("location"); 127 CHECK_GE(i, 0); 128 129 mSessionURL = response->mHeaders.valueAt(i); 130 131 AString request; 132 request = "DESCRIBE "; 133 request.append(mSessionURL); 134 request.append(" RTSP/1.0\r\n"); 135 request.append("Accept: application/sdp\r\n"); 136 request.append("\r\n"); 137 138 sp<AMessage> reply = new AMessage('desc', id()); 139 mConn->sendRequest(request.c_str(), reply); 140 break; 141 } 142 143 CHECK_EQ(response->mStatusCode, 200u); 144 145 mSessionDesc = new ASessionDescription; 146 147 mSessionDesc->setTo( 148 response->mContent->data(), 149 response->mContent->size()); 150 151 CHECK(mSessionDesc->isValid()); 152 153 ssize_t i = response->mHeaders.indexOfKey("content-base"); 154 if (i >= 0) { 155 mBaseURL = response->mHeaders.valueAt(i); 156 } else { 157 i = response->mHeaders.indexOfKey("content-location"); 158 if (i >= 0) { 159 mBaseURL = response->mHeaders.valueAt(i); 160 } else { 161 mBaseURL = mSessionURL; 162 } 163 } 164 165 CHECK_GT(mSessionDesc->countTracks(), 1u); 166 setupTrack(1); 167 } else { 168 sp<AMessage> reply = new AMessage('disc', id()); 169 mConn->disconnect(reply); 170 } 171 break; 172 } 173 174 case 'setu': 175 { 176 size_t index; 177 CHECK(msg->findSize("index", &index)); 178 179 TrackInfo *track = NULL; 180 size_t trackIndex; 181 if (msg->findSize("track-index", &trackIndex)) { 182 track = &mTracks.editItemAt(trackIndex); 183 } 184 185 int32_t result; 186 CHECK(msg->findInt32("result", &result)); 187 188 LOG(INFO) << "SETUP(" << index << ") completed with result " 189 << result << " (" << strerror(-result) << ")"; 190 191 if (result != OK) { 192 if (track) { 193 if (!track->mUsingInterleavedTCP) { 194 close(track->mRTPSocket); 195 close(track->mRTCPSocket); 196 } 197 198 mTracks.removeItemsAt(trackIndex); 199 } 200 } else { 201 CHECK(track != NULL); 202 203 sp<RefBase> obj; 204 CHECK(msg->findObject("response", &obj)); 205 sp<ARTSPResponse> response = 206 static_cast<ARTSPResponse *>(obj.get()); 207 208 CHECK_EQ(response->mStatusCode, 200u); 209 210 ssize_t i = response->mHeaders.indexOfKey("session"); 211 CHECK_GE(i, 0); 212 213 if (index == 1) { 214 mSessionID = response->mHeaders.valueAt(i); 215 i = mSessionID.find(";"); 216 if (i >= 0) { 217 // Remove options, i.e. ";timeout=90" 218 mSessionID.erase(i, mSessionID.size() - i); 219 } 220 } 221 222 sp<AMessage> notify = new AMessage('accu', id()); 223 notify->setSize("track-index", trackIndex); 224 225 mRTPConn->addStream( 226 track->mRTPSocket, track->mRTCPSocket, 227 mSessionDesc, index, 228 notify, track->mUsingInterleavedTCP); 229 230 mSetupTracksSuccessful = true; 231 } 232 233 ++index; 234 if (index < mSessionDesc->countTracks()) { 235 setupTrack(index); 236 } else if (mSetupTracksSuccessful) { 237 AString request = "PLAY "; 238 request.append(mSessionURL); 239 request.append(" RTSP/1.0\r\n"); 240 241 request.append("Session: "); 242 request.append(mSessionID); 243 request.append("\r\n"); 244 245 request.append("\r\n"); 246 247 sp<AMessage> reply = new AMessage('play', id()); 248 mConn->sendRequest(request.c_str(), reply); 249 } else { 250 sp<AMessage> reply = new AMessage('disc', id()); 251 mConn->disconnect(reply); 252 } 253 break; 254 } 255 256 case 'play': 257 { 258 int32_t result; 259 CHECK(msg->findInt32("result", &result)); 260 261 LOG(INFO) << "PLAY completed with result " 262 << result << " (" << strerror(-result) << ")"; 263 264 if (result == OK) { 265 sp<RefBase> obj; 266 CHECK(msg->findObject("response", &obj)); 267 sp<ARTSPResponse> response = 268 static_cast<ARTSPResponse *>(obj.get()); 269 270 CHECK_EQ(response->mStatusCode, 200u); 271 272 mDoneMsg->setInt32("result", OK); 273 mDoneMsg->post(); 274 mDoneMsg = NULL; 275 276 sp<AMessage> timeout = new AMessage('tiou', id()); 277 timeout->post(10000000ll); 278 } else { 279 sp<AMessage> reply = new AMessage('disc', id()); 280 mConn->disconnect(reply); 281 } 282 283 break; 284 } 285 286 case 'abor': 287 { 288 for (size_t i = 0; i < mTracks.size(); ++i) { 289 mTracks.editItemAt(i).mPacketSource->signalEOS( 290 ERROR_END_OF_STREAM); 291 } 292 293 sp<AMessage> reply = new AMessage('tear', id()); 294 295 AString request; 296 request = "TEARDOWN "; 297 298 // XXX should use aggregate url from SDP here... 299 request.append(mSessionURL); 300 request.append(" RTSP/1.0\r\n"); 301 302 request.append("Session: "); 303 request.append(mSessionID); 304 request.append("\r\n"); 305 306 request.append("\r\n"); 307 308 mConn->sendRequest(request.c_str(), reply); 309 break; 310 } 311 312 case 'tear': 313 { 314 int32_t result; 315 CHECK(msg->findInt32("result", &result)); 316 317 LOG(INFO) << "TEARDOWN completed with result " 318 << result << " (" << strerror(-result) << ")"; 319 320 sp<AMessage> reply = new AMessage('disc', id()); 321 mConn->disconnect(reply); 322 break; 323 } 324 325 case 'quit': 326 { 327 if (mDoneMsg != NULL) { 328 mDoneMsg->setInt32("result", UNKNOWN_ERROR); 329 mDoneMsg->post(); 330 mDoneMsg = NULL; 331 } 332 break; 333 } 334 335 case 'accu': 336 { 337 size_t trackIndex; 338 CHECK(msg->findSize("track-index", &trackIndex)); 339 340 int32_t eos; 341 if (msg->findInt32("eos", &eos)) { 342 LOG(INFO) << "received BYE on track index " << trackIndex; 343#if 0 344 TrackInfo *track = &mTracks.editItemAt(trackIndex); 345 track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 346#endif 347 return; 348 } 349 350 sp<RefBase> obj; 351 CHECK(msg->findObject("access-unit", &obj)); 352 353 sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get()); 354 355 uint64_t ntpTime; 356 CHECK(accessUnit->meta()->findInt64( 357 "ntp-time", (int64_t *)&ntpTime)); 358 359 if (mFirstAccessUnit) { 360 mFirstAccessUnit = false; 361 mFirstAccessUnitNTP = ntpTime; 362 } 363 364 if (ntpTime >= mFirstAccessUnitNTP) { 365 ntpTime -= mFirstAccessUnitNTP; 366 } else { 367 ntpTime = 0; 368 } 369 370 int64_t timeUs = (int64_t)(ntpTime * 1E6 / (1ll << 32)); 371 372 accessUnit->meta()->setInt64("timeUs", timeUs); 373 374#if 0 375 int32_t damaged; 376 if (accessUnit->meta()->findInt32("damaged", &damaged) 377 && damaged != 0) { 378 LOG(INFO) << "ignoring damaged AU"; 379 } else 380#endif 381 { 382 TrackInfo *track = &mTracks.editItemAt(trackIndex); 383 track->mPacketSource->queueAccessUnit(accessUnit); 384 } 385 break; 386 } 387 388 case 'seek': 389 { 390 if (mSeekPending) { 391 break; 392 } 393 394 int64_t timeUs; 395 CHECK(msg->findInt64("time", &timeUs)); 396 397 mSeekPending = true; 398 399 AString request = "PAUSE "; 400 request.append(mSessionURL); 401 request.append(" RTSP/1.0\r\n"); 402 403 request.append("Session: "); 404 request.append(mSessionID); 405 request.append("\r\n"); 406 407 request.append("\r\n"); 408 409 sp<AMessage> reply = new AMessage('see1', id()); 410 reply->setInt64("time", timeUs); 411 mConn->sendRequest(request.c_str(), reply); 412 break; 413 } 414 415 case 'see1': 416 { 417 int64_t timeUs; 418 CHECK(msg->findInt64("time", &timeUs)); 419 420 AString request = "PLAY "; 421 request.append(mSessionURL); 422 request.append(" RTSP/1.0\r\n"); 423 424 request.append("Session: "); 425 request.append(mSessionID); 426 request.append("\r\n"); 427 428 request.append( 429 StringPrintf( 430 "Range: npt=%lld-\r\n", timeUs / 1000000ll)); 431 432 request.append("\r\n"); 433 434 sp<AMessage> reply = new AMessage('see2', id()); 435 mConn->sendRequest(request.c_str(), reply); 436 break; 437 } 438 439 case 'see2': 440 { 441 CHECK(mSeekPending); 442 443 LOG(INFO) << "seek completed."; 444 mSeekPending = false; 445 446 int32_t result; 447 CHECK(msg->findInt32("result", &result)); 448 if (result != OK) { 449 LOG(ERROR) << "seek FAILED"; 450 break; 451 } 452 453 sp<RefBase> obj; 454 CHECK(msg->findObject("response", &obj)); 455 sp<ARTSPResponse> response = 456 static_cast<ARTSPResponse *>(obj.get()); 457 458 CHECK_EQ(response->mStatusCode, 200u); 459 460 for (size_t i = 0; i < mTracks.size(); ++i) { 461 mTracks.editItemAt(i).mPacketSource->flushQueue(); 462 } 463 break; 464 } 465 466 case 'biny': 467 { 468 sp<RefBase> obj; 469 CHECK(msg->findObject("buffer", &obj)); 470 sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get()); 471 472 int32_t index; 473 CHECK(buffer->meta()->findInt32("index", &index)); 474 475 mRTPConn->injectPacket(index, buffer); 476 break; 477 } 478 479 case 'tiou': 480 { 481 if (mFirstAccessUnit) { 482 LOG(WARNING) << "Never received any data, disconnecting."; 483 (new AMessage('abor', id()))->post(); 484 } 485 break; 486 } 487 488 default: 489 TRESPASS(); 490 break; 491 } 492 } 493 494 sp<APacketSource> getPacketSource(size_t index) { 495 CHECK_GE(index, 0u); 496 CHECK_LT(index, mTracks.size()); 497 498 return mTracks.editItemAt(index).mPacketSource; 499 } 500 501 size_t countTracks() const { 502 return mTracks.size(); 503 } 504 505private: 506 sp<ALooper> mLooper; 507 sp<ALooper> mNetLooper; 508 sp<ARTSPConnection> mConn; 509 sp<ARTPConnection> mRTPConn; 510 sp<ASessionDescription> mSessionDesc; 511 AString mSessionURL; 512 AString mBaseURL; 513 AString mSessionID; 514 bool mSetupTracksSuccessful; 515 bool mSeekPending; 516 bool mFirstAccessUnit; 517 uint64_t mFirstAccessUnitNTP; 518 519 struct TrackInfo { 520 int mRTPSocket; 521 int mRTCPSocket; 522 bool mUsingInterleavedTCP; 523 524 sp<APacketSource> mPacketSource; 525 }; 526 Vector<TrackInfo> mTracks; 527 528 sp<AMessage> mDoneMsg; 529 530 void setupTrack(size_t index) { 531 sp<APacketSource> source = 532 new APacketSource(mSessionDesc, index); 533 if (source->initCheck() != OK) { 534 LOG(WARNING) << "Unsupported format. Ignoring track #" 535 << index << "."; 536 537 sp<AMessage> reply = new AMessage('setu', id()); 538 reply->setSize("index", index); 539 reply->setInt32("result", ERROR_UNSUPPORTED); 540 reply->post(); 541 return; 542 } 543 544 AString url; 545 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 546 547 AString trackURL; 548 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 549 550 mTracks.push(TrackInfo()); 551 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 552 info->mPacketSource = source; 553 info->mUsingInterleavedTCP = false; 554 555 AString request = "SETUP "; 556 request.append(trackURL); 557 request.append(" RTSP/1.0\r\n"); 558 559#if USE_TCP_INTERLEAVED 560 size_t interleaveIndex = 2 * (mTracks.size() - 1); 561 info->mUsingInterleavedTCP = true; 562 info->mRTPSocket = interleaveIndex; 563 info->mRTCPSocket = interleaveIndex + 1; 564 565 request.append("Transport: RTP/AVP/TCP;interleaved="); 566 request.append(interleaveIndex); 567 request.append("-"); 568 request.append(interleaveIndex + 1); 569#else 570 unsigned rtpPort; 571 ARTPConnection::MakePortPair( 572 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 573 574 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 575 request.append(rtpPort); 576 request.append("-"); 577 request.append(rtpPort + 1); 578#endif 579 580 request.append("\r\n"); 581 582 if (index > 1) { 583 request.append("Session: "); 584 request.append(mSessionID); 585 request.append("\r\n"); 586 } 587 588 request.append("\r\n"); 589 590 sp<AMessage> reply = new AMessage('setu', id()); 591 reply->setSize("index", index); 592 reply->setSize("track-index", mTracks.size() - 1); 593 mConn->sendRequest(request.c_str(), reply); 594 } 595 596 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 597 out->clear(); 598 599 if (strncasecmp("rtsp://", baseURL, 7)) { 600 // Base URL must be absolute 601 return false; 602 } 603 604 if (!strncasecmp("rtsp://", url, 7)) { 605 // "url" is already an absolute URL, ignore base URL. 606 out->setTo(url); 607 return true; 608 } 609 610 size_t n = strlen(baseURL); 611 if (baseURL[n - 1] == '/') { 612 out->setTo(baseURL); 613 out->append(url); 614 } else { 615 const char *slashPos = strrchr(baseURL, '/'); 616 617 if (slashPos > &baseURL[6]) { 618 out->setTo(baseURL, slashPos - baseURL); 619 } else { 620 out->setTo(baseURL); 621 } 622 623 out->append("/"); 624 out->append(url); 625 } 626 627 return true; 628 } 629 630 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 631}; 632 633} // namespace android 634 635#endif // MY_HANDLER_H_ 636