1/*
2 * Copyright 2012, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "NetworkSession"
19#include <utils/Log.h>
20
21#include "ANetworkSession.h"
22#include "ParsedMessage.h"
23
24#include <arpa/inet.h>
25#include <fcntl.h>
26#include <linux/tcp.h>
27#include <net/if.h>
28#include <netdb.h>
29#include <netinet/in.h>
30#include <sys/ioctl.h>
31#include <sys/socket.h>
32
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/foundation/ByteUtils.h>
37#include <media/stagefright/foundation/hexdump.h>
38
39namespace android {
40
41static const size_t kMaxUDPSize = 1500;
42static const int32_t kMaxUDPRetries = 200;
43
44struct ANetworkSession::NetworkThread : public Thread {
45    explicit NetworkThread(ANetworkSession *session);
46
47protected:
48    virtual ~NetworkThread();
49
50private:
51    ANetworkSession *mSession;
52
53    virtual bool threadLoop();
54
55    DISALLOW_EVIL_CONSTRUCTORS(NetworkThread);
56};
57
58struct ANetworkSession::Session : public RefBase {
59    enum Mode {
60        MODE_RTSP,
61        MODE_DATAGRAM,
62        MODE_WEBSOCKET,
63    };
64
65    enum State {
66        CONNECTING,
67        CONNECTED,
68        LISTENING_RTSP,
69        LISTENING_TCP_DGRAMS,
70        DATAGRAM,
71    };
72
73    Session(int32_t sessionID,
74            State state,
75            int s,
76            const sp<AMessage> &notify);
77
78    int32_t sessionID() const;
79    int socket() const;
80    sp<AMessage> getNotificationMessage() const;
81
82    bool isRTSPServer() const;
83    bool isTCPDatagramServer() const;
84
85    bool wantsToRead();
86    bool wantsToWrite();
87
88    status_t readMore();
89    status_t writeMore();
90
91    status_t sendRequest(
92            const void *data, ssize_t size, bool timeValid, int64_t timeUs);
93
94    void setMode(Mode mode);
95
96    status_t switchToWebSocketMode();
97
98protected:
99    virtual ~Session();
100
101private:
102    enum {
103        FRAGMENT_FLAG_TIME_VALID = 1,
104    };
105    struct Fragment {
106        uint32_t mFlags;
107        int64_t mTimeUs;
108        sp<ABuffer> mBuffer;
109    };
110
111    int32_t mSessionID;
112    State mState;
113    Mode mMode;
114    int mSocket;
115    sp<AMessage> mNotify;
116    bool mSawReceiveFailure, mSawSendFailure;
117    int32_t mUDPRetries;
118
119    List<Fragment> mOutFragments;
120
121    AString mInBuffer;
122
123    int64_t mLastStallReportUs;
124
125    void notifyError(bool send, status_t err, const char *detail);
126    void notify(NotificationReason reason);
127
128    void dumpFragmentStats(const Fragment &frag);
129
130    DISALLOW_EVIL_CONSTRUCTORS(Session);
131};
132////////////////////////////////////////////////////////////////////////////////
133
134ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session)
135    : mSession(session) {
136}
137
138ANetworkSession::NetworkThread::~NetworkThread() {
139}
140
141bool ANetworkSession::NetworkThread::threadLoop() {
142    mSession->threadLoop();
143
144    return true;
145}
146
147////////////////////////////////////////////////////////////////////////////////
148
149ANetworkSession::Session::Session(
150        int32_t sessionID,
151        State state,
152        int s,
153        const sp<AMessage> &notify)
154    : mSessionID(sessionID),
155      mState(state),
156      mMode(MODE_DATAGRAM),
157      mSocket(s),
158      mNotify(notify),
159      mSawReceiveFailure(false),
160      mSawSendFailure(false),
161      mUDPRetries(kMaxUDPRetries),
162      mLastStallReportUs(-1ll) {
163    if (mState == CONNECTED) {
164        struct sockaddr_in localAddr;
165        socklen_t localAddrLen = sizeof(localAddr);
166
167        int res = getsockname(
168                mSocket, (struct sockaddr *)&localAddr, &localAddrLen);
169        CHECK_GE(res, 0);
170
171        struct sockaddr_in remoteAddr;
172        socklen_t remoteAddrLen = sizeof(remoteAddr);
173
174        res = getpeername(
175                mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
176        CHECK_GE(res, 0);
177
178        in_addr_t addr = ntohl(localAddr.sin_addr.s_addr);
179        AString localAddrString = AStringPrintf(
180                "%d.%d.%d.%d",
181                (addr >> 24),
182                (addr >> 16) & 0xff,
183                (addr >> 8) & 0xff,
184                addr & 0xff);
185
186        addr = ntohl(remoteAddr.sin_addr.s_addr);
187        AString remoteAddrString = AStringPrintf(
188                "%d.%d.%d.%d",
189                (addr >> 24),
190                (addr >> 16) & 0xff,
191                (addr >> 8) & 0xff,
192                addr & 0xff);
193
194        sp<AMessage> msg = mNotify->dup();
195        msg->setInt32("sessionID", mSessionID);
196        msg->setInt32("reason", kWhatClientConnected);
197        msg->setString("server-ip", localAddrString.c_str());
198        msg->setInt32("server-port", ntohs(localAddr.sin_port));
199        msg->setString("client-ip", remoteAddrString.c_str());
200        msg->setInt32("client-port", ntohs(remoteAddr.sin_port));
201        msg->post();
202    }
203}
204
205ANetworkSession::Session::~Session() {
206    ALOGV("Session %d gone", mSessionID);
207
208    close(mSocket);
209    mSocket = -1;
210}
211
212int32_t ANetworkSession::Session::sessionID() const {
213    return mSessionID;
214}
215
216int ANetworkSession::Session::socket() const {
217    return mSocket;
218}
219
220void ANetworkSession::Session::setMode(Mode mode) {
221    mMode = mode;
222}
223
224status_t ANetworkSession::Session::switchToWebSocketMode() {
225    if (mState != CONNECTED || mMode != MODE_RTSP) {
226        return INVALID_OPERATION;
227    }
228
229    mMode = MODE_WEBSOCKET;
230
231    return OK;
232}
233
234sp<AMessage> ANetworkSession::Session::getNotificationMessage() const {
235    return mNotify;
236}
237
238bool ANetworkSession::Session::isRTSPServer() const {
239    return mState == LISTENING_RTSP;
240}
241
242bool ANetworkSession::Session::isTCPDatagramServer() const {
243    return mState == LISTENING_TCP_DGRAMS;
244}
245
246bool ANetworkSession::Session::wantsToRead() {
247    return !mSawReceiveFailure && mState != CONNECTING;
248}
249
250bool ANetworkSession::Session::wantsToWrite() {
251    return !mSawSendFailure
252        && (mState == CONNECTING
253            || (mState == CONNECTED && !mOutFragments.empty())
254            || (mState == DATAGRAM && !mOutFragments.empty()));
255}
256
257status_t ANetworkSession::Session::readMore() {
258    if (mState == DATAGRAM) {
259        CHECK_EQ(mMode, MODE_DATAGRAM);
260
261        status_t err;
262        do {
263            sp<ABuffer> buf = new ABuffer(kMaxUDPSize);
264
265            struct sockaddr_in remoteAddr;
266            socklen_t remoteAddrLen = sizeof(remoteAddr);
267
268            ssize_t n;
269            do {
270                n = recvfrom(
271                        mSocket, buf->data(), buf->capacity(), 0,
272                        (struct sockaddr *)&remoteAddr, &remoteAddrLen);
273            } while (n < 0 && errno == EINTR);
274
275            err = OK;
276            if (n < 0) {
277                err = -errno;
278            } else if (n == 0) {
279                err = -ECONNRESET;
280            } else {
281                buf->setRange(0, n);
282
283                int64_t nowUs = ALooper::GetNowUs();
284                buf->meta()->setInt64("arrivalTimeUs", nowUs);
285
286                sp<AMessage> notify = mNotify->dup();
287                notify->setInt32("sessionID", mSessionID);
288                notify->setInt32("reason", kWhatDatagram);
289
290                uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr);
291                notify->setString(
292                        "fromAddr",
293                        AStringPrintf(
294                            "%u.%u.%u.%u",
295                            ip >> 24,
296                            (ip >> 16) & 0xff,
297                            (ip >> 8) & 0xff,
298                            ip & 0xff).c_str());
299
300                notify->setInt32("fromPort", ntohs(remoteAddr.sin_port));
301
302                notify->setBuffer("data", buf);
303                notify->post();
304            }
305        } while (err == OK);
306
307        if (err == -EAGAIN) {
308            err = OK;
309        }
310
311        if (err != OK) {
312            if (!mUDPRetries) {
313                notifyError(false /* send */, err, "Recvfrom failed.");
314                mSawReceiveFailure = true;
315            } else {
316                mUDPRetries--;
317                ALOGE("Recvfrom failed, %d/%d retries left",
318                        mUDPRetries, kMaxUDPRetries);
319                err = OK;
320            }
321        } else {
322            mUDPRetries = kMaxUDPRetries;
323        }
324
325        return err;
326    }
327
328    char tmp[512];
329    ssize_t n;
330    do {
331        n = recv(mSocket, tmp, sizeof(tmp), 0);
332    } while (n < 0 && errno == EINTR);
333
334    status_t err = OK;
335
336    if (n > 0) {
337        mInBuffer.append(tmp, n);
338
339#if 0
340        ALOGI("in:");
341        hexdump(tmp, n);
342#endif
343    } else if (n < 0) {
344        err = -errno;
345    } else {
346        err = -ECONNRESET;
347    }
348
349    if (mMode == MODE_DATAGRAM) {
350        // TCP stream carrying 16-bit length-prefixed datagrams.
351
352        while (mInBuffer.size() >= 2) {
353            size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str());
354
355            if (mInBuffer.size() < packetSize + 2) {
356                break;
357            }
358
359            sp<ABuffer> packet = new ABuffer(packetSize);
360            memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize);
361
362            int64_t nowUs = ALooper::GetNowUs();
363            packet->meta()->setInt64("arrivalTimeUs", nowUs);
364
365            sp<AMessage> notify = mNotify->dup();
366            notify->setInt32("sessionID", mSessionID);
367            notify->setInt32("reason", kWhatDatagram);
368            notify->setBuffer("data", packet);
369            notify->post();
370
371            mInBuffer.erase(0, packetSize + 2);
372        }
373    } else if (mMode == MODE_RTSP) {
374        for (;;) {
375            size_t length;
376
377            if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') {
378                if (mInBuffer.size() < 4) {
379                    break;
380                }
381
382                length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2);
383
384                if (mInBuffer.size() < 4 + length) {
385                    break;
386                }
387
388                sp<AMessage> notify = mNotify->dup();
389                notify->setInt32("sessionID", mSessionID);
390                notify->setInt32("reason", kWhatBinaryData);
391                notify->setInt32("channel", mInBuffer.c_str()[1]);
392
393                sp<ABuffer> data = new ABuffer(length);
394                memcpy(data->data(), mInBuffer.c_str() + 4, length);
395
396                int64_t nowUs = ALooper::GetNowUs();
397                data->meta()->setInt64("arrivalTimeUs", nowUs);
398
399                notify->setBuffer("data", data);
400                notify->post();
401
402                mInBuffer.erase(0, 4 + length);
403                continue;
404            }
405
406            sp<ParsedMessage> msg =
407                ParsedMessage::Parse(
408                        mInBuffer.c_str(), mInBuffer.size(), err != OK, &length);
409
410            if (msg == NULL) {
411                break;
412            }
413
414            sp<AMessage> notify = mNotify->dup();
415            notify->setInt32("sessionID", mSessionID);
416            notify->setInt32("reason", kWhatData);
417            notify->setObject("data", msg);
418            notify->post();
419
420#if 1
421            // XXX The (old) dongle sends the wrong content length header on a
422            // SET_PARAMETER request that signals a "wfd_idr_request".
423            // (17 instead of 19).
424            const char *content = msg->getContent();
425            if (content
426                    && !memcmp(content, "wfd_idr_request\r\n", 17)
427                    && length >= 19
428                    && mInBuffer.c_str()[length] == '\r'
429                    && mInBuffer.c_str()[length + 1] == '\n') {
430                length += 2;
431            }
432#endif
433
434            mInBuffer.erase(0, length);
435
436            if (err != OK) {
437                break;
438            }
439        }
440    } else {
441        CHECK_EQ(mMode, MODE_WEBSOCKET);
442
443        const uint8_t *data = (const uint8_t *)mInBuffer.c_str();
444        // hexdump(data, mInBuffer.size());
445
446        while (mInBuffer.size() >= 2) {
447            size_t offset = 2;
448
449            uint64_t payloadLen = data[1] & 0x7f;
450            if (payloadLen == 126) {
451                if (offset + 2 > mInBuffer.size()) {
452                    break;
453                }
454
455                payloadLen = U16_AT(&data[offset]);
456                offset += 2;
457            } else if (payloadLen == 127) {
458                if (offset + 8 > mInBuffer.size()) {
459                    break;
460                }
461
462                payloadLen = U64_AT(&data[offset]);
463                offset += 8;
464            }
465
466            uint32_t mask = 0;
467            if (data[1] & 0x80) {
468                // MASK==1
469                if (offset + 4 > mInBuffer.size()) {
470                    break;
471                }
472
473                mask = U32_AT(&data[offset]);
474                offset += 4;
475            }
476
477            if (payloadLen > mInBuffer.size() || offset > mInBuffer.size() - payloadLen) {
478                break;
479            }
480
481            // We have the full message.
482
483            sp<ABuffer> packet = new ABuffer(payloadLen);
484            memcpy(packet->data(), &data[offset], payloadLen);
485
486            if (mask != 0) {
487                for (size_t i = 0; i < payloadLen; ++i) {
488                    packet->data()[i] =
489                        data[offset + i]
490                            ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff);
491                }
492            }
493
494            sp<AMessage> notify = mNotify->dup();
495            notify->setInt32("sessionID", mSessionID);
496            notify->setInt32("reason", kWhatWebSocketMessage);
497            notify->setBuffer("data", packet);
498            notify->setInt32("headerByte", data[0]);
499            notify->post();
500
501            mInBuffer.erase(0, offset + payloadLen);
502        }
503    }
504
505    if (err != OK) {
506        notifyError(false /* send */, err, "Recv failed.");
507        mSawReceiveFailure = true;
508    }
509
510    return err;
511}
512
513void ANetworkSession::Session::dumpFragmentStats(const Fragment & /* frag */) {
514#if 0
515    int64_t nowUs = ALooper::GetNowUs();
516    int64_t delayMs = (nowUs - frag.mTimeUs) / 1000ll;
517
518    static const int64_t kMinDelayMs = 0;
519    static const int64_t kMaxDelayMs = 300;
520
521    const char *kPattern = "########################################";
522    size_t kPatternSize = strlen(kPattern);
523
524    int n = (kPatternSize * (delayMs - kMinDelayMs))
525                / (kMaxDelayMs - kMinDelayMs);
526
527    if (n < 0) {
528        n = 0;
529    } else if ((size_t)n > kPatternSize) {
530        n = kPatternSize;
531    }
532
533    ALOGI("[%lld]: (%4lld ms) %s\n",
534          frag.mTimeUs / 1000,
535          delayMs,
536          kPattern + kPatternSize - n);
537#endif
538}
539
540status_t ANetworkSession::Session::writeMore() {
541    if (mState == DATAGRAM) {
542        CHECK(!mOutFragments.empty());
543
544        status_t err;
545        do {
546            const Fragment &frag = *mOutFragments.begin();
547            const sp<ABuffer> &datagram = frag.mBuffer;
548
549            int n;
550            do {
551                n = send(mSocket, datagram->data(), datagram->size(), 0);
552            } while (n < 0 && errno == EINTR);
553
554            err = OK;
555
556            if (n > 0) {
557                if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
558                    dumpFragmentStats(frag);
559                }
560
561                mOutFragments.erase(mOutFragments.begin());
562            } else if (n < 0) {
563                err = -errno;
564            } else if (n == 0) {
565                err = -ECONNRESET;
566            }
567        } while (err == OK && !mOutFragments.empty());
568
569        if (err == -EAGAIN) {
570            if (!mOutFragments.empty()) {
571                ALOGI("%zu datagrams remain queued.", mOutFragments.size());
572            }
573            err = OK;
574        }
575
576        if (err != OK) {
577            if (!mUDPRetries) {
578                notifyError(true /* send */, err, "Send datagram failed.");
579                mSawSendFailure = true;
580            } else {
581                mUDPRetries--;
582                ALOGE("Send datagram failed, %d/%d retries left",
583                        mUDPRetries, kMaxUDPRetries);
584                err = OK;
585            }
586        } else {
587            mUDPRetries = kMaxUDPRetries;
588        }
589
590        return err;
591    }
592
593    if (mState == CONNECTING) {
594        int err;
595        socklen_t optionLen = sizeof(err);
596        CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0);
597        CHECK_EQ(optionLen, (socklen_t)sizeof(err));
598
599        if (err != 0) {
600            notifyError(kWhatError, -err, "Connection failed");
601            mSawSendFailure = true;
602
603            return -err;
604        }
605
606        mState = CONNECTED;
607        notify(kWhatConnected);
608
609        return OK;
610    }
611
612    CHECK_EQ(mState, CONNECTED);
613    CHECK(!mOutFragments.empty());
614
615    ssize_t n = -1;
616    while (!mOutFragments.empty()) {
617        const Fragment &frag = *mOutFragments.begin();
618
619        do {
620            n = send(mSocket, frag.mBuffer->data(), frag.mBuffer->size(), 0);
621        } while (n < 0 && errno == EINTR);
622
623        if (n <= 0) {
624            break;
625        }
626
627        frag.mBuffer->setRange(
628                frag.mBuffer->offset() + n, frag.mBuffer->size() - n);
629
630        if (frag.mBuffer->size() > 0) {
631            break;
632        }
633
634        if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
635            dumpFragmentStats(frag);
636        }
637
638        mOutFragments.erase(mOutFragments.begin());
639    }
640
641    status_t err = OK;
642
643    if (n < 0) {
644        err = -errno;
645    } else if (n == 0) {
646        err = -ECONNRESET;
647    }
648
649    if (err != OK) {
650        notifyError(true /* send */, err, "Send failed.");
651        mSawSendFailure = true;
652    }
653
654#if 0
655    int numBytesQueued;
656    int res = ioctl(mSocket, SIOCOUTQ, &numBytesQueued);
657    if (res == 0 && numBytesQueued > 50 * 1024) {
658        if (numBytesQueued > 409600) {
659            ALOGW("!!! numBytesQueued = %d", numBytesQueued);
660        }
661
662        int64_t nowUs = ALooper::GetNowUs();
663
664        if (mLastStallReportUs < 0ll
665                || nowUs > mLastStallReportUs + 100000ll) {
666            sp<AMessage> msg = mNotify->dup();
667            msg->setInt32("sessionID", mSessionID);
668            msg->setInt32("reason", kWhatNetworkStall);
669            msg->setSize("numBytesQueued", numBytesQueued);
670            msg->post();
671
672            mLastStallReportUs = nowUs;
673        }
674    }
675#endif
676
677    return err;
678}
679
680status_t ANetworkSession::Session::sendRequest(
681        const void *data, ssize_t size, bool timeValid, int64_t timeUs) {
682    CHECK(mState == CONNECTED || mState == DATAGRAM);
683
684    if (size < 0) {
685        size = strlen((const char *)data);
686    }
687
688    if (size == 0) {
689        return OK;
690    }
691
692    sp<ABuffer> buffer;
693
694    if (mState == CONNECTED && mMode == MODE_DATAGRAM) {
695        CHECK_LE(size, 65535);
696
697        buffer = new ABuffer(size + 2);
698        buffer->data()[0] = size >> 8;
699        buffer->data()[1] = size & 0xff;
700        memcpy(buffer->data() + 2, data, size);
701    } else if (mState == CONNECTED && mMode == MODE_WEBSOCKET) {
702        static const bool kUseMask = false;  // Chromium doesn't like it.
703
704        size_t numHeaderBytes = 2 + (kUseMask ? 4 : 0);
705        if (size > 65535) {
706            numHeaderBytes += 8;
707        } else if (size > 125) {
708            numHeaderBytes += 2;
709        }
710
711        buffer = new ABuffer(numHeaderBytes + size);
712        buffer->data()[0] = 0x81;  // FIN==1 | opcode=1 (text)
713        buffer->data()[1] = kUseMask ? 0x80 : 0x00;
714
715        if (size > 65535) {
716            buffer->data()[1] |= 127;
717            buffer->data()[2] = 0x00;
718            buffer->data()[3] = 0x00;
719            buffer->data()[4] = 0x00;
720            buffer->data()[5] = 0x00;
721            buffer->data()[6] = (size >> 24) & 0xff;
722            buffer->data()[7] = (size >> 16) & 0xff;
723            buffer->data()[8] = (size >> 8) & 0xff;
724            buffer->data()[9] = size & 0xff;
725        } else if (size > 125) {
726            buffer->data()[1] |= 126;
727            buffer->data()[2] = (size >> 8) & 0xff;
728            buffer->data()[3] = size & 0xff;
729        } else {
730            buffer->data()[1] |= size;
731        }
732
733        if (kUseMask) {
734            uint32_t mask = rand();
735
736            buffer->data()[numHeaderBytes - 4] = (mask >> 24) & 0xff;
737            buffer->data()[numHeaderBytes - 3] = (mask >> 16) & 0xff;
738            buffer->data()[numHeaderBytes - 2] = (mask >> 8) & 0xff;
739            buffer->data()[numHeaderBytes - 1] = mask & 0xff;
740
741            for (size_t i = 0; i < (size_t)size; ++i) {
742                buffer->data()[numHeaderBytes + i] =
743                    ((const uint8_t *)data)[i]
744                        ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff);
745            }
746        } else {
747            memcpy(buffer->data() + numHeaderBytes, data, size);
748        }
749    } else {
750        buffer = new ABuffer(size);
751        memcpy(buffer->data(), data, size);
752    }
753
754    Fragment frag;
755
756    frag.mFlags = 0;
757    if (timeValid) {
758        frag.mFlags = FRAGMENT_FLAG_TIME_VALID;
759        frag.mTimeUs = timeUs;
760    }
761
762    frag.mBuffer = buffer;
763
764    mOutFragments.push_back(frag);
765
766    return OK;
767}
768
769void ANetworkSession::Session::notifyError(
770        bool send, status_t err, const char *detail) {
771    sp<AMessage> msg = mNotify->dup();
772    msg->setInt32("sessionID", mSessionID);
773    msg->setInt32("reason", kWhatError);
774    msg->setInt32("send", send);
775    msg->setInt32("err", err);
776    msg->setString("detail", detail);
777    msg->post();
778}
779
780void ANetworkSession::Session::notify(NotificationReason reason) {
781    sp<AMessage> msg = mNotify->dup();
782    msg->setInt32("sessionID", mSessionID);
783    msg->setInt32("reason", reason);
784    msg->post();
785}
786
787////////////////////////////////////////////////////////////////////////////////
788
789ANetworkSession::ANetworkSession()
790    : mNextSessionID(1) {
791    mPipeFd[0] = mPipeFd[1] = -1;
792}
793
794ANetworkSession::~ANetworkSession() {
795    stop();
796}
797
798status_t ANetworkSession::start() {
799    if (mThread != NULL) {
800        return INVALID_OPERATION;
801    }
802
803    int res = pipe(mPipeFd);
804    if (res != 0) {
805        mPipeFd[0] = mPipeFd[1] = -1;
806        return -errno;
807    }
808
809    mThread = new NetworkThread(this);
810
811    status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO);
812
813    if (err != OK) {
814        mThread.clear();
815
816        close(mPipeFd[0]);
817        close(mPipeFd[1]);
818        mPipeFd[0] = mPipeFd[1] = -1;
819
820        return err;
821    }
822
823    return OK;
824}
825
826status_t ANetworkSession::stop() {
827    if (mThread == NULL) {
828        return INVALID_OPERATION;
829    }
830
831    mThread->requestExit();
832    interrupt();
833    mThread->requestExitAndWait();
834
835    mThread.clear();
836
837    close(mPipeFd[0]);
838    close(mPipeFd[1]);
839    mPipeFd[0] = mPipeFd[1] = -1;
840
841    return OK;
842}
843
844status_t ANetworkSession::createRTSPClient(
845        const char *host, unsigned port, const sp<AMessage> &notify,
846        int32_t *sessionID) {
847    return createClientOrServer(
848            kModeCreateRTSPClient,
849            NULL /* addr */,
850            0 /* port */,
851            host,
852            port,
853            notify,
854            sessionID);
855}
856
857status_t ANetworkSession::createRTSPServer(
858        const struct in_addr &addr, unsigned port,
859        const sp<AMessage> &notify, int32_t *sessionID) {
860    return createClientOrServer(
861            kModeCreateRTSPServer,
862            &addr,
863            port,
864            NULL /* remoteHost */,
865            0 /* remotePort */,
866            notify,
867            sessionID);
868}
869
870status_t ANetworkSession::createUDPSession(
871        unsigned localPort, const sp<AMessage> &notify, int32_t *sessionID) {
872    return createUDPSession(localPort, NULL, 0, notify, sessionID);
873}
874
875status_t ANetworkSession::createUDPSession(
876        unsigned localPort,
877        const char *remoteHost,
878        unsigned remotePort,
879        const sp<AMessage> &notify,
880        int32_t *sessionID) {
881    return createClientOrServer(
882            kModeCreateUDPSession,
883            NULL /* addr */,
884            localPort,
885            remoteHost,
886            remotePort,
887            notify,
888            sessionID);
889}
890
891status_t ANetworkSession::createTCPDatagramSession(
892        const struct in_addr &addr, unsigned port,
893        const sp<AMessage> &notify, int32_t *sessionID) {
894    return createClientOrServer(
895            kModeCreateTCPDatagramSessionPassive,
896            &addr,
897            port,
898            NULL /* remoteHost */,
899            0 /* remotePort */,
900            notify,
901            sessionID);
902}
903
904status_t ANetworkSession::createTCPDatagramSession(
905        unsigned localPort,
906        const char *remoteHost,
907        unsigned remotePort,
908        const sp<AMessage> &notify,
909        int32_t *sessionID) {
910    return createClientOrServer(
911            kModeCreateTCPDatagramSessionActive,
912            NULL /* addr */,
913            localPort,
914            remoteHost,
915            remotePort,
916            notify,
917            sessionID);
918}
919
920status_t ANetworkSession::destroySession(int32_t sessionID) {
921    Mutex::Autolock autoLock(mLock);
922
923    ssize_t index = mSessions.indexOfKey(sessionID);
924
925    if (index < 0) {
926        return -ENOENT;
927    }
928
929    mSessions.removeItemsAt(index);
930
931    interrupt();
932
933    return OK;
934}
935
936// static
937status_t ANetworkSession::MakeSocketNonBlocking(int s) {
938    int flags = fcntl(s, F_GETFL, 0);
939    if (flags < 0) {
940        flags = 0;
941    }
942
943    int res = fcntl(s, F_SETFL, flags | O_NONBLOCK);
944    if (res < 0) {
945        return -errno;
946    }
947
948    return OK;
949}
950
951status_t ANetworkSession::createClientOrServer(
952        Mode mode,
953        const struct in_addr *localAddr,
954        unsigned port,
955        const char *remoteHost,
956        unsigned remotePort,
957        const sp<AMessage> &notify,
958        int32_t *sessionID) {
959    Mutex::Autolock autoLock(mLock);
960
961    *sessionID = 0;
962    status_t err = OK;
963    int s, res;
964    sp<Session> session;
965
966    s = socket(
967            AF_INET,
968            (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM,
969            0);
970
971    if (s < 0) {
972        err = -errno;
973        goto bail;
974    }
975
976    if (mode == kModeCreateRTSPServer
977            || mode == kModeCreateTCPDatagramSessionPassive) {
978        const int yes = 1;
979        res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
980
981        if (res < 0) {
982            err = -errno;
983            goto bail2;
984        }
985    }
986
987    if (mode == kModeCreateUDPSession) {
988        int size = 256 * 1024;
989
990        res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size));
991
992        if (res < 0) {
993            err = -errno;
994            goto bail2;
995        }
996
997        res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size));
998
999        if (res < 0) {
1000            err = -errno;
1001            goto bail2;
1002        }
1003    } else if (mode == kModeCreateTCPDatagramSessionActive) {
1004        int flag = 1;
1005        res = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
1006
1007        if (res < 0) {
1008            err = -errno;
1009            goto bail2;
1010        }
1011
1012        int tos = 224;  // VOICE
1013        res = setsockopt(s, IPPROTO_IP, IP_TOS, &tos, sizeof(tos));
1014
1015        if (res < 0) {
1016            err = -errno;
1017            goto bail2;
1018        }
1019    }
1020
1021    err = MakeSocketNonBlocking(s);
1022
1023    if (err != OK) {
1024        goto bail2;
1025    }
1026
1027    struct sockaddr_in addr;
1028    memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
1029    addr.sin_family = AF_INET;
1030
1031    if (mode == kModeCreateRTSPClient
1032            || mode == kModeCreateTCPDatagramSessionActive) {
1033        struct hostent *ent= gethostbyname(remoteHost);
1034        if (ent == NULL) {
1035            err = -h_errno;
1036            goto bail2;
1037        }
1038
1039        addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
1040        addr.sin_port = htons(remotePort);
1041    } else if (localAddr != NULL) {
1042        addr.sin_addr = *localAddr;
1043        addr.sin_port = htons(port);
1044    } else {
1045        addr.sin_addr.s_addr = htonl(INADDR_ANY);
1046        addr.sin_port = htons(port);
1047    }
1048
1049    if (mode == kModeCreateRTSPClient
1050            || mode == kModeCreateTCPDatagramSessionActive) {
1051        in_addr_t x = ntohl(addr.sin_addr.s_addr);
1052        ALOGI("connecting socket %d to %d.%d.%d.%d:%d",
1053              s,
1054              (x >> 24),
1055              (x >> 16) & 0xff,
1056              (x >> 8) & 0xff,
1057              x & 0xff,
1058              ntohs(addr.sin_port));
1059
1060        res = connect(s, (const struct sockaddr *)&addr, sizeof(addr));
1061
1062        CHECK_LT(res, 0);
1063        if (errno == EINPROGRESS) {
1064            res = 0;
1065        }
1066    } else {
1067        res = bind(s, (const struct sockaddr *)&addr, sizeof(addr));
1068
1069        if (res == 0) {
1070            if (mode == kModeCreateRTSPServer
1071                    || mode == kModeCreateTCPDatagramSessionPassive) {
1072                res = listen(s, 4);
1073            } else {
1074                CHECK_EQ(mode, kModeCreateUDPSession);
1075
1076                if (remoteHost != NULL) {
1077                    struct sockaddr_in remoteAddr;
1078                    memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
1079                    remoteAddr.sin_family = AF_INET;
1080                    remoteAddr.sin_port = htons(remotePort);
1081
1082                    struct hostent *ent= gethostbyname(remoteHost);
1083                    if (ent == NULL) {
1084                        err = -h_errno;
1085                        goto bail2;
1086                    }
1087
1088                    remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
1089
1090                    res = connect(
1091                            s,
1092                            (const struct sockaddr *)&remoteAddr,
1093                            sizeof(remoteAddr));
1094                }
1095            }
1096        }
1097    }
1098
1099    if (res < 0) {
1100        err = -errno;
1101        goto bail2;
1102    }
1103
1104    Session::State state;
1105    switch (mode) {
1106        case kModeCreateRTSPClient:
1107            state = Session::CONNECTING;
1108            break;
1109
1110        case kModeCreateTCPDatagramSessionActive:
1111            state = Session::CONNECTING;
1112            break;
1113
1114        case kModeCreateTCPDatagramSessionPassive:
1115            state = Session::LISTENING_TCP_DGRAMS;
1116            break;
1117
1118        case kModeCreateRTSPServer:
1119            state = Session::LISTENING_RTSP;
1120            break;
1121
1122        default:
1123            CHECK_EQ(mode, kModeCreateUDPSession);
1124            state = Session::DATAGRAM;
1125            break;
1126    }
1127
1128    session = new Session(
1129            mNextSessionID++,
1130            state,
1131            s,
1132            notify);
1133
1134    if (mode == kModeCreateTCPDatagramSessionActive) {
1135        session->setMode(Session::MODE_DATAGRAM);
1136    } else if (mode == kModeCreateRTSPClient) {
1137        session->setMode(Session::MODE_RTSP);
1138    }
1139
1140    mSessions.add(session->sessionID(), session);
1141
1142    interrupt();
1143
1144    *sessionID = session->sessionID();
1145
1146    goto bail;
1147
1148bail2:
1149    close(s);
1150    s = -1;
1151
1152bail:
1153    return err;
1154}
1155
1156status_t ANetworkSession::connectUDPSession(
1157        int32_t sessionID, const char *remoteHost, unsigned remotePort) {
1158    Mutex::Autolock autoLock(mLock);
1159
1160    ssize_t index = mSessions.indexOfKey(sessionID);
1161
1162    if (index < 0) {
1163        return -ENOENT;
1164    }
1165
1166    const sp<Session> session = mSessions.valueAt(index);
1167    int s = session->socket();
1168
1169    struct sockaddr_in remoteAddr;
1170    memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
1171    remoteAddr.sin_family = AF_INET;
1172    remoteAddr.sin_port = htons(remotePort);
1173
1174    status_t err = OK;
1175    struct hostent *ent = gethostbyname(remoteHost);
1176    if (ent == NULL) {
1177        err = -h_errno;
1178    } else {
1179        remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
1180
1181        int res = connect(
1182                s,
1183                (const struct sockaddr *)&remoteAddr,
1184                sizeof(remoteAddr));
1185
1186        if (res < 0) {
1187            err = -errno;
1188        }
1189    }
1190
1191    return err;
1192}
1193
1194status_t ANetworkSession::sendRequest(
1195        int32_t sessionID, const void *data, ssize_t size,
1196        bool timeValid, int64_t timeUs) {
1197    Mutex::Autolock autoLock(mLock);
1198
1199    ssize_t index = mSessions.indexOfKey(sessionID);
1200
1201    if (index < 0) {
1202        return -ENOENT;
1203    }
1204
1205    const sp<Session> session = mSessions.valueAt(index);
1206
1207    status_t err = session->sendRequest(data, size, timeValid, timeUs);
1208
1209    interrupt();
1210
1211    return err;
1212}
1213
1214status_t ANetworkSession::switchToWebSocketMode(int32_t sessionID) {
1215    Mutex::Autolock autoLock(mLock);
1216
1217    ssize_t index = mSessions.indexOfKey(sessionID);
1218
1219    if (index < 0) {
1220        return -ENOENT;
1221    }
1222
1223    const sp<Session> session = mSessions.valueAt(index);
1224    return session->switchToWebSocketMode();
1225}
1226
1227void ANetworkSession::interrupt() {
1228    static const char dummy = 0;
1229
1230    ssize_t n;
1231    do {
1232        n = write(mPipeFd[1], &dummy, 1);
1233    } while (n < 0 && errno == EINTR);
1234
1235    if (n < 0) {
1236        ALOGW("Error writing to pipe (%s)", strerror(errno));
1237    }
1238}
1239
1240void ANetworkSession::threadLoop() {
1241    fd_set rs, ws;
1242    FD_ZERO(&rs);
1243    FD_ZERO(&ws);
1244
1245    FD_SET(mPipeFd[0], &rs);
1246    int maxFd = mPipeFd[0];
1247
1248    {
1249        Mutex::Autolock autoLock(mLock);
1250
1251        for (size_t i = 0; i < mSessions.size(); ++i) {
1252            const sp<Session> &session = mSessions.valueAt(i);
1253
1254            int s = session->socket();
1255
1256            if (s < 0) {
1257                continue;
1258            }
1259
1260            if (session->wantsToRead()) {
1261                FD_SET(s, &rs);
1262                if (s > maxFd) {
1263                    maxFd = s;
1264                }
1265            }
1266
1267            if (session->wantsToWrite()) {
1268                FD_SET(s, &ws);
1269                if (s > maxFd) {
1270                    maxFd = s;
1271                }
1272            }
1273        }
1274    }
1275
1276    int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */);
1277
1278    if (res == 0) {
1279        return;
1280    }
1281
1282    if (res < 0) {
1283        if (errno == EINTR) {
1284            return;
1285        }
1286
1287        ALOGE("select failed w/ error %d (%s)", errno, strerror(errno));
1288        return;
1289    }
1290
1291    if (FD_ISSET(mPipeFd[0], &rs)) {
1292        char c;
1293        ssize_t n;
1294        do {
1295            n = read(mPipeFd[0], &c, 1);
1296        } while (n < 0 && errno == EINTR);
1297
1298        if (n < 0) {
1299            ALOGW("Error reading from pipe (%s)", strerror(errno));
1300        }
1301
1302        --res;
1303    }
1304
1305    {
1306        Mutex::Autolock autoLock(mLock);
1307
1308        List<sp<Session> > sessionsToAdd;
1309
1310        for (size_t i = mSessions.size(); res > 0 && i > 0;) {
1311            i--;
1312            const sp<Session> &session = mSessions.valueAt(i);
1313
1314            int s = session->socket();
1315
1316            if (s < 0) {
1317                continue;
1318            }
1319
1320            if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) {
1321                --res;
1322            }
1323
1324            if (FD_ISSET(s, &rs)) {
1325                if (session->isRTSPServer() || session->isTCPDatagramServer()) {
1326                    struct sockaddr_in remoteAddr;
1327                    socklen_t remoteAddrLen = sizeof(remoteAddr);
1328
1329                    int clientSocket = accept(
1330                            s, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
1331
1332                    if (clientSocket >= 0) {
1333                        status_t err = MakeSocketNonBlocking(clientSocket);
1334
1335                        if (err != OK) {
1336                            ALOGE("Unable to make client socket non blocking, "
1337                                  "failed w/ error %d (%s)",
1338                                  err, strerror(-err));
1339
1340                            close(clientSocket);
1341                            clientSocket = -1;
1342                        } else {
1343                            in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr);
1344
1345                            ALOGI("incoming connection from %d.%d.%d.%d:%d "
1346                                  "(socket %d)",
1347                                  (addr >> 24),
1348                                  (addr >> 16) & 0xff,
1349                                  (addr >> 8) & 0xff,
1350                                  addr & 0xff,
1351                                  ntohs(remoteAddr.sin_port),
1352                                  clientSocket);
1353
1354                            sp<Session> clientSession =
1355                                new Session(
1356                                        mNextSessionID++,
1357                                        Session::CONNECTED,
1358                                        clientSocket,
1359                                        session->getNotificationMessage());
1360
1361                            clientSession->setMode(
1362                                    session->isRTSPServer()
1363                                        ? Session::MODE_RTSP
1364                                        : Session::MODE_DATAGRAM);
1365
1366                            sessionsToAdd.push_back(clientSession);
1367                        }
1368                    } else {
1369                        ALOGE("accept returned error %d (%s)",
1370                              errno, strerror(errno));
1371                    }
1372                } else {
1373                    status_t err = session->readMore();
1374                    if (err != OK) {
1375                        ALOGE("readMore on socket %d failed w/ error %d (%s)",
1376                              s, err, strerror(-err));
1377                    }
1378                }
1379            }
1380
1381            if (FD_ISSET(s, &ws)) {
1382                status_t err = session->writeMore();
1383                if (err != OK) {
1384                    ALOGE("writeMore on socket %d failed w/ error %d (%s)",
1385                          s, err, strerror(-err));
1386                }
1387            }
1388        }
1389
1390        while (!sessionsToAdd.empty()) {
1391            sp<Session> session = *sessionsToAdd.begin();
1392            sessionsToAdd.erase(sessionsToAdd.begin());
1393
1394            mSessions.add(session->sessionID(), session);
1395
1396            ALOGI("added clientSession %d", session->sessionID());
1397        }
1398    }
1399}
1400
1401}  // namespace android
1402