MyHandler.h revision 39ddf8e0f18766f7ba1e3246b774aa6ebd93eea8
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef MY_HANDLER_H_ 18 19#define MY_HANDLER_H_ 20 21#include "APacketSource.h" 22#include "ARTPConnection.h" 23#include "ARTSPConnection.h" 24#include "ASessionDescription.h" 25 26#include <media/stagefright/foundation/ABuffer.h> 27#include <media/stagefright/foundation/ADebug.h> 28#include <media/stagefright/foundation/ALooper.h> 29#include <media/stagefright/foundation/AMessage.h> 30#include <media/stagefright/MediaErrors.h> 31 32namespace android { 33 34struct MyHandler : public AHandler { 35 MyHandler(const char *url, const sp<ALooper> &looper) 36 : mLooper(looper), 37 mNetLooper(new ALooper), 38 mConn(new ARTSPConnection), 39 mRTPConn(new ARTPConnection), 40 mSessionURL(url), 41 mSetupTracksSuccessful(false) { 42 43 mNetLooper->start(false /* runOnCallingThread */, 44 false /* canCallJava */, 45 PRIORITY_HIGHEST); 46 } 47 48 void connect() { 49 mLooper->registerHandler(this); 50 mLooper->registerHandler(mConn); 51 (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn); 52 sp<AMessage> reply = new AMessage('conn', id()); 53 54 mConn->connect(mSessionURL.c_str(), reply); 55 } 56 57 void disconnect() { 58 sp<AMessage> reply = new AMessage('disc', id()); 59 mConn->disconnect(reply); 60 } 61 62 virtual void onMessageReceived(const sp<AMessage> &msg) { 63 switch (msg->what()) { 64 case 'conn': 65 { 66 int32_t result; 67 CHECK(msg->findInt32("result", &result)); 68 69 LOG(INFO) << "connection request completed with result " 70 << result << " (" << strerror(-result) << ")"; 71 72 if (result == OK) { 73 AString request; 74 request = "DESCRIBE "; 75 request.append(mSessionURL); 76 request.append(" RTSP/1.0\r\n"); 77 request.append("Accept: application/sdp\r\n"); 78 request.append("\r\n"); 79 80 sp<AMessage> reply = new AMessage('desc', id()); 81 mConn->sendRequest(request.c_str(), reply); 82 } 83 break; 84 } 85 86 case 'disc': 87 { 88 LOG(INFO) << "disconnect completed"; 89 90 (new AMessage('quit', id()))->post(); 91 break; 92 } 93 94 case 'desc': 95 { 96 int32_t result; 97 CHECK(msg->findInt32("result", &result)); 98 99 LOG(INFO) << "DESCRIBE completed with result " 100 << result << " (" << strerror(-result) << ")"; 101 102 if (result == OK) { 103 sp<RefBase> obj; 104 CHECK(msg->findObject("response", &obj)); 105 sp<ARTSPResponse> response = 106 static_cast<ARTSPResponse *>(obj.get()); 107 108 if (response->mStatusCode == 302) { 109 ssize_t i = response->mHeaders.indexOfKey("location"); 110 CHECK_GE(i, 0); 111 112 mSessionURL = response->mHeaders.valueAt(i); 113 114 AString request; 115 request = "DESCRIBE "; 116 request.append(mSessionURL); 117 request.append(" RTSP/1.0\r\n"); 118 request.append("Accept: application/sdp\r\n"); 119 request.append("\r\n"); 120 121 sp<AMessage> reply = new AMessage('desc', id()); 122 mConn->sendRequest(request.c_str(), reply); 123 break; 124 } 125 126 CHECK_EQ(response->mStatusCode, 200u); 127 128 mSessionDesc = new ASessionDescription; 129 130 mSessionDesc->setTo( 131 response->mContent->data(), 132 response->mContent->size()); 133 134 CHECK(mSessionDesc->isValid()); 135 136 ssize_t i = response->mHeaders.indexOfKey("content-base"); 137 if (i >= 0) { 138 mBaseURL = response->mHeaders.valueAt(i); 139 } else { 140 i = response->mHeaders.indexOfKey("content-location"); 141 if (i >= 0) { 142 mBaseURL = response->mHeaders.valueAt(i); 143 } else { 144 mBaseURL = mSessionURL; 145 } 146 } 147 148 CHECK_GT(mSessionDesc->countTracks(), 1u); 149 setupTrack(1); 150 } else { 151 sp<AMessage> reply = new AMessage('disc', id()); 152 mConn->disconnect(reply); 153 } 154 break; 155 } 156 157 case 'setu': 158 { 159 size_t index; 160 CHECK(msg->findSize("index", &index)); 161 162 TrackInfo *track = NULL; 163 size_t trackIndex; 164 if (msg->findSize("track-index", &trackIndex)) { 165 track = &mTracks.editItemAt(trackIndex); 166 } 167 168 int32_t result; 169 CHECK(msg->findInt32("result", &result)); 170 171 LOG(INFO) << "SETUP(" << index << ") completed with result " 172 << result << " (" << strerror(-result) << ")"; 173 174 if (result != OK) { 175 if (track) { 176 close(track->mRTPSocket); 177 close(track->mRTCPSocket); 178 179 mTracks.removeItemsAt(trackIndex); 180 } 181 } else { 182 CHECK(track != NULL); 183 184 sp<RefBase> obj; 185 CHECK(msg->findObject("response", &obj)); 186 sp<ARTSPResponse> response = 187 static_cast<ARTSPResponse *>(obj.get()); 188 189 CHECK_EQ(response->mStatusCode, 200u); 190 191 ssize_t i = response->mHeaders.indexOfKey("session"); 192 CHECK_GE(i, 0); 193 194 if (index == 1) { 195 mSessionID = response->mHeaders.valueAt(i); 196 i = mSessionID.find(";"); 197 if (i >= 0) { 198 // Remove options, i.e. ";timeout=90" 199 mSessionID.erase(i, mSessionID.size() - i); 200 } 201 } 202 203 sp<AMessage> notify = new AMessage('accu', id()); 204 notify->setSize("track-index", trackIndex); 205 206 mRTPConn->addStream( 207 track->mRTPSocket, track->mRTCPSocket, 208 mSessionDesc, index, 209 notify); 210 211 mSetupTracksSuccessful = true; 212 } 213 214 ++index; 215 if (index < mSessionDesc->countTracks()) { 216 setupTrack(index); 217 } else if (mSetupTracksSuccessful) { 218 AString request = "PLAY "; 219 request.append(mSessionURL); 220 request.append(" RTSP/1.0\r\n"); 221 222 request.append("Session: "); 223 request.append(mSessionID); 224 request.append("\r\n"); 225 226 request.append("\r\n"); 227 228 sp<AMessage> reply = new AMessage('play', id()); 229 mConn->sendRequest(request.c_str(), reply); 230 } else { 231 sp<AMessage> reply = new AMessage('disc', id()); 232 mConn->disconnect(reply); 233 } 234 break; 235 } 236 237 case 'play': 238 { 239 int32_t result; 240 CHECK(msg->findInt32("result", &result)); 241 242 LOG(INFO) << "PLAY completed with result " 243 << result << " (" << strerror(-result) << ")"; 244 245 if (result == OK) { 246 sp<RefBase> obj; 247 CHECK(msg->findObject("response", &obj)); 248 sp<ARTSPResponse> response = 249 static_cast<ARTSPResponse *>(obj.get()); 250 251 CHECK_EQ(response->mStatusCode, 200u); 252 253 sp<AMessage> msg = new AMessage('abor', id()); 254 msg->post(60000000ll); 255 } else { 256 sp<AMessage> reply = new AMessage('disc', id()); 257 mConn->disconnect(reply); 258 } 259 260 break; 261 } 262 263 case 'abor': 264 { 265 for (size_t i = 0; i < mTracks.size(); ++i) { 266 mTracks.editItemAt(i).mPacketSource->signalEOS( 267 ERROR_END_OF_STREAM); 268 } 269 270 sp<AMessage> reply = new AMessage('tear', id()); 271 272 AString request; 273 request = "TEARDOWN "; 274 275 // XXX should use aggregate url from SDP here... 276 request.append(mSessionURL); 277 request.append(" RTSP/1.0\r\n"); 278 279 request.append("Session: "); 280 request.append(mSessionID); 281 request.append("\r\n"); 282 283 request.append("\r\n"); 284 285 mConn->sendRequest(request.c_str(), reply); 286 break; 287 } 288 289 case 'tear': 290 { 291 int32_t result; 292 CHECK(msg->findInt32("result", &result)); 293 294 LOG(INFO) << "TEARDOWN completed with result " 295 << result << " (" << strerror(-result) << ")"; 296 297 sp<AMessage> reply = new AMessage('disc', id()); 298 mConn->disconnect(reply); 299 break; 300 } 301 302 case 'quit': 303 { 304 break; 305 } 306 307 case 'accu': 308 { 309 size_t trackIndex; 310 CHECK(msg->findSize("track-index", &trackIndex)); 311 312 sp<RefBase> obj; 313 CHECK(msg->findObject("access-unit", &obj)); 314 315 sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get()); 316 317 uint64_t ntpTime; 318 CHECK(accessUnit->meta()->findInt64( 319 "ntp-time", (int64_t *)&ntpTime)); 320 321 accessUnit->meta()->setInt64("ntp-time", ntpTime); 322 323#if 0 324 int32_t damaged; 325 if (accessUnit->meta()->findInt32("damaged", &damaged) 326 && damaged != 0) { 327 LOG(INFO) << "ignoring damaged AU"; 328 } else 329#endif 330 { 331 TrackInfo *track = &mTracks.editItemAt(trackIndex); 332 track->mPacketSource->queueAccessUnit(accessUnit); 333 } 334 break; 335 } 336 337 default: 338 TRESPASS(); 339 break; 340 } 341 } 342 343 sp<APacketSource> getPacketSource(size_t index) { 344 CHECK_GE(index, 0u); 345 CHECK_LT(index, mTracks.size()); 346 347 return mTracks.editItemAt(index).mPacketSource; 348 } 349 350 size_t countTracks() const { 351 return mTracks.size(); 352 } 353 354private: 355 sp<ALooper> mLooper; 356 sp<ALooper> mNetLooper; 357 sp<ARTSPConnection> mConn; 358 sp<ARTPConnection> mRTPConn; 359 sp<ASessionDescription> mSessionDesc; 360 AString mSessionURL; 361 AString mBaseURL; 362 AString mSessionID; 363 bool mSetupTracksSuccessful; 364 365 struct TrackInfo { 366 int mRTPSocket; 367 int mRTCPSocket; 368 369 sp<APacketSource> mPacketSource; 370 }; 371 Vector<TrackInfo> mTracks; 372 373 void setupTrack(size_t index) { 374 sp<APacketSource> source = 375 new APacketSource(mSessionDesc, index); 376 if (source->initCheck() != OK) { 377 LOG(WARNING) << "Unsupported format. Ignoring track #" 378 << index << "."; 379 380 sp<AMessage> reply = new AMessage('setu', id()); 381 reply->setSize("index", index); 382 reply->setInt32("result", ERROR_UNSUPPORTED); 383 reply->post(); 384 return; 385 } 386 387 AString url; 388 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 389 390 AString trackURL; 391 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 392 393 mTracks.push(TrackInfo()); 394 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 395 info->mPacketSource = source; 396 397 unsigned rtpPort; 398 ARTPConnection::MakePortPair( 399 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 400 401 AString request = "SETUP "; 402 request.append(trackURL); 403 request.append(" RTSP/1.0\r\n"); 404 405 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 406 request.append(rtpPort); 407 request.append("-"); 408 request.append(rtpPort + 1); 409 request.append("\r\n"); 410 411 if (index > 1) { 412 request.append("Session: "); 413 request.append(mSessionID); 414 request.append("\r\n"); 415 } 416 417 request.append("\r\n"); 418 419 sp<AMessage> reply = new AMessage('setu', id()); 420 reply->setSize("index", index); 421 reply->setSize("track-index", mTracks.size() - 1); 422 mConn->sendRequest(request.c_str(), reply); 423 } 424 425 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 426 out->clear(); 427 428 if (strncasecmp("rtsp://", baseURL, 7)) { 429 // Base URL must be absolute 430 return false; 431 } 432 433 if (!strncasecmp("rtsp://", url, 7)) { 434 // "url" is already an absolute URL, ignore base URL. 435 out->setTo(url); 436 return true; 437 } 438 439 size_t n = strlen(baseURL); 440 if (baseURL[n - 1] == '/') { 441 out->setTo(baseURL); 442 out->append(url); 443 } else { 444 char *slashPos = strrchr(baseURL, '/'); 445 446 if (slashPos > &baseURL[6]) { 447 out->setTo(baseURL, slashPos - baseURL); 448 } else { 449 out->setTo(baseURL); 450 } 451 452 out->append("/"); 453 out->append(url); 454 } 455 456 return true; 457 } 458 459 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 460}; 461 462} // namespace android 463 464#endif // MY_HANDLER_H_ 465