MyHandler.h revision cf7b9c7aae758ac0b99833915053c63c2ac46e09
1/* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#ifndef MY_HANDLER_H_ 18 19#define MY_HANDLER_H_ 20 21#include "APacketSource.h" 22#include "ARTPConnection.h" 23#include "ARTSPConnection.h" 24#include "ASessionDescription.h" 25 26#include <media/stagefright/foundation/ABuffer.h> 27#include <media/stagefright/foundation/ADebug.h> 28#include <media/stagefright/foundation/ALooper.h> 29#include <media/stagefright/foundation/AMessage.h> 30#include <media/stagefright/MediaErrors.h> 31 32namespace android { 33 34struct MyHandler : public AHandler { 35 MyHandler(const char *url, const sp<ALooper> &looper) 36 : mLooper(looper), 37 mConn(new ARTSPConnection), 38 mRTPConn(new ARTPConnection), 39 mSessionURL(url), 40 mSetupTracksSuccessful(false), 41 mFirstAccessUnit(true), 42 mFirstAccessUnitNTP(-1) { 43 mLooper->registerHandler(this); 44 mLooper->registerHandler(mConn); 45 mLooper->registerHandler(mRTPConn); 46 sp<AMessage> reply = new AMessage('conn', id()); 47 mConn->connect(mSessionURL.c_str(), reply); 48 } 49 50 virtual void onMessageReceived(const sp<AMessage> &msg) { 51 switch (msg->what()) { 52 case 'conn': 53 { 54 int32_t result; 55 CHECK(msg->findInt32("result", &result)); 56 57 LOG(INFO) << "connection request completed with result " 58 << result << " (" << strerror(-result) << ")"; 59 60 if (result == OK) { 61 AString request; 62 request = "DESCRIBE "; 63 request.append(mSessionURL); 64 request.append(" RTSP/1.0\r\n"); 65 request.append("Accept: application/sdp\r\n"); 66 request.append("\r\n"); 67 68 sp<AMessage> reply = new AMessage('desc', id()); 69 mConn->sendRequest(request.c_str(), reply); 70 } 71 break; 72 } 73 74 case 'disc': 75 { 76 LOG(INFO) << "disconnect completed"; 77 78 (new AMessage('quit', id()))->post(); 79 break; 80 } 81 82 case 'desc': 83 { 84 int32_t result; 85 CHECK(msg->findInt32("result", &result)); 86 87 LOG(INFO) << "DESCRIBE completed with result " 88 << result << " (" << strerror(-result) << ")"; 89 90 if (result == OK) { 91 sp<RefBase> obj; 92 CHECK(msg->findObject("response", &obj)); 93 sp<ARTSPResponse> response = 94 static_cast<ARTSPResponse *>(obj.get()); 95 96 if (response->mStatusCode == 302) { 97 ssize_t i = response->mHeaders.indexOfKey("location"); 98 CHECK_GE(i, 0); 99 100 mSessionURL = response->mHeaders.valueAt(i); 101 102 AString request; 103 request = "DESCRIBE "; 104 request.append(mSessionURL); 105 request.append(" RTSP/1.0\r\n"); 106 request.append("Accept: application/sdp\r\n"); 107 request.append("\r\n"); 108 109 sp<AMessage> reply = new AMessage('desc', id()); 110 mConn->sendRequest(request.c_str(), reply); 111 break; 112 } 113 114 CHECK_EQ(response->mStatusCode, 200u); 115 116 mSessionDesc = new ASessionDescription; 117 118 mSessionDesc->setTo( 119 response->mContent->data(), 120 response->mContent->size()); 121 122 CHECK(mSessionDesc->isValid()); 123 124 ssize_t i = response->mHeaders.indexOfKey("content-base"); 125 if (i >= 0) { 126 mBaseURL = response->mHeaders.valueAt(i); 127 } else { 128 i = response->mHeaders.indexOfKey("content-location"); 129 if (i >= 0) { 130 mBaseURL = response->mHeaders.valueAt(i); 131 } else { 132 mBaseURL = mSessionURL; 133 } 134 } 135 136 CHECK_GT(mSessionDesc->countTracks(), 1u); 137 setupTrack(1); 138 } else { 139 sp<AMessage> reply = new AMessage('disc', id()); 140 mConn->disconnect(reply); 141 } 142 break; 143 } 144 145 case 'setu': 146 { 147 size_t index; 148 CHECK(msg->findSize("index", &index)); 149 150 size_t trackIndex; 151 CHECK(msg->findSize("track-index", &trackIndex)); 152 153 int32_t result; 154 CHECK(msg->findInt32("result", &result)); 155 156 LOG(INFO) << "SETUP(" << index << ") completed with result " 157 << result << " (" << strerror(-result) << ")"; 158 159 TrackInfo *track = &mTracks.editItemAt(trackIndex); 160 161 if (result == OK) { 162 sp<RefBase> obj; 163 CHECK(msg->findObject("response", &obj)); 164 sp<ARTSPResponse> response = 165 static_cast<ARTSPResponse *>(obj.get()); 166 167 CHECK_EQ(response->mStatusCode, 200u); 168 169 ssize_t i = response->mHeaders.indexOfKey("session"); 170 CHECK_GE(i, 0); 171 172 if (index == 1) { 173 mSessionID = response->mHeaders.valueAt(i); 174 i = mSessionID.find(";"); 175 if (i >= 0) { 176 // Remove options, i.e. ";timeout=90" 177 mSessionID.erase(i, mSessionID.size() - i); 178 } 179 } 180 181 sp<AMessage> notify = new AMessage('accu', id()); 182 notify->setSize("track-index", trackIndex); 183 184 mRTPConn->addStream( 185 track->mRTPSocket, track->mRTCPSocket, 186 mSessionDesc, index, 187 notify); 188 189 track->mPacketSource = 190 new APacketSource(mSessionDesc, index); 191 192 mSetupTracksSuccessful = true; 193 194 ++index; 195 if (index < mSessionDesc->countTracks()) { 196 setupTrack(index); 197 break; 198 } 199 } else { 200 close(track->mRTPSocket); 201 close(track->mRTCPSocket); 202 203 mTracks.removeItemsAt(mTracks.size() - 1); 204 } 205 206 if (mSetupTracksSuccessful) { 207 AString request = "PLAY "; 208 request.append(mSessionURL); 209 request.append(" RTSP/1.0\r\n"); 210 211 request.append("Session: "); 212 request.append(mSessionID); 213 request.append("\r\n"); 214 215 request.append("\r\n"); 216 217 sp<AMessage> reply = new AMessage('play', id()); 218 mConn->sendRequest(request.c_str(), reply); 219 } else { 220 sp<AMessage> reply = new AMessage('disc', id()); 221 mConn->disconnect(reply); 222 } 223 break; 224 } 225 226 case 'play': 227 { 228 int32_t result; 229 CHECK(msg->findInt32("result", &result)); 230 231 LOG(INFO) << "PLAY completed with result " 232 << result << " (" << strerror(-result) << ")"; 233 234 if (result == OK) { 235 sp<RefBase> obj; 236 CHECK(msg->findObject("response", &obj)); 237 sp<ARTSPResponse> response = 238 static_cast<ARTSPResponse *>(obj.get()); 239 240 CHECK_EQ(response->mStatusCode, 200u); 241 242 sp<AMessage> msg = new AMessage('abor', id()); 243 msg->post(60000000ll); 244 } else { 245 sp<AMessage> reply = new AMessage('disc', id()); 246 mConn->disconnect(reply); 247 } 248 249 break; 250 } 251 252 case 'abor': 253 { 254 for (size_t i = 0; i < mTracks.size(); ++i) { 255 mTracks.editItemAt(i).mPacketSource->signalEOS( 256 ERROR_END_OF_STREAM); 257 } 258 259 sp<AMessage> reply = new AMessage('tear', id()); 260 261 AString request; 262 request = "TEARDOWN "; 263 264 // XXX should use aggregate url from SDP here... 265 request.append(mSessionURL); 266 request.append(" RTSP/1.0\r\n"); 267 268 request.append("Session: "); 269 request.append(mSessionID); 270 request.append("\r\n"); 271 272 request.append("\r\n"); 273 274 mConn->sendRequest(request.c_str(), reply); 275 break; 276 } 277 278 case 'tear': 279 { 280 int32_t result; 281 CHECK(msg->findInt32("result", &result)); 282 283 LOG(INFO) << "TEARDOWN completed with result " 284 << result << " (" << strerror(-result) << ")"; 285 286 sp<AMessage> reply = new AMessage('disc', id()); 287 mConn->disconnect(reply); 288 break; 289 } 290 291 case 'quit': 292 { 293 mLooper->stop(); 294 break; 295 } 296 297 case 'accu': 298 { 299 size_t trackIndex; 300 CHECK(msg->findSize("track-index", &trackIndex)); 301 302 sp<RefBase> obj; 303 CHECK(msg->findObject("access-unit", &obj)); 304 305 sp<ABuffer> accessUnit = static_cast<ABuffer *>(obj.get()); 306 307 uint64_t ntpTime; 308 CHECK(accessUnit->meta()->findInt64( 309 "ntp-time", (int64_t *)&ntpTime)); 310 311 if (mFirstAccessUnit) { 312 mFirstAccessUnit = false; 313 mFirstAccessUnitNTP = ntpTime; 314 } 315 if (ntpTime > mFirstAccessUnitNTP) { 316 ntpTime -= mFirstAccessUnitNTP; 317 } else { 318 ntpTime = 0; 319 } 320 321 accessUnit->meta()->setInt64("ntp-time", ntpTime); 322 323 TrackInfo *track = &mTracks.editItemAt(trackIndex); 324 track->mPacketSource->queueAccessUnit(accessUnit); 325 break; 326 } 327 328 default: 329 TRESPASS(); 330 break; 331 } 332 } 333 334 sp<APacketSource> getPacketSource(size_t index) { 335 CHECK_GE(index, 0u); 336 CHECK_LT(index, mTracks.size()); 337 338 return mTracks.editItemAt(index).mPacketSource; 339 } 340 341 size_t countTracks() const { 342 return mTracks.size(); 343 } 344 345private: 346 sp<ALooper> mLooper; 347 sp<ARTSPConnection> mConn; 348 sp<ARTPConnection> mRTPConn; 349 sp<ASessionDescription> mSessionDesc; 350 AString mSessionURL; 351 AString mBaseURL; 352 AString mSessionID; 353 bool mSetupTracksSuccessful; 354 bool mFirstAccessUnit; 355 uint64_t mFirstAccessUnitNTP; 356 357 struct TrackInfo { 358 int mRTPSocket; 359 int mRTCPSocket; 360 361 sp<APacketSource> mPacketSource; 362 }; 363 Vector<TrackInfo> mTracks; 364 365 void setupTrack(size_t index) { 366 AString url; 367 CHECK(mSessionDesc->findAttribute(index, "a=control", &url)); 368 369 AString trackURL; 370 CHECK(MakeURL(mBaseURL.c_str(), url.c_str(), &trackURL)); 371 372 mTracks.push(TrackInfo()); 373 TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); 374 375 unsigned rtpPort; 376 ARTPConnection::MakePortPair( 377 &info->mRTPSocket, &info->mRTCPSocket, &rtpPort); 378 379 AString request = "SETUP "; 380 request.append(trackURL); 381 request.append(" RTSP/1.0\r\n"); 382 383 request.append("Transport: RTP/AVP/UDP;unicast;client_port="); 384 request.append(rtpPort); 385 request.append("-"); 386 request.append(rtpPort + 1); 387 request.append("\r\n"); 388 389 if (index > 1) { 390 request.append("Session: "); 391 request.append(mSessionID); 392 request.append("\r\n"); 393 } 394 395 request.append("\r\n"); 396 397 sp<AMessage> reply = new AMessage('setu', id()); 398 reply->setSize("index", index); 399 reply->setSize("track-index", mTracks.size() - 1); 400 mConn->sendRequest(request.c_str(), reply); 401 } 402 403 static bool MakeURL(const char *baseURL, const char *url, AString *out) { 404 out->clear(); 405 406 if (strncasecmp("rtsp://", baseURL, 7)) { 407 // Base URL must be absolute 408 return false; 409 } 410 411 if (!strncasecmp("rtsp://", url, 7)) { 412 // "url" is already an absolute URL, ignore base URL. 413 out->setTo(url); 414 return true; 415 } 416 417 size_t n = strlen(baseURL); 418 if (baseURL[n - 1] == '/') { 419 out->setTo(baseURL); 420 out->append(url); 421 } else { 422 char *slashPos = strrchr(baseURL, '/'); 423 424 if (slashPos > &baseURL[6]) { 425 out->setTo(baseURL, slashPos - baseURL); 426 } else { 427 out->setTo(baseURL); 428 } 429 430 out->append("/"); 431 out->append(url); 432 } 433 434 return true; 435 } 436 437 DISALLOW_EVIL_CONSTRUCTORS(MyHandler); 438}; 439 440} // namespace android 441 442#endif // MY_HANDLER_H_ 443