1/*
2 * Copyright 2012, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "NetworkSession"
19#include <utils/Log.h>
20
21#include "ANetworkSession.h"
22#include "ParsedMessage.h"
23
24#include <arpa/inet.h>
25#include <fcntl.h>
26#include <linux/tcp.h>
27#include <net/if.h>
28#include <netdb.h>
29#include <netinet/in.h>
30#include <sys/ioctl.h>
31#include <sys/socket.h>
32
33#include <media/stagefright/foundation/ABuffer.h>
34#include <media/stagefright/foundation/ADebug.h>
35#include <media/stagefright/foundation/AMessage.h>
36#include <media/stagefright/foundation/hexdump.h>
37
38namespace android {
39
40static uint16_t U16_AT(const uint8_t *ptr) {
41    return ptr[0] << 8 | ptr[1];
42}
43
44static uint32_t U32_AT(const uint8_t *ptr) {
45    return ptr[0] << 24 | ptr[1] << 16 | ptr[2] << 8 | ptr[3];
46}
47
48static uint64_t U64_AT(const uint8_t *ptr) {
49    return ((uint64_t)U32_AT(ptr)) << 32 | U32_AT(ptr + 4);
50}
51
52static const size_t kMaxUDPSize = 1500;
53static const int32_t kMaxUDPRetries = 200;
54
55struct ANetworkSession::NetworkThread : public Thread {
56    NetworkThread(ANetworkSession *session);
57
58protected:
59    virtual ~NetworkThread();
60
61private:
62    ANetworkSession *mSession;
63
64    virtual bool threadLoop();
65
66    DISALLOW_EVIL_CONSTRUCTORS(NetworkThread);
67};
68
69struct ANetworkSession::Session : public RefBase {
70    enum Mode {
71        MODE_RTSP,
72        MODE_DATAGRAM,
73        MODE_WEBSOCKET,
74    };
75
76    enum State {
77        CONNECTING,
78        CONNECTED,
79        LISTENING_RTSP,
80        LISTENING_TCP_DGRAMS,
81        DATAGRAM,
82    };
83
84    Session(int32_t sessionID,
85            State state,
86            int s,
87            const sp<AMessage> &notify);
88
89    int32_t sessionID() const;
90    int socket() const;
91    sp<AMessage> getNotificationMessage() const;
92
93    bool isRTSPServer() const;
94    bool isTCPDatagramServer() const;
95
96    bool wantsToRead();
97    bool wantsToWrite();
98
99    status_t readMore();
100    status_t writeMore();
101
102    status_t sendRequest(
103            const void *data, ssize_t size, bool timeValid, int64_t timeUs);
104
105    void setMode(Mode mode);
106
107    status_t switchToWebSocketMode();
108
109protected:
110    virtual ~Session();
111
112private:
113    enum {
114        FRAGMENT_FLAG_TIME_VALID = 1,
115    };
116    struct Fragment {
117        uint32_t mFlags;
118        int64_t mTimeUs;
119        sp<ABuffer> mBuffer;
120    };
121
122    int32_t mSessionID;
123    State mState;
124    Mode mMode;
125    int mSocket;
126    sp<AMessage> mNotify;
127    bool mSawReceiveFailure, mSawSendFailure;
128    int32_t mUDPRetries;
129
130    List<Fragment> mOutFragments;
131
132    AString mInBuffer;
133
134    int64_t mLastStallReportUs;
135
136    void notifyError(bool send, status_t err, const char *detail);
137    void notify(NotificationReason reason);
138
139    void dumpFragmentStats(const Fragment &frag);
140
141    DISALLOW_EVIL_CONSTRUCTORS(Session);
142};
143////////////////////////////////////////////////////////////////////////////////
144
145ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session)
146    : mSession(session) {
147}
148
149ANetworkSession::NetworkThread::~NetworkThread() {
150}
151
152bool ANetworkSession::NetworkThread::threadLoop() {
153    mSession->threadLoop();
154
155    return true;
156}
157
158////////////////////////////////////////////////////////////////////////////////
159
160ANetworkSession::Session::Session(
161        int32_t sessionID,
162        State state,
163        int s,
164        const sp<AMessage> &notify)
165    : mSessionID(sessionID),
166      mState(state),
167      mMode(MODE_DATAGRAM),
168      mSocket(s),
169      mNotify(notify),
170      mSawReceiveFailure(false),
171      mSawSendFailure(false),
172      mUDPRetries(kMaxUDPRetries),
173      mLastStallReportUs(-1ll) {
174    if (mState == CONNECTED) {
175        struct sockaddr_in localAddr;
176        socklen_t localAddrLen = sizeof(localAddr);
177
178        int res = getsockname(
179                mSocket, (struct sockaddr *)&localAddr, &localAddrLen);
180        CHECK_GE(res, 0);
181
182        struct sockaddr_in remoteAddr;
183        socklen_t remoteAddrLen = sizeof(remoteAddr);
184
185        res = getpeername(
186                mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
187        CHECK_GE(res, 0);
188
189        in_addr_t addr = ntohl(localAddr.sin_addr.s_addr);
190        AString localAddrString = AStringPrintf(
191                "%d.%d.%d.%d",
192                (addr >> 24),
193                (addr >> 16) & 0xff,
194                (addr >> 8) & 0xff,
195                addr & 0xff);
196
197        addr = ntohl(remoteAddr.sin_addr.s_addr);
198        AString remoteAddrString = AStringPrintf(
199                "%d.%d.%d.%d",
200                (addr >> 24),
201                (addr >> 16) & 0xff,
202                (addr >> 8) & 0xff,
203                addr & 0xff);
204
205        sp<AMessage> msg = mNotify->dup();
206        msg->setInt32("sessionID", mSessionID);
207        msg->setInt32("reason", kWhatClientConnected);
208        msg->setString("server-ip", localAddrString.c_str());
209        msg->setInt32("server-port", ntohs(localAddr.sin_port));
210        msg->setString("client-ip", remoteAddrString.c_str());
211        msg->setInt32("client-port", ntohs(remoteAddr.sin_port));
212        msg->post();
213    }
214}
215
216ANetworkSession::Session::~Session() {
217    ALOGV("Session %d gone", mSessionID);
218
219    close(mSocket);
220    mSocket = -1;
221}
222
223int32_t ANetworkSession::Session::sessionID() const {
224    return mSessionID;
225}
226
227int ANetworkSession::Session::socket() const {
228    return mSocket;
229}
230
231void ANetworkSession::Session::setMode(Mode mode) {
232    mMode = mode;
233}
234
235status_t ANetworkSession::Session::switchToWebSocketMode() {
236    if (mState != CONNECTED || mMode != MODE_RTSP) {
237        return INVALID_OPERATION;
238    }
239
240    mMode = MODE_WEBSOCKET;
241
242    return OK;
243}
244
245sp<AMessage> ANetworkSession::Session::getNotificationMessage() const {
246    return mNotify;
247}
248
249bool ANetworkSession::Session::isRTSPServer() const {
250    return mState == LISTENING_RTSP;
251}
252
253bool ANetworkSession::Session::isTCPDatagramServer() const {
254    return mState == LISTENING_TCP_DGRAMS;
255}
256
257bool ANetworkSession::Session::wantsToRead() {
258    return !mSawReceiveFailure && mState != CONNECTING;
259}
260
261bool ANetworkSession::Session::wantsToWrite() {
262    return !mSawSendFailure
263        && (mState == CONNECTING
264            || (mState == CONNECTED && !mOutFragments.empty())
265            || (mState == DATAGRAM && !mOutFragments.empty()));
266}
267
268status_t ANetworkSession::Session::readMore() {
269    if (mState == DATAGRAM) {
270        CHECK_EQ(mMode, MODE_DATAGRAM);
271
272        status_t err;
273        do {
274            sp<ABuffer> buf = new ABuffer(kMaxUDPSize);
275
276            struct sockaddr_in remoteAddr;
277            socklen_t remoteAddrLen = sizeof(remoteAddr);
278
279            ssize_t n;
280            do {
281                n = recvfrom(
282                        mSocket, buf->data(), buf->capacity(), 0,
283                        (struct sockaddr *)&remoteAddr, &remoteAddrLen);
284            } while (n < 0 && errno == EINTR);
285
286            err = OK;
287            if (n < 0) {
288                err = -errno;
289            } else if (n == 0) {
290                err = -ECONNRESET;
291            } else {
292                buf->setRange(0, n);
293
294                int64_t nowUs = ALooper::GetNowUs();
295                buf->meta()->setInt64("arrivalTimeUs", nowUs);
296
297                sp<AMessage> notify = mNotify->dup();
298                notify->setInt32("sessionID", mSessionID);
299                notify->setInt32("reason", kWhatDatagram);
300
301                uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr);
302                notify->setString(
303                        "fromAddr",
304                        AStringPrintf(
305                            "%u.%u.%u.%u",
306                            ip >> 24,
307                            (ip >> 16) & 0xff,
308                            (ip >> 8) & 0xff,
309                            ip & 0xff).c_str());
310
311                notify->setInt32("fromPort", ntohs(remoteAddr.sin_port));
312
313                notify->setBuffer("data", buf);
314                notify->post();
315            }
316        } while (err == OK);
317
318        if (err == -EAGAIN) {
319            err = OK;
320        }
321
322        if (err != OK) {
323            if (!mUDPRetries) {
324                notifyError(false /* send */, err, "Recvfrom failed.");
325                mSawReceiveFailure = true;
326            } else {
327                mUDPRetries--;
328                ALOGE("Recvfrom failed, %d/%d retries left",
329                        mUDPRetries, kMaxUDPRetries);
330                err = OK;
331            }
332        } else {
333            mUDPRetries = kMaxUDPRetries;
334        }
335
336        return err;
337    }
338
339    char tmp[512];
340    ssize_t n;
341    do {
342        n = recv(mSocket, tmp, sizeof(tmp), 0);
343    } while (n < 0 && errno == EINTR);
344
345    status_t err = OK;
346
347    if (n > 0) {
348        mInBuffer.append(tmp, n);
349
350#if 0
351        ALOGI("in:");
352        hexdump(tmp, n);
353#endif
354    } else if (n < 0) {
355        err = -errno;
356    } else {
357        err = -ECONNRESET;
358    }
359
360    if (mMode == MODE_DATAGRAM) {
361        // TCP stream carrying 16-bit length-prefixed datagrams.
362
363        while (mInBuffer.size() >= 2) {
364            size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str());
365
366            if (mInBuffer.size() < packetSize + 2) {
367                break;
368            }
369
370            sp<ABuffer> packet = new ABuffer(packetSize);
371            memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize);
372
373            int64_t nowUs = ALooper::GetNowUs();
374            packet->meta()->setInt64("arrivalTimeUs", nowUs);
375
376            sp<AMessage> notify = mNotify->dup();
377            notify->setInt32("sessionID", mSessionID);
378            notify->setInt32("reason", kWhatDatagram);
379            notify->setBuffer("data", packet);
380            notify->post();
381
382            mInBuffer.erase(0, packetSize + 2);
383        }
384    } else if (mMode == MODE_RTSP) {
385        for (;;) {
386            size_t length;
387
388            if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') {
389                if (mInBuffer.size() < 4) {
390                    break;
391                }
392
393                length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2);
394
395                if (mInBuffer.size() < 4 + length) {
396                    break;
397                }
398
399                sp<AMessage> notify = mNotify->dup();
400                notify->setInt32("sessionID", mSessionID);
401                notify->setInt32("reason", kWhatBinaryData);
402                notify->setInt32("channel", mInBuffer.c_str()[1]);
403
404                sp<ABuffer> data = new ABuffer(length);
405                memcpy(data->data(), mInBuffer.c_str() + 4, length);
406
407                int64_t nowUs = ALooper::GetNowUs();
408                data->meta()->setInt64("arrivalTimeUs", nowUs);
409
410                notify->setBuffer("data", data);
411                notify->post();
412
413                mInBuffer.erase(0, 4 + length);
414                continue;
415            }
416
417            sp<ParsedMessage> msg =
418                ParsedMessage::Parse(
419                        mInBuffer.c_str(), mInBuffer.size(), err != OK, &length);
420
421            if (msg == NULL) {
422                break;
423            }
424
425            sp<AMessage> notify = mNotify->dup();
426            notify->setInt32("sessionID", mSessionID);
427            notify->setInt32("reason", kWhatData);
428            notify->setObject("data", msg);
429            notify->post();
430
431#if 1
432            // XXX The (old) dongle sends the wrong content length header on a
433            // SET_PARAMETER request that signals a "wfd_idr_request".
434            // (17 instead of 19).
435            const char *content = msg->getContent();
436            if (content
437                    && !memcmp(content, "wfd_idr_request\r\n", 17)
438                    && length >= 19
439                    && mInBuffer.c_str()[length] == '\r'
440                    && mInBuffer.c_str()[length + 1] == '\n') {
441                length += 2;
442            }
443#endif
444
445            mInBuffer.erase(0, length);
446
447            if (err != OK) {
448                break;
449            }
450        }
451    } else {
452        CHECK_EQ(mMode, MODE_WEBSOCKET);
453
454        const uint8_t *data = (const uint8_t *)mInBuffer.c_str();
455        // hexdump(data, mInBuffer.size());
456
457        while (mInBuffer.size() >= 2) {
458            size_t offset = 2;
459
460            uint64_t payloadLen = data[1] & 0x7f;
461            if (payloadLen == 126) {
462                if (offset + 2 > mInBuffer.size()) {
463                    break;
464                }
465
466                payloadLen = U16_AT(&data[offset]);
467                offset += 2;
468            } else if (payloadLen == 127) {
469                if (offset + 8 > mInBuffer.size()) {
470                    break;
471                }
472
473                payloadLen = U64_AT(&data[offset]);
474                offset += 8;
475            }
476
477            uint32_t mask = 0;
478            if (data[1] & 0x80) {
479                // MASK==1
480                if (offset + 4 > mInBuffer.size()) {
481                    break;
482                }
483
484                mask = U32_AT(&data[offset]);
485                offset += 4;
486            }
487
488            if (payloadLen > mInBuffer.size() || offset > mInBuffer.size() - payloadLen) {
489                break;
490            }
491
492            // We have the full message.
493
494            sp<ABuffer> packet = new ABuffer(payloadLen);
495            memcpy(packet->data(), &data[offset], payloadLen);
496
497            if (mask != 0) {
498                for (size_t i = 0; i < payloadLen; ++i) {
499                    packet->data()[i] =
500                        data[offset + i]
501                            ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff);
502                }
503            }
504
505            sp<AMessage> notify = mNotify->dup();
506            notify->setInt32("sessionID", mSessionID);
507            notify->setInt32("reason", kWhatWebSocketMessage);
508            notify->setBuffer("data", packet);
509            notify->setInt32("headerByte", data[0]);
510            notify->post();
511
512            mInBuffer.erase(0, offset + payloadLen);
513        }
514    }
515
516    if (err != OK) {
517        notifyError(false /* send */, err, "Recv failed.");
518        mSawReceiveFailure = true;
519    }
520
521    return err;
522}
523
524void ANetworkSession::Session::dumpFragmentStats(const Fragment & /* frag */) {
525#if 0
526    int64_t nowUs = ALooper::GetNowUs();
527    int64_t delayMs = (nowUs - frag.mTimeUs) / 1000ll;
528
529    static const int64_t kMinDelayMs = 0;
530    static const int64_t kMaxDelayMs = 300;
531
532    const char *kPattern = "########################################";
533    size_t kPatternSize = strlen(kPattern);
534
535    int n = (kPatternSize * (delayMs - kMinDelayMs))
536                / (kMaxDelayMs - kMinDelayMs);
537
538    if (n < 0) {
539        n = 0;
540    } else if ((size_t)n > kPatternSize) {
541        n = kPatternSize;
542    }
543
544    ALOGI("[%lld]: (%4lld ms) %s\n",
545          frag.mTimeUs / 1000,
546          delayMs,
547          kPattern + kPatternSize - n);
548#endif
549}
550
551status_t ANetworkSession::Session::writeMore() {
552    if (mState == DATAGRAM) {
553        CHECK(!mOutFragments.empty());
554
555        status_t err;
556        do {
557            const Fragment &frag = *mOutFragments.begin();
558            const sp<ABuffer> &datagram = frag.mBuffer;
559
560            int n;
561            do {
562                n = send(mSocket, datagram->data(), datagram->size(), 0);
563            } while (n < 0 && errno == EINTR);
564
565            err = OK;
566
567            if (n > 0) {
568                if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
569                    dumpFragmentStats(frag);
570                }
571
572                mOutFragments.erase(mOutFragments.begin());
573            } else if (n < 0) {
574                err = -errno;
575            } else if (n == 0) {
576                err = -ECONNRESET;
577            }
578        } while (err == OK && !mOutFragments.empty());
579
580        if (err == -EAGAIN) {
581            if (!mOutFragments.empty()) {
582                ALOGI("%zu datagrams remain queued.", mOutFragments.size());
583            }
584            err = OK;
585        }
586
587        if (err != OK) {
588            if (!mUDPRetries) {
589                notifyError(true /* send */, err, "Send datagram failed.");
590                mSawSendFailure = true;
591            } else {
592                mUDPRetries--;
593                ALOGE("Send datagram failed, %d/%d retries left",
594                        mUDPRetries, kMaxUDPRetries);
595                err = OK;
596            }
597        } else {
598            mUDPRetries = kMaxUDPRetries;
599        }
600
601        return err;
602    }
603
604    if (mState == CONNECTING) {
605        int err;
606        socklen_t optionLen = sizeof(err);
607        CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0);
608        CHECK_EQ(optionLen, (socklen_t)sizeof(err));
609
610        if (err != 0) {
611            notifyError(kWhatError, -err, "Connection failed");
612            mSawSendFailure = true;
613
614            return -err;
615        }
616
617        mState = CONNECTED;
618        notify(kWhatConnected);
619
620        return OK;
621    }
622
623    CHECK_EQ(mState, CONNECTED);
624    CHECK(!mOutFragments.empty());
625
626    ssize_t n = -1;
627    while (!mOutFragments.empty()) {
628        const Fragment &frag = *mOutFragments.begin();
629
630        do {
631            n = send(mSocket, frag.mBuffer->data(), frag.mBuffer->size(), 0);
632        } while (n < 0 && errno == EINTR);
633
634        if (n <= 0) {
635            break;
636        }
637
638        frag.mBuffer->setRange(
639                frag.mBuffer->offset() + n, frag.mBuffer->size() - n);
640
641        if (frag.mBuffer->size() > 0) {
642            break;
643        }
644
645        if (frag.mFlags & FRAGMENT_FLAG_TIME_VALID) {
646            dumpFragmentStats(frag);
647        }
648
649        mOutFragments.erase(mOutFragments.begin());
650    }
651
652    status_t err = OK;
653
654    if (n < 0) {
655        err = -errno;
656    } else if (n == 0) {
657        err = -ECONNRESET;
658    }
659
660    if (err != OK) {
661        notifyError(true /* send */, err, "Send failed.");
662        mSawSendFailure = true;
663    }
664
665#if 0
666    int numBytesQueued;
667    int res = ioctl(mSocket, SIOCOUTQ, &numBytesQueued);
668    if (res == 0 && numBytesQueued > 50 * 1024) {
669        if (numBytesQueued > 409600) {
670            ALOGW("!!! numBytesQueued = %d", numBytesQueued);
671        }
672
673        int64_t nowUs = ALooper::GetNowUs();
674
675        if (mLastStallReportUs < 0ll
676                || nowUs > mLastStallReportUs + 100000ll) {
677            sp<AMessage> msg = mNotify->dup();
678            msg->setInt32("sessionID", mSessionID);
679            msg->setInt32("reason", kWhatNetworkStall);
680            msg->setSize("numBytesQueued", numBytesQueued);
681            msg->post();
682
683            mLastStallReportUs = nowUs;
684        }
685    }
686#endif
687
688    return err;
689}
690
691status_t ANetworkSession::Session::sendRequest(
692        const void *data, ssize_t size, bool timeValid, int64_t timeUs) {
693    CHECK(mState == CONNECTED || mState == DATAGRAM);
694
695    if (size < 0) {
696        size = strlen((const char *)data);
697    }
698
699    if (size == 0) {
700        return OK;
701    }
702
703    sp<ABuffer> buffer;
704
705    if (mState == CONNECTED && mMode == MODE_DATAGRAM) {
706        CHECK_LE(size, 65535);
707
708        buffer = new ABuffer(size + 2);
709        buffer->data()[0] = size >> 8;
710        buffer->data()[1] = size & 0xff;
711        memcpy(buffer->data() + 2, data, size);
712    } else if (mState == CONNECTED && mMode == MODE_WEBSOCKET) {
713        static const bool kUseMask = false;  // Chromium doesn't like it.
714
715        size_t numHeaderBytes = 2 + (kUseMask ? 4 : 0);
716        if (size > 65535) {
717            numHeaderBytes += 8;
718        } else if (size > 125) {
719            numHeaderBytes += 2;
720        }
721
722        buffer = new ABuffer(numHeaderBytes + size);
723        buffer->data()[0] = 0x81;  // FIN==1 | opcode=1 (text)
724        buffer->data()[1] = kUseMask ? 0x80 : 0x00;
725
726        if (size > 65535) {
727            buffer->data()[1] |= 127;
728            buffer->data()[2] = 0x00;
729            buffer->data()[3] = 0x00;
730            buffer->data()[4] = 0x00;
731            buffer->data()[5] = 0x00;
732            buffer->data()[6] = (size >> 24) & 0xff;
733            buffer->data()[7] = (size >> 16) & 0xff;
734            buffer->data()[8] = (size >> 8) & 0xff;
735            buffer->data()[9] = size & 0xff;
736        } else if (size > 125) {
737            buffer->data()[1] |= 126;
738            buffer->data()[2] = (size >> 8) & 0xff;
739            buffer->data()[3] = size & 0xff;
740        } else {
741            buffer->data()[1] |= size;
742        }
743
744        if (kUseMask) {
745            uint32_t mask = rand();
746
747            buffer->data()[numHeaderBytes - 4] = (mask >> 24) & 0xff;
748            buffer->data()[numHeaderBytes - 3] = (mask >> 16) & 0xff;
749            buffer->data()[numHeaderBytes - 2] = (mask >> 8) & 0xff;
750            buffer->data()[numHeaderBytes - 1] = mask & 0xff;
751
752            for (size_t i = 0; i < (size_t)size; ++i) {
753                buffer->data()[numHeaderBytes + i] =
754                    ((const uint8_t *)data)[i]
755                        ^ ((mask >> (8 * (3 - (i % 4)))) & 0xff);
756            }
757        } else {
758            memcpy(buffer->data() + numHeaderBytes, data, size);
759        }
760    } else {
761        buffer = new ABuffer(size);
762        memcpy(buffer->data(), data, size);
763    }
764
765    Fragment frag;
766
767    frag.mFlags = 0;
768    if (timeValid) {
769        frag.mFlags = FRAGMENT_FLAG_TIME_VALID;
770        frag.mTimeUs = timeUs;
771    }
772
773    frag.mBuffer = buffer;
774
775    mOutFragments.push_back(frag);
776
777    return OK;
778}
779
780void ANetworkSession::Session::notifyError(
781        bool send, status_t err, const char *detail) {
782    sp<AMessage> msg = mNotify->dup();
783    msg->setInt32("sessionID", mSessionID);
784    msg->setInt32("reason", kWhatError);
785    msg->setInt32("send", send);
786    msg->setInt32("err", err);
787    msg->setString("detail", detail);
788    msg->post();
789}
790
791void ANetworkSession::Session::notify(NotificationReason reason) {
792    sp<AMessage> msg = mNotify->dup();
793    msg->setInt32("sessionID", mSessionID);
794    msg->setInt32("reason", reason);
795    msg->post();
796}
797
798////////////////////////////////////////////////////////////////////////////////
799
800ANetworkSession::ANetworkSession()
801    : mNextSessionID(1) {
802    mPipeFd[0] = mPipeFd[1] = -1;
803}
804
805ANetworkSession::~ANetworkSession() {
806    stop();
807}
808
809status_t ANetworkSession::start() {
810    if (mThread != NULL) {
811        return INVALID_OPERATION;
812    }
813
814    int res = pipe(mPipeFd);
815    if (res != 0) {
816        mPipeFd[0] = mPipeFd[1] = -1;
817        return -errno;
818    }
819
820    mThread = new NetworkThread(this);
821
822    status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO);
823
824    if (err != OK) {
825        mThread.clear();
826
827        close(mPipeFd[0]);
828        close(mPipeFd[1]);
829        mPipeFd[0] = mPipeFd[1] = -1;
830
831        return err;
832    }
833
834    return OK;
835}
836
837status_t ANetworkSession::stop() {
838    if (mThread == NULL) {
839        return INVALID_OPERATION;
840    }
841
842    mThread->requestExit();
843    interrupt();
844    mThread->requestExitAndWait();
845
846    mThread.clear();
847
848    close(mPipeFd[0]);
849    close(mPipeFd[1]);
850    mPipeFd[0] = mPipeFd[1] = -1;
851
852    return OK;
853}
854
855status_t ANetworkSession::createRTSPClient(
856        const char *host, unsigned port, const sp<AMessage> &notify,
857        int32_t *sessionID) {
858    return createClientOrServer(
859            kModeCreateRTSPClient,
860            NULL /* addr */,
861            0 /* port */,
862            host,
863            port,
864            notify,
865            sessionID);
866}
867
868status_t ANetworkSession::createRTSPServer(
869        const struct in_addr &addr, unsigned port,
870        const sp<AMessage> &notify, int32_t *sessionID) {
871    return createClientOrServer(
872            kModeCreateRTSPServer,
873            &addr,
874            port,
875            NULL /* remoteHost */,
876            0 /* remotePort */,
877            notify,
878            sessionID);
879}
880
881status_t ANetworkSession::createUDPSession(
882        unsigned localPort, const sp<AMessage> &notify, int32_t *sessionID) {
883    return createUDPSession(localPort, NULL, 0, notify, sessionID);
884}
885
886status_t ANetworkSession::createUDPSession(
887        unsigned localPort,
888        const char *remoteHost,
889        unsigned remotePort,
890        const sp<AMessage> &notify,
891        int32_t *sessionID) {
892    return createClientOrServer(
893            kModeCreateUDPSession,
894            NULL /* addr */,
895            localPort,
896            remoteHost,
897            remotePort,
898            notify,
899            sessionID);
900}
901
902status_t ANetworkSession::createTCPDatagramSession(
903        const struct in_addr &addr, unsigned port,
904        const sp<AMessage> &notify, int32_t *sessionID) {
905    return createClientOrServer(
906            kModeCreateTCPDatagramSessionPassive,
907            &addr,
908            port,
909            NULL /* remoteHost */,
910            0 /* remotePort */,
911            notify,
912            sessionID);
913}
914
915status_t ANetworkSession::createTCPDatagramSession(
916        unsigned localPort,
917        const char *remoteHost,
918        unsigned remotePort,
919        const sp<AMessage> &notify,
920        int32_t *sessionID) {
921    return createClientOrServer(
922            kModeCreateTCPDatagramSessionActive,
923            NULL /* addr */,
924            localPort,
925            remoteHost,
926            remotePort,
927            notify,
928            sessionID);
929}
930
931status_t ANetworkSession::destroySession(int32_t sessionID) {
932    Mutex::Autolock autoLock(mLock);
933
934    ssize_t index = mSessions.indexOfKey(sessionID);
935
936    if (index < 0) {
937        return -ENOENT;
938    }
939
940    mSessions.removeItemsAt(index);
941
942    interrupt();
943
944    return OK;
945}
946
947// static
948status_t ANetworkSession::MakeSocketNonBlocking(int s) {
949    int flags = fcntl(s, F_GETFL, 0);
950    if (flags < 0) {
951        flags = 0;
952    }
953
954    int res = fcntl(s, F_SETFL, flags | O_NONBLOCK);
955    if (res < 0) {
956        return -errno;
957    }
958
959    return OK;
960}
961
962status_t ANetworkSession::createClientOrServer(
963        Mode mode,
964        const struct in_addr *localAddr,
965        unsigned port,
966        const char *remoteHost,
967        unsigned remotePort,
968        const sp<AMessage> &notify,
969        int32_t *sessionID) {
970    Mutex::Autolock autoLock(mLock);
971
972    *sessionID = 0;
973    status_t err = OK;
974    int s, res;
975    sp<Session> session;
976
977    s = socket(
978            AF_INET,
979            (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM,
980            0);
981
982    if (s < 0) {
983        err = -errno;
984        goto bail;
985    }
986
987    if (mode == kModeCreateRTSPServer
988            || mode == kModeCreateTCPDatagramSessionPassive) {
989        const int yes = 1;
990        res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
991
992        if (res < 0) {
993            err = -errno;
994            goto bail2;
995        }
996    }
997
998    if (mode == kModeCreateUDPSession) {
999        int size = 256 * 1024;
1000
1001        res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size));
1002
1003        if (res < 0) {
1004            err = -errno;
1005            goto bail2;
1006        }
1007
1008        res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size));
1009
1010        if (res < 0) {
1011            err = -errno;
1012            goto bail2;
1013        }
1014    } else if (mode == kModeCreateTCPDatagramSessionActive) {
1015        int flag = 1;
1016        res = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
1017
1018        if (res < 0) {
1019            err = -errno;
1020            goto bail2;
1021        }
1022
1023        int tos = 224;  // VOICE
1024        res = setsockopt(s, IPPROTO_IP, IP_TOS, &tos, sizeof(tos));
1025
1026        if (res < 0) {
1027            err = -errno;
1028            goto bail2;
1029        }
1030    }
1031
1032    err = MakeSocketNonBlocking(s);
1033
1034    if (err != OK) {
1035        goto bail2;
1036    }
1037
1038    struct sockaddr_in addr;
1039    memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
1040    addr.sin_family = AF_INET;
1041
1042    if (mode == kModeCreateRTSPClient
1043            || mode == kModeCreateTCPDatagramSessionActive) {
1044        struct hostent *ent= gethostbyname(remoteHost);
1045        if (ent == NULL) {
1046            err = -h_errno;
1047            goto bail2;
1048        }
1049
1050        addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
1051        addr.sin_port = htons(remotePort);
1052    } else if (localAddr != NULL) {
1053        addr.sin_addr = *localAddr;
1054        addr.sin_port = htons(port);
1055    } else {
1056        addr.sin_addr.s_addr = htonl(INADDR_ANY);
1057        addr.sin_port = htons(port);
1058    }
1059
1060    if (mode == kModeCreateRTSPClient
1061            || mode == kModeCreateTCPDatagramSessionActive) {
1062        in_addr_t x = ntohl(addr.sin_addr.s_addr);
1063        ALOGI("connecting socket %d to %d.%d.%d.%d:%d",
1064              s,
1065              (x >> 24),
1066              (x >> 16) & 0xff,
1067              (x >> 8) & 0xff,
1068              x & 0xff,
1069              ntohs(addr.sin_port));
1070
1071        res = connect(s, (const struct sockaddr *)&addr, sizeof(addr));
1072
1073        CHECK_LT(res, 0);
1074        if (errno == EINPROGRESS) {
1075            res = 0;
1076        }
1077    } else {
1078        res = bind(s, (const struct sockaddr *)&addr, sizeof(addr));
1079
1080        if (res == 0) {
1081            if (mode == kModeCreateRTSPServer
1082                    || mode == kModeCreateTCPDatagramSessionPassive) {
1083                res = listen(s, 4);
1084            } else {
1085                CHECK_EQ(mode, kModeCreateUDPSession);
1086
1087                if (remoteHost != NULL) {
1088                    struct sockaddr_in remoteAddr;
1089                    memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
1090                    remoteAddr.sin_family = AF_INET;
1091                    remoteAddr.sin_port = htons(remotePort);
1092
1093                    struct hostent *ent= gethostbyname(remoteHost);
1094                    if (ent == NULL) {
1095                        err = -h_errno;
1096                        goto bail2;
1097                    }
1098
1099                    remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
1100
1101                    res = connect(
1102                            s,
1103                            (const struct sockaddr *)&remoteAddr,
1104                            sizeof(remoteAddr));
1105                }
1106            }
1107        }
1108    }
1109
1110    if (res < 0) {
1111        err = -errno;
1112        goto bail2;
1113    }
1114
1115    Session::State state;
1116    switch (mode) {
1117        case kModeCreateRTSPClient:
1118            state = Session::CONNECTING;
1119            break;
1120
1121        case kModeCreateTCPDatagramSessionActive:
1122            state = Session::CONNECTING;
1123            break;
1124
1125        case kModeCreateTCPDatagramSessionPassive:
1126            state = Session::LISTENING_TCP_DGRAMS;
1127            break;
1128
1129        case kModeCreateRTSPServer:
1130            state = Session::LISTENING_RTSP;
1131            break;
1132
1133        default:
1134            CHECK_EQ(mode, kModeCreateUDPSession);
1135            state = Session::DATAGRAM;
1136            break;
1137    }
1138
1139    session = new Session(
1140            mNextSessionID++,
1141            state,
1142            s,
1143            notify);
1144
1145    if (mode == kModeCreateTCPDatagramSessionActive) {
1146        session->setMode(Session::MODE_DATAGRAM);
1147    } else if (mode == kModeCreateRTSPClient) {
1148        session->setMode(Session::MODE_RTSP);
1149    }
1150
1151    mSessions.add(session->sessionID(), session);
1152
1153    interrupt();
1154
1155    *sessionID = session->sessionID();
1156
1157    goto bail;
1158
1159bail2:
1160    close(s);
1161    s = -1;
1162
1163bail:
1164    return err;
1165}
1166
1167status_t ANetworkSession::connectUDPSession(
1168        int32_t sessionID, const char *remoteHost, unsigned remotePort) {
1169    Mutex::Autolock autoLock(mLock);
1170
1171    ssize_t index = mSessions.indexOfKey(sessionID);
1172
1173    if (index < 0) {
1174        return -ENOENT;
1175    }
1176
1177    const sp<Session> session = mSessions.valueAt(index);
1178    int s = session->socket();
1179
1180    struct sockaddr_in remoteAddr;
1181    memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
1182    remoteAddr.sin_family = AF_INET;
1183    remoteAddr.sin_port = htons(remotePort);
1184
1185    status_t err = OK;
1186    struct hostent *ent = gethostbyname(remoteHost);
1187    if (ent == NULL) {
1188        err = -h_errno;
1189    } else {
1190        remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
1191
1192        int res = connect(
1193                s,
1194                (const struct sockaddr *)&remoteAddr,
1195                sizeof(remoteAddr));
1196
1197        if (res < 0) {
1198            err = -errno;
1199        }
1200    }
1201
1202    return err;
1203}
1204
1205status_t ANetworkSession::sendRequest(
1206        int32_t sessionID, const void *data, ssize_t size,
1207        bool timeValid, int64_t timeUs) {
1208    Mutex::Autolock autoLock(mLock);
1209
1210    ssize_t index = mSessions.indexOfKey(sessionID);
1211
1212    if (index < 0) {
1213        return -ENOENT;
1214    }
1215
1216    const sp<Session> session = mSessions.valueAt(index);
1217
1218    status_t err = session->sendRequest(data, size, timeValid, timeUs);
1219
1220    interrupt();
1221
1222    return err;
1223}
1224
1225status_t ANetworkSession::switchToWebSocketMode(int32_t sessionID) {
1226    Mutex::Autolock autoLock(mLock);
1227
1228    ssize_t index = mSessions.indexOfKey(sessionID);
1229
1230    if (index < 0) {
1231        return -ENOENT;
1232    }
1233
1234    const sp<Session> session = mSessions.valueAt(index);
1235    return session->switchToWebSocketMode();
1236}
1237
1238void ANetworkSession::interrupt() {
1239    static const char dummy = 0;
1240
1241    ssize_t n;
1242    do {
1243        n = write(mPipeFd[1], &dummy, 1);
1244    } while (n < 0 && errno == EINTR);
1245
1246    if (n < 0) {
1247        ALOGW("Error writing to pipe (%s)", strerror(errno));
1248    }
1249}
1250
1251void ANetworkSession::threadLoop() {
1252    fd_set rs, ws;
1253    FD_ZERO(&rs);
1254    FD_ZERO(&ws);
1255
1256    FD_SET(mPipeFd[0], &rs);
1257    int maxFd = mPipeFd[0];
1258
1259    {
1260        Mutex::Autolock autoLock(mLock);
1261
1262        for (size_t i = 0; i < mSessions.size(); ++i) {
1263            const sp<Session> &session = mSessions.valueAt(i);
1264
1265            int s = session->socket();
1266
1267            if (s < 0) {
1268                continue;
1269            }
1270
1271            if (session->wantsToRead()) {
1272                FD_SET(s, &rs);
1273                if (s > maxFd) {
1274                    maxFd = s;
1275                }
1276            }
1277
1278            if (session->wantsToWrite()) {
1279                FD_SET(s, &ws);
1280                if (s > maxFd) {
1281                    maxFd = s;
1282                }
1283            }
1284        }
1285    }
1286
1287    int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */);
1288
1289    if (res == 0) {
1290        return;
1291    }
1292
1293    if (res < 0) {
1294        if (errno == EINTR) {
1295            return;
1296        }
1297
1298        ALOGE("select failed w/ error %d (%s)", errno, strerror(errno));
1299        return;
1300    }
1301
1302    if (FD_ISSET(mPipeFd[0], &rs)) {
1303        char c;
1304        ssize_t n;
1305        do {
1306            n = read(mPipeFd[0], &c, 1);
1307        } while (n < 0 && errno == EINTR);
1308
1309        if (n < 0) {
1310            ALOGW("Error reading from pipe (%s)", strerror(errno));
1311        }
1312
1313        --res;
1314    }
1315
1316    {
1317        Mutex::Autolock autoLock(mLock);
1318
1319        List<sp<Session> > sessionsToAdd;
1320
1321        for (size_t i = mSessions.size(); res > 0 && i > 0;) {
1322            i--;
1323            const sp<Session> &session = mSessions.valueAt(i);
1324
1325            int s = session->socket();
1326
1327            if (s < 0) {
1328                continue;
1329            }
1330
1331            if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) {
1332                --res;
1333            }
1334
1335            if (FD_ISSET(s, &rs)) {
1336                if (session->isRTSPServer() || session->isTCPDatagramServer()) {
1337                    struct sockaddr_in remoteAddr;
1338                    socklen_t remoteAddrLen = sizeof(remoteAddr);
1339
1340                    int clientSocket = accept(
1341                            s, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
1342
1343                    if (clientSocket >= 0) {
1344                        status_t err = MakeSocketNonBlocking(clientSocket);
1345
1346                        if (err != OK) {
1347                            ALOGE("Unable to make client socket non blocking, "
1348                                  "failed w/ error %d (%s)",
1349                                  err, strerror(-err));
1350
1351                            close(clientSocket);
1352                            clientSocket = -1;
1353                        } else {
1354                            in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr);
1355
1356                            ALOGI("incoming connection from %d.%d.%d.%d:%d "
1357                                  "(socket %d)",
1358                                  (addr >> 24),
1359                                  (addr >> 16) & 0xff,
1360                                  (addr >> 8) & 0xff,
1361                                  addr & 0xff,
1362                                  ntohs(remoteAddr.sin_port),
1363                                  clientSocket);
1364
1365                            sp<Session> clientSession =
1366                                new Session(
1367                                        mNextSessionID++,
1368                                        Session::CONNECTED,
1369                                        clientSocket,
1370                                        session->getNotificationMessage());
1371
1372                            clientSession->setMode(
1373                                    session->isRTSPServer()
1374                                        ? Session::MODE_RTSP
1375                                        : Session::MODE_DATAGRAM);
1376
1377                            sessionsToAdd.push_back(clientSession);
1378                        }
1379                    } else {
1380                        ALOGE("accept returned error %d (%s)",
1381                              errno, strerror(errno));
1382                    }
1383                } else {
1384                    status_t err = session->readMore();
1385                    if (err != OK) {
1386                        ALOGE("readMore on socket %d failed w/ error %d (%s)",
1387                              s, err, strerror(-err));
1388                    }
1389                }
1390            }
1391
1392            if (FD_ISSET(s, &ws)) {
1393                status_t err = session->writeMore();
1394                if (err != OK) {
1395                    ALOGE("writeMore on socket %d failed w/ error %d (%s)",
1396                          s, err, strerror(-err));
1397                }
1398            }
1399        }
1400
1401        while (!sessionsToAdd.empty()) {
1402            sp<Session> session = *sessionsToAdd.begin();
1403            sessionsToAdd.erase(sessionsToAdd.begin());
1404
1405            mSessions.add(session->sessionID(), session);
1406
1407            ALOGI("added clientSession %d", session->sessionID());
1408        }
1409    }
1410}
1411
1412}  // namespace android
1413