1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/*
18 * Encapsulates exchange protocol between the emulator, and an Android device
19 * that is connected to the host via USB. The communication is established over
20 * a TCP port forwarding, enabled by ADB.
21 */
22
23#include "qemu-common.h"
24#include "android/async-utils.h"
25#include "android/utils/debug.h"
26#include "android/async-socket-connector.h"
27#include "android/async-socket.h"
28#include "utils/panic.h"
29#include "iolooper.h"
30
31#define  E(...)    derror(__VA_ARGS__)
32#define  W(...)    dwarning(__VA_ARGS__)
33#define  D(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
34#define  D_ACTIVE  VERBOSE_CHECK(asyncsocket)
35
36#define TRACE_ON    0
37
38#if TRACE_ON
39#define  T(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
40#else
41#define  T(...)
42#endif
43
44/********************************************************************************
45 *                  Asynchronous Socket internal API declarations
46 *******************************************************************************/
47
48/* Gets socket's address string. */
49static const char* _async_socket_string(AsyncSocket* as);
50
51/* Gets socket's looper. */
52static Looper* _async_socket_get_looper(AsyncSocket* as);
53
54/* Handler for the I/O time out.
55 * Param:
56 *  as - Asynchronous socket for the I/O.
57 *  asio - Desciptor for the timed out I/O.
58 */
59static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as,
60                                                AsyncSocketIO* asio);
61
62/********************************************************************************
63 *                  Asynchronous Socket Reader / Writer
64 *******************************************************************************/
65
66struct AsyncSocketIO {
67    /* Next I/O in the reader, or writer list. */
68    AsyncSocketIO*      next;
69    /* Asynchronous socket for this I/O. */
70    AsyncSocket*        as;
71    /* Timer used for time outs on this I/O. */
72    LoopTimer           timer[1];
73    /* An opaque pointer associated with this I/O. */
74    void*               io_opaque;
75    /* Buffer where to read / write data. */
76    uint8_t*            buffer;
77    /* Bytes to transfer through the socket for this I/O. */
78    uint32_t            to_transfer;
79    /* Bytes thransferred through the socket in this I/O. */
80    uint32_t            transferred;
81    /* I/O callback for this I/O. */
82    on_as_io_cb         on_io;
83    /* I/O type selector: 1 - read, 0 - write. */
84    int                 is_io_read;
85    /* State of the I/O. */
86    AsyncIOState        state;
87    /* Number of outstanding references to the I/O. */
88    int                 ref_count;
89    /* Deadline for this I/O */
90    Duration            deadline;
91};
92
93/*
94 * Recycling I/O instances.
95 * Since AsyncSocketIO instances are not that large, it makes sence to recycle
96 * them for faster allocation, rather than allocating and freeing them for each
97 * I/O on the socket.
98 */
99
100/* List of recycled I/O descriptors. */
101static AsyncSocketIO* _asio_recycled    = NULL;
102/* Number of I/O descriptors that are recycled in the _asio_recycled list. */
103static int _recycled_asio_count         = 0;
104/* Maximum number of I/O descriptors that can be recycled. */
105static const int _max_recycled_asio_num = 32;
106
107/* Handler for an I/O time-out timer event.
108 * When this routine is invoked, it indicates that a time out has occurred on an
109 * I/O.
110 * Param:
111 *  opaque - AsyncSocketIO instance representing the timed out I/O.
112 */
113static void _on_async_socket_io_timed_out(void* opaque);
114
115/* Creates new I/O descriptor.
116 * Param:
117 *  as - Asynchronous socket for the I/O.
118 *  is_io_read - I/O type selector: 1 - read, 0 - write.
119 *  buffer, len - Reader / writer buffer address.
120 *  io_cb - Callback for this reader / writer.
121 *  io_opaque - An opaque pointer associated with the I/O.
122 *  deadline - Deadline to complete the I/O.
123 * Return:
124 *  Initialized AsyncSocketIO instance.
125 */
126static AsyncSocketIO*
127_async_socket_rw_new(AsyncSocket* as,
128                     int is_io_read,
129                     void* buffer,
130                     uint32_t len,
131                     on_as_io_cb io_cb,
132                     void* io_opaque,
133                     Duration deadline)
134{
135    /* Lookup in the recycler first. */
136    AsyncSocketIO* asio = _asio_recycled;
137    if (asio != NULL) {
138        /* Pull the descriptor from recycler. */
139        _asio_recycled = asio->next;
140        _recycled_asio_count--;
141    } else {
142        /* No recycled descriptors. Allocate new one. */
143        ANEW0(asio);
144    }
145
146    asio->next          = NULL;
147    asio->as            = as;
148    asio->is_io_read    = is_io_read;
149    asio->buffer        = (uint8_t*)buffer;
150    asio->to_transfer   = len;
151    asio->transferred   = 0;
152    asio->on_io         = io_cb;
153    asio->io_opaque     = io_opaque;
154    asio->state         = ASIO_STATE_QUEUED;
155    asio->ref_count     = 1;
156    asio->deadline      = deadline;
157    loopTimer_init(asio->timer, _async_socket_get_looper(as),
158                   _on_async_socket_io_timed_out, asio);
159    loopTimer_startAbsolute(asio->timer, deadline);
160
161    /* Reference socket that is holding this I/O. */
162    async_socket_reference(as);
163
164    T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data",
165      _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len);
166
167    return asio;
168}
169
170/* Destroys and frees I/O descriptor. */
171static void
172_async_socket_io_free(AsyncSocketIO* asio)
173{
174    AsyncSocket* const as = asio->as;
175
176    T("ASocket %s: %s I/O descriptor %p is destroyed.",
177      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
178
179    loopTimer_done(asio->timer);
180
181    /* Try to recycle it first, and free the memory if recycler is full. */
182    if (_recycled_asio_count < _max_recycled_asio_num) {
183        asio->next = _asio_recycled;
184        _asio_recycled = asio;
185        _recycled_asio_count++;
186    } else {
187        AFREE(asio);
188    }
189
190    /* Release socket that is holding this I/O. */
191    async_socket_release(as);
192}
193
194/* An I/O has been finished and its descriptor is about to be discarded. */
195static void
196_async_socket_io_finished(AsyncSocketIO* asio)
197{
198    /* Notify the client of the I/O that I/O is finished. */
199    asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED);
200}
201
202int
203async_socket_io_reference(AsyncSocketIO* asio)
204{
205    assert(asio->ref_count > 0);
206    asio->ref_count++;
207    return asio->ref_count;
208}
209
210int
211async_socket_io_release(AsyncSocketIO* asio)
212{
213    assert(asio->ref_count > 0);
214    asio->ref_count--;
215    if (asio->ref_count == 0) {
216        _async_socket_io_finished(asio);
217        /* Last reference has been dropped. Destroy this object. */
218        _async_socket_io_free(asio);
219        return 0;
220    }
221    return asio->ref_count;
222}
223
224/* Creates new asynchronous socket reader.
225 * Param:
226 *  as - Asynchronous socket for the reader.
227 *  buffer, len - Reader's buffer.
228 *  io_cb - Reader's callback.
229 *  reader_opaque - An opaque pointer associated with the reader.
230 *  deadline - Deadline to complete the operation.
231 * Return:
232 *  An initialized AsyncSocketIO intance.
233 */
234static AsyncSocketIO*
235_async_socket_reader_new(AsyncSocket* as,
236                         void* buffer,
237                         uint32_t len,
238                         on_as_io_cb io_cb,
239                         void* reader_opaque,
240                         Duration deadline)
241{
242    AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb,
243                                                     reader_opaque, deadline);
244    return asio;
245}
246
247/* Creates new asynchronous socket writer.
248 * Param:
249 *  as - Asynchronous socket for the writer.
250 *  buffer, len - Writer's buffer.
251 *  io_cb - Writer's callback.
252 *  writer_opaque - An opaque pointer associated with the writer.
253 *  deadline - Deadline to complete the operation.
254 * Return:
255 *  An initialized AsyncSocketIO intance.
256 */
257static AsyncSocketIO*
258_async_socket_writer_new(AsyncSocket* as,
259                         const void* buffer,
260                         uint32_t len,
261                         on_as_io_cb io_cb,
262                         void* writer_opaque,
263                         Duration deadline)
264{
265    AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len,
266                                                     io_cb, writer_opaque,
267                                                     deadline);
268    return asio;
269}
270
271/* I/O timed out. */
272static void
273_on_async_socket_io_timed_out(void* opaque)
274{
275    AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
276    AsyncSocket* const as = asio->as;
277
278    D("ASocket %s: %s I/O with deadline %lld has timed out at %lld",
279      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
280      asio->deadline, async_socket_deadline(as, 0));
281
282    /* Reference while in callback. */
283    async_socket_io_reference(asio);
284    _async_socket_io_timed_out(asio->as, asio);
285    async_socket_io_release(asio);
286}
287
288/********************************************************************************
289 *                 Public Asynchronous Socket I/O API
290 *******************************************************************************/
291
292AsyncSocket*
293async_socket_io_get_socket(const AsyncSocketIO* asio)
294{
295    async_socket_reference(asio->as);
296    return asio->as;
297}
298
299void
300async_socket_io_cancel_time_out(AsyncSocketIO* asio)
301{
302    loopTimer_stop(asio->timer);
303}
304
305void*
306async_socket_io_get_io_opaque(const AsyncSocketIO* asio)
307{
308    return asio->io_opaque;
309}
310
311void*
312async_socket_io_get_client_opaque(const AsyncSocketIO* asio)
313{
314    return async_socket_get_client_opaque(asio->as);
315}
316
317void*
318async_socket_io_get_buffer_info(const AsyncSocketIO* asio,
319                                uint32_t* transferred,
320                                uint32_t* to_transfer)
321{
322    if (transferred != NULL) {
323        *transferred = asio->transferred;
324    }
325    if (to_transfer != NULL) {
326        *to_transfer = asio->to_transfer;
327    }
328    return asio->buffer;
329}
330
331void*
332async_socket_io_get_buffer(const AsyncSocketIO* asio)
333{
334    return asio->buffer;
335}
336
337uint32_t
338async_socket_io_get_transferred(const AsyncSocketIO* asio)
339{
340    return asio->transferred;
341}
342
343uint32_t
344async_socket_io_get_to_transfer(const AsyncSocketIO* asio)
345{
346    return asio->to_transfer;
347}
348
349int
350async_socket_io_is_read(const AsyncSocketIO* asio)
351{
352    return asio->is_io_read;
353}
354
355/********************************************************************************
356 *                      Asynchronous Socket internals
357 *******************************************************************************/
358
359struct AsyncSocket {
360    /* TCP address for the socket. */
361    SockAddress         address;
362    /* Connection callback for this socket. */
363    on_as_connection_cb on_connection;
364    /* An opaque pointer associated with this socket by the client. */
365    void*               client_opaque;
366    /* I/O looper for asynchronous I/O on the socket. */
367    Looper*             looper;
368    /* I/O descriptor for asynchronous I/O on the socket. */
369    LoopIo              io[1];
370    /* Timer to use for reconnection attempts. */
371    LoopTimer           reconnect_timer[1];
372    /* Head of the list of the active readers. */
373    AsyncSocketIO*      readers_head;
374    /* Tail of the list of the active readers. */
375    AsyncSocketIO*      readers_tail;
376    /* Head of the list of the active writers. */
377    AsyncSocketIO*      writers_head;
378    /* Tail of the list of the active writers. */
379    AsyncSocketIO*      writers_tail;
380    /* Socket's file descriptor. */
381    int                 fd;
382    /* Timeout to use for reconnection attempts. */
383    int                 reconnect_to;
384    /* Number of outstanding references to the socket. */
385    int                 ref_count;
386    /* Flags whether (1) or not (0) socket owns the looper. */
387    int                 owns_looper;
388};
389
390static const char*
391_async_socket_string(AsyncSocket* as)
392{
393    return sock_address_to_string(&as->address);
394}
395
396static Looper*
397_async_socket_get_looper(AsyncSocket* as)
398{
399    return as->looper;
400}
401
402/* Pulls first reader out of the list.
403 * Param:
404 *  as - Initialized AsyncSocket instance.
405 * Return:
406 *  First I/O pulled out of the list, or NULL if there are no I/O in the list.
407 *  Note that the caller is responsible for releasing the I/O object returned
408 *  from this routine.
409 */
410static AsyncSocketIO*
411_async_socket_pull_first_io(AsyncSocket* as,
412                            AsyncSocketIO** list_head,
413                            AsyncSocketIO** list_tail)
414{
415    AsyncSocketIO* const ret = *list_head;
416    if (ret != NULL) {
417        *list_head = ret->next;
418        ret->next = NULL;
419        if (*list_head == NULL) {
420            *list_tail = NULL;
421        }
422    }
423    return ret;
424}
425
426/* Pulls first reader out of the list.
427 * Param:
428 *  as - Initialized AsyncSocket instance.
429 * Return:
430 *  First reader pulled out of the list, or NULL if there are no readers in the
431 *  list.
432 *  Note that the caller is responsible for releasing the I/O object returned
433 *  from this routine.
434 */
435static AsyncSocketIO*
436_async_socket_pull_first_reader(AsyncSocket* as)
437{
438    return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail);
439}
440
441/* Pulls first writer out of the list.
442 * Param:
443 *  as - Initialized AsyncSocket instance.
444 * Return:
445 *  First writer pulled out of the list, or NULL if there are no writers in the
446 *  list.
447 *  Note that the caller is responsible for releasing the I/O object returned
448 *  from this routine.
449 */
450static AsyncSocketIO*
451_async_socket_pull_first_writer(AsyncSocket* as)
452{
453    return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail);
454}
455
456/* Removes an I/O descriptor from a list of active I/O.
457 * Param:
458 *  as - Initialized AsyncSocket instance.
459 *  list_head, list_tail - Pointers to the list head and tail.
460 *  io - I/O to remove.
461 * Return:
462 *  Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list.
463 */
464static int
465_async_socket_remove_io(AsyncSocket* as,
466                        AsyncSocketIO** list_head,
467                        AsyncSocketIO** list_tail,
468                        AsyncSocketIO* io)
469{
470    AsyncSocketIO* prev = NULL;
471
472    while (*list_head != NULL && io != *list_head) {
473        prev = *list_head;
474        list_head = &((*list_head)->next);
475    }
476    if (*list_head == NULL) {
477        D("%s: I/O %p is not found in the list for socket '%s'",
478          __FUNCTION__, io, _async_socket_string(as));
479        return 0;
480    }
481
482    *list_head = io->next;
483    if (prev != NULL) {
484        prev->next = io->next;
485    }
486    if (*list_tail == io) {
487        *list_tail = prev;
488    }
489
490    /* Release I/O adjusting reference added when I/O has been saved in the list. */
491    async_socket_io_release(io);
492
493    return 1;
494}
495
496/* Advances to the next I/O in the list.
497 * Param:
498 *  as - Initialized AsyncSocket instance.
499 *  list_head, list_tail - Pointers to the list head and tail.
500 */
501static void
502_async_socket_advance_io(AsyncSocket* as,
503                         AsyncSocketIO** list_head,
504                         AsyncSocketIO** list_tail)
505{
506    AsyncSocketIO* first_io = *list_head;
507    if (first_io != NULL) {
508        *list_head = first_io->next;
509        first_io->next = NULL;
510    }
511    if (*list_head == NULL) {
512        *list_tail = NULL;
513    }
514    if (first_io != NULL) {
515        /* Release I/O removed from the head of the list. */
516        async_socket_io_release(first_io);
517    }
518}
519
520/* Advances to the next reader in the list.
521 * Param:
522 *  as - Initialized AsyncSocket instance.
523 */
524static void
525_async_socket_advance_reader(AsyncSocket* as)
526{
527    _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
528}
529
530/* Advances to the next writer in the list.
531 * Param:
532 *  as - Initialized AsyncSocket instance.
533 */
534static void
535_async_socket_advance_writer(AsyncSocket* as)
536{
537    _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
538}
539
540/* Completes an I/O.
541 * Param:
542 *  as - Initialized AsyncSocket instance.
543 *  asio - I/O to complete.
544 * Return:
545 *  One of AsyncIOAction values.
546 */
547static AsyncIOAction
548_async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio)
549{
550    T("ASocket %s: %s I/O %p is completed.",
551      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
552
553    /* Stop the timer. */
554    async_socket_io_cancel_time_out(asio);
555
556    return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED);
557}
558
559/* Timeouts an I/O.
560 * Param:
561 *  as - Initialized AsyncSocket instance.
562 *  asio - An I/O that has timed out.
563 * Return:
564 *  One of AsyncIOAction values.
565 */
566static AsyncIOAction
567_async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio)
568{
569    T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld",
570      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
571      asio->deadline, async_socket_deadline(as, 0));
572
573    /* Report to the client. */
574    const AsyncIOAction action = asio->on_io(asio->io_opaque, asio,
575                                             ASIO_STATE_TIMED_OUT);
576
577    /* Remove the I/O from a list of active I/O for actions other than retry. */
578    if (action != ASIO_ACTION_RETRY) {
579        if (asio->is_io_read) {
580            _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio);
581        } else {
582            _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio);
583        }
584    }
585
586    return action;
587}
588
589/* Cancels an I/O.
590 * Param:
591 *  as - Initialized AsyncSocket instance.
592 *  asio - An I/O to cancel.
593 * Return:
594 *  One of AsyncIOAction values.
595 */
596static AsyncIOAction
597_async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio)
598{
599    T("ASocket %s: %s I/O %p is cancelled.",
600      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
601
602    /* Stop the timer. */
603    async_socket_io_cancel_time_out(asio);
604
605    return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED);
606}
607
608/* Reports an I/O failure.
609 * Param:
610 *  as - Initialized AsyncSocket instance.
611 *  asio - An I/O that has failed. Can be NULL for general failures.
612 *  failure - Failure (errno) that has occurred.
613 * Return:
614 *  One of AsyncIOAction values.
615 */
616static AsyncIOAction
617_async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure)
618{
619    T("ASocket %s: %s I/O %p has failed: %d -> %s",
620      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
621      failure, strerror(failure));
622
623    /* Stop the timer. */
624    async_socket_io_cancel_time_out(asio);
625
626    errno = failure;
627    return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED);
628}
629
630/* Cancels all the active socket readers.
631 * Param:
632 *  as - Initialized AsyncSocket instance.
633 */
634static void
635_async_socket_cancel_readers(AsyncSocket* as)
636{
637    while (as->readers_head != NULL) {
638        AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as);
639        /* We ignore action returned from the cancellation callback, since we're
640         * in a disconnected state here. */
641        _async_socket_cancel_io(as, to_cancel);
642        async_socket_io_release(to_cancel);
643    }
644}
645
646/* Cancels all the active socket writers.
647 * Param:
648 *  as - Initialized AsyncSocket instance.
649 */
650static void
651_async_socket_cancel_writers(AsyncSocket* as)
652{
653    while (as->writers_head != NULL) {
654        AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as);
655        /* We ignore action returned from the cancellation callback, since we're
656         * in a disconnected state here. */
657        _async_socket_cancel_io(as, to_cancel);
658        async_socket_io_release(to_cancel);
659    }
660}
661
662/* Cancels all the I/O on the socket. */
663static void
664_async_socket_cancel_all_io(AsyncSocket* as)
665{
666    /* Stop the reconnection timer. */
667    loopTimer_stop(as->reconnect_timer);
668
669    /* Stop read / write on the socket. */
670    loopIo_dontWantWrite(as->io);
671    loopIo_dontWantRead(as->io);
672
673    /* Cancel active readers and writers. */
674    _async_socket_cancel_readers(as);
675    _async_socket_cancel_writers(as);
676}
677
678/* Closes socket handle used by the async socket.
679 * Param:
680 *  as - Initialized AsyncSocket instance.
681 */
682static void
683_async_socket_close_socket(AsyncSocket* as)
684{
685    if (as->fd >= 0) {
686        loopIo_done(as->io);
687        socket_close(as->fd);
688        T("ASocket %s: Socket handle %d is closed.",
689          _async_socket_string(as), as->fd);
690        as->fd = -1;
691    }
692}
693
694/* Destroys AsyncSocket instance.
695 * Param:
696 *  as - Initialized AsyncSocket instance.
697 */
698static void
699_async_socket_free(AsyncSocket* as)
700{
701    if (as != NULL) {
702        T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as));
703
704        /* Close socket. */
705        _async_socket_close_socket(as);
706
707        /* Free allocated resources. */
708        if (as->looper != NULL) {
709            loopTimer_done(as->reconnect_timer);
710            if (as->owns_looper) {
711                looper_free(as->looper);
712            }
713        }
714        sock_address_done(&as->address);
715        AFREE(as);
716    }
717}
718
719/* Starts reconnection attempts after connection has been lost.
720 * Param:
721 *  as - Initialized AsyncSocket instance.
722 *  to - Milliseconds to wait before reconnection attempt.
723 */
724static void
725_async_socket_reconnect(AsyncSocket* as, int to)
726{
727    T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to);
728
729    /* Make sure that no I/O is active, and socket is closed before we
730     * reconnect. */
731    _async_socket_cancel_all_io(as);
732
733    /* Set the timer for reconnection attempt. */
734    loopTimer_startRelative(as->reconnect_timer, to);
735}
736
737/********************************************************************************
738 *                      Asynchronous Socket callbacks
739 *******************************************************************************/
740
741/* A callback that is invoked when socket gets disconnected.
742 * Param:
743 *  as - Initialized AsyncSocket instance.
744 */
745static void
746_on_async_socket_disconnected(AsyncSocket* as)
747{
748    /* Save error to restore it for the client's callback. */
749    const int save_errno = errno;
750    AsyncIOAction action = ASIO_ACTION_ABORT;
751
752    D("ASocket %s: Disconnected.", _async_socket_string(as));
753
754    /* Cancel all the I/O on this socket. */
755    _async_socket_cancel_all_io(as);
756
757    /* Close the socket. */
758    _async_socket_close_socket(as);
759
760    /* Restore errno, and invoke client's callback. */
761    errno = save_errno;
762    action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
763
764    if (action == ASIO_ACTION_RETRY) {
765        /* Client requested reconnection. */
766        _async_socket_reconnect(as, as->reconnect_to);
767    }
768}
769
770/* A callback that is invoked on socket's I/O failure.
771 * Param:
772 *  as - Initialized AsyncSocket instance.
773 *  asio - Descriptor for the failed I/O. Can be NULL for general failures.
774 */
775static AsyncIOAction
776_on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
777{
778    D("ASocket %s: %s I/O failure: %d -> %s",
779      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
780      errno, strerror(errno));
781
782    /* Report the failure. */
783    return _async_socket_io_failure(as, asio, errno);
784}
785
786/* A callback that is invoked when there is data available to read.
787 * Param:
788 *  as - Initialized AsyncSocket instance.
789 * Return:
790 *  0 on success, or -1 on failure. Failure returned from this routine will
791 *  skip writes (if awailable) behind this read.
792 */
793static int
794_on_async_socket_recv(AsyncSocket* as)
795{
796    AsyncIOAction action;
797
798    /* Get current reader. */
799    AsyncSocketIO* const asr = as->readers_head;
800    if (asr == NULL) {
801        D("ASocket %s: No reader is available.", _async_socket_string(as));
802        loopIo_dontWantRead(as->io);
803        return 0;
804    }
805
806    /* Reference the reader while we're working with it in this callback. */
807    async_socket_io_reference(asr);
808
809    /* Bump I/O state, and inform the client that I/O is in progress. */
810    if (asr->state == ASIO_STATE_QUEUED) {
811        asr->state = ASIO_STATE_STARTED;
812    } else {
813        asr->state = ASIO_STATE_CONTINUES;
814    }
815    action = asr->on_io(asr->io_opaque, asr, asr->state);
816    if (action == ASIO_ACTION_ABORT) {
817        D("ASocket %s: Read is aborted by the client.", _async_socket_string(as));
818        /* Move on to the next reader. */
819        _async_socket_advance_reader(as);
820        /* Lets see if there are still active readers, and enable, or disable
821         * read I/O callback accordingly. */
822        if (as->readers_head != NULL) {
823            loopIo_wantRead(as->io);
824        } else {
825            loopIo_dontWantRead(as->io);
826        }
827        async_socket_io_release(asr);
828        return 0;
829    }
830
831    /* Read next chunk of data. */
832    int res = socket_recv(as->fd, asr->buffer + asr->transferred,
833                          asr->to_transfer - asr->transferred);
834    while (res < 0 && errno == EINTR) {
835        res = socket_recv(as->fd, asr->buffer + asr->transferred,
836                          asr->to_transfer - asr->transferred);
837    }
838
839    if (res == 0) {
840        /* Socket has been disconnected. */
841        errno = ECONNRESET;
842        _on_async_socket_disconnected(as);
843        async_socket_io_release(asr);
844        return -1;
845    }
846
847    if (res < 0) {
848        if (errno == EWOULDBLOCK || errno == EAGAIN) {
849            /* Yield to writes behind this read. */
850            loopIo_wantRead(as->io);
851            async_socket_io_release(asr);
852            return 0;
853        }
854
855        /* An I/O error. */
856        action = _on_async_socket_failure(as, asr);
857        if (action != ASIO_ACTION_RETRY) {
858            D("ASocket %s: Read is aborted on failure.", _async_socket_string(as));
859            /* Move on to the next reader. */
860            _async_socket_advance_reader(as);
861            /* Lets see if there are still active readers, and enable, or disable
862             * read I/O callback accordingly. */
863            if (as->readers_head != NULL) {
864                loopIo_wantRead(as->io);
865            } else {
866                loopIo_dontWantRead(as->io);
867            }
868        }
869        async_socket_io_release(asr);
870        return -1;
871    }
872
873    /* Update the reader's descriptor. */
874    asr->transferred += res;
875    if (asr->transferred == asr->to_transfer) {
876        /* This read is completed. Move on to the next reader. */
877        _async_socket_advance_reader(as);
878
879        /* Notify reader completion. */
880        _async_socket_complete_io(as, asr);
881    }
882
883    /* Lets see if there are still active readers, and enable, or disable read
884     * I/O callback accordingly. */
885    if (as->readers_head != NULL) {
886        loopIo_wantRead(as->io);
887    } else {
888        loopIo_dontWantRead(as->io);
889    }
890
891    async_socket_io_release(asr);
892
893    return 0;
894}
895
896/* A callback that is invoked when there is data available to write.
897 * Param:
898 *  as - Initialized AsyncSocket instance.
899 * Return:
900 *  0 on success, or -1 on failure. Failure returned from this routine will
901 *  skip reads (if awailable) behind this write.
902 */
903static int
904_on_async_socket_send(AsyncSocket* as)
905{
906    AsyncIOAction action;
907
908    /* Get current writer. */
909    AsyncSocketIO* const asw = as->writers_head;
910    if (asw == NULL) {
911        D("ASocket %s: No writer is available.", _async_socket_string(as));
912        loopIo_dontWantWrite(as->io);
913        return 0;
914    }
915
916    /* Reference the writer while we're working with it in this callback. */
917    async_socket_io_reference(asw);
918
919    /* Bump I/O state, and inform the client that I/O is in progress. */
920    if (asw->state == ASIO_STATE_QUEUED) {
921        asw->state = ASIO_STATE_STARTED;
922    } else {
923        asw->state = ASIO_STATE_CONTINUES;
924    }
925    action = asw->on_io(asw->io_opaque, asw, asw->state);
926    if (action == ASIO_ACTION_ABORT) {
927        D("ASocket %s: Write is aborted by the client.", _async_socket_string(as));
928        /* Move on to the next writer. */
929        _async_socket_advance_writer(as);
930        /* Lets see if there are still active writers, and enable, or disable
931         * write I/O callback accordingly. */
932        if (as->writers_head != NULL) {
933            loopIo_wantWrite(as->io);
934        } else {
935            loopIo_dontWantWrite(as->io);
936        }
937        async_socket_io_release(asw);
938        return 0;
939    }
940
941    /* Write next chunk of data. */
942    int res = socket_send(as->fd, asw->buffer + asw->transferred,
943                          asw->to_transfer - asw->transferred);
944    while (res < 0 && errno == EINTR) {
945        res = socket_send(as->fd, asw->buffer + asw->transferred,
946                          asw->to_transfer - asw->transferred);
947    }
948
949    if (res == 0) {
950        /* Socket has been disconnected. */
951        errno = ECONNRESET;
952        _on_async_socket_disconnected(as);
953        async_socket_io_release(asw);
954        return -1;
955    }
956
957    if (res < 0) {
958        if (errno == EWOULDBLOCK || errno == EAGAIN) {
959            /* Yield to reads behind this write. */
960            loopIo_wantWrite(as->io);
961            async_socket_io_release(asw);
962            return 0;
963        }
964
965        /* An I/O error. */
966        action = _on_async_socket_failure(as, asw);
967        if (action != ASIO_ACTION_RETRY) {
968            D("ASocket %s: Write is aborted on failure.", _async_socket_string(as));
969            /* Move on to the next writer. */
970            _async_socket_advance_writer(as);
971            /* Lets see if there are still active writers, and enable, or disable
972             * write I/O callback accordingly. */
973            if (as->writers_head != NULL) {
974                loopIo_wantWrite(as->io);
975            } else {
976                loopIo_dontWantWrite(as->io);
977            }
978        }
979        async_socket_io_release(asw);
980        return -1;
981    }
982
983    /* Update the writer descriptor. */
984    asw->transferred += res;
985    if (asw->transferred == asw->to_transfer) {
986        /* This write is completed. Move on to the next writer. */
987        _async_socket_advance_writer(as);
988
989        /* Notify writer completion. */
990        _async_socket_complete_io(as, asw);
991    }
992
993    /* Lets see if there are still active writers, and enable, or disable write
994     * I/O callback accordingly. */
995    if (as->writers_head != NULL) {
996        loopIo_wantWrite(as->io);
997    } else {
998        loopIo_dontWantWrite(as->io);
999    }
1000
1001    async_socket_io_release(asw);
1002
1003    return 0;
1004}
1005
1006/* A callback that is invoked when an I/O is available on socket.
1007 * Param:
1008 *  as - Initialized AsyncSocket instance.
1009 *  fd - Socket's file descriptor.
1010 *  events - LOOP_IO_READ | LOOP_IO_WRITE bitmask.
1011 */
1012static void
1013_on_async_socket_io(void* opaque, int fd, unsigned events)
1014{
1015    AsyncSocket* const as = (AsyncSocket*)opaque;
1016
1017    /* Reference the socket while we're working with it in this callback. */
1018    async_socket_reference(as);
1019
1020    if ((events & LOOP_IO_READ) != 0) {
1021        if (_on_async_socket_recv(as) != 0) {
1022            async_socket_release(as);
1023            return;
1024        }
1025    }
1026
1027    if ((events & LOOP_IO_WRITE) != 0) {
1028        if (_on_async_socket_send(as) != 0) {
1029            async_socket_release(as);
1030            return;
1031        }
1032    }
1033
1034    async_socket_release(as);
1035}
1036
1037/* A callback that is invoked by asynchronous socket connector on connection
1038 *  events.
1039 * Param:
1040 *  opaque - Initialized AsyncSocket instance.
1041 *  connector - Connector that is used to connect this socket.
1042 *  event - Connection event.
1043 * Return:
1044 *  One of AsyncIOAction values.
1045 */
1046static AsyncIOAction
1047_on_connector_events(void* opaque,
1048                     AsyncSocketConnector* connector,
1049                     AsyncIOState event)
1050{
1051    AsyncIOAction action;
1052    AsyncSocket* const as = (AsyncSocket*)opaque;
1053
1054    /* Reference the socket while we're working with it in this callback. */
1055    async_socket_reference(as);
1056
1057    if (event == ASIO_STATE_SUCCEEDED) {
1058        /* Accept the connection. */
1059        as->fd = async_socket_connector_pull_fd(connector);
1060        loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as);
1061    }
1062
1063    /* Invoke client's callback. */
1064    action = as->on_connection(as->client_opaque, as, event);
1065    if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) {
1066        /* For whatever reason the client didn't want to keep this connection.
1067         * Close it. */
1068        D("ASocket %s: Connection is discarded by the client.",
1069          _async_socket_string(as));
1070        _async_socket_close_socket(as);
1071    }
1072
1073    if (action != ASIO_ACTION_RETRY) {
1074        async_socket_connector_release(connector);
1075    }
1076
1077    async_socket_release(as);
1078
1079    return action;
1080}
1081
1082/* Timer callback invoked to reconnect the lost connection.
1083 * Param:
1084 *  as - Initialized AsyncSocket instance.
1085 */
1086void
1087_on_async_socket_reconnect(void* opaque)
1088{
1089    AsyncSocket* as = (AsyncSocket*)opaque;
1090
1091    /* Reference the socket while we're working with it in this callback. */
1092    async_socket_reference(as);
1093    async_socket_connect(as, as->reconnect_to);
1094    async_socket_release(as);
1095}
1096
1097
1098/********************************************************************************
1099 *                  Android Device Socket public API
1100 *******************************************************************************/
1101
1102AsyncSocket*
1103async_socket_new(int port,
1104                 int reconnect_to,
1105                 on_as_connection_cb client_cb,
1106                 void* client_opaque,
1107                 Looper* looper)
1108{
1109    AsyncSocket* as;
1110
1111    if (client_cb == NULL) {
1112        E("Invalid client_cb parameter");
1113        return NULL;
1114    }
1115
1116    ANEW0(as);
1117
1118    as->fd = -1;
1119    as->client_opaque = client_opaque;
1120    as->on_connection = client_cb;
1121    as->readers_head = as->readers_tail = NULL;
1122    as->reconnect_to = reconnect_to;
1123    as->ref_count = 1;
1124    sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
1125    if (looper == NULL) {
1126        as->looper = looper_newCore();
1127        if (as->looper == NULL) {
1128            E("Unable to create I/O looper for async socket '%s'",
1129              _async_socket_string(as));
1130            client_cb(client_opaque, as, ASIO_STATE_FAILED);
1131            _async_socket_free(as);
1132            return NULL;
1133        }
1134        as->owns_looper = 1;
1135    } else {
1136        as->looper = looper;
1137        as->owns_looper = 0;
1138    }
1139
1140    loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as);
1141
1142    T("ASocket %s: Descriptor is created.", _async_socket_string(as));
1143
1144    return as;
1145}
1146
1147int
1148async_socket_reference(AsyncSocket* as)
1149{
1150    assert(as->ref_count > 0);
1151    as->ref_count++;
1152    return as->ref_count;
1153}
1154
1155int
1156async_socket_release(AsyncSocket* as)
1157{
1158    assert(as->ref_count > 0);
1159    as->ref_count--;
1160    if (as->ref_count == 0) {
1161        /* Last reference has been dropped. Destroy this object. */
1162        _async_socket_cancel_all_io(as);
1163        _async_socket_free(as);
1164        return 0;
1165    }
1166    return as->ref_count;
1167}
1168
1169void
1170async_socket_connect(AsyncSocket* as, int retry_to)
1171{
1172    T("ASocket %s: Handling connection request for %dms...",
1173      _async_socket_string(as), retry_to);
1174
1175    AsyncSocketConnector* const connector =
1176        async_socket_connector_new(&as->address, retry_to, _on_connector_events,
1177                                   as, as->looper);
1178    if (connector != NULL) {
1179        async_socket_connector_connect(connector);
1180    } else {
1181        as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
1182    }
1183}
1184
1185void
1186async_socket_disconnect(AsyncSocket* as)
1187{
1188    T("ASocket %s: Handling disconnection request...", _async_socket_string(as));
1189
1190    if (as != NULL) {
1191        _async_socket_cancel_all_io(as);
1192        _async_socket_close_socket(as);
1193    }
1194}
1195
1196void
1197async_socket_reconnect(AsyncSocket* as, int retry_to)
1198{
1199    T("ASocket %s: Handling reconnection request for %dms...",
1200      _async_socket_string(as), retry_to);
1201
1202    _async_socket_cancel_all_io(as);
1203    _async_socket_close_socket(as);
1204    _async_socket_reconnect(as, retry_to);
1205}
1206
1207void
1208async_socket_read_abs(AsyncSocket* as,
1209                      void* buffer, uint32_t len,
1210                      on_as_io_cb reader_cb,
1211                      void* reader_opaque,
1212                      Duration deadline)
1213{
1214    T("ASocket %s: Handling read for %d bytes with deadline %lld...",
1215      _async_socket_string(as), len, deadline);
1216
1217    AsyncSocketIO* const asr =
1218        _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque,
1219                                 deadline);
1220    /* Add new reader to the list. Note that we use initial reference from I/O
1221     * 'new' routine as "in the list" reference counter. */
1222    if (as->readers_head == NULL) {
1223        as->readers_head = as->readers_tail = asr;
1224    } else {
1225        as->readers_tail->next = asr;
1226        as->readers_tail = asr;
1227    }
1228    loopIo_wantRead(as->io);
1229}
1230
1231void
1232async_socket_read_rel(AsyncSocket* as,
1233                      void* buffer, uint32_t len,
1234                      on_as_io_cb reader_cb,
1235                      void* reader_opaque,
1236                      int to)
1237{
1238    const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1239                                    DURATION_INFINITE;
1240    async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl);
1241}
1242
1243void
1244async_socket_write_abs(AsyncSocket* as,
1245                       const void* buffer, uint32_t len,
1246                       on_as_io_cb writer_cb,
1247                       void* writer_opaque,
1248                       Duration deadline)
1249{
1250    T("ASocket %s: Handling write for %d bytes with deadline %lld...",
1251      _async_socket_string(as), len, deadline);
1252
1253    AsyncSocketIO* const asw =
1254        _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque,
1255                                 deadline);
1256    /* Add new writer to the list. Note that we use initial reference from I/O
1257     * 'new' routine as "in the list" reference counter. */
1258    if (as->writers_head == NULL) {
1259        as->writers_head = as->writers_tail = asw;
1260    } else {
1261        as->writers_tail->next = asw;
1262        as->writers_tail = asw;
1263    }
1264    loopIo_wantWrite(as->io);
1265}
1266
1267void
1268async_socket_write_rel(AsyncSocket* as,
1269                       const void* buffer, uint32_t len,
1270                       on_as_io_cb writer_cb,
1271                       void* writer_opaque,
1272                       int to)
1273{
1274    const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1275                                    DURATION_INFINITE;
1276    async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl);
1277}
1278
1279void*
1280async_socket_get_client_opaque(const AsyncSocket* as)
1281{
1282    return as->client_opaque;
1283}
1284
1285Duration
1286async_socket_deadline(AsyncSocket* as, int rel)
1287{
1288    return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel :
1289                        DURATION_INFINITE;
1290}
1291
1292int
1293async_socket_get_port(const AsyncSocket* as)
1294{
1295    return sock_address_get_port(&as->address);
1296}
1297