1/*
2 * Copyright 2012, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "NetworkSession"
19#include <utils/Log.h>
20
21#include "ANetworkSession.h"
22#include "ParsedMessage.h"
23
24#include <arpa/inet.h>
25#include <fcntl.h>
26#include <net/if.h>
27#include <netdb.h>
28#include <netinet/in.h>
29#include <sys/socket.h>
30
31#include <media/stagefright/foundation/ABuffer.h>
32#include <media/stagefright/foundation/ADebug.h>
33#include <media/stagefright/foundation/AMessage.h>
34#include <media/stagefright/foundation/hexdump.h>
35#include <media/stagefright/Utils.h>
36
37namespace android {
38
39static const size_t kMaxUDPSize = 1500;
40
41struct ANetworkSession::NetworkThread : public Thread {
42    NetworkThread(ANetworkSession *session);
43
44protected:
45    virtual ~NetworkThread();
46
47private:
48    ANetworkSession *mSession;
49
50    virtual bool threadLoop();
51
52    DISALLOW_EVIL_CONSTRUCTORS(NetworkThread);
53};
54
55struct ANetworkSession::Session : public RefBase {
56    enum State {
57        CONNECTING,
58        CONNECTED,
59        LISTENING_RTSP,
60        LISTENING_TCP_DGRAMS,
61        DATAGRAM,
62    };
63
64    Session(int32_t sessionID,
65            State state,
66            int s,
67            const sp<AMessage> &notify);
68
69    int32_t sessionID() const;
70    int socket() const;
71    sp<AMessage> getNotificationMessage() const;
72
73    bool isRTSPServer() const;
74    bool isTCPDatagramServer() const;
75
76    bool wantsToRead();
77    bool wantsToWrite();
78
79    status_t readMore();
80    status_t writeMore();
81
82    status_t sendRequest(const void *data, ssize_t size);
83
84    void setIsRTSPConnection(bool yesno);
85
86protected:
87    virtual ~Session();
88
89private:
90    int32_t mSessionID;
91    State mState;
92    bool mIsRTSPConnection;
93    int mSocket;
94    sp<AMessage> mNotify;
95    bool mSawReceiveFailure, mSawSendFailure;
96
97    // for TCP / stream data
98    AString mOutBuffer;
99
100    // for UDP / datagrams
101    List<sp<ABuffer> > mOutDatagrams;
102
103    AString mInBuffer;
104
105    void notifyError(bool send, status_t err, const char *detail);
106    void notify(NotificationReason reason);
107
108    DISALLOW_EVIL_CONSTRUCTORS(Session);
109};
110////////////////////////////////////////////////////////////////////////////////
111
112ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session)
113    : mSession(session) {
114}
115
116ANetworkSession::NetworkThread::~NetworkThread() {
117}
118
119bool ANetworkSession::NetworkThread::threadLoop() {
120    mSession->threadLoop();
121
122    return true;
123}
124
125////////////////////////////////////////////////////////////////////////////////
126
127ANetworkSession::Session::Session(
128        int32_t sessionID,
129        State state,
130        int s,
131        const sp<AMessage> &notify)
132    : mSessionID(sessionID),
133      mState(state),
134      mIsRTSPConnection(false),
135      mSocket(s),
136      mNotify(notify),
137      mSawReceiveFailure(false),
138      mSawSendFailure(false) {
139    if (mState == CONNECTED) {
140        struct sockaddr_in localAddr;
141        socklen_t localAddrLen = sizeof(localAddr);
142
143        int res = getsockname(
144                mSocket, (struct sockaddr *)&localAddr, &localAddrLen);
145        CHECK_GE(res, 0);
146
147        struct sockaddr_in remoteAddr;
148        socklen_t remoteAddrLen = sizeof(remoteAddr);
149
150        res = getpeername(
151                mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
152        CHECK_GE(res, 0);
153
154        in_addr_t addr = ntohl(localAddr.sin_addr.s_addr);
155        AString localAddrString = StringPrintf(
156                "%d.%d.%d.%d",
157                (addr >> 24),
158                (addr >> 16) & 0xff,
159                (addr >> 8) & 0xff,
160                addr & 0xff);
161
162        addr = ntohl(remoteAddr.sin_addr.s_addr);
163        AString remoteAddrString = StringPrintf(
164                "%d.%d.%d.%d",
165                (addr >> 24),
166                (addr >> 16) & 0xff,
167                (addr >> 8) & 0xff,
168                addr & 0xff);
169
170        sp<AMessage> msg = mNotify->dup();
171        msg->setInt32("sessionID", mSessionID);
172        msg->setInt32("reason", kWhatClientConnected);
173        msg->setString("server-ip", localAddrString.c_str());
174        msg->setInt32("server-port", ntohs(localAddr.sin_port));
175        msg->setString("client-ip", remoteAddrString.c_str());
176        msg->setInt32("client-port", ntohs(remoteAddr.sin_port));
177        msg->post();
178    }
179}
180
181ANetworkSession::Session::~Session() {
182    ALOGV("Session %d gone", mSessionID);
183
184    close(mSocket);
185    mSocket = -1;
186}
187
188int32_t ANetworkSession::Session::sessionID() const {
189    return mSessionID;
190}
191
192int ANetworkSession::Session::socket() const {
193    return mSocket;
194}
195
196void ANetworkSession::Session::setIsRTSPConnection(bool yesno) {
197    mIsRTSPConnection = yesno;
198}
199
200sp<AMessage> ANetworkSession::Session::getNotificationMessage() const {
201    return mNotify;
202}
203
204bool ANetworkSession::Session::isRTSPServer() const {
205    return mState == LISTENING_RTSP;
206}
207
208bool ANetworkSession::Session::isTCPDatagramServer() const {
209    return mState == LISTENING_TCP_DGRAMS;
210}
211
212bool ANetworkSession::Session::wantsToRead() {
213    return !mSawReceiveFailure && mState != CONNECTING;
214}
215
216bool ANetworkSession::Session::wantsToWrite() {
217    return !mSawSendFailure
218        && (mState == CONNECTING
219            || (mState == CONNECTED && !mOutBuffer.empty())
220            || (mState == DATAGRAM && !mOutDatagrams.empty()));
221}
222
223status_t ANetworkSession::Session::readMore() {
224    if (mState == DATAGRAM) {
225        status_t err;
226        do {
227            sp<ABuffer> buf = new ABuffer(kMaxUDPSize);
228
229            struct sockaddr_in remoteAddr;
230            socklen_t remoteAddrLen = sizeof(remoteAddr);
231
232            ssize_t n;
233            do {
234                n = recvfrom(
235                        mSocket, buf->data(), buf->capacity(), 0,
236                        (struct sockaddr *)&remoteAddr, &remoteAddrLen);
237            } while (n < 0 && errno == EINTR);
238
239            err = OK;
240            if (n < 0) {
241                err = -errno;
242            } else if (n == 0) {
243                err = -ECONNRESET;
244            } else {
245                buf->setRange(0, n);
246
247                int64_t nowUs = ALooper::GetNowUs();
248                buf->meta()->setInt64("arrivalTimeUs", nowUs);
249
250                sp<AMessage> notify = mNotify->dup();
251                notify->setInt32("sessionID", mSessionID);
252                notify->setInt32("reason", kWhatDatagram);
253
254                uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr);
255                notify->setString(
256                        "fromAddr",
257                        StringPrintf(
258                            "%u.%u.%u.%u",
259                            ip >> 24,
260                            (ip >> 16) & 0xff,
261                            (ip >> 8) & 0xff,
262                            ip & 0xff).c_str());
263
264                notify->setInt32("fromPort", ntohs(remoteAddr.sin_port));
265
266                notify->setBuffer("data", buf);
267                notify->post();
268            }
269        } while (err == OK);
270
271        if (err == -EAGAIN) {
272            err = OK;
273        }
274
275        if (err != OK) {
276            notifyError(false /* send */, err, "Recvfrom failed.");
277            mSawReceiveFailure = true;
278        }
279
280        return err;
281    }
282
283    char tmp[512];
284    ssize_t n;
285    do {
286        n = recv(mSocket, tmp, sizeof(tmp), 0);
287    } while (n < 0 && errno == EINTR);
288
289    status_t err = OK;
290
291    if (n > 0) {
292        mInBuffer.append(tmp, n);
293
294#if 0
295        ALOGI("in:");
296        hexdump(tmp, n);
297#endif
298    } else if (n < 0) {
299        err = -errno;
300    } else {
301        err = -ECONNRESET;
302    }
303
304    if (!mIsRTSPConnection) {
305        // TCP stream carrying 16-bit length-prefixed datagrams.
306
307        while (mInBuffer.size() >= 2) {
308            size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str());
309
310            if (mInBuffer.size() < packetSize + 2) {
311                break;
312            }
313
314            sp<ABuffer> packet = new ABuffer(packetSize);
315            memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize);
316
317            sp<AMessage> notify = mNotify->dup();
318            notify->setInt32("sessionID", mSessionID);
319            notify->setInt32("reason", kWhatDatagram);
320            notify->setBuffer("data", packet);
321            notify->post();
322
323            mInBuffer.erase(0, packetSize + 2);
324        }
325    } else {
326        for (;;) {
327            size_t length;
328
329            if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') {
330                if (mInBuffer.size() < 4) {
331                    break;
332                }
333
334                length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2);
335
336                if (mInBuffer.size() < 4 + length) {
337                    break;
338                }
339
340                sp<AMessage> notify = mNotify->dup();
341                notify->setInt32("sessionID", mSessionID);
342                notify->setInt32("reason", kWhatBinaryData);
343                notify->setInt32("channel", mInBuffer.c_str()[1]);
344
345                sp<ABuffer> data = new ABuffer(length);
346                memcpy(data->data(), mInBuffer.c_str() + 4, length);
347
348                int64_t nowUs = ALooper::GetNowUs();
349                data->meta()->setInt64("arrivalTimeUs", nowUs);
350
351                notify->setBuffer("data", data);
352                notify->post();
353
354                mInBuffer.erase(0, 4 + length);
355                continue;
356            }
357
358            sp<ParsedMessage> msg =
359                ParsedMessage::Parse(
360                        mInBuffer.c_str(), mInBuffer.size(), err != OK, &length);
361
362            if (msg == NULL) {
363                break;
364            }
365
366            sp<AMessage> notify = mNotify->dup();
367            notify->setInt32("sessionID", mSessionID);
368            notify->setInt32("reason", kWhatData);
369            notify->setObject("data", msg);
370            notify->post();
371
372#if 1
373            // XXX The (old) dongle sends the wrong content length header on a
374            // SET_PARAMETER request that signals a "wfd_idr_request".
375            // (17 instead of 19).
376            const char *content = msg->getContent();
377            if (content
378                    && !memcmp(content, "wfd_idr_request\r\n", 17)
379                    && length >= 19
380                    && mInBuffer.c_str()[length] == '\r'
381                    && mInBuffer.c_str()[length + 1] == '\n') {
382                length += 2;
383            }
384#endif
385
386            mInBuffer.erase(0, length);
387
388            if (err != OK) {
389                break;
390            }
391        }
392    }
393
394    if (err != OK) {
395        notifyError(false /* send */, err, "Recv failed.");
396        mSawReceiveFailure = true;
397    }
398
399    return err;
400}
401
402status_t ANetworkSession::Session::writeMore() {
403    if (mState == DATAGRAM) {
404        CHECK(!mOutDatagrams.empty());
405
406        status_t err;
407        do {
408            const sp<ABuffer> &datagram = *mOutDatagrams.begin();
409
410            uint8_t *data = datagram->data();
411            if (data[0] == 0x80 && (data[1] & 0x7f) == 33) {
412                int64_t nowUs = ALooper::GetNowUs();
413
414                uint32_t prevRtpTime = U32_AT(&data[4]);
415
416                // 90kHz time scale
417                uint32_t rtpTime = (nowUs * 9ll) / 100ll;
418                int32_t diffTime = (int32_t)rtpTime - (int32_t)prevRtpTime;
419
420                ALOGV("correcting rtpTime by %.0f ms", diffTime / 90.0);
421
422                data[4] = rtpTime >> 24;
423                data[5] = (rtpTime >> 16) & 0xff;
424                data[6] = (rtpTime >> 8) & 0xff;
425                data[7] = rtpTime & 0xff;
426            }
427
428            int n;
429            do {
430                n = send(mSocket, datagram->data(), datagram->size(), 0);
431            } while (n < 0 && errno == EINTR);
432
433            err = OK;
434
435            if (n > 0) {
436                mOutDatagrams.erase(mOutDatagrams.begin());
437            } else if (n < 0) {
438                err = -errno;
439            } else if (n == 0) {
440                err = -ECONNRESET;
441            }
442        } while (err == OK && !mOutDatagrams.empty());
443
444        if (err == -EAGAIN) {
445            if (!mOutDatagrams.empty()) {
446                ALOGI("%d datagrams remain queued.", mOutDatagrams.size());
447            }
448            err = OK;
449        }
450
451        if (err != OK) {
452            notifyError(true /* send */, err, "Send datagram failed.");
453            mSawSendFailure = true;
454        }
455
456        return err;
457    }
458
459    if (mState == CONNECTING) {
460        int err;
461        socklen_t optionLen = sizeof(err);
462        CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0);
463        CHECK_EQ(optionLen, (socklen_t)sizeof(err));
464
465        if (err != 0) {
466            notifyError(kWhatError, -err, "Connection failed");
467            mSawSendFailure = true;
468
469            return -err;
470        }
471
472        mState = CONNECTED;
473        notify(kWhatConnected);
474
475        return OK;
476    }
477
478    CHECK_EQ(mState, CONNECTED);
479    CHECK(!mOutBuffer.empty());
480
481    ssize_t n;
482    do {
483        n = send(mSocket, mOutBuffer.c_str(), mOutBuffer.size(), 0);
484    } while (n < 0 && errno == EINTR);
485
486    status_t err = OK;
487
488    if (n > 0) {
489#if 0
490        ALOGI("out:");
491        hexdump(mOutBuffer.c_str(), n);
492#endif
493
494        mOutBuffer.erase(0, n);
495    } else if (n < 0) {
496        err = -errno;
497    } else if (n == 0) {
498        err = -ECONNRESET;
499    }
500
501    if (err != OK) {
502        notifyError(true /* send */, err, "Send failed.");
503        mSawSendFailure = true;
504    }
505
506    return err;
507}
508
509status_t ANetworkSession::Session::sendRequest(const void *data, ssize_t size) {
510    CHECK(mState == CONNECTED || mState == DATAGRAM);
511
512    if (mState == DATAGRAM) {
513        CHECK_GE(size, 0);
514
515        sp<ABuffer> datagram = new ABuffer(size);
516        memcpy(datagram->data(), data, size);
517
518        mOutDatagrams.push_back(datagram);
519        return OK;
520    }
521
522    if (mState == CONNECTED && !mIsRTSPConnection) {
523        CHECK_LE(size, 65535);
524
525        uint8_t prefix[2];
526        prefix[0] = size >> 8;
527        prefix[1] = size & 0xff;
528
529        mOutBuffer.append((const char *)prefix, sizeof(prefix));
530    }
531
532    mOutBuffer.append(
533            (const char *)data,
534            (size >= 0) ? size : strlen((const char *)data));
535
536    return OK;
537}
538
539void ANetworkSession::Session::notifyError(
540        bool send, status_t err, const char *detail) {
541    sp<AMessage> msg = mNotify->dup();
542    msg->setInt32("sessionID", mSessionID);
543    msg->setInt32("reason", kWhatError);
544    msg->setInt32("send", send);
545    msg->setInt32("err", err);
546    msg->setString("detail", detail);
547    msg->post();
548}
549
550void ANetworkSession::Session::notify(NotificationReason reason) {
551    sp<AMessage> msg = mNotify->dup();
552    msg->setInt32("sessionID", mSessionID);
553    msg->setInt32("reason", reason);
554    msg->post();
555}
556
557////////////////////////////////////////////////////////////////////////////////
558
559ANetworkSession::ANetworkSession()
560    : mNextSessionID(1) {
561    mPipeFd[0] = mPipeFd[1] = -1;
562}
563
564ANetworkSession::~ANetworkSession() {
565    stop();
566}
567
568status_t ANetworkSession::start() {
569    if (mThread != NULL) {
570        return INVALID_OPERATION;
571    }
572
573    int res = pipe(mPipeFd);
574    if (res != 0) {
575        mPipeFd[0] = mPipeFd[1] = -1;
576        return -errno;
577    }
578
579    mThread = new NetworkThread(this);
580
581    status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO);
582
583    if (err != OK) {
584        mThread.clear();
585
586        close(mPipeFd[0]);
587        close(mPipeFd[1]);
588        mPipeFd[0] = mPipeFd[1] = -1;
589
590        return err;
591    }
592
593    return OK;
594}
595
596status_t ANetworkSession::stop() {
597    if (mThread == NULL) {
598        return INVALID_OPERATION;
599    }
600
601    mThread->requestExit();
602    interrupt();
603    mThread->requestExitAndWait();
604
605    mThread.clear();
606
607    close(mPipeFd[0]);
608    close(mPipeFd[1]);
609    mPipeFd[0] = mPipeFd[1] = -1;
610
611    return OK;
612}
613
614status_t ANetworkSession::createRTSPClient(
615        const char *host, unsigned port, const sp<AMessage> &notify,
616        int32_t *sessionID) {
617    return createClientOrServer(
618            kModeCreateRTSPClient,
619            NULL /* addr */,
620            0 /* port */,
621            host,
622            port,
623            notify,
624            sessionID);
625}
626
627status_t ANetworkSession::createRTSPServer(
628        const struct in_addr &addr, unsigned port,
629        const sp<AMessage> &notify, int32_t *sessionID) {
630    return createClientOrServer(
631            kModeCreateRTSPServer,
632            &addr,
633            port,
634            NULL /* remoteHost */,
635            0 /* remotePort */,
636            notify,
637            sessionID);
638}
639
640status_t ANetworkSession::createUDPSession(
641        unsigned localPort, const sp<AMessage> &notify, int32_t *sessionID) {
642    return createUDPSession(localPort, NULL, 0, notify, sessionID);
643}
644
645status_t ANetworkSession::createUDPSession(
646        unsigned localPort,
647        const char *remoteHost,
648        unsigned remotePort,
649        const sp<AMessage> &notify,
650        int32_t *sessionID) {
651    return createClientOrServer(
652            kModeCreateUDPSession,
653            NULL /* addr */,
654            localPort,
655            remoteHost,
656            remotePort,
657            notify,
658            sessionID);
659}
660
661status_t ANetworkSession::createTCPDatagramSession(
662        const struct in_addr &addr, unsigned port,
663        const sp<AMessage> &notify, int32_t *sessionID) {
664    return createClientOrServer(
665            kModeCreateTCPDatagramSessionPassive,
666            &addr,
667            port,
668            NULL /* remoteHost */,
669            0 /* remotePort */,
670            notify,
671            sessionID);
672}
673
674status_t ANetworkSession::createTCPDatagramSession(
675        unsigned localPort,
676        const char *remoteHost,
677        unsigned remotePort,
678        const sp<AMessage> &notify,
679        int32_t *sessionID) {
680    return createClientOrServer(
681            kModeCreateTCPDatagramSessionActive,
682            NULL /* addr */,
683            localPort,
684            remoteHost,
685            remotePort,
686            notify,
687            sessionID);
688}
689
690status_t ANetworkSession::destroySession(int32_t sessionID) {
691    Mutex::Autolock autoLock(mLock);
692
693    ssize_t index = mSessions.indexOfKey(sessionID);
694
695    if (index < 0) {
696        return -ENOENT;
697    }
698
699    mSessions.removeItemsAt(index);
700
701    interrupt();
702
703    return OK;
704}
705
706// static
707status_t ANetworkSession::MakeSocketNonBlocking(int s) {
708    int flags = fcntl(s, F_GETFL, 0);
709    if (flags < 0) {
710        flags = 0;
711    }
712
713    int res = fcntl(s, F_SETFL, flags | O_NONBLOCK);
714    if (res < 0) {
715        return -errno;
716    }
717
718    return OK;
719}
720
721status_t ANetworkSession::createClientOrServer(
722        Mode mode,
723        const struct in_addr *localAddr,
724        unsigned port,
725        const char *remoteHost,
726        unsigned remotePort,
727        const sp<AMessage> &notify,
728        int32_t *sessionID) {
729    Mutex::Autolock autoLock(mLock);
730
731    *sessionID = 0;
732    status_t err = OK;
733    int s, res;
734    sp<Session> session;
735
736    s = socket(
737            AF_INET,
738            (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM,
739            0);
740
741    if (s < 0) {
742        err = -errno;
743        goto bail;
744    }
745
746    if (mode == kModeCreateRTSPServer
747            || mode == kModeCreateTCPDatagramSessionPassive) {
748        const int yes = 1;
749        res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
750
751        if (res < 0) {
752            err = -errno;
753            goto bail2;
754        }
755    }
756
757    if (mode == kModeCreateUDPSession) {
758        int size = 256 * 1024;
759
760        res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size));
761
762        if (res < 0) {
763            err = -errno;
764            goto bail2;
765        }
766
767        res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size));
768
769        if (res < 0) {
770            err = -errno;
771            goto bail2;
772        }
773    }
774
775    err = MakeSocketNonBlocking(s);
776
777    if (err != OK) {
778        goto bail2;
779    }
780
781    struct sockaddr_in addr;
782    memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
783    addr.sin_family = AF_INET;
784
785    if (mode == kModeCreateRTSPClient
786            || mode == kModeCreateTCPDatagramSessionActive) {
787        struct hostent *ent= gethostbyname(remoteHost);
788        if (ent == NULL) {
789            err = -h_errno;
790            goto bail2;
791        }
792
793        addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
794        addr.sin_port = htons(remotePort);
795    } else if (localAddr != NULL) {
796        addr.sin_addr = *localAddr;
797        addr.sin_port = htons(port);
798    } else {
799        addr.sin_addr.s_addr = htonl(INADDR_ANY);
800        addr.sin_port = htons(port);
801    }
802
803    if (mode == kModeCreateRTSPClient
804            || mode == kModeCreateTCPDatagramSessionActive) {
805        in_addr_t x = ntohl(addr.sin_addr.s_addr);
806        ALOGI("connecting socket %d to %d.%d.%d.%d:%d",
807              s,
808              (x >> 24),
809              (x >> 16) & 0xff,
810              (x >> 8) & 0xff,
811              x & 0xff,
812              ntohs(addr.sin_port));
813
814        res = connect(s, (const struct sockaddr *)&addr, sizeof(addr));
815
816        CHECK_LT(res, 0);
817        if (errno == EINPROGRESS) {
818            res = 0;
819        }
820    } else {
821        res = bind(s, (const struct sockaddr *)&addr, sizeof(addr));
822
823        if (res == 0) {
824            if (mode == kModeCreateRTSPServer
825                    || mode == kModeCreateTCPDatagramSessionPassive) {
826                res = listen(s, 4);
827            } else {
828                CHECK_EQ(mode, kModeCreateUDPSession);
829
830                if (remoteHost != NULL) {
831                    struct sockaddr_in remoteAddr;
832                    memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
833                    remoteAddr.sin_family = AF_INET;
834                    remoteAddr.sin_port = htons(remotePort);
835
836                    struct hostent *ent= gethostbyname(remoteHost);
837                    if (ent == NULL) {
838                        err = -h_errno;
839                        goto bail2;
840                    }
841
842                    remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
843
844                    res = connect(
845                            s,
846                            (const struct sockaddr *)&remoteAddr,
847                            sizeof(remoteAddr));
848                }
849            }
850        }
851    }
852
853    if (res < 0) {
854        err = -errno;
855        goto bail2;
856    }
857
858    Session::State state;
859    switch (mode) {
860        case kModeCreateRTSPClient:
861            state = Session::CONNECTING;
862            break;
863
864        case kModeCreateTCPDatagramSessionActive:
865            state = Session::CONNECTING;
866            break;
867
868        case kModeCreateTCPDatagramSessionPassive:
869            state = Session::LISTENING_TCP_DGRAMS;
870            break;
871
872        case kModeCreateRTSPServer:
873            state = Session::LISTENING_RTSP;
874            break;
875
876        default:
877            CHECK_EQ(mode, kModeCreateUDPSession);
878            state = Session::DATAGRAM;
879            break;
880    }
881
882    session = new Session(
883            mNextSessionID++,
884            state,
885            s,
886            notify);
887
888    if (mode == kModeCreateTCPDatagramSessionActive) {
889        session->setIsRTSPConnection(false);
890    } else if (mode == kModeCreateRTSPClient) {
891        session->setIsRTSPConnection(true);
892    }
893
894    mSessions.add(session->sessionID(), session);
895
896    interrupt();
897
898    *sessionID = session->sessionID();
899
900    goto bail;
901
902bail2:
903    close(s);
904    s = -1;
905
906bail:
907    return err;
908}
909
910status_t ANetworkSession::connectUDPSession(
911        int32_t sessionID, const char *remoteHost, unsigned remotePort) {
912    Mutex::Autolock autoLock(mLock);
913
914    ssize_t index = mSessions.indexOfKey(sessionID);
915
916    if (index < 0) {
917        return -ENOENT;
918    }
919
920    const sp<Session> session = mSessions.valueAt(index);
921    int s = session->socket();
922
923    struct sockaddr_in remoteAddr;
924    memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
925    remoteAddr.sin_family = AF_INET;
926    remoteAddr.sin_port = htons(remotePort);
927
928    status_t err = OK;
929    struct hostent *ent = gethostbyname(remoteHost);
930    if (ent == NULL) {
931        err = -h_errno;
932    } else {
933        remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
934
935        int res = connect(
936                s,
937                (const struct sockaddr *)&remoteAddr,
938                sizeof(remoteAddr));
939
940        if (res < 0) {
941            err = -errno;
942        }
943    }
944
945    return err;
946}
947
948status_t ANetworkSession::sendRequest(
949        int32_t sessionID, const void *data, ssize_t size) {
950    Mutex::Autolock autoLock(mLock);
951
952    ssize_t index = mSessions.indexOfKey(sessionID);
953
954    if (index < 0) {
955        return -ENOENT;
956    }
957
958    const sp<Session> session = mSessions.valueAt(index);
959
960    status_t err = session->sendRequest(data, size);
961
962    interrupt();
963
964    return err;
965}
966
967void ANetworkSession::interrupt() {
968    static const char dummy = 0;
969
970    ssize_t n;
971    do {
972        n = write(mPipeFd[1], &dummy, 1);
973    } while (n < 0 && errno == EINTR);
974
975    if (n < 0) {
976        ALOGW("Error writing to pipe (%s)", strerror(errno));
977    }
978}
979
980void ANetworkSession::threadLoop() {
981    fd_set rs, ws;
982    FD_ZERO(&rs);
983    FD_ZERO(&ws);
984
985    FD_SET(mPipeFd[0], &rs);
986    int maxFd = mPipeFd[0];
987
988    {
989        Mutex::Autolock autoLock(mLock);
990
991        for (size_t i = 0; i < mSessions.size(); ++i) {
992            const sp<Session> &session = mSessions.valueAt(i);
993
994            int s = session->socket();
995
996            if (s < 0) {
997                continue;
998            }
999
1000            if (session->wantsToRead()) {
1001                FD_SET(s, &rs);
1002                if (s > maxFd) {
1003                    maxFd = s;
1004                }
1005            }
1006
1007            if (session->wantsToWrite()) {
1008                FD_SET(s, &ws);
1009                if (s > maxFd) {
1010                    maxFd = s;
1011                }
1012            }
1013        }
1014    }
1015
1016    int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */);
1017
1018    if (res == 0) {
1019        return;
1020    }
1021
1022    if (res < 0) {
1023        if (errno == EINTR) {
1024            return;
1025        }
1026
1027        ALOGE("select failed w/ error %d (%s)", errno, strerror(errno));
1028        return;
1029    }
1030
1031    if (FD_ISSET(mPipeFd[0], &rs)) {
1032        char c;
1033        ssize_t n;
1034        do {
1035            n = read(mPipeFd[0], &c, 1);
1036        } while (n < 0 && errno == EINTR);
1037
1038        if (n < 0) {
1039            ALOGW("Error reading from pipe (%s)", strerror(errno));
1040        }
1041
1042        --res;
1043    }
1044
1045    {
1046        Mutex::Autolock autoLock(mLock);
1047
1048        List<sp<Session> > sessionsToAdd;
1049
1050        for (size_t i = mSessions.size(); res > 0 && i-- > 0;) {
1051            const sp<Session> &session = mSessions.valueAt(i);
1052
1053            int s = session->socket();
1054
1055            if (s < 0) {
1056                continue;
1057            }
1058
1059            if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) {
1060                --res;
1061            }
1062
1063            if (FD_ISSET(s, &rs)) {
1064                if (session->isRTSPServer() || session->isTCPDatagramServer()) {
1065                    struct sockaddr_in remoteAddr;
1066                    socklen_t remoteAddrLen = sizeof(remoteAddr);
1067
1068                    int clientSocket = accept(
1069                            s, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
1070
1071                    if (clientSocket >= 0) {
1072                        status_t err = MakeSocketNonBlocking(clientSocket);
1073
1074                        if (err != OK) {
1075                            ALOGE("Unable to make client socket non blocking, "
1076                                  "failed w/ error %d (%s)",
1077                                  err, strerror(-err));
1078
1079                            close(clientSocket);
1080                            clientSocket = -1;
1081                        } else {
1082                            in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr);
1083
1084                            ALOGI("incoming connection from %d.%d.%d.%d:%d "
1085                                  "(socket %d)",
1086                                  (addr >> 24),
1087                                  (addr >> 16) & 0xff,
1088                                  (addr >> 8) & 0xff,
1089                                  addr & 0xff,
1090                                  ntohs(remoteAddr.sin_port),
1091                                  clientSocket);
1092
1093                            sp<Session> clientSession =
1094                                // using socket sd as sessionID
1095                                new Session(
1096                                        mNextSessionID++,
1097                                        Session::CONNECTED,
1098                                        clientSocket,
1099                                        session->getNotificationMessage());
1100
1101                            clientSession->setIsRTSPConnection(
1102                                    session->isRTSPServer());
1103
1104                            sessionsToAdd.push_back(clientSession);
1105                        }
1106                    } else {
1107                        ALOGE("accept returned error %d (%s)",
1108                              errno, strerror(errno));
1109                    }
1110                } else {
1111                    status_t err = session->readMore();
1112                    if (err != OK) {
1113                        ALOGE("readMore on socket %d failed w/ error %d (%s)",
1114                              s, err, strerror(-err));
1115                    }
1116                }
1117            }
1118
1119            if (FD_ISSET(s, &ws)) {
1120                status_t err = session->writeMore();
1121                if (err != OK) {
1122                    ALOGE("writeMore on socket %d failed w/ error %d (%s)",
1123                          s, err, strerror(-err));
1124                }
1125            }
1126        }
1127
1128        while (!sessionsToAdd.empty()) {
1129            sp<Session> session = *sessionsToAdd.begin();
1130            sessionsToAdd.erase(sessionsToAdd.begin());
1131
1132            mSessions.add(session->sessionID(), session);
1133
1134            ALOGI("added clientSession %d", session->sessionID());
1135        }
1136    }
1137}
1138
1139}  // namespace android
1140
1141