ARTSPConnection.cpp revision 6e3fa444c5b3970666707bb2b6d25e2615dafe80
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "ARTSPConnection"
19#include <utils/Log.h>
20
21#include "ARTSPConnection.h"
22
23#include <media/stagefright/foundation/ABuffer.h>
24#include <media/stagefright/foundation/ADebug.h>
25#include <media/stagefright/foundation/AMessage.h>
26#include <media/stagefright/MediaErrors.h>
27
28#include <arpa/inet.h>
29#include <fcntl.h>
30#include <netdb.h>
31#include <sys/socket.h>
32
33namespace android {
34
35// static
36const int64_t ARTSPConnection::kSelectTimeoutUs = 1000ll;
37
38ARTSPConnection::ARTSPConnection()
39    : mState(DISCONNECTED),
40      mSocket(-1),
41      mConnectionID(0),
42      mNextCSeq(0),
43      mReceiveResponseEventPending(false) {
44}
45
46ARTSPConnection::~ARTSPConnection() {
47    if (mSocket >= 0) {
48        LOGE("Connection is still open, closing the socket.");
49        close(mSocket);
50        mSocket = -1;
51    }
52}
53
54void ARTSPConnection::connect(const char *url, const sp<AMessage> &reply) {
55    sp<AMessage> msg = new AMessage(kWhatConnect, id());
56    msg->setString("url", url);
57    msg->setMessage("reply", reply);
58    msg->post();
59}
60
61void ARTSPConnection::disconnect(const sp<AMessage> &reply) {
62    sp<AMessage> msg = new AMessage(kWhatDisconnect, id());
63    msg->setMessage("reply", reply);
64    msg->post();
65}
66
67void ARTSPConnection::sendRequest(
68        const char *request, const sp<AMessage> &reply) {
69    sp<AMessage> msg = new AMessage(kWhatSendRequest, id());
70    msg->setString("request", request);
71    msg->setMessage("reply", reply);
72    msg->post();
73}
74
75void ARTSPConnection::observeBinaryData(const sp<AMessage> &reply) {
76    sp<AMessage> msg = new AMessage(kWhatObserveBinaryData, id());
77    msg->setMessage("reply", reply);
78    msg->post();
79}
80
81void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) {
82    switch (msg->what()) {
83        case kWhatConnect:
84            onConnect(msg);
85            break;
86
87        case kWhatDisconnect:
88            onDisconnect(msg);
89            break;
90
91        case kWhatCompleteConnection:
92            onCompleteConnection(msg);
93            break;
94
95        case kWhatSendRequest:
96            onSendRequest(msg);
97            break;
98
99        case kWhatReceiveResponse:
100            onReceiveResponse();
101            break;
102
103        case kWhatObserveBinaryData:
104        {
105            CHECK(msg->findMessage("reply", &mObserveBinaryMessage));
106            break;
107        }
108
109        default:
110            TRESPASS();
111            break;
112    }
113}
114
115// static
116bool ARTSPConnection::ParseURL(
117        const char *url, AString *host, unsigned *port, AString *path) {
118    host->clear();
119    *port = 0;
120    path->clear();
121
122    if (strncasecmp("rtsp://", url, 7)) {
123        return false;
124    }
125
126    const char *slashPos = strchr(&url[7], '/');
127
128    if (slashPos == NULL) {
129        host->setTo(&url[7]);
130        path->setTo("/");
131    } else {
132        host->setTo(&url[7], slashPos - &url[7]);
133        path->setTo(slashPos);
134    }
135
136    char *colonPos = strchr(host->c_str(), ':');
137
138    if (colonPos != NULL) {
139        unsigned long x;
140        if (!ParseSingleUnsignedLong(colonPos + 1, &x) || x >= 65536) {
141            return false;
142        }
143
144        *port = x;
145
146        size_t colonOffset = colonPos - host->c_str();
147        size_t trailing = host->size() - colonOffset;
148        host->erase(colonOffset, trailing);
149    } else {
150        *port = 554;
151    }
152
153    return true;
154}
155
156static void MakeSocketBlocking(int s, bool blocking) {
157    // Make socket non-blocking.
158    int flags = fcntl(s, F_GETFL, 0);
159    CHECK_NE(flags, -1);
160
161    if (blocking) {
162        flags &= ~O_NONBLOCK;
163    } else {
164        flags |= O_NONBLOCK;
165    }
166
167    CHECK_NE(fcntl(s, F_SETFL, flags), -1);
168}
169
170void ARTSPConnection::onConnect(const sp<AMessage> &msg) {
171    ++mConnectionID;
172
173    if (mState != DISCONNECTED) {
174        close(mSocket);
175        mSocket = -1;
176
177        flushPendingRequests();
178    }
179
180    mState = CONNECTING;
181
182    AString url;
183    CHECK(msg->findString("url", &url));
184
185    sp<AMessage> reply;
186    CHECK(msg->findMessage("reply", &reply));
187
188    AString host, path;
189    unsigned port;
190    if (!ParseURL(url.c_str(), &host, &port, &path)) {
191        LOGE("Malformed rtsp url %s", url.c_str());
192
193        reply->setInt32("result", ERROR_MALFORMED);
194        reply->post();
195
196        mState = DISCONNECTED;
197        return;
198    }
199
200    struct hostent *ent = gethostbyname(host.c_str());
201    if (ent == NULL) {
202        LOGE("Unknown host %s", host.c_str());
203
204        reply->setInt32("result", -ENOENT);
205        reply->post();
206
207        mState = DISCONNECTED;
208        return;
209    }
210
211    mSocket = socket(AF_INET, SOCK_STREAM, 0);
212
213    MakeSocketBlocking(mSocket, false);
214
215    struct sockaddr_in remote;
216    memset(remote.sin_zero, 0, sizeof(remote.sin_zero));
217    remote.sin_family = AF_INET;
218    remote.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
219    remote.sin_port = htons(port);
220
221    int err = ::connect(
222            mSocket, (const struct sockaddr *)&remote, sizeof(remote));
223
224    reply->setInt32("server-ip", ntohl(remote.sin_addr.s_addr));
225
226    if (err < 0) {
227        if (errno == EINPROGRESS) {
228            sp<AMessage> msg = new AMessage(kWhatCompleteConnection, id());
229            msg->setMessage("reply", reply);
230            msg->setInt32("connection-id", mConnectionID);
231            msg->post();
232            return;
233        }
234
235        reply->setInt32("result", -errno);
236        mState = DISCONNECTED;
237
238        close(mSocket);
239        mSocket = -1;
240    } else {
241        reply->setInt32("result", OK);
242        mState = CONNECTED;
243        mNextCSeq = 1;
244
245        postReceiveReponseEvent();
246    }
247
248    reply->post();
249}
250
251void ARTSPConnection::onDisconnect(const sp<AMessage> &msg) {
252    if (mState == CONNECTED || mState == CONNECTING) {
253        close(mSocket);
254        mSocket = -1;
255
256        flushPendingRequests();
257    }
258
259    sp<AMessage> reply;
260    CHECK(msg->findMessage("reply", &reply));
261
262    reply->setInt32("result", OK);
263    mState = DISCONNECTED;
264
265    reply->post();
266}
267
268void ARTSPConnection::onCompleteConnection(const sp<AMessage> &msg) {
269    sp<AMessage> reply;
270    CHECK(msg->findMessage("reply", &reply));
271
272    int32_t connectionID;
273    CHECK(msg->findInt32("connection-id", &connectionID));
274
275    if ((connectionID != mConnectionID) || mState != CONNECTING) {
276        // While we were attempting to connect, the attempt was
277        // cancelled.
278        reply->setInt32("result", -ECONNABORTED);
279        reply->post();
280        return;
281    }
282
283    struct timeval tv;
284    tv.tv_sec = 0;
285    tv.tv_usec = kSelectTimeoutUs;
286
287    fd_set ws;
288    FD_ZERO(&ws);
289    FD_SET(mSocket, &ws);
290
291    int res = select(mSocket + 1, NULL, &ws, NULL, &tv);
292    CHECK_GE(res, 0);
293
294    if (res == 0) {
295        // Timed out. Not yet connected.
296
297        msg->post();
298        return;
299    }
300
301    int err;
302    socklen_t optionLen = sizeof(err);
303    CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0);
304    CHECK_EQ(optionLen, (socklen_t)sizeof(err));
305
306    if (err != 0) {
307        LOGE("err = %d (%s)", err, strerror(err));
308
309        reply->setInt32("result", -err);
310
311        mState = DISCONNECTED;
312        close(mSocket);
313        mSocket = -1;
314    } else {
315        reply->setInt32("result", OK);
316        mState = CONNECTED;
317        mNextCSeq = 1;
318
319        postReceiveReponseEvent();
320    }
321
322    reply->post();
323}
324
325void ARTSPConnection::onSendRequest(const sp<AMessage> &msg) {
326    sp<AMessage> reply;
327    CHECK(msg->findMessage("reply", &reply));
328
329    if (mState != CONNECTED) {
330        reply->setInt32("result", -ENOTCONN);
331        reply->post();
332        return;
333    }
334
335    AString request;
336    CHECK(msg->findString("request", &request));
337
338    // Find the boundary between headers and the body.
339    ssize_t i = request.find("\r\n\r\n");
340    CHECK_GE(i, 0);
341
342    int32_t cseq = mNextCSeq++;
343
344    AString cseqHeader = "CSeq: ";
345    cseqHeader.append(cseq);
346    cseqHeader.append("\r\n");
347
348    request.insert(cseqHeader, i + 2);
349
350    LOGV("%s", request.c_str());
351
352    size_t numBytesSent = 0;
353    while (numBytesSent < request.size()) {
354        ssize_t n =
355            send(mSocket, request.c_str() + numBytesSent,
356                 request.size() - numBytesSent, 0);
357
358        if (n == 0) {
359            // Server closed the connection.
360            LOGE("Server unexpectedly closed the connection.");
361
362            reply->setInt32("result", ERROR_IO);
363            reply->post();
364            return;
365        } else if (n < 0) {
366            if (errno == EINTR) {
367                continue;
368            }
369
370            LOGE("Error sending rtsp request.");
371            reply->setInt32("result", -errno);
372            reply->post();
373            return;
374        }
375
376        numBytesSent += (size_t)n;
377    }
378
379    mPendingRequests.add(cseq, reply);
380}
381
382void ARTSPConnection::onReceiveResponse() {
383    mReceiveResponseEventPending = false;
384
385    if (mState != CONNECTED) {
386        return;
387    }
388
389    struct timeval tv;
390    tv.tv_sec = 0;
391    tv.tv_usec = kSelectTimeoutUs;
392
393    fd_set rs;
394    FD_ZERO(&rs);
395    FD_SET(mSocket, &rs);
396
397    int res = select(mSocket + 1, &rs, NULL, NULL, &tv);
398    CHECK_GE(res, 0);
399
400    if (res == 1) {
401        MakeSocketBlocking(mSocket, true);
402
403        bool success = receiveRTSPReponse();
404
405        MakeSocketBlocking(mSocket, false);
406
407        if (!success) {
408            // Something horrible, irreparable has happened.
409            flushPendingRequests();
410            return;
411        }
412    }
413
414    postReceiveReponseEvent();
415}
416
417void ARTSPConnection::flushPendingRequests() {
418    for (size_t i = 0; i < mPendingRequests.size(); ++i) {
419        sp<AMessage> reply = mPendingRequests.valueAt(i);
420
421        reply->setInt32("result", -ECONNABORTED);
422        reply->post();
423    }
424
425    mPendingRequests.clear();
426}
427
428void ARTSPConnection::postReceiveReponseEvent() {
429    if (mReceiveResponseEventPending) {
430        return;
431    }
432
433    sp<AMessage> msg = new AMessage(kWhatReceiveResponse, id());
434    msg->post();
435
436    mReceiveResponseEventPending = true;
437}
438
439status_t ARTSPConnection::receive(void *data, size_t size) {
440    size_t offset = 0;
441    while (offset < size) {
442        ssize_t n = recv(mSocket, (uint8_t *)data + offset, size - offset, 0);
443        if (n == 0) {
444            // Server closed the connection.
445            LOGE("Server unexpectedly closed the connection.");
446            return ERROR_IO;
447        } else if (n < 0) {
448            if (errno == EINTR) {
449                continue;
450            }
451
452            LOGE("Error reading rtsp response.");
453            return -errno;
454        }
455
456        offset += (size_t)n;
457    }
458
459    return OK;
460}
461
462bool ARTSPConnection::receiveLine(AString *line) {
463    line->clear();
464
465    bool sawCR = false;
466    for (;;) {
467        char c;
468        if (receive(&c, 1) != OK) {
469            return false;
470        }
471
472        if (sawCR && c == '\n') {
473            line->erase(line->size() - 1, 1);
474            return true;
475        }
476
477        line->append(&c, 1);
478
479        if (c == '$' && line->size() == 1) {
480            // Special-case for interleaved binary data.
481            return true;
482        }
483
484        sawCR = (c == '\r');
485    }
486}
487
488sp<ABuffer> ARTSPConnection::receiveBinaryData() {
489    uint8_t x[3];
490    if (receive(x, 3) != OK) {
491        return NULL;
492    }
493
494    sp<ABuffer> buffer = new ABuffer((x[1] << 8) | x[2]);
495    if (receive(buffer->data(), buffer->size()) != OK) {
496        return NULL;
497    }
498
499    buffer->meta()->setInt32("index", (int32_t)x[0]);
500
501    return buffer;
502}
503
504bool ARTSPConnection::receiveRTSPReponse() {
505    AString statusLine;
506
507    if (!receiveLine(&statusLine)) {
508        return false;
509    }
510
511    if (statusLine == "$") {
512        sp<ABuffer> buffer = receiveBinaryData();
513
514        if (buffer == NULL) {
515            return false;
516        }
517
518        if (mObserveBinaryMessage != NULL) {
519            sp<AMessage> notify = mObserveBinaryMessage->dup();
520            notify->setObject("buffer", buffer);
521            notify->post();
522        } else {
523            LOGW("received binary data, but no one cares.");
524        }
525
526        return true;
527    }
528
529    sp<ARTSPResponse> response = new ARTSPResponse;
530    response->mStatusLine = statusLine;
531
532    LOGI("status: %s", response->mStatusLine.c_str());
533
534    ssize_t space1 = response->mStatusLine.find(" ");
535    if (space1 < 0) {
536        return false;
537    }
538    ssize_t space2 = response->mStatusLine.find(" ", space1 + 1);
539    if (space2 < 0) {
540        return false;
541    }
542
543    AString statusCodeStr(
544            response->mStatusLine, space1 + 1, space2 - space1 - 1);
545
546    if (!ParseSingleUnsignedLong(
547                statusCodeStr.c_str(), &response->mStatusCode)
548            || response->mStatusCode < 100 || response->mStatusCode > 999) {
549        return false;
550    }
551
552    AString line;
553    for (;;) {
554        if (!receiveLine(&line)) {
555            break;
556        }
557
558        if (line.empty()) {
559            break;
560        }
561
562        LOGV("line: %s", line.c_str());
563
564        ssize_t colonPos = line.find(":");
565        if (colonPos < 0) {
566            // Malformed header line.
567            return false;
568        }
569
570        AString key(line, 0, colonPos);
571        key.trim();
572        key.tolower();
573
574        line.erase(0, colonPos + 1);
575        line.trim();
576
577        response->mHeaders.add(key, line);
578    }
579
580    unsigned long contentLength = 0;
581
582    ssize_t i = response->mHeaders.indexOfKey("content-length");
583
584    if (i >= 0) {
585        AString value = response->mHeaders.valueAt(i);
586        if (!ParseSingleUnsignedLong(value.c_str(), &contentLength)) {
587            return false;
588        }
589    }
590
591    if (contentLength > 0) {
592        response->mContent = new ABuffer(contentLength);
593
594        size_t numBytesRead = 0;
595        while (numBytesRead < contentLength) {
596            ssize_t n = recv(
597                    mSocket, response->mContent->data() + numBytesRead,
598                    contentLength - numBytesRead, 0);
599
600            if (n == 0) {
601                // Server closed the connection.
602                TRESPASS();
603            } else if (n < 0) {
604                if (errno == EINTR) {
605                    continue;
606                }
607
608                TRESPASS();
609            }
610
611            numBytesRead += (size_t)n;
612        }
613    }
614
615    return notifyResponseListener(response);
616}
617
618// static
619bool ARTSPConnection::ParseSingleUnsignedLong(
620        const char *from, unsigned long *x) {
621    char *end;
622    *x = strtoul(from, &end, 10);
623
624    if (end == from || *end != '\0') {
625        return false;
626    }
627
628    return true;
629}
630
631bool ARTSPConnection::notifyResponseListener(
632        const sp<ARTSPResponse> &response) {
633    ssize_t i = response->mHeaders.indexOfKey("cseq");
634
635    if (i < 0) {
636        return true;
637    }
638
639    AString value = response->mHeaders.valueAt(i);
640
641    unsigned long cseq;
642    if (!ParseSingleUnsignedLong(value.c_str(), &cseq)) {
643        return false;
644    }
645
646    i = mPendingRequests.indexOfKey(cseq);
647
648    if (i < 0) {
649        // Unsolicited response?
650        TRESPASS();
651    }
652
653    sp<AMessage> reply = mPendingRequests.valueAt(i);
654    mPendingRequests.removeItemsAt(i);
655
656    reply->setInt32("result", OK);
657    reply->setObject("response", response);
658    reply->post();
659
660    return true;
661}
662
663}  // namespace android
664