fec_process.cpp revision c54a33db7505976a3530aa76ebd5602f12923c4d
1/*
2 * Copyright (C) 2015 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "fec_private.h"
18
19struct process_info {
20    int id;
21    fec_handle *f;
22    uint8_t *buf;
23    size_t count;
24    uint64_t offset;
25    read_func func;
26    ssize_t rc;
27    size_t errors;
28};
29
30/* thread function  */
31static void * __process(void *cookie)
32{
33    process_info *p = static_cast<process_info *>(cookie);
34
35    debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset,
36        p->offset + p->count);
37
38    p->rc = p->func(p->f, p->buf, p->count, p->offset, &p->errors);
39    return p;
40}
41
42/* launches a maximum number of threads to process a read */
43ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset,
44        read_func func)
45{
46    check(f);
47    check(buf)
48    check(func);
49
50    if (count == 0) {
51        return 0;
52    }
53
54    int threads = sysconf(_SC_NPROCESSORS_ONLN);
55
56    if (threads < WORK_MIN_THREADS) {
57        threads = WORK_MIN_THREADS;
58    } else if (threads > WORK_MAX_THREADS) {
59        threads = WORK_MAX_THREADS;
60    }
61
62    uint64_t start = (offset / FEC_BLOCKSIZE) * FEC_BLOCKSIZE;
63    size_t blocks = fec_div_round_up(count, FEC_BLOCKSIZE);
64
65    if ((size_t)threads > blocks) {
66        threads = (int)blocks;
67    }
68
69    size_t count_per_thread = fec_div_round_up(blocks, threads) * FEC_BLOCKSIZE;
70    size_t left = count;
71    uint64_t pos = offset;
72    uint64_t end = start + count_per_thread;
73
74    debug("%d threads, %zu bytes per thread (total %zu)", threads,
75        count_per_thread, count);
76
77    std::vector<pthread_t> handles;
78    process_info info[threads];
79    ssize_t rc = 0;
80
81    /* start threads to process queue */
82    for (int i = 0; i < threads; ++i) {
83        check(left > 0);
84
85        info[i].id = i;
86        info[i].f = f;
87        info[i].buf = &buf[pos - offset];
88        info[i].count = (size_t)(end - pos);
89        info[i].offset = pos;
90        info[i].func = func;
91        info[i].rc = -1;
92        info[i].errors = 0;
93
94        if (info[i].count > left) {
95            info[i].count = left;
96        }
97
98        pthread_t thread;
99
100        if (pthread_create(&thread, NULL, __process, &info[i]) != 0) {
101            error("failed to create thread: %s", strerror(errno));
102            rc = -1;
103        } else {
104            handles.push_back(thread);
105        }
106
107        pos = end;
108        end  += count_per_thread;
109        left -= info[i].count;
110    }
111
112    check(left == 0);
113
114    ssize_t nread = 0;
115
116    /* wait for all threads to complete */
117    for (auto thread : handles) {
118        process_info *p = NULL;
119
120        if (pthread_join(thread, (void **)&p) != 0) {
121            error("failed to join thread: %s", strerror(errno));
122            rc = -1;
123        } else if (!p || p->rc == -1) {
124            rc = -1;
125        } else {
126            nread += p->rc;
127            f->errors += p->errors;
128        }
129    }
130
131    if (rc == -1) {
132        errno = EIO;
133        return -1;
134    }
135
136    return nread;
137}
138