btif_sock_thread.c revision df3459935a4c12744a9a78812157890b60ccb77d
1/******************************************************************************
2 *
3 *  Copyright (C) 2009-2012 Broadcom Corporation
4 *
5 *  Licensed under the Apache License, Version 2.0 (the "License");
6 *  you may not use this file except in compliance with the License.
7 *  You may obtain a copy of the License at:
8 *
9 *  http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *  Unless required by applicable law or agreed to in writing, software
12 *  distributed under the License is distributed on an "AS IS" BASIS,
13 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  See the License for the specific language governing permissions and
15 *  limitations under the License.
16 *
17 ******************************************************************************/
18
19/************************************************************************************
20 *
21 *  Filename:      btif_sock_thread.c
22 *
23 *  Description:   socket select thread
24 *
25 *
26 ***********************************************************************************/
27
28#include <hardware/bluetooth.h>
29#include <hardware/bt_sock.h>
30
31//bta_jv_co_rfc_data
32#include <stdio.h>
33#include <stdlib.h>
34#include <errno.h>
35#include <string.h>
36#include <sys/types.h>
37#include <sys/socket.h>
38#include <sys/un.h>
39#include <time.h>
40#include <fcntl.h>
41#include <unistd.h>
42#include <signal.h>
43#include <pthread.h>
44#include <ctype.h>
45
46#include <sys/select.h>
47#include <sys/poll.h>
48#include <cutils/sockets.h>
49#include <alloca.h>
50
51#define LOG_TAG "BTIF_SOCK"
52#include "btif_common.h"
53#include "btif_util.h"
54
55#include "bd.h"
56
57#include "bta_api.h"
58#include "btif_sock.h"
59#include "btif_sock_thread.h"
60#include "btif_sock_util.h"
61
62#include <cutils/log.h>
63#define asrt(s) if(!(s)) APPL_TRACE_ERROR("## %s assert %s failed at line:%d ##",__FUNCTION__, #s, __LINE__)
64#define print_events(events) do { \
65    APPL_TRACE_DEBUG("print poll event:%x", events); \
66    if (events & POLLIN) APPL_TRACE_DEBUG(  "   POLLIN "); \
67    if (events & POLLPRI) APPL_TRACE_DEBUG( "   POLLPRI "); \
68    if (events & POLLOUT) APPL_TRACE_DEBUG( "   POLLOUT "); \
69    if (events & POLLERR) APPL_TRACE_DEBUG( "   POLLERR "); \
70    if (events & POLLHUP) APPL_TRACE_DEBUG( "   POLLHUP "); \
71    if (events & POLLNVAL) APPL_TRACE_DEBUG("   POLLNVAL "); \
72    if (events & POLLRDHUP) APPL_TRACE_DEBUG("   POLLRDHUP"); \
73    } while(0)
74
75#define MAX_THREAD 8
76#define MAX_POLL 64
77#define POLL_EXCEPTION_EVENTS (POLLHUP | POLLRDHUP | POLLERR | POLLNVAL)
78#define IS_EXCEPTION(e) ((e) & POLL_EXCEPTION_EVENTS)
79#define IS_READ(e) ((e) & POLLIN)
80#define IS_WRITE(e) ((e) & POLLOUT)
81/*cmd executes in socket poll thread */
82#define CMD_WAKEUP       1
83#define CMD_EXIT         2
84#define CMD_ADD_FD       3
85#define CMD_USER_PRIVATE 4
86
87typedef struct {
88    struct pollfd pfd;
89    uint32_t user_id;
90    int type;
91    int flags;
92} poll_slot_t;
93typedef struct {
94    int cmd_fdr, cmd_fdw;
95    int poll_count;
96    poll_slot_t ps[MAX_POLL];
97    int psi[MAX_POLL]; //index of poll slot
98    volatile pthread_t thread_id;
99    btsock_signaled_cb callback;
100    btsock_cmd_cb cmd_callback;
101    int used;
102} thread_slot_t;
103static thread_slot_t ts[MAX_THREAD];
104
105
106
107static void *sock_poll_thread(void *arg);
108static inline void close_cmd_fd(int h);
109
110static inline void add_poll(int h, int fd, int type, int flags, uint32_t user_id);
111
112static pthread_mutex_t thread_slot_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
113
114static inline int create_thread(void *(*start_routine)(void *), void * arg,
115                                pthread_t * thread_id)
116{
117    pthread_attr_t thread_attr;
118    pthread_attr_init(&thread_attr);
119    pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE);
120    return pthread_create(thread_id, &thread_attr, start_routine, arg);
121}
122static void init_poll(int cmd_fd);
123static int alloc_thread_slot()
124{
125    int i;
126    //revserd order to save guard uninitialized access to 0 index
127    for(i = MAX_THREAD - 1; i >=0; i--)
128    {
129        APPL_TRACE_DEBUG("ts[%d].used:%d", i, ts[i].used);
130        if(!ts[i].used)
131        {
132            ts[i].used = 1;
133            return i;
134        }
135    }
136    APPL_TRACE_ERROR("execeeded max thread count");
137    return -1;
138}
139static void free_thread_slot(int h)
140{
141    if(0 <= h && h < MAX_THREAD)
142    {
143        close_cmd_fd(h);
144        ts[h].used = 0;
145    }
146    else APPL_TRACE_ERROR("invalid thread handle:%d", h);
147}
148int btsock_thread_init()
149{
150    static int initialized;
151    APPL_TRACE_DEBUG("in initialized:%d", initialized);
152    if(!initialized)
153    {
154        initialized = 1;
155        int h;
156        for(h = 0; h < MAX_THREAD; h++)
157        {
158            ts[h].cmd_fdr = ts[h].cmd_fdw = -1;
159            ts[h].used = 0;
160            ts[h].thread_id = -1;
161            ts[h].poll_count = 0;
162            ts[h].callback = NULL;
163            ts[h].cmd_callback = NULL;
164        }
165    }
166    return TRUE;
167}
168int btsock_thread_create(btsock_signaled_cb callback, btsock_cmd_cb cmd_callback)
169{
170    asrt(callback || cmd_callback);
171    pthread_mutex_lock(&thread_slot_lock);
172    int h = alloc_thread_slot();
173    pthread_mutex_unlock(&thread_slot_lock);
174    APPL_TRACE_DEBUG("alloc_thread_slot ret:%d", h);
175    if(h >= 0)
176    {
177        init_poll(h);
178        pthread_t thread;
179        int status = create_thread(sock_poll_thread, (void*)(uintptr_t)h, &thread);
180        if (status)
181        {
182            APPL_TRACE_ERROR("create_thread failed: %s", strerror(status));
183            free_thread_slot(h);
184            return -1;
185        }
186
187        ts[h].thread_id = thread;
188        APPL_TRACE_DEBUG("h:%d, thread id:%d", h, ts[h].thread_id);
189        ts[h].callback = callback;
190        ts[h].cmd_callback = cmd_callback;
191    }
192    return h;
193}
194
195/* create dummy socket pair used to wake up select loop */
196static inline void init_cmd_fd(int h)
197{
198    asrt(ts[h].cmd_fdr == -1 && ts[h].cmd_fdw == -1);
199    if(socketpair(AF_UNIX, SOCK_STREAM, 0, &ts[h].cmd_fdr) < 0)
200    {
201        APPL_TRACE_ERROR("socketpair failed: %s", strerror(errno));
202        return;
203    }
204    APPL_TRACE_DEBUG("h:%d, cmd_fdr:%d, cmd_fdw:%d", h, ts[h].cmd_fdr, ts[h].cmd_fdw);
205    //add the cmd fd for read & write
206    add_poll(h, ts[h].cmd_fdr, 0, SOCK_THREAD_FD_RD, 0);
207}
208static inline void close_cmd_fd(int h)
209{
210    if(ts[h].cmd_fdr != -1)
211    {
212        close(ts[h].cmd_fdr);
213        ts[h].cmd_fdr = -1;
214    }
215    if(ts[h].cmd_fdw != -1)
216    {
217        close(ts[h].cmd_fdw);
218        ts[h].cmd_fdw = -1;
219    }
220}
221typedef struct
222{
223    int id;
224    int fd;
225    int type;
226    int flags;
227    uint32_t user_id;
228} sock_cmd_t;
229int btsock_thread_add_fd(int h, int fd, int type, int flags, uint32_t user_id)
230{
231    if(h < 0 || h >= MAX_THREAD)
232    {
233        APPL_TRACE_ERROR("invalid bt thread handle:%d", h);
234        return FALSE;
235    }
236    if(ts[h].cmd_fdw == -1)
237    {
238        APPL_TRACE_ERROR("cmd socket is not created. socket thread may not initialized");
239        return FALSE;
240    }
241    if(flags & SOCK_THREAD_ADD_FD_SYNC)
242    {
243        //must executed in socket poll thread
244        if(ts[h].thread_id == pthread_self())
245        {
246            //cleanup one-time flags
247            flags &= ~SOCK_THREAD_ADD_FD_SYNC;
248            add_poll(h, fd, type, flags, user_id);
249            return TRUE;
250        }
251        APPL_TRACE_DEBUG("THREAD_ADD_FD_SYNC is not called in poll thread, fallback to async");
252    }
253    sock_cmd_t cmd = {CMD_ADD_FD, fd, type, flags, user_id};
254    APPL_TRACE_DEBUG("adding fd:%d, flags:0x%x", fd, flags);
255    return send(ts[h].cmd_fdw, &cmd, sizeof(cmd), 0) == sizeof(cmd);
256}
257int btsock_thread_post_cmd(int h, int type, const unsigned char* data, int size, uint32_t user_id)
258{
259    if(h < 0 || h >= MAX_THREAD)
260    {
261        APPL_TRACE_ERROR("invalid bt thread handle:%d", h);
262        return FALSE;
263    }
264    if(ts[h].cmd_fdw == -1)
265    {
266        APPL_TRACE_ERROR("cmd socket is not created. socket thread may not initialized");
267        return FALSE;
268    }
269    sock_cmd_t cmd = {CMD_USER_PRIVATE, 0, type, size, user_id};
270    APPL_TRACE_DEBUG("post cmd type:%d, size:%d, h:%d, ", type, size, h);
271    sock_cmd_t* cmd_send = &cmd;
272    int size_send = sizeof(cmd);
273    if(data && size)
274    {
275        size_send = sizeof(cmd) + size;
276        cmd_send = (sock_cmd_t*)alloca(size_send);
277        if(cmd_send)
278        {
279            *cmd_send = cmd;
280            memcpy(cmd_send + 1, data, size);
281        }
282        else
283        {
284            APPL_TRACE_ERROR("alloca failed at h:%d, cmd type:%d, size:%d", h, type, size_send);
285            return FALSE;
286        }
287    }
288    return send(ts[h].cmd_fdw, cmd_send, size_send, 0) == size_send;
289}
290int btsock_thread_wakeup(int h)
291{
292    if(h < 0 || h >= MAX_THREAD)
293    {
294        APPL_TRACE_ERROR("invalid bt thread handle:%d", h);
295        return FALSE;
296    }
297    if(ts[h].cmd_fdw == -1)
298    {
299        APPL_TRACE_ERROR("thread handle:%d, cmd socket is not created", h);
300        return FALSE;
301    }
302    sock_cmd_t cmd = {CMD_WAKEUP, 0, 0, 0, 0};
303    return send(ts[h].cmd_fdw, &cmd, sizeof(cmd), 0) == sizeof(cmd);
304}
305int btsock_thread_exit(int h)
306{
307    if(h < 0 || h >= MAX_THREAD)
308    {
309        APPL_TRACE_ERROR("invalid bt thread handle:%d", h);
310        return FALSE;
311    }
312    if(ts[h].cmd_fdw == -1)
313    {
314        APPL_TRACE_ERROR("cmd socket is not created");
315        return FALSE;
316    }
317    sock_cmd_t cmd = {CMD_EXIT, 0, 0, 0, 0};
318    if(send(ts[h].cmd_fdw, &cmd, sizeof(cmd), 0) == sizeof(cmd))
319    {
320        pthread_join(ts[h].thread_id, 0);
321        pthread_mutex_lock(&thread_slot_lock);
322        free_thread_slot(h);
323        pthread_mutex_unlock(&thread_slot_lock);
324        return TRUE;
325    }
326    return FALSE;
327}
328static void init_poll(int h)
329{
330    int i;
331    ts[h].poll_count = 0;
332    ts[h].thread_id = -1;
333    ts[h].callback = NULL;
334    ts[h].cmd_callback = NULL;
335    for(i = 0; i < MAX_POLL; i++)
336    {
337        ts[h].ps[i].pfd.fd = -1;
338        ts[h].psi[i] = -1;
339    }
340    init_cmd_fd(h);
341}
342static inline unsigned int flags2pevents(int flags)
343{
344    unsigned int pevents = 0;
345    if(flags & SOCK_THREAD_FD_WR)
346        pevents |= POLLOUT;
347    if(flags & SOCK_THREAD_FD_RD)
348        pevents |= POLLIN;
349    pevents |= POLL_EXCEPTION_EVENTS;
350    return pevents;
351}
352
353static inline void set_poll(poll_slot_t* ps, int fd, int type, int flags, uint32_t user_id)
354{
355    ps->pfd.fd = fd;
356    ps->user_id = user_id;
357    if(ps->type != 0 && ps->type != type)
358        APPL_TRACE_ERROR("poll socket type should not changed! type was:%d, type now:%d", ps->type, type);
359    ps->type = type;
360    ps->flags = flags;
361    ps->pfd.events = flags2pevents(flags);
362    ps->pfd.revents = 0;
363}
364static inline void add_poll(int h, int fd, int type, int flags, uint32_t user_id)
365{
366    asrt(fd != -1);
367    int i;
368    int empty = -1;
369    poll_slot_t* ps = ts[h].ps;
370
371    for(i = 0; i < MAX_POLL; i++)
372    {
373        if(ps[i].pfd.fd == fd)
374        {
375            asrt(ts[h].poll_count < MAX_POLL);
376
377            set_poll(&ps[i], fd, type, flags | ps[i].flags, user_id);
378            return;
379        }
380        else if(empty < 0 && ps[i].pfd.fd == -1)
381            empty = i;
382    }
383    if(empty >= 0)
384    {
385        asrt(ts[h].poll_count < MAX_POLL);
386        set_poll(&ps[empty], fd, type, flags, user_id);
387        ++ts[h].poll_count;
388        return;
389    }
390    APPL_TRACE_ERROR("exceeded max poll slot:%d!", MAX_POLL);
391}
392static inline void remove_poll(int h, poll_slot_t* ps, int flags)
393{
394    if(flags == ps->flags)
395    {
396        //all monitored events signaled. To remove it, just clear the slot
397        --ts[h].poll_count;
398        memset(ps, 0, sizeof(*ps));
399        ps->pfd.fd = -1;
400    }
401    else
402    {
403        //one read or one write monitor event signaled, removed the accordding bit
404        ps->flags &= ~flags;
405        //update the poll events mask
406        ps->pfd.events = flags2pevents(ps->flags);
407    }
408}
409static int process_cmd_sock(int h)
410{
411    sock_cmd_t cmd = {-1, 0, 0, 0, 0};
412    int fd = ts[h].cmd_fdr;
413    if(recv(fd, &cmd, sizeof(cmd), MSG_WAITALL) != sizeof(cmd))
414    {
415        APPL_TRACE_ERROR("recv cmd errno:%d", errno);
416        return FALSE;
417    }
418    APPL_TRACE_DEBUG("cmd.id:%d", cmd.id);
419    switch(cmd.id)
420    {
421        case CMD_ADD_FD:
422            add_poll(h, cmd.fd, cmd.type, cmd.flags, cmd.user_id);
423            break;
424        case CMD_WAKEUP:
425            break;
426        case CMD_USER_PRIVATE:
427            asrt(ts[h].cmd_callback);
428            if(ts[h].cmd_callback)
429                ts[h].cmd_callback(fd, cmd.type, cmd.flags, cmd.user_id);
430            break;
431        case CMD_EXIT:
432            return FALSE;
433        default:
434            APPL_TRACE_DEBUG("unknown cmd: %d", cmd.id);
435             break;
436    }
437    return TRUE;
438}
439static void process_data_sock(int h, struct pollfd *pfds, int count)
440{
441    asrt(count <= ts[h].poll_count);
442    int i;
443    for( i= 1; i < ts[h].poll_count; i++)
444    {
445        if(pfds[i].revents)
446        {
447            int ps_i = ts[h].psi[i];
448            asrt(pfds[i].fd == ts[h].ps[ps_i].pfd.fd);
449            uint32_t user_id = ts[h].ps[ps_i].user_id;
450            int type = ts[h].ps[ps_i].type;
451            int flags = 0;
452            print_events(pfds[i].revents);
453            if(IS_READ(pfds[i].revents))
454            {
455                flags |= SOCK_THREAD_FD_RD;
456            }
457            if(IS_WRITE(pfds[i].revents))
458            {
459                flags |= SOCK_THREAD_FD_WR;
460            }
461            if(IS_EXCEPTION(pfds[i].revents))
462            {
463                flags |= SOCK_THREAD_FD_EXCEPTION;
464                //remove the whole slot not flags
465                remove_poll(h, &ts[h].ps[ps_i], ts[h].ps[ps_i].flags);
466            }
467            else if(flags)
468                 remove_poll(h, &ts[h].ps[ps_i], flags); //remove the monitor flags that already processed
469            if(flags)
470                ts[h].callback(pfds[i].fd, type, flags, user_id);
471        }
472    }
473}
474
475static void prepare_poll_fds(int h, struct pollfd* pfds)
476{
477    int count = 0;
478    int ps_i = 0;
479    int pfd_i = 0;
480    asrt(ts[h].poll_count <= MAX_POLL);
481    memset(pfds, 0, sizeof(pfds[0])*ts[h].poll_count);
482    while(count < ts[h].poll_count)
483    {
484        if(ps_i >= MAX_POLL)
485        {
486            APPL_TRACE_ERROR("exceed max poll range, ps_i:%d, MAX_POLL:%d, count:%d, ts[h].poll_count:%d",
487                    ps_i, MAX_POLL, count, ts[h].poll_count);
488            return;
489        }
490        if(ts[h].ps[ps_i].pfd.fd >= 0)
491        {
492            pfds[pfd_i] =  ts[h].ps[ps_i].pfd;
493            ts[h].psi[pfd_i] = ps_i;
494            count++;
495            pfd_i++;
496        }
497        ps_i++;
498    }
499}
500static void *sock_poll_thread(void *arg)
501{
502    struct pollfd pfds[MAX_POLL];
503    memset(pfds, 0, sizeof(pfds));
504    int h = (intptr_t)arg;
505    for(;;)
506    {
507        prepare_poll_fds(h, pfds);
508        int ret = poll(pfds, ts[h].poll_count, -1);
509        if(ret == -1)
510        {
511            APPL_TRACE_ERROR("poll ret -1, exit the thread, errno:%d, err:%s", errno, strerror(errno));
512            break;
513        }
514        if(ret != 0)
515        {
516            int need_process_data_fd = TRUE;
517            if(pfds[0].revents) //cmd fd always is the first one
518            {
519                asrt(pfds[0].fd == ts[h].cmd_fdr);
520                if(!process_cmd_sock(h))
521                {
522                    APPL_TRACE_DEBUG("h:%d, process_cmd_sock return false, exit...", h);
523                    break;
524                }
525                if(ret == 1)
526                    need_process_data_fd = FALSE;
527                else ret--; //exclude the cmd fd
528            }
529            if(need_process_data_fd)
530                process_data_sock(h, pfds, ret);
531        }
532        else {APPL_TRACE_DEBUG("no data, select ret: %d", ret)};
533    }
534    ts[h].thread_id = -1;
535    APPL_TRACE_DEBUG("socket poll thread exiting, h:%d", h);
536    return 0;
537}
538
539