1/*
2 * Copyright 2012, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "NetworkSession"
19#include <utils/Log.h>
20
21#include "ANetworkSession.h"
22#include "ParsedMessage.h"
23
24#include <arpa/inet.h>
25#include <fcntl.h>
26#include <net/if.h>
27#include <netdb.h>
28#include <netinet/in.h>
29#include <sys/socket.h>
30
31#include <media/stagefright/foundation/ABuffer.h>
32#include <media/stagefright/foundation/ADebug.h>
33#include <media/stagefright/foundation/AMessage.h>
34#include <media/stagefright/foundation/hexdump.h>
35#include <media/stagefright/Utils.h>
36
37namespace android {
38
39static const size_t kMaxUDPSize = 1500;
40
41struct ANetworkSession::NetworkThread : public Thread {
42    NetworkThread(ANetworkSession *session);
43
44protected:
45    virtual ~NetworkThread();
46
47private:
48    ANetworkSession *mSession;
49
50    virtual bool threadLoop();
51
52    DISALLOW_EVIL_CONSTRUCTORS(NetworkThread);
53};
54
55struct ANetworkSession::Session : public RefBase {
56    enum State {
57        CONNECTING,
58        CONNECTED,
59        LISTENING_RTSP,
60        LISTENING_TCP_DGRAMS,
61        DATAGRAM,
62    };
63
64    Session(int32_t sessionID,
65            State state,
66            int s,
67            const sp<AMessage> &notify);
68
69    int32_t sessionID() const;
70    int socket() const;
71    sp<AMessage> getNotificationMessage() const;
72
73    bool isRTSPServer() const;
74    bool isTCPDatagramServer() const;
75
76    bool wantsToRead();
77    bool wantsToWrite();
78
79    status_t readMore();
80    status_t writeMore();
81
82    status_t sendRequest(const void *data, ssize_t size);
83
84    void setIsRTSPConnection(bool yesno);
85
86protected:
87    virtual ~Session();
88
89private:
90    int32_t mSessionID;
91    State mState;
92    bool mIsRTSPConnection;
93    int mSocket;
94    sp<AMessage> mNotify;
95    bool mSawReceiveFailure, mSawSendFailure;
96
97    // for TCP / stream data
98    AString mOutBuffer;
99
100    // for UDP / datagrams
101    List<sp<ABuffer> > mOutDatagrams;
102
103    AString mInBuffer;
104
105    void notifyError(bool send, status_t err, const char *detail);
106    void notify(NotificationReason reason);
107
108    DISALLOW_EVIL_CONSTRUCTORS(Session);
109};
110////////////////////////////////////////////////////////////////////////////////
111
112ANetworkSession::NetworkThread::NetworkThread(ANetworkSession *session)
113    : mSession(session) {
114}
115
116ANetworkSession::NetworkThread::~NetworkThread() {
117}
118
119bool ANetworkSession::NetworkThread::threadLoop() {
120    mSession->threadLoop();
121
122    return true;
123}
124
125////////////////////////////////////////////////////////////////////////////////
126
127ANetworkSession::Session::Session(
128        int32_t sessionID,
129        State state,
130        int s,
131        const sp<AMessage> &notify)
132    : mSessionID(sessionID),
133      mState(state),
134      mIsRTSPConnection(false),
135      mSocket(s),
136      mNotify(notify),
137      mSawReceiveFailure(false),
138      mSawSendFailure(false) {
139    if (mState == CONNECTED) {
140        struct sockaddr_in localAddr;
141        socklen_t localAddrLen = sizeof(localAddr);
142
143        int res = getsockname(
144                mSocket, (struct sockaddr *)&localAddr, &localAddrLen);
145        CHECK_GE(res, 0);
146
147        struct sockaddr_in remoteAddr;
148        socklen_t remoteAddrLen = sizeof(remoteAddr);
149
150        res = getpeername(
151                mSocket, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
152        CHECK_GE(res, 0);
153
154        in_addr_t addr = ntohl(localAddr.sin_addr.s_addr);
155        AString localAddrString = StringPrintf(
156                "%d.%d.%d.%d",
157                (addr >> 24),
158                (addr >> 16) & 0xff,
159                (addr >> 8) & 0xff,
160                addr & 0xff);
161
162        addr = ntohl(remoteAddr.sin_addr.s_addr);
163        AString remoteAddrString = StringPrintf(
164                "%d.%d.%d.%d",
165                (addr >> 24),
166                (addr >> 16) & 0xff,
167                (addr >> 8) & 0xff,
168                addr & 0xff);
169
170        sp<AMessage> msg = mNotify->dup();
171        msg->setInt32("sessionID", mSessionID);
172        msg->setInt32("reason", kWhatClientConnected);
173        msg->setString("server-ip", localAddrString.c_str());
174        msg->setInt32("server-port", ntohs(localAddr.sin_port));
175        msg->setString("client-ip", remoteAddrString.c_str());
176        msg->setInt32("client-port", ntohs(remoteAddr.sin_port));
177        msg->post();
178    }
179}
180
181ANetworkSession::Session::~Session() {
182    ALOGV("Session %d gone", mSessionID);
183
184    close(mSocket);
185    mSocket = -1;
186}
187
188int32_t ANetworkSession::Session::sessionID() const {
189    return mSessionID;
190}
191
192int ANetworkSession::Session::socket() const {
193    return mSocket;
194}
195
196void ANetworkSession::Session::setIsRTSPConnection(bool yesno) {
197    mIsRTSPConnection = yesno;
198}
199
200sp<AMessage> ANetworkSession::Session::getNotificationMessage() const {
201    return mNotify;
202}
203
204bool ANetworkSession::Session::isRTSPServer() const {
205    return mState == LISTENING_RTSP;
206}
207
208bool ANetworkSession::Session::isTCPDatagramServer() const {
209    return mState == LISTENING_TCP_DGRAMS;
210}
211
212bool ANetworkSession::Session::wantsToRead() {
213    return !mSawReceiveFailure && mState != CONNECTING;
214}
215
216bool ANetworkSession::Session::wantsToWrite() {
217    return !mSawSendFailure
218        && (mState == CONNECTING
219            || (mState == CONNECTED && !mOutBuffer.empty())
220            || (mState == DATAGRAM && !mOutDatagrams.empty()));
221}
222
223status_t ANetworkSession::Session::readMore() {
224    if (mState == DATAGRAM) {
225        status_t err;
226        do {
227            sp<ABuffer> buf = new ABuffer(kMaxUDPSize);
228
229            struct sockaddr_in remoteAddr;
230            socklen_t remoteAddrLen = sizeof(remoteAddr);
231
232            ssize_t n;
233            do {
234                n = recvfrom(
235                        mSocket, buf->data(), buf->capacity(), 0,
236                        (struct sockaddr *)&remoteAddr, &remoteAddrLen);
237            } while (n < 0 && errno == EINTR);
238
239            err = OK;
240            if (n < 0) {
241                err = -errno;
242            } else if (n == 0) {
243                err = -ECONNRESET;
244            } else {
245                buf->setRange(0, n);
246
247                int64_t nowUs = ALooper::GetNowUs();
248                buf->meta()->setInt64("arrivalTimeUs", nowUs);
249
250                sp<AMessage> notify = mNotify->dup();
251                notify->setInt32("sessionID", mSessionID);
252                notify->setInt32("reason", kWhatDatagram);
253
254                uint32_t ip = ntohl(remoteAddr.sin_addr.s_addr);
255                notify->setString(
256                        "fromAddr",
257                        StringPrintf(
258                            "%u.%u.%u.%u",
259                            ip >> 24,
260                            (ip >> 16) & 0xff,
261                            (ip >> 8) & 0xff,
262                            ip & 0xff).c_str());
263
264                notify->setInt32("fromPort", ntohs(remoteAddr.sin_port));
265
266                notify->setBuffer("data", buf);
267                notify->post();
268            }
269        } while (err == OK);
270
271        if (err == -EAGAIN) {
272            err = OK;
273        }
274
275        if (err != OK) {
276            notifyError(false /* send */, err, "Recvfrom failed.");
277            mSawReceiveFailure = true;
278        }
279
280        return err;
281    }
282
283    char tmp[512];
284    ssize_t n;
285    do {
286        n = recv(mSocket, tmp, sizeof(tmp), 0);
287    } while (n < 0 && errno == EINTR);
288
289    status_t err = OK;
290
291    if (n > 0) {
292        mInBuffer.append(tmp, n);
293
294#if 0
295        ALOGI("in:");
296        hexdump(tmp, n);
297#endif
298    } else if (n < 0) {
299        err = -errno;
300    } else {
301        err = -ECONNRESET;
302    }
303
304    if (!mIsRTSPConnection) {
305        // TCP stream carrying 16-bit length-prefixed datagrams.
306
307        while (mInBuffer.size() >= 2) {
308            size_t packetSize = U16_AT((const uint8_t *)mInBuffer.c_str());
309
310            if (mInBuffer.size() < packetSize + 2) {
311                break;
312            }
313
314            sp<ABuffer> packet = new ABuffer(packetSize);
315            memcpy(packet->data(), mInBuffer.c_str() + 2, packetSize);
316
317            sp<AMessage> notify = mNotify->dup();
318            notify->setInt32("sessionID", mSessionID);
319            notify->setInt32("reason", kWhatDatagram);
320            notify->setBuffer("data", packet);
321            notify->post();
322
323            mInBuffer.erase(0, packetSize + 2);
324        }
325    } else {
326        for (;;) {
327            size_t length;
328
329            if (mInBuffer.size() > 0 && mInBuffer.c_str()[0] == '$') {
330                if (mInBuffer.size() < 4) {
331                    break;
332                }
333
334                length = U16_AT((const uint8_t *)mInBuffer.c_str() + 2);
335
336                if (mInBuffer.size() < 4 + length) {
337                    break;
338                }
339
340                sp<AMessage> notify = mNotify->dup();
341                notify->setInt32("sessionID", mSessionID);
342                notify->setInt32("reason", kWhatBinaryData);
343                notify->setInt32("channel", mInBuffer.c_str()[1]);
344
345                sp<ABuffer> data = new ABuffer(length);
346                memcpy(data->data(), mInBuffer.c_str() + 4, length);
347
348                int64_t nowUs = ALooper::GetNowUs();
349                data->meta()->setInt64("arrivalTimeUs", nowUs);
350
351                notify->setBuffer("data", data);
352                notify->post();
353
354                mInBuffer.erase(0, 4 + length);
355                continue;
356            }
357
358            sp<ParsedMessage> msg =
359                ParsedMessage::Parse(
360                        mInBuffer.c_str(), mInBuffer.size(), err != OK, &length);
361
362            if (msg == NULL) {
363                break;
364            }
365
366            sp<AMessage> notify = mNotify->dup();
367            notify->setInt32("sessionID", mSessionID);
368            notify->setInt32("reason", kWhatData);
369            notify->setObject("data", msg);
370            notify->post();
371
372#if 1
373            // XXX The (old) dongle sends the wrong content length header on a
374            // SET_PARAMETER request that signals a "wfd_idr_request".
375            // (17 instead of 19).
376            const char *content = msg->getContent();
377            if (content
378                    && !memcmp(content, "wfd_idr_request\r\n", 17)
379                    && length >= 19
380                    && mInBuffer.c_str()[length] == '\r'
381                    && mInBuffer.c_str()[length + 1] == '\n') {
382                length += 2;
383            }
384#endif
385
386            mInBuffer.erase(0, length);
387
388            if (err != OK) {
389                break;
390            }
391        }
392    }
393
394    if (err != OK) {
395        notifyError(false /* send */, err, "Recv failed.");
396        mSawReceiveFailure = true;
397    }
398
399    return err;
400}
401
402status_t ANetworkSession::Session::writeMore() {
403    if (mState == DATAGRAM) {
404        CHECK(!mOutDatagrams.empty());
405
406        status_t err;
407        do {
408            const sp<ABuffer> &datagram = *mOutDatagrams.begin();
409
410            int n;
411            do {
412                n = send(mSocket, datagram->data(), datagram->size(), 0);
413            } while (n < 0 && errno == EINTR);
414
415            err = OK;
416
417            if (n > 0) {
418                mOutDatagrams.erase(mOutDatagrams.begin());
419            } else if (n < 0) {
420                err = -errno;
421            } else if (n == 0) {
422                err = -ECONNRESET;
423            }
424        } while (err == OK && !mOutDatagrams.empty());
425
426        if (err == -EAGAIN) {
427            err = OK;
428        }
429
430        if (err != OK) {
431            notifyError(true /* send */, err, "Send datagram failed.");
432            mSawSendFailure = true;
433        }
434
435        return err;
436    }
437
438    if (mState == CONNECTING) {
439        int err;
440        socklen_t optionLen = sizeof(err);
441        CHECK_EQ(getsockopt(mSocket, SOL_SOCKET, SO_ERROR, &err, &optionLen), 0);
442        CHECK_EQ(optionLen, (socklen_t)sizeof(err));
443
444        if (err != 0) {
445            notifyError(kWhatError, -err, "Connection failed");
446            mSawSendFailure = true;
447
448            return -err;
449        }
450
451        mState = CONNECTED;
452        notify(kWhatConnected);
453
454        return OK;
455    }
456
457    CHECK_EQ(mState, CONNECTED);
458    CHECK(!mOutBuffer.empty());
459
460    ssize_t n;
461    do {
462        n = send(mSocket, mOutBuffer.c_str(), mOutBuffer.size(), 0);
463    } while (n < 0 && errno == EINTR);
464
465    status_t err = OK;
466
467    if (n > 0) {
468#if 0
469        ALOGI("out:");
470        hexdump(mOutBuffer.c_str(), n);
471#endif
472
473        mOutBuffer.erase(0, n);
474    } else if (n < 0) {
475        err = -errno;
476    } else if (n == 0) {
477        err = -ECONNRESET;
478    }
479
480    if (err != OK) {
481        notifyError(true /* send */, err, "Send failed.");
482        mSawSendFailure = true;
483    }
484
485    return err;
486}
487
488status_t ANetworkSession::Session::sendRequest(const void *data, ssize_t size) {
489    CHECK(mState == CONNECTED || mState == DATAGRAM);
490
491    if (mState == DATAGRAM) {
492        CHECK_GE(size, 0);
493
494        sp<ABuffer> datagram = new ABuffer(size);
495        memcpy(datagram->data(), data, size);
496
497        mOutDatagrams.push_back(datagram);
498        return OK;
499    }
500
501    if (mState == CONNECTED && !mIsRTSPConnection) {
502        CHECK_LE(size, 65535);
503
504        uint8_t prefix[2];
505        prefix[0] = size >> 8;
506        prefix[1] = size & 0xff;
507
508        mOutBuffer.append((const char *)prefix, sizeof(prefix));
509    }
510
511    mOutBuffer.append(
512            (const char *)data,
513            (size >= 0) ? size : strlen((const char *)data));
514
515    return OK;
516}
517
518void ANetworkSession::Session::notifyError(
519        bool send, status_t err, const char *detail) {
520    sp<AMessage> msg = mNotify->dup();
521    msg->setInt32("sessionID", mSessionID);
522    msg->setInt32("reason", kWhatError);
523    msg->setInt32("send", send);
524    msg->setInt32("err", err);
525    msg->setString("detail", detail);
526    msg->post();
527}
528
529void ANetworkSession::Session::notify(NotificationReason reason) {
530    sp<AMessage> msg = mNotify->dup();
531    msg->setInt32("sessionID", mSessionID);
532    msg->setInt32("reason", reason);
533    msg->post();
534}
535
536////////////////////////////////////////////////////////////////////////////////
537
538ANetworkSession::ANetworkSession()
539    : mNextSessionID(1) {
540    mPipeFd[0] = mPipeFd[1] = -1;
541}
542
543ANetworkSession::~ANetworkSession() {
544    stop();
545}
546
547status_t ANetworkSession::start() {
548    if (mThread != NULL) {
549        return INVALID_OPERATION;
550    }
551
552    int res = pipe(mPipeFd);
553    if (res != 0) {
554        mPipeFd[0] = mPipeFd[1] = -1;
555        return -errno;
556    }
557
558    mThread = new NetworkThread(this);
559
560    status_t err = mThread->run("ANetworkSession", ANDROID_PRIORITY_AUDIO);
561
562    if (err != OK) {
563        mThread.clear();
564
565        close(mPipeFd[0]);
566        close(mPipeFd[1]);
567        mPipeFd[0] = mPipeFd[1] = -1;
568
569        return err;
570    }
571
572    return OK;
573}
574
575status_t ANetworkSession::stop() {
576    if (mThread == NULL) {
577        return INVALID_OPERATION;
578    }
579
580    mThread->requestExit();
581    interrupt();
582    mThread->requestExitAndWait();
583
584    mThread.clear();
585
586    close(mPipeFd[0]);
587    close(mPipeFd[1]);
588    mPipeFd[0] = mPipeFd[1] = -1;
589
590    return OK;
591}
592
593status_t ANetworkSession::createRTSPClient(
594        const char *host, unsigned port, const sp<AMessage> &notify,
595        int32_t *sessionID) {
596    return createClientOrServer(
597            kModeCreateRTSPClient,
598            NULL /* addr */,
599            0 /* port */,
600            host,
601            port,
602            notify,
603            sessionID);
604}
605
606status_t ANetworkSession::createRTSPServer(
607        const struct in_addr &addr, unsigned port,
608        const sp<AMessage> &notify, int32_t *sessionID) {
609    return createClientOrServer(
610            kModeCreateRTSPServer,
611            &addr,
612            port,
613            NULL /* remoteHost */,
614            0 /* remotePort */,
615            notify,
616            sessionID);
617}
618
619status_t ANetworkSession::createUDPSession(
620        unsigned localPort, const sp<AMessage> &notify, int32_t *sessionID) {
621    return createUDPSession(localPort, NULL, 0, notify, sessionID);
622}
623
624status_t ANetworkSession::createUDPSession(
625        unsigned localPort,
626        const char *remoteHost,
627        unsigned remotePort,
628        const sp<AMessage> &notify,
629        int32_t *sessionID) {
630    return createClientOrServer(
631            kModeCreateUDPSession,
632            NULL /* addr */,
633            localPort,
634            remoteHost,
635            remotePort,
636            notify,
637            sessionID);
638}
639
640status_t ANetworkSession::createTCPDatagramSession(
641        const struct in_addr &addr, unsigned port,
642        const sp<AMessage> &notify, int32_t *sessionID) {
643    return createClientOrServer(
644            kModeCreateTCPDatagramSessionPassive,
645            &addr,
646            port,
647            NULL /* remoteHost */,
648            0 /* remotePort */,
649            notify,
650            sessionID);
651}
652
653status_t ANetworkSession::createTCPDatagramSession(
654        unsigned localPort,
655        const char *remoteHost,
656        unsigned remotePort,
657        const sp<AMessage> &notify,
658        int32_t *sessionID) {
659    return createClientOrServer(
660            kModeCreateTCPDatagramSessionActive,
661            NULL /* addr */,
662            localPort,
663            remoteHost,
664            remotePort,
665            notify,
666            sessionID);
667}
668
669status_t ANetworkSession::destroySession(int32_t sessionID) {
670    Mutex::Autolock autoLock(mLock);
671
672    ssize_t index = mSessions.indexOfKey(sessionID);
673
674    if (index < 0) {
675        return -ENOENT;
676    }
677
678    mSessions.removeItemsAt(index);
679
680    interrupt();
681
682    return OK;
683}
684
685// static
686status_t ANetworkSession::MakeSocketNonBlocking(int s) {
687    int flags = fcntl(s, F_GETFL, 0);
688    if (flags < 0) {
689        flags = 0;
690    }
691
692    int res = fcntl(s, F_SETFL, flags | O_NONBLOCK);
693    if (res < 0) {
694        return -errno;
695    }
696
697    return OK;
698}
699
700status_t ANetworkSession::createClientOrServer(
701        Mode mode,
702        const struct in_addr *localAddr,
703        unsigned port,
704        const char *remoteHost,
705        unsigned remotePort,
706        const sp<AMessage> &notify,
707        int32_t *sessionID) {
708    Mutex::Autolock autoLock(mLock);
709
710    *sessionID = 0;
711    status_t err = OK;
712    int s, res;
713    sp<Session> session;
714
715    s = socket(
716            AF_INET,
717            (mode == kModeCreateUDPSession) ? SOCK_DGRAM : SOCK_STREAM,
718            0);
719
720    if (s < 0) {
721        err = -errno;
722        goto bail;
723    }
724
725    if (mode == kModeCreateRTSPServer
726            || mode == kModeCreateTCPDatagramSessionPassive) {
727        const int yes = 1;
728        res = setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
729
730        if (res < 0) {
731            err = -errno;
732            goto bail2;
733        }
734    }
735
736    if (mode == kModeCreateUDPSession) {
737        int size = 256 * 1024;
738
739        res = setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size));
740
741        if (res < 0) {
742            err = -errno;
743            goto bail2;
744        }
745
746        res = setsockopt(s, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size));
747
748        if (res < 0) {
749            err = -errno;
750            goto bail2;
751        }
752    }
753
754    err = MakeSocketNonBlocking(s);
755
756    if (err != OK) {
757        goto bail2;
758    }
759
760    struct sockaddr_in addr;
761    memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
762    addr.sin_family = AF_INET;
763
764    if (mode == kModeCreateRTSPClient
765            || mode == kModeCreateTCPDatagramSessionActive) {
766        struct hostent *ent= gethostbyname(remoteHost);
767        if (ent == NULL) {
768            err = -h_errno;
769            goto bail2;
770        }
771
772        addr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
773        addr.sin_port = htons(remotePort);
774    } else if (localAddr != NULL) {
775        addr.sin_addr = *localAddr;
776        addr.sin_port = htons(port);
777    } else {
778        addr.sin_addr.s_addr = htonl(INADDR_ANY);
779        addr.sin_port = htons(port);
780    }
781
782    if (mode == kModeCreateRTSPClient
783            || mode == kModeCreateTCPDatagramSessionActive) {
784        in_addr_t x = ntohl(addr.sin_addr.s_addr);
785        ALOGI("connecting socket %d to %d.%d.%d.%d:%d",
786              s,
787              (x >> 24),
788              (x >> 16) & 0xff,
789              (x >> 8) & 0xff,
790              x & 0xff,
791              ntohs(addr.sin_port));
792
793        res = connect(s, (const struct sockaddr *)&addr, sizeof(addr));
794
795        CHECK_LT(res, 0);
796        if (errno == EINPROGRESS) {
797            res = 0;
798        }
799    } else {
800        res = bind(s, (const struct sockaddr *)&addr, sizeof(addr));
801
802        if (res == 0) {
803            if (mode == kModeCreateRTSPServer
804                    || mode == kModeCreateTCPDatagramSessionPassive) {
805                res = listen(s, 4);
806            } else {
807                CHECK_EQ(mode, kModeCreateUDPSession);
808
809                if (remoteHost != NULL) {
810                    struct sockaddr_in remoteAddr;
811                    memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
812                    remoteAddr.sin_family = AF_INET;
813                    remoteAddr.sin_port = htons(remotePort);
814
815                    struct hostent *ent= gethostbyname(remoteHost);
816                    if (ent == NULL) {
817                        err = -h_errno;
818                        goto bail2;
819                    }
820
821                    remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
822
823                    res = connect(
824                            s,
825                            (const struct sockaddr *)&remoteAddr,
826                            sizeof(remoteAddr));
827                }
828            }
829        }
830    }
831
832    if (res < 0) {
833        err = -errno;
834        goto bail2;
835    }
836
837    Session::State state;
838    switch (mode) {
839        case kModeCreateRTSPClient:
840            state = Session::CONNECTING;
841            break;
842
843        case kModeCreateTCPDatagramSessionActive:
844            state = Session::CONNECTING;
845            break;
846
847        case kModeCreateTCPDatagramSessionPassive:
848            state = Session::LISTENING_TCP_DGRAMS;
849            break;
850
851        case kModeCreateRTSPServer:
852            state = Session::LISTENING_RTSP;
853            break;
854
855        default:
856            CHECK_EQ(mode, kModeCreateUDPSession);
857            state = Session::DATAGRAM;
858            break;
859    }
860
861    session = new Session(
862            mNextSessionID++,
863            state,
864            s,
865            notify);
866
867    if (mode == kModeCreateTCPDatagramSessionActive) {
868        session->setIsRTSPConnection(false);
869    } else if (mode == kModeCreateRTSPClient) {
870        session->setIsRTSPConnection(true);
871    }
872
873    mSessions.add(session->sessionID(), session);
874
875    interrupt();
876
877    *sessionID = session->sessionID();
878
879    goto bail;
880
881bail2:
882    close(s);
883    s = -1;
884
885bail:
886    return err;
887}
888
889status_t ANetworkSession::connectUDPSession(
890        int32_t sessionID, const char *remoteHost, unsigned remotePort) {
891    Mutex::Autolock autoLock(mLock);
892
893    ssize_t index = mSessions.indexOfKey(sessionID);
894
895    if (index < 0) {
896        return -ENOENT;
897    }
898
899    const sp<Session> session = mSessions.valueAt(index);
900    int s = session->socket();
901
902    struct sockaddr_in remoteAddr;
903    memset(remoteAddr.sin_zero, 0, sizeof(remoteAddr.sin_zero));
904    remoteAddr.sin_family = AF_INET;
905    remoteAddr.sin_port = htons(remotePort);
906
907    status_t err = OK;
908    struct hostent *ent = gethostbyname(remoteHost);
909    if (ent == NULL) {
910        err = -h_errno;
911    } else {
912        remoteAddr.sin_addr.s_addr = *(in_addr_t *)ent->h_addr;
913
914        int res = connect(
915                s,
916                (const struct sockaddr *)&remoteAddr,
917                sizeof(remoteAddr));
918
919        if (res < 0) {
920            err = -errno;
921        }
922    }
923
924    return err;
925}
926
927status_t ANetworkSession::sendRequest(
928        int32_t sessionID, const void *data, ssize_t size) {
929    Mutex::Autolock autoLock(mLock);
930
931    ssize_t index = mSessions.indexOfKey(sessionID);
932
933    if (index < 0) {
934        return -ENOENT;
935    }
936
937    const sp<Session> session = mSessions.valueAt(index);
938
939    status_t err = session->sendRequest(data, size);
940
941    interrupt();
942
943    return err;
944}
945
946void ANetworkSession::interrupt() {
947    static const char dummy = 0;
948
949    ssize_t n;
950    do {
951        n = write(mPipeFd[1], &dummy, 1);
952    } while (n < 0 && errno == EINTR);
953
954    if (n < 0) {
955        ALOGW("Error writing to pipe (%s)", strerror(errno));
956    }
957}
958
959void ANetworkSession::threadLoop() {
960    fd_set rs, ws;
961    FD_ZERO(&rs);
962    FD_ZERO(&ws);
963
964    FD_SET(mPipeFd[0], &rs);
965    int maxFd = mPipeFd[0];
966
967    {
968        Mutex::Autolock autoLock(mLock);
969
970        for (size_t i = 0; i < mSessions.size(); ++i) {
971            const sp<Session> &session = mSessions.valueAt(i);
972
973            int s = session->socket();
974
975            if (s < 0) {
976                continue;
977            }
978
979            if (session->wantsToRead()) {
980                FD_SET(s, &rs);
981                if (s > maxFd) {
982                    maxFd = s;
983                }
984            }
985
986            if (session->wantsToWrite()) {
987                FD_SET(s, &ws);
988                if (s > maxFd) {
989                    maxFd = s;
990                }
991            }
992        }
993    }
994
995    int res = select(maxFd + 1, &rs, &ws, NULL, NULL /* tv */);
996
997    if (res == 0) {
998        return;
999    }
1000
1001    if (res < 0) {
1002        if (errno == EINTR) {
1003            return;
1004        }
1005
1006        ALOGE("select failed w/ error %d (%s)", errno, strerror(errno));
1007        return;
1008    }
1009
1010    if (FD_ISSET(mPipeFd[0], &rs)) {
1011        char c;
1012        ssize_t n;
1013        do {
1014            n = read(mPipeFd[0], &c, 1);
1015        } while (n < 0 && errno == EINTR);
1016
1017        if (n < 0) {
1018            ALOGW("Error reading from pipe (%s)", strerror(errno));
1019        }
1020
1021        --res;
1022    }
1023
1024    {
1025        Mutex::Autolock autoLock(mLock);
1026
1027        List<sp<Session> > sessionsToAdd;
1028
1029        for (size_t i = mSessions.size(); res > 0 && i-- > 0;) {
1030            const sp<Session> &session = mSessions.valueAt(i);
1031
1032            int s = session->socket();
1033
1034            if (s < 0) {
1035                continue;
1036            }
1037
1038            if (FD_ISSET(s, &rs) || FD_ISSET(s, &ws)) {
1039                --res;
1040            }
1041
1042            if (FD_ISSET(s, &rs)) {
1043                if (session->isRTSPServer() || session->isTCPDatagramServer()) {
1044                    struct sockaddr_in remoteAddr;
1045                    socklen_t remoteAddrLen = sizeof(remoteAddr);
1046
1047                    int clientSocket = accept(
1048                            s, (struct sockaddr *)&remoteAddr, &remoteAddrLen);
1049
1050                    if (clientSocket >= 0) {
1051                        status_t err = MakeSocketNonBlocking(clientSocket);
1052
1053                        if (err != OK) {
1054                            ALOGE("Unable to make client socket non blocking, "
1055                                  "failed w/ error %d (%s)",
1056                                  err, strerror(-err));
1057
1058                            close(clientSocket);
1059                            clientSocket = -1;
1060                        } else {
1061                            in_addr_t addr = ntohl(remoteAddr.sin_addr.s_addr);
1062
1063                            ALOGI("incoming connection from %d.%d.%d.%d:%d "
1064                                  "(socket %d)",
1065                                  (addr >> 24),
1066                                  (addr >> 16) & 0xff,
1067                                  (addr >> 8) & 0xff,
1068                                  addr & 0xff,
1069                                  ntohs(remoteAddr.sin_port),
1070                                  clientSocket);
1071
1072                            sp<Session> clientSession =
1073                                // using socket sd as sessionID
1074                                new Session(
1075                                        mNextSessionID++,
1076                                        Session::CONNECTED,
1077                                        clientSocket,
1078                                        session->getNotificationMessage());
1079
1080                            clientSession->setIsRTSPConnection(
1081                                    session->isRTSPServer());
1082
1083                            sessionsToAdd.push_back(clientSession);
1084                        }
1085                    } else {
1086                        ALOGE("accept returned error %d (%s)",
1087                              errno, strerror(errno));
1088                    }
1089                } else {
1090                    status_t err = session->readMore();
1091                    if (err != OK) {
1092                        ALOGE("readMore on socket %d failed w/ error %d (%s)",
1093                              s, err, strerror(-err));
1094                    }
1095                }
1096            }
1097
1098            if (FD_ISSET(s, &ws)) {
1099                status_t err = session->writeMore();
1100                if (err != OK) {
1101                    ALOGE("writeMore on socket %d failed w/ error %d (%s)",
1102                          s, err, strerror(-err));
1103                }
1104            }
1105        }
1106
1107        while (!sessionsToAdd.empty()) {
1108            sp<Session> session = *sessionsToAdd.begin();
1109            sessionsToAdd.erase(sessionsToAdd.begin());
1110
1111            mSessions.add(session->sessionID(), session);
1112
1113            ALOGI("added clientSession %d", session->sessionID());
1114        }
1115    }
1116}
1117
1118}  // namespace android
1119
1120