MyHandler.h revision 8370be11debc574b4a9fee62009009d999e29fa3
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef MY_HANDLER_H_ 18 19#define MY_HANDLER_H_ 20 21#include "APacketSource.h" 22#include "ARTPConnection.h" 23#include "ARTSPConnection.h" 24#include "ASessionDescription.h" 25 26#include <media/stagefright/foundation/ABuffer.h> 27#include <media/stagefright/foundation/ADebug.h> 28#include <media/stagefright/foundation/ALooper.h> 29#include <media/stagefright/foundation/AMessage.h> 30#include <media/stagefright/MediaErrors.h> 31 32namespace android { 33 34struct MyHandler : public AHandler { 35 MyHandler(const char *url, const sp<ALooper> &looper) 36 : mLooper(looper), 37 mNetLooper(new ALooper), 38 mConn(new ARTSPConnection), 39 mRTPConn(new ARTPConnection), 40 mSessionURL(url), 41 mSetupTracksSuccessful(false) { 42 43 mNetLooper->start(false /* runOnCallingThread */, 44 false /* canCallJava */, 45 PRIORITY_HIGHEST); 46 } 47 48 void connect(const sp<AMessage> &doneMsg) { 49 mDoneMsg = doneMsg; 50 51 mLooper->registerHandler(this); 52 mLooper->registerHandler(mConn); 53 (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn); 54 55 sp<AMessage> reply = new AMessage('conn', id()); 56 mConn->connect(mSessionURL.c_str(), reply); 57 } 58 59 void disconnect(const sp<AMessage> &doneMsg) { 60 mDoneMsg = doneMsg; 61 62 (new AMessage('abor', id()))->post(); 63 } 64 65 virtual void onMessageReceived(const sp<AMessage> &msg) { 66 switch (msg->what()) { 67 case 'conn': 68 { 69 int32_t result; 70 CHECK(msg->findInt32("result", &result)); 71 72 LOG(INFO) << "connection request completed with result " 73 << result << " (" << strerror(-result) << ")"; 74 75 if (result == OK) { 76 AString request; 77 request = "DESCRIBE "; 78 request.append(mSessionURL); 79 request.append(" RTSP/1.0\r\n"); 80 request.append("Accept: application/sdp\r\n"); 81 request.append("\r\n"); 82 83 sp<AMessage> reply = new AMessage('desc', id()); 84 mConn->sendRequest(request.c_str(), reply); 85 } 86 break; 87 } 88 89 case 'disc': 90 { 91 LOG(INFO) << "disconnect completed"; 92 93 (new AMessage('quit', id()))->post(); 94 break; 95 } 96 97 case 'desc': 98 { 99 int32_t result; 100 CHECK(msg->findInt32("result", &result)); 101 102 LOG(INFO) << "DESCRIBE completed with result " 103 << result << " (" << strerror(-result) << ")"; 104 105 if (result == OK) { 106 sp<RefBase> obj; 107 CHECK(msg->findObject("response", &obj)); 108 sp<ARTSPResponse> response = 109 static_cast<ARTSPResponse *>(obj.get()); 110 111 if (response->mStatusCode == 302) { 112 ssize_t i = response->mHeaders.indexOfKey("location"); 113 CHECK_GE(i, 0); 114 115 mSessionURL = response->mHeaders.valueAt(i); 116 117 AString request; 118 request = "DESCRIBE "; 119 request.append(mSessionURL); 120 request.append(" RTSP/1.0\r\n"); 121 request.append("Accept: application/sdp\r\n"); 122 request.append("\r\n"); 123 124 sp<AMessage> reply = new AMessage('desc', id()); 125 mConn->sendRequest(request.c_str(), reply); 126 break; 127 } 128 129 CHECK_EQ(response->mStatusCode, 200u); 130 131 mSessionDesc = new ASessionDescription; 132 133 mSessionDesc->setTo( 134 response->mContent->data(), 135 response->mContent->size()); 136 137 CHECK(mSessionDesc->isValid()); 138 139 ssize_t i = response->mHeaders.indexOfKey("content-base"); 140 if (i >= 0) { 141 mBaseURL = response->mHeaders.valueAt(i); 142 } else { 143 i = response->mHeaders.indexOfKey("content-location"); 144 if (i >= 0) { 145 mBaseURL = response->mHeaders.valueAt(i); 146 } else { 147 mBaseURL = mSessionURL; 148 } 149 } 150 151 CHECK_GT(mSessionDesc->countTracks(), 1u); 152 setupTrack(1); 153 } else { 154 sp<AMessage> reply = new AMessage('disc', id()); 155 mConn->disconnect(reply); 156 } 157 break; 158 } 159 160 case 'setu': 161 { 162 size_t index; 163 CHECK(msg->findSize("index", &index)); 164 165 TrackInfo *track = NULL; 166 size_t trackIndex; 167 if (msg->findSize("track-index", &trackIndex)) { 168 track = &mTracks.editItemAt(trackIndex); 169 } 170 171 int32_t result; 172 CHECK(msg->findInt32("result", &result)); 173 174 LOG(INFO) << "SETUP(" << index << ") completed with result " 175 << result << " (" << strerror(-result) << ")"; 176 177 if (result != OK) { 178 if (track) { 179 close(track->mRTPSocket); 180 close(track->mRTCPSocket); 181 182 mTracks.removeItemsAt(trackIndex); 183 } 184 } else { 185 CHECK(track != NULL); 186 187 sp<RefBase> obj; 188 CHECK(msg->findObject("response", &obj)); 189 sp<ARTSPResponse> response = 190 static_cast<ARTSPResponse *>(obj.get()); 191 192 CHECK_EQ(response->mStatusCode, 200u); 193 194 ssize_t i = response->mHeaders.indexOfKey("session"); 195 CHECK_GE(i, 0); 196 197 if (index == 1) { 198 mSessionID = response->mHeaders.valueAt(i); 199 i = mSessionID.find(";"); 200 if (i >= 0) { 201 // Remove options, i.e. ";timeout=90" 202 mSessionID.erase(i, mSessionID.size() - i); 203 } 204 } 205 206 sp<AMessage> notify = new AMessage('accu', id()); 207 notify->setSize("track-index", trackIndex); 208 209 mRTPConn->addStream( 210 track->mRTPSocket, track->mRTCPSocket, 211 mSessionDesc, index, 212 notify); 213 214 mSetupTracksSuccessful = true; 215 } 216 217 ++index; 218 if (index < mSessionDesc->countTracks()) { 219 setupTrack(index); 220 } else if (mSetupTracksSuccessful) { 221 AString request = "PLAY "; 222 request.append(mSessionURL); 223 request.append(" RTSP/1.0\r\n"); 224 225 request.append("Session: "); 226 request.append(mSessionID); 227 request.append("\r\n"); 228 229 request.append("\r\n"); 230 231 sp<AMessage> reply = new AMessage('play', id()); 232 mConn->sendRequest(request.c_str(), reply); 233 } else { 234 sp<AMessage> reply = new AMessage('disc', id()); 235 mConn->disconnect(reply); 236 } 237 break; 238 } 239 240 case 'play': 241 { 242 int32_t result; 243 CHECK(msg->findInt32("result", &result)); 244 245 LOG(INFO) << "PLAY completed with result " 246 << result << " (" << strerror(-result) << ")"; 247 248 if (result == OK) { 249 sp<RefBase> obj; 250 CHECK(msg->findObject("response", &obj)); 251 sp<ARTSPResponse> response = 252 static_cast<ARTSPResponse *>(obj.get()); 253 254 CHECK_EQ(response->mStatusCode, 200u); 255 256 mDoneMsg->setInt32("result", OK); 257 mDoneMsg->post(); 258 mDoneMsg = NULL; 259 } else { 260 sp<AMessage> reply = new AMessage('disc', id()); 261 mConn->disconnect(reply); 262 } 263 264 break; 265 } 266 267 case 'abor': 268 { 269 for (size_t i = 0; i < mTracks.size(); ++i) { 270 mTracks.editItemAt(i).mPacketSource->signalEOS( 271 ERROR_END_OF_STREAM); 272 } 273 274 sp<AMessage> reply = new AMessage('tear', id()); 275 276 AString request; 277 request = "TEARDOWN "; 278 279 // XXX should use aggregate url from SDP here... 280 request.append(mSessionURL); 281 request.append(" RTSP/1.0\r\n"); 282 283 request.append("Session: "); 284 request.append(mSessionID); 285 request.append("\r\n"); 286 287 request.append("\r\n"); 288 289 mConn->sendRequest(request.c_str(), reply); 290 break; 291 } 292 293 case 'tear': 294 { 295 int32_t result; 296 CHECK(msg->findInt32("result", &result)); 297 298 LOG(INFO) << "TEARDOWN completed with result " 299 << result << " (" << strerror(-result) << ")"; 300 301 sp<AMessage> reply = new AMessage('disc', id()); 302 mConn->disconnect(reply); 303 break; 304 } 305 306 case 'quit': 307 { 308 if (mDoneMsg != NULL) { 309 mDoneMsg->setInt32("result", UNKNOWN_ERROR); 310 mDoneMsg->post(); 311 mDoneMsg = NULL; 312 } 313 break; 314 } 315 316 case 'accu': 317 { 318 size_t trackIndex; 319 CHECK(msg->findSize("track-index", &trackIndex)); 320 321 int32_t eos; 322 if (msg->findInt32("eos", &eos)) { 323 LOG(INFO) << "received BYE on track index " << trackIndex; 324#if 0 325 TrackInfo *track = &mTracks.editItemAt(trackIndex); 326 track->mPacketSource->signalEOS(ERROR_END_OF_STREAM); 327#endif 328 return; 329 } 330 331 sp<RefBase> obj; 332 CHECK(msg->findObject("access-unit", &obj)); 333 334 sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get()); 335 336 uint64_t ntpTime; 337 CHECK(accessUnit->meta()->findInt64( 338 "ntp-time", (int64_t *)&ntpTime)); 339 340 accessUnit->meta()->setInt64("ntp-time", ntpTime); 341 342#if 0 343 int32_t damaged; 344 if (accessUnit->meta()->findInt32("damaged", &damaged) 345 && damaged != 0) { 346 LOG(INFO) << "ignoring damaged AU"; 347 } else 348#endif 349 { 350 TrackInfo *track = &mTracks.editItemAt(trackIndex); 351 track->mPacketSource->queueAccessUnit(accessUnit); 352 } 353 break; 354 } 355 356 default: 357 TRESPASS(); 358 break; 359 } 360 } 361 362 sp<APacketSource> getPacketSource(size_t index) { 363 CHECK_GE(index, 0u); 364 CHECK_LT(index, mTracks.size()); 365 366 return mTracks.editItemAt(index).mPacketSource; 367 } 368 369 size_t countTracks() const { 370 return mTracks.size(); 371 } 372 373private: 374 sp<ALooper> mLooper; 375 sp<ALooper> mNetLooper; 376 sp<ARTSPConnection> mConn; 377 sp<ARTPConnection> mRTPConn; 378 sp<ASessionDescription> mSessionDesc; 379 AString mSessionURL; 380 AString mBaseURL; 381 AString mSessionID; 382 bool mSetupTracksSuccessful; 383 384 struct TrackInfo { 385 int mRTPSocket; 386 int mRTCPSocket; 387 388 sp<APacketSource> mPacketSource; 389 }; 390 Vector<TrackInfo> mTracks; 391 392 sp<AMessage> mDoneMsg; 393 394 void setupTrack(size_t index) { 395 sp<APacketSource> source = 396 new APacketSource(mSessionDesc, index); 397 if (source->initCheck() != OK) { 398 LOG(WARNING) << "Unsupported format. Ignoring track #" 399 << index << "."; 400 401 sp<AMessage> reply = new AMessage('setu', id()); 402 reply->setSize("index", index); 403 reply->setInt32("result", ERROR_UNSUPPORTED); 404 reply->post(); 405 return; 406 } 407 408 AString url; 409 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 410 411 AString trackURL; 412 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 413 414 mTracks.push(TrackInfo()); 415 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 416 info->mPacketSource = source; 417 418 unsigned rtpPort; 419 ARTPConnection::MakePortPair( 420 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 421 422 AString request = "SETUP "; 423 request.append(trackURL); 424 request.append(" RTSP/1.0\r\n"); 425 426 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 427 request.append(rtpPort); 428 request.append("-"); 429 request.append(rtpPort + 1); 430 request.append("\r\n"); 431 432 if (index > 1) { 433 request.append("Session: "); 434 request.append(mSessionID); 435 request.append("\r\n"); 436 } 437 438 request.append("\r\n"); 439 440 sp<AMessage> reply = new AMessage('setu', id()); 441 reply->setSize("index", index); 442 reply->setSize("track-index", mTracks.size() - 1); 443 mConn->sendRequest(request.c_str(), reply); 444 } 445 446 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 447 out->clear(); 448 449 if (strncasecmp("rtsp://", baseURL, 7)) { 450 // Base URL must be absolute 451 return false; 452 } 453 454 if (!strncasecmp("rtsp://", url, 7)) { 455 // "url" is already an absolute URL, ignore base URL. 456 out->setTo(url); 457 return true; 458 } 459 460 size_t n = strlen(baseURL); 461 if (baseURL[n - 1] == '/') { 462 out->setTo(baseURL); 463 out->append(url); 464 } else { 465 char *slashPos = strrchr(baseURL, '/'); 466 467 if (slashPos > &baseURL[6]) { 468 out->setTo(baseURL, slashPos - baseURL); 469 } else { 470 out->setTo(baseURL); 471 } 472 473 out->append("/"); 474 out->append(url); 475 } 476 477 return true; 478 } 479 480 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 481}; 482 483} // namespace android 484 485#endif // MY_HANDLER_H_ 486