1/*
2 * Copyright (C) 2012 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17/*
18 * Encapsulates exchange protocol between the emulator, and an Android device
19 * that is connected to the host via USB. The communication is established over
20 * a TCP port forwarding, enabled by ADB.
21 */
22
23#include "android/utils/debug.h"
24#include "android/async-socket-connector.h"
25#include "android/async-socket.h"
26#include "utils/panic.h"
27#include "iolooper.h"
28
29#define  E(...)    derror(__VA_ARGS__)
30#define  W(...)    dwarning(__VA_ARGS__)
31#define  D(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
32#define  D_ACTIVE  VERBOSE_CHECK(asyncsocket)
33
34#define TRACE_ON    0
35
36#if TRACE_ON
37#define  T(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
38#else
39#define  T(...)
40#endif
41
42/********************************************************************************
43 *                  Asynchronous Socket internal API declarations
44 *******************************************************************************/
45
46/* Gets socket's address string. */
47static const char* _async_socket_string(AsyncSocket* as);
48
49/* Gets socket's looper. */
50static Looper* _async_socket_get_looper(AsyncSocket* as);
51
52/* Handler for the I/O time out.
53 * Param:
54 *  as - Asynchronous socket for the I/O.
55 *  asio - Desciptor for the timed out I/O.
56 */
57static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as,
58                                                AsyncSocketIO* asio);
59
60/********************************************************************************
61 *                  Asynchronous Socket Reader / Writer
62 *******************************************************************************/
63
64struct AsyncSocketIO {
65    /* Next I/O in the reader, or writer list. */
66    AsyncSocketIO*      next;
67    /* Asynchronous socket for this I/O. */
68    AsyncSocket*        as;
69    /* Timer used for time outs on this I/O. */
70    LoopTimer           timer[1];
71    /* An opaque pointer associated with this I/O. */
72    void*               io_opaque;
73    /* Buffer where to read / write data. */
74    uint8_t*            buffer;
75    /* Bytes to transfer through the socket for this I/O. */
76    uint32_t            to_transfer;
77    /* Bytes thransferred through the socket in this I/O. */
78    uint32_t            transferred;
79    /* I/O callback for this I/O. */
80    on_as_io_cb         on_io;
81    /* I/O type selector: 1 - read, 0 - write. */
82    int                 is_io_read;
83    /* State of the I/O. */
84    AsyncIOState        state;
85    /* Number of outstanding references to the I/O. */
86    int                 ref_count;
87    /* Deadline for this I/O */
88    Duration            deadline;
89};
90
91/*
92 * Recycling I/O instances.
93 * Since AsyncSocketIO instances are not that large, it makes sence to recycle
94 * them for faster allocation, rather than allocating and freeing them for each
95 * I/O on the socket.
96 */
97
98/* List of recycled I/O descriptors. */
99static AsyncSocketIO* _asio_recycled    = NULL;
100/* Number of I/O descriptors that are recycled in the _asio_recycled list. */
101static int _recycled_asio_count         = 0;
102/* Maximum number of I/O descriptors that can be recycled. */
103static const int _max_recycled_asio_num = 32;
104
105/* Handler for an I/O time-out timer event.
106 * When this routine is invoked, it indicates that a time out has occurred on an
107 * I/O.
108 * Param:
109 *  opaque - AsyncSocketIO instance representing the timed out I/O.
110 */
111static void _on_async_socket_io_timed_out(void* opaque);
112
113/* Creates new I/O descriptor.
114 * Param:
115 *  as - Asynchronous socket for the I/O.
116 *  is_io_read - I/O type selector: 1 - read, 0 - write.
117 *  buffer, len - Reader / writer buffer address.
118 *  io_cb - Callback for this reader / writer.
119 *  io_opaque - An opaque pointer associated with the I/O.
120 *  deadline - Deadline to complete the I/O.
121 * Return:
122 *  Initialized AsyncSocketIO instance.
123 */
124static AsyncSocketIO*
125_async_socket_rw_new(AsyncSocket* as,
126                     int is_io_read,
127                     void* buffer,
128                     uint32_t len,
129                     on_as_io_cb io_cb,
130                     void* io_opaque,
131                     Duration deadline)
132{
133    /* Lookup in the recycler first. */
134    AsyncSocketIO* asio = _asio_recycled;
135    if (asio != NULL) {
136        /* Pull the descriptor from recycler. */
137        _asio_recycled = asio->next;
138        _recycled_asio_count--;
139    } else {
140        /* No recycled descriptors. Allocate new one. */
141        ANEW0(asio);
142    }
143
144    asio->next          = NULL;
145    asio->as            = as;
146    asio->is_io_read    = is_io_read;
147    asio->buffer        = (uint8_t*)buffer;
148    asio->to_transfer   = len;
149    asio->transferred   = 0;
150    asio->on_io         = io_cb;
151    asio->io_opaque     = io_opaque;
152    asio->state         = ASIO_STATE_QUEUED;
153    asio->ref_count     = 1;
154    asio->deadline      = deadline;
155    loopTimer_init(asio->timer, _async_socket_get_looper(as),
156                   _on_async_socket_io_timed_out, asio);
157    loopTimer_startAbsolute(asio->timer, deadline);
158
159    /* Reference socket that is holding this I/O. */
160    async_socket_reference(as);
161
162    T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data",
163      _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len);
164
165    return asio;
166}
167
168/* Destroys and frees I/O descriptor. */
169static void
170_async_socket_io_free(AsyncSocketIO* asio)
171{
172    AsyncSocket* const as = asio->as;
173
174    T("ASocket %s: %s I/O descriptor %p is destroyed.",
175      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
176
177    loopTimer_done(asio->timer);
178
179    /* Try to recycle it first, and free the memory if recycler is full. */
180    if (_recycled_asio_count < _max_recycled_asio_num) {
181        asio->next = _asio_recycled;
182        _asio_recycled = asio;
183        _recycled_asio_count++;
184    } else {
185        AFREE(asio);
186    }
187
188    /* Release socket that is holding this I/O. */
189    async_socket_release(as);
190}
191
192/* An I/O has been finished and its descriptor is about to be discarded. */
193static void
194_async_socket_io_finished(AsyncSocketIO* asio)
195{
196    /* Notify the client of the I/O that I/O is finished. */
197    asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED);
198}
199
200int
201async_socket_io_reference(AsyncSocketIO* asio)
202{
203    assert(asio->ref_count > 0);
204    asio->ref_count++;
205    return asio->ref_count;
206}
207
208int
209async_socket_io_release(AsyncSocketIO* asio)
210{
211    assert(asio->ref_count > 0);
212    asio->ref_count--;
213    if (asio->ref_count == 0) {
214        _async_socket_io_finished(asio);
215        /* Last reference has been dropped. Destroy this object. */
216        _async_socket_io_free(asio);
217        return 0;
218    }
219    return asio->ref_count;
220}
221
222/* Creates new asynchronous socket reader.
223 * Param:
224 *  as - Asynchronous socket for the reader.
225 *  buffer, len - Reader's buffer.
226 *  io_cb - Reader's callback.
227 *  reader_opaque - An opaque pointer associated with the reader.
228 *  deadline - Deadline to complete the operation.
229 * Return:
230 *  An initialized AsyncSocketIO intance.
231 */
232static AsyncSocketIO*
233_async_socket_reader_new(AsyncSocket* as,
234                         void* buffer,
235                         uint32_t len,
236                         on_as_io_cb io_cb,
237                         void* reader_opaque,
238                         Duration deadline)
239{
240    AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb,
241                                                     reader_opaque, deadline);
242    return asio;
243}
244
245/* Creates new asynchronous socket writer.
246 * Param:
247 *  as - Asynchronous socket for the writer.
248 *  buffer, len - Writer's buffer.
249 *  io_cb - Writer's callback.
250 *  writer_opaque - An opaque pointer associated with the writer.
251 *  deadline - Deadline to complete the operation.
252 * Return:
253 *  An initialized AsyncSocketIO intance.
254 */
255static AsyncSocketIO*
256_async_socket_writer_new(AsyncSocket* as,
257                         const void* buffer,
258                         uint32_t len,
259                         on_as_io_cb io_cb,
260                         void* writer_opaque,
261                         Duration deadline)
262{
263    AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len,
264                                                     io_cb, writer_opaque,
265                                                     deadline);
266    return asio;
267}
268
269/* I/O timed out. */
270static void
271_on_async_socket_io_timed_out(void* opaque)
272{
273    AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
274    AsyncSocket* const as = asio->as;
275
276    D("ASocket %s: %s I/O with deadline %lld has timed out at %lld",
277      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
278      asio->deadline, async_socket_deadline(as, 0));
279
280    /* Reference while in callback. */
281    async_socket_io_reference(asio);
282    _async_socket_io_timed_out(asio->as, asio);
283    async_socket_io_release(asio);
284}
285
286/********************************************************************************
287 *                 Public Asynchronous Socket I/O API
288 *******************************************************************************/
289
290AsyncSocket*
291async_socket_io_get_socket(const AsyncSocketIO* asio)
292{
293    async_socket_reference(asio->as);
294    return asio->as;
295}
296
297void
298async_socket_io_cancel_time_out(AsyncSocketIO* asio)
299{
300    loopTimer_stop(asio->timer);
301}
302
303void*
304async_socket_io_get_io_opaque(const AsyncSocketIO* asio)
305{
306    return asio->io_opaque;
307}
308
309void*
310async_socket_io_get_client_opaque(const AsyncSocketIO* asio)
311{
312    return async_socket_get_client_opaque(asio->as);
313}
314
315void*
316async_socket_io_get_buffer_info(const AsyncSocketIO* asio,
317                                uint32_t* transferred,
318                                uint32_t* to_transfer)
319{
320    if (transferred != NULL) {
321        *transferred = asio->transferred;
322    }
323    if (to_transfer != NULL) {
324        *to_transfer = asio->to_transfer;
325    }
326    return asio->buffer;
327}
328
329void*
330async_socket_io_get_buffer(const AsyncSocketIO* asio)
331{
332    return asio->buffer;
333}
334
335uint32_t
336async_socket_io_get_transferred(const AsyncSocketIO* asio)
337{
338    return asio->transferred;
339}
340
341uint32_t
342async_socket_io_get_to_transfer(const AsyncSocketIO* asio)
343{
344    return asio->to_transfer;
345}
346
347int
348async_socket_io_is_read(const AsyncSocketIO* asio)
349{
350    return asio->is_io_read;
351}
352
353/********************************************************************************
354 *                      Asynchronous Socket internals
355 *******************************************************************************/
356
357struct AsyncSocket {
358    /* TCP address for the socket. */
359    SockAddress         address;
360    /* Connection callback for this socket. */
361    on_as_connection_cb on_connection;
362    /* An opaque pointer associated with this socket by the client. */
363    void*               client_opaque;
364    /* I/O looper for asynchronous I/O on the socket. */
365    Looper*             looper;
366    /* I/O descriptor for asynchronous I/O on the socket. */
367    LoopIo              io[1];
368    /* Timer to use for reconnection attempts. */
369    LoopTimer           reconnect_timer[1];
370    /* Head of the list of the active readers. */
371    AsyncSocketIO*      readers_head;
372    /* Tail of the list of the active readers. */
373    AsyncSocketIO*      readers_tail;
374    /* Head of the list of the active writers. */
375    AsyncSocketIO*      writers_head;
376    /* Tail of the list of the active writers. */
377    AsyncSocketIO*      writers_tail;
378    /* Socket's file descriptor. */
379    int                 fd;
380    /* Timeout to use for reconnection attempts. */
381    int                 reconnect_to;
382    /* Number of outstanding references to the socket. */
383    int                 ref_count;
384    /* Flags whether (1) or not (0) socket owns the looper. */
385    int                 owns_looper;
386};
387
388static const char*
389_async_socket_string(AsyncSocket* as)
390{
391    return sock_address_to_string(&as->address);
392}
393
394static Looper*
395_async_socket_get_looper(AsyncSocket* as)
396{
397    return as->looper;
398}
399
400/* Pulls first reader out of the list.
401 * Param:
402 *  as - Initialized AsyncSocket instance.
403 * Return:
404 *  First I/O pulled out of the list, or NULL if there are no I/O in the list.
405 *  Note that the caller is responsible for releasing the I/O object returned
406 *  from this routine.
407 */
408static AsyncSocketIO*
409_async_socket_pull_first_io(AsyncSocket* as,
410                            AsyncSocketIO** list_head,
411                            AsyncSocketIO** list_tail)
412{
413    AsyncSocketIO* const ret = *list_head;
414    if (ret != NULL) {
415        *list_head = ret->next;
416        ret->next = NULL;
417        if (*list_head == NULL) {
418            *list_tail = NULL;
419        }
420    }
421    return ret;
422}
423
424/* Pulls first reader out of the list.
425 * Param:
426 *  as - Initialized AsyncSocket instance.
427 * Return:
428 *  First reader pulled out of the list, or NULL if there are no readers in the
429 *  list.
430 *  Note that the caller is responsible for releasing the I/O object returned
431 *  from this routine.
432 */
433static AsyncSocketIO*
434_async_socket_pull_first_reader(AsyncSocket* as)
435{
436    return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail);
437}
438
439/* Pulls first writer out of the list.
440 * Param:
441 *  as - Initialized AsyncSocket instance.
442 * Return:
443 *  First writer pulled out of the list, or NULL if there are no writers in the
444 *  list.
445 *  Note that the caller is responsible for releasing the I/O object returned
446 *  from this routine.
447 */
448static AsyncSocketIO*
449_async_socket_pull_first_writer(AsyncSocket* as)
450{
451    return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail);
452}
453
454/* Removes an I/O descriptor from a list of active I/O.
455 * Param:
456 *  as - Initialized AsyncSocket instance.
457 *  list_head, list_tail - Pointers to the list head and tail.
458 *  io - I/O to remove.
459 * Return:
460 *  Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list.
461 */
462static int
463_async_socket_remove_io(AsyncSocket* as,
464                        AsyncSocketIO** list_head,
465                        AsyncSocketIO** list_tail,
466                        AsyncSocketIO* io)
467{
468    AsyncSocketIO* prev = NULL;
469
470    while (*list_head != NULL && io != *list_head) {
471        prev = *list_head;
472        list_head = &((*list_head)->next);
473    }
474    if (*list_head == NULL) {
475        D("%s: I/O %p is not found in the list for socket '%s'",
476          __FUNCTION__, io, _async_socket_string(as));
477        return 0;
478    }
479
480    *list_head = io->next;
481    if (prev != NULL) {
482        prev->next = io->next;
483    }
484    if (*list_tail == io) {
485        *list_tail = prev;
486    }
487
488    /* Release I/O adjusting reference added when I/O has been saved in the list. */
489    async_socket_io_release(io);
490
491    return 1;
492}
493
494/* Advances to the next I/O in the list.
495 * Param:
496 *  as - Initialized AsyncSocket instance.
497 *  list_head, list_tail - Pointers to the list head and tail.
498 */
499static void
500_async_socket_advance_io(AsyncSocket* as,
501                         AsyncSocketIO** list_head,
502                         AsyncSocketIO** list_tail)
503{
504    AsyncSocketIO* first_io = *list_head;
505    if (first_io != NULL) {
506        *list_head = first_io->next;
507        first_io->next = NULL;
508    }
509    if (*list_head == NULL) {
510        *list_tail = NULL;
511    }
512    if (first_io != NULL) {
513        /* Release I/O removed from the head of the list. */
514        async_socket_io_release(first_io);
515    }
516}
517
518/* Advances to the next reader in the list.
519 * Param:
520 *  as - Initialized AsyncSocket instance.
521 */
522static void
523_async_socket_advance_reader(AsyncSocket* as)
524{
525    _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
526}
527
528/* Advances to the next writer in the list.
529 * Param:
530 *  as - Initialized AsyncSocket instance.
531 */
532static void
533_async_socket_advance_writer(AsyncSocket* as)
534{
535    _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
536}
537
538/* Completes an I/O.
539 * Param:
540 *  as - Initialized AsyncSocket instance.
541 *  asio - I/O to complete.
542 * Return:
543 *  One of AsyncIOAction values.
544 */
545static AsyncIOAction
546_async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio)
547{
548    T("ASocket %s: %s I/O %p is completed.",
549      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
550
551    /* Stop the timer. */
552    async_socket_io_cancel_time_out(asio);
553
554    return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED);
555}
556
557/* Timeouts an I/O.
558 * Param:
559 *  as - Initialized AsyncSocket instance.
560 *  asio - An I/O that has timed out.
561 * Return:
562 *  One of AsyncIOAction values.
563 */
564static AsyncIOAction
565_async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio)
566{
567    T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld",
568      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
569      asio->deadline, async_socket_deadline(as, 0));
570
571    /* Report to the client. */
572    const AsyncIOAction action = asio->on_io(asio->io_opaque, asio,
573                                             ASIO_STATE_TIMED_OUT);
574
575    /* Remove the I/O from a list of active I/O for actions other than retry. */
576    if (action != ASIO_ACTION_RETRY) {
577        if (asio->is_io_read) {
578            _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio);
579        } else {
580            _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio);
581        }
582    }
583
584    return action;
585}
586
587/* Cancels an I/O.
588 * Param:
589 *  as - Initialized AsyncSocket instance.
590 *  asio - An I/O to cancel.
591 * Return:
592 *  One of AsyncIOAction values.
593 */
594static AsyncIOAction
595_async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio)
596{
597    T("ASocket %s: %s I/O %p is cancelled.",
598      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);
599
600    /* Stop the timer. */
601    async_socket_io_cancel_time_out(asio);
602
603    return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED);
604}
605
606/* Reports an I/O failure.
607 * Param:
608 *  as - Initialized AsyncSocket instance.
609 *  asio - An I/O that has failed. Can be NULL for general failures.
610 *  failure - Failure (errno) that has occurred.
611 * Return:
612 *  One of AsyncIOAction values.
613 */
614static AsyncIOAction
615_async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure)
616{
617    T("ASocket %s: %s I/O %p has failed: %d -> %s",
618      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
619      failure, strerror(failure));
620
621    /* Stop the timer. */
622    async_socket_io_cancel_time_out(asio);
623
624    errno = failure;
625    return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED);
626}
627
628/* Cancels all the active socket readers.
629 * Param:
630 *  as - Initialized AsyncSocket instance.
631 */
632static void
633_async_socket_cancel_readers(AsyncSocket* as)
634{
635    while (as->readers_head != NULL) {
636        AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as);
637        /* We ignore action returned from the cancellation callback, since we're
638         * in a disconnected state here. */
639        _async_socket_cancel_io(as, to_cancel);
640        async_socket_io_release(to_cancel);
641    }
642}
643
644/* Cancels all the active socket writers.
645 * Param:
646 *  as - Initialized AsyncSocket instance.
647 */
648static void
649_async_socket_cancel_writers(AsyncSocket* as)
650{
651    while (as->writers_head != NULL) {
652        AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as);
653        /* We ignore action returned from the cancellation callback, since we're
654         * in a disconnected state here. */
655        _async_socket_cancel_io(as, to_cancel);
656        async_socket_io_release(to_cancel);
657    }
658}
659
660/* Cancels all the I/O on the socket. */
661static void
662_async_socket_cancel_all_io(AsyncSocket* as)
663{
664    /* Stop the reconnection timer. */
665    loopTimer_stop(as->reconnect_timer);
666
667    /* Stop read / write on the socket. */
668    loopIo_dontWantWrite(as->io);
669    loopIo_dontWantRead(as->io);
670
671    /* Cancel active readers and writers. */
672    _async_socket_cancel_readers(as);
673    _async_socket_cancel_writers(as);
674}
675
676/* Closes socket handle used by the async socket.
677 * Param:
678 *  as - Initialized AsyncSocket instance.
679 */
680static void
681_async_socket_close_socket(AsyncSocket* as)
682{
683    if (as->fd >= 0) {
684        T("ASocket %s: Socket handle %d is closed.",
685          _async_socket_string(as), as->fd);
686        loopIo_done(as->io);
687        socket_close(as->fd);
688        as->fd = -1;
689    }
690}
691
692/* Destroys AsyncSocket instance.
693 * Param:
694 *  as - Initialized AsyncSocket instance.
695 */
696static void
697_async_socket_free(AsyncSocket* as)
698{
699    if (as != NULL) {
700        T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as));
701
702        /* Close socket. */
703        _async_socket_close_socket(as);
704
705        /* Free allocated resources. */
706        if (as->looper != NULL) {
707            loopTimer_done(as->reconnect_timer);
708            if (as->owns_looper) {
709                looper_free(as->looper);
710            }
711        }
712        sock_address_done(&as->address);
713        AFREE(as);
714    }
715}
716
717/* Starts reconnection attempts after connection has been lost.
718 * Param:
719 *  as - Initialized AsyncSocket instance.
720 *  to - Milliseconds to wait before reconnection attempt.
721 */
722static void
723_async_socket_reconnect(AsyncSocket* as, int to)
724{
725    T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to);
726
727    /* Make sure that no I/O is active, and socket is closed before we
728     * reconnect. */
729    _async_socket_cancel_all_io(as);
730
731    /* Set the timer for reconnection attempt. */
732    loopTimer_startRelative(as->reconnect_timer, to);
733}
734
735/********************************************************************************
736 *                      Asynchronous Socket callbacks
737 *******************************************************************************/
738
739/* A callback that is invoked when socket gets disconnected.
740 * Param:
741 *  as - Initialized AsyncSocket instance.
742 */
743static void
744_on_async_socket_disconnected(AsyncSocket* as)
745{
746    /* Save error to restore it for the client's callback. */
747    const int save_errno = errno;
748    AsyncIOAction action = ASIO_ACTION_ABORT;
749
750    D("ASocket %s: Disconnected.", _async_socket_string(as));
751
752    /* Cancel all the I/O on this socket. */
753    _async_socket_cancel_all_io(as);
754
755    /* Close the socket. */
756    _async_socket_close_socket(as);
757
758    /* Restore errno, and invoke client's callback. */
759    errno = save_errno;
760    action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
761
762    if (action == ASIO_ACTION_RETRY) {
763        /* Client requested reconnection. */
764        _async_socket_reconnect(as, as->reconnect_to);
765    }
766}
767
768/* A callback that is invoked on socket's I/O failure.
769 * Param:
770 *  as - Initialized AsyncSocket instance.
771 *  asio - Descriptor for the failed I/O. Can be NULL for general failures.
772 */
773static AsyncIOAction
774_on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
775{
776    D("ASocket %s: %s I/O failure: %d -> %s",
777      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
778      errno, strerror(errno));
779
780    /* Report the failure. */
781    return _async_socket_io_failure(as, asio, errno);
782}
783
784/* A callback that is invoked when there is data available to read.
785 * Param:
786 *  as - Initialized AsyncSocket instance.
787 * Return:
788 *  0 on success, or -1 on failure. Failure returned from this routine will
789 *  skip writes (if awailable) behind this read.
790 */
791static int
792_on_async_socket_recv(AsyncSocket* as)
793{
794    AsyncIOAction action;
795
796    /* Get current reader. */
797    AsyncSocketIO* const asr = as->readers_head;
798    if (asr == NULL) {
799        D("ASocket %s: No reader is available.", _async_socket_string(as));
800        loopIo_dontWantRead(as->io);
801        return 0;
802    }
803
804    /* Reference the reader while we're working with it in this callback. */
805    async_socket_io_reference(asr);
806
807    /* Bump I/O state, and inform the client that I/O is in progress. */
808    if (asr->state == ASIO_STATE_QUEUED) {
809        asr->state = ASIO_STATE_STARTED;
810    } else {
811        asr->state = ASIO_STATE_CONTINUES;
812    }
813    action = asr->on_io(asr->io_opaque, asr, asr->state);
814    if (action == ASIO_ACTION_ABORT) {
815        D("ASocket %s: Read is aborted by the client.", _async_socket_string(as));
816        /* Move on to the next reader. */
817        _async_socket_advance_reader(as);
818        /* Lets see if there are still active readers, and enable, or disable
819         * read I/O callback accordingly. */
820        if (as->readers_head != NULL) {
821            loopIo_wantRead(as->io);
822        } else {
823            loopIo_dontWantRead(as->io);
824        }
825        async_socket_io_release(asr);
826        return 0;
827    }
828
829    /* Read next chunk of data. */
830    int res = socket_recv(as->fd, asr->buffer + asr->transferred,
831                          asr->to_transfer - asr->transferred);
832    while (res < 0 && errno == EINTR) {
833        res = socket_recv(as->fd, asr->buffer + asr->transferred,
834                          asr->to_transfer - asr->transferred);
835    }
836
837    if (res == 0) {
838        /* Socket has been disconnected. */
839        errno = ECONNRESET;
840        _on_async_socket_disconnected(as);
841        async_socket_io_release(asr);
842        return -1;
843    }
844
845    if (res < 0) {
846        if (errno == EWOULDBLOCK || errno == EAGAIN) {
847            /* Yield to writes behind this read. */
848            loopIo_wantRead(as->io);
849            async_socket_io_release(asr);
850            return 0;
851        }
852
853        /* An I/O error. */
854        action = _on_async_socket_failure(as, asr);
855        if (action != ASIO_ACTION_RETRY) {
856            D("ASocket %s: Read is aborted on failure.", _async_socket_string(as));
857            /* Move on to the next reader. */
858            _async_socket_advance_reader(as);
859            /* Lets see if there are still active readers, and enable, or disable
860             * read I/O callback accordingly. */
861            if (as->readers_head != NULL) {
862                loopIo_wantRead(as->io);
863            } else {
864                loopIo_dontWantRead(as->io);
865            }
866        }
867        async_socket_io_release(asr);
868        return -1;
869    }
870
871    /* Update the reader's descriptor. */
872    asr->transferred += res;
873    if (asr->transferred == asr->to_transfer) {
874        /* This read is completed. Move on to the next reader. */
875        _async_socket_advance_reader(as);
876
877        /* Notify reader completion. */
878        _async_socket_complete_io(as, asr);
879    }
880
881    /* Lets see if there are still active readers, and enable, or disable read
882     * I/O callback accordingly. */
883    if (as->readers_head != NULL) {
884        loopIo_wantRead(as->io);
885    } else {
886        loopIo_dontWantRead(as->io);
887    }
888
889    async_socket_io_release(asr);
890
891    return 0;
892}
893
894/* A callback that is invoked when there is data available to write.
895 * Param:
896 *  as - Initialized AsyncSocket instance.
897 * Return:
898 *  0 on success, or -1 on failure. Failure returned from this routine will
899 *  skip reads (if awailable) behind this write.
900 */
901static int
902_on_async_socket_send(AsyncSocket* as)
903{
904    AsyncIOAction action;
905
906    /* Get current writer. */
907    AsyncSocketIO* const asw = as->writers_head;
908    if (asw == NULL) {
909        D("ASocket %s: No writer is available.", _async_socket_string(as));
910        loopIo_dontWantWrite(as->io);
911        return 0;
912    }
913
914    /* Reference the writer while we're working with it in this callback. */
915    async_socket_io_reference(asw);
916
917    /* Bump I/O state, and inform the client that I/O is in progress. */
918    if (asw->state == ASIO_STATE_QUEUED) {
919        asw->state = ASIO_STATE_STARTED;
920    } else {
921        asw->state = ASIO_STATE_CONTINUES;
922    }
923    action = asw->on_io(asw->io_opaque, asw, asw->state);
924    if (action == ASIO_ACTION_ABORT) {
925        D("ASocket %s: Write is aborted by the client.", _async_socket_string(as));
926        /* Move on to the next writer. */
927        _async_socket_advance_writer(as);
928        /* Lets see if there are still active writers, and enable, or disable
929         * write I/O callback accordingly. */
930        if (as->writers_head != NULL) {
931            loopIo_wantWrite(as->io);
932        } else {
933            loopIo_dontWantWrite(as->io);
934        }
935        async_socket_io_release(asw);
936        return 0;
937    }
938
939    /* Write next chunk of data. */
940    int res = socket_send(as->fd, asw->buffer + asw->transferred,
941                          asw->to_transfer - asw->transferred);
942    while (res < 0 && errno == EINTR) {
943        res = socket_send(as->fd, asw->buffer + asw->transferred,
944                          asw->to_transfer - asw->transferred);
945    }
946
947    if (res == 0) {
948        /* Socket has been disconnected. */
949        errno = ECONNRESET;
950        _on_async_socket_disconnected(as);
951        async_socket_io_release(asw);
952        return -1;
953    }
954
955    if (res < 0) {
956        if (errno == EWOULDBLOCK || errno == EAGAIN) {
957            /* Yield to reads behind this write. */
958            loopIo_wantWrite(as->io);
959            async_socket_io_release(asw);
960            return 0;
961        }
962
963        /* An I/O error. */
964        action = _on_async_socket_failure(as, asw);
965        if (action != ASIO_ACTION_RETRY) {
966            D("ASocket %s: Write is aborted on failure.", _async_socket_string(as));
967            /* Move on to the next writer. */
968            _async_socket_advance_writer(as);
969            /* Lets see if there are still active writers, and enable, or disable
970             * write I/O callback accordingly. */
971            if (as->writers_head != NULL) {
972                loopIo_wantWrite(as->io);
973            } else {
974                loopIo_dontWantWrite(as->io);
975            }
976        }
977        async_socket_io_release(asw);
978        return -1;
979    }
980
981    /* Update the writer descriptor. */
982    asw->transferred += res;
983    if (asw->transferred == asw->to_transfer) {
984        /* This write is completed. Move on to the next writer. */
985        _async_socket_advance_writer(as);
986
987        /* Notify writer completion. */
988        _async_socket_complete_io(as, asw);
989    }
990
991    /* Lets see if there are still active writers, and enable, or disable write
992     * I/O callback accordingly. */
993    if (as->writers_head != NULL) {
994        loopIo_wantWrite(as->io);
995    } else {
996        loopIo_dontWantWrite(as->io);
997    }
998
999    async_socket_io_release(asw);
1000
1001    return 0;
1002}
1003
1004/* A callback that is invoked when an I/O is available on socket.
1005 * Param:
1006 *  as - Initialized AsyncSocket instance.
1007 *  fd - Socket's file descriptor.
1008 *  events - LOOP_IO_READ | LOOP_IO_WRITE bitmask.
1009 */
1010static void
1011_on_async_socket_io(void* opaque, int fd, unsigned events)
1012{
1013    AsyncSocket* const as = (AsyncSocket*)opaque;
1014
1015    /* Reference the socket while we're working with it in this callback. */
1016    async_socket_reference(as);
1017
1018    if ((events & LOOP_IO_READ) != 0) {
1019        if (_on_async_socket_recv(as) != 0) {
1020            async_socket_release(as);
1021            return;
1022        }
1023    }
1024
1025    if ((events & LOOP_IO_WRITE) != 0) {
1026        if (_on_async_socket_send(as) != 0) {
1027            async_socket_release(as);
1028            return;
1029        }
1030    }
1031
1032    async_socket_release(as);
1033}
1034
1035/* A callback that is invoked by asynchronous socket connector on connection
1036 *  events.
1037 * Param:
1038 *  opaque - Initialized AsyncSocket instance.
1039 *  connector - Connector that is used to connect this socket.
1040 *  event - Connection event.
1041 * Return:
1042 *  One of AsyncIOAction values.
1043 */
1044static AsyncIOAction
1045_on_connector_events(void* opaque,
1046                     AsyncSocketConnector* connector,
1047                     AsyncIOState event)
1048{
1049    AsyncIOAction action;
1050    AsyncSocket* const as = (AsyncSocket*)opaque;
1051
1052    /* Reference the socket while we're working with it in this callback. */
1053    async_socket_reference(as);
1054
1055    if (event == ASIO_STATE_SUCCEEDED) {
1056        /* Accept the connection. */
1057        as->fd = async_socket_connector_pull_fd(connector);
1058        loopIo_init(as->io, as->looper, as->fd, _on_async_socket_io, as);
1059    }
1060
1061    /* Invoke client's callback. */
1062    action = as->on_connection(as->client_opaque, as, event);
1063    if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) {
1064        /* For whatever reason the client didn't want to keep this connection.
1065         * Close it. */
1066        D("ASocket %s: Connection is discarded by the client.",
1067          _async_socket_string(as));
1068        _async_socket_close_socket(as);
1069    }
1070
1071    if (action != ASIO_ACTION_RETRY) {
1072        async_socket_connector_release(connector);
1073    }
1074
1075    async_socket_release(as);
1076
1077    return action;
1078}
1079
1080/* Timer callback invoked to reconnect the lost connection.
1081 * Param:
1082 *  as - Initialized AsyncSocket instance.
1083 */
1084void
1085_on_async_socket_reconnect(void* opaque)
1086{
1087    AsyncSocket* as = (AsyncSocket*)opaque;
1088
1089    /* Reference the socket while we're working with it in this callback. */
1090    async_socket_reference(as);
1091    async_socket_connect(as, as->reconnect_to);
1092    async_socket_release(as);
1093}
1094
1095
1096/********************************************************************************
1097 *                  Android Device Socket public API
1098 *******************************************************************************/
1099
1100AsyncSocket*
1101async_socket_new(int port,
1102                 int reconnect_to,
1103                 on_as_connection_cb client_cb,
1104                 void* client_opaque,
1105                 Looper* looper)
1106{
1107    AsyncSocket* as;
1108
1109    if (client_cb == NULL) {
1110        E("Invalid client_cb parameter");
1111        return NULL;
1112    }
1113
1114    ANEW0(as);
1115
1116    as->fd = -1;
1117    as->client_opaque = client_opaque;
1118    as->on_connection = client_cb;
1119    as->readers_head = as->readers_tail = NULL;
1120    as->reconnect_to = reconnect_to;
1121    as->ref_count = 1;
1122    sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
1123    if (looper == NULL) {
1124        as->looper = looper_newCore();
1125        if (as->looper == NULL) {
1126            E("Unable to create I/O looper for async socket '%s'",
1127              _async_socket_string(as));
1128            client_cb(client_opaque, as, ASIO_STATE_FAILED);
1129            _async_socket_free(as);
1130            return NULL;
1131        }
1132        as->owns_looper = 1;
1133    } else {
1134        as->looper = looper;
1135        as->owns_looper = 0;
1136    }
1137
1138    loopTimer_init(as->reconnect_timer, as->looper, _on_async_socket_reconnect, as);
1139
1140    T("ASocket %s: Descriptor is created.", _async_socket_string(as));
1141
1142    return as;
1143}
1144
1145int
1146async_socket_reference(AsyncSocket* as)
1147{
1148    assert(as->ref_count > 0);
1149    as->ref_count++;
1150    return as->ref_count;
1151}
1152
1153int
1154async_socket_release(AsyncSocket* as)
1155{
1156    assert(as->ref_count > 0);
1157    as->ref_count--;
1158    if (as->ref_count == 0) {
1159        /* Last reference has been dropped. Destroy this object. */
1160        _async_socket_cancel_all_io(as);
1161        _async_socket_free(as);
1162        return 0;
1163    }
1164    return as->ref_count;
1165}
1166
1167void
1168async_socket_connect(AsyncSocket* as, int retry_to)
1169{
1170    T("ASocket %s: Handling connection request for %dms...",
1171      _async_socket_string(as), retry_to);
1172
1173    AsyncSocketConnector* const connector =
1174        async_socket_connector_new(&as->address, retry_to, _on_connector_events,
1175                                   as, as->looper);
1176    if (connector != NULL) {
1177        async_socket_connector_connect(connector);
1178    } else {
1179        as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
1180    }
1181}
1182
1183void
1184async_socket_disconnect(AsyncSocket* as)
1185{
1186    T("ASocket %s: Handling disconnection request...", _async_socket_string(as));
1187
1188    if (as != NULL) {
1189        _async_socket_cancel_all_io(as);
1190        _async_socket_close_socket(as);
1191    }
1192}
1193
1194void
1195async_socket_reconnect(AsyncSocket* as, int retry_to)
1196{
1197    T("ASocket %s: Handling reconnection request for %dms...",
1198      _async_socket_string(as), retry_to);
1199
1200    _async_socket_cancel_all_io(as);
1201    _async_socket_close_socket(as);
1202    _async_socket_reconnect(as, retry_to);
1203}
1204
1205void
1206async_socket_read_abs(AsyncSocket* as,
1207                      void* buffer, uint32_t len,
1208                      on_as_io_cb reader_cb,
1209                      void* reader_opaque,
1210                      Duration deadline)
1211{
1212    T("ASocket %s: Handling read for %d bytes with deadline %lld...",
1213      _async_socket_string(as), len, deadline);
1214
1215    AsyncSocketIO* const asr =
1216        _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque,
1217                                 deadline);
1218    if (async_socket_is_connected(as)) {
1219        /* Add new reader to the list. Note that we use initial reference from I/O
1220         * 'new' routine as "in the list" reference counter. */
1221        if (as->readers_head == NULL) {
1222            as->readers_head = as->readers_tail = asr;
1223        } else {
1224            as->readers_tail->next = asr;
1225            as->readers_tail = asr;
1226        }
1227        loopIo_wantRead(as->io);
1228    } else {
1229        D("ASocket %s: Read on a disconnected socket.", _async_socket_string(as));
1230        errno = ECONNRESET;
1231        reader_cb(reader_opaque, asr, ASIO_STATE_FAILED);
1232        async_socket_io_release(asr);
1233    }
1234}
1235
1236void
1237async_socket_read_rel(AsyncSocket* as,
1238                      void* buffer, uint32_t len,
1239                      on_as_io_cb reader_cb,
1240                      void* reader_opaque,
1241                      int to)
1242{
1243    const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1244                                    DURATION_INFINITE;
1245    async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl);
1246}
1247
1248void
1249async_socket_write_abs(AsyncSocket* as,
1250                       const void* buffer, uint32_t len,
1251                       on_as_io_cb writer_cb,
1252                       void* writer_opaque,
1253                       Duration deadline)
1254{
1255    T("ASocket %s: Handling write for %d bytes with deadline %lld...",
1256      _async_socket_string(as), len, deadline);
1257
1258    AsyncSocketIO* const asw =
1259        _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque,
1260                                 deadline);
1261    if (async_socket_is_connected(as)) {
1262        /* Add new writer to the list. Note that we use initial reference from I/O
1263         * 'new' routine as "in the list" reference counter. */
1264        if (as->writers_head == NULL) {
1265            as->writers_head = as->writers_tail = asw;
1266        } else {
1267            as->writers_tail->next = asw;
1268            as->writers_tail = asw;
1269        }
1270        loopIo_wantWrite(as->io);
1271    } else {
1272        D("ASocket %s: Write on a disconnected socket.", _async_socket_string(as));
1273        errno = ECONNRESET;
1274        writer_cb(writer_opaque, asw, ASIO_STATE_FAILED);
1275        async_socket_io_release(asw);
1276    }
1277}
1278
1279void
1280async_socket_write_rel(AsyncSocket* as,
1281                       const void* buffer, uint32_t len,
1282                       on_as_io_cb writer_cb,
1283                       void* writer_opaque,
1284                       int to)
1285{
1286    const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
1287                                    DURATION_INFINITE;
1288    async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl);
1289}
1290
1291void*
1292async_socket_get_client_opaque(const AsyncSocket* as)
1293{
1294    return as->client_opaque;
1295}
1296
1297Duration
1298async_socket_deadline(AsyncSocket* as, int rel)
1299{
1300    return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel :
1301                        DURATION_INFINITE;
1302}
1303
1304int
1305async_socket_get_port(const AsyncSocket* as)
1306{
1307    return sock_address_get_port(&as->address);
1308}
1309
1310int
1311async_socket_is_connected(const AsyncSocket* as)
1312{
1313    return as->fd >= 0;
1314}
1315