fec_process.cpp revision c54a33db7505976a3530aa76ebd5602f12923c4d
1/* 2 * Copyright (C) 2015 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "fec_private.h" 18 19struct process_info { 20 int id; 21 fec_handle *f; 22 uint8_t *buf; 23 size_t count; 24 uint64_t offset; 25 read_func func; 26 ssize_t rc; 27 size_t errors; 28}; 29 30/* thread function */ 31static void * __process(void *cookie) 32{ 33 process_info *p = static_cast<process_info *>(cookie); 34 35 debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset, 36 p->offset + p->count); 37 38 p->rc = p->func(p->f, p->buf, p->count, p->offset, &p->errors); 39 return p; 40} 41 42/* launches a maximum number of threads to process a read */ 43ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset, 44 read_func func) 45{ 46 check(f); 47 check(buf) 48 check(func); 49 50 if (count == 0) { 51 return 0; 52 } 53 54 int threads = sysconf(_SC_NPROCESSORS_ONLN); 55 56 if (threads < WORK_MIN_THREADS) { 57 threads = WORK_MIN_THREADS; 58 } else if (threads > WORK_MAX_THREADS) { 59 threads = WORK_MAX_THREADS; 60 } 61 62 uint64_t start = (offset / FEC_BLOCKSIZE) * FEC_BLOCKSIZE; 63 size_t blocks = fec_div_round_up(count, FEC_BLOCKSIZE); 64 65 if ((size_t)threads > blocks) { 66 threads = (int)blocks; 67 } 68 69 size_t count_per_thread = fec_div_round_up(blocks, threads) * FEC_BLOCKSIZE; 70 size_t left = count; 71 uint64_t pos = offset; 72 uint64_t end = start + count_per_thread; 73 74 debug("%d threads, %zu bytes per thread (total %zu)", threads, 75 count_per_thread, count); 76 77 std::vector<pthread_t> handles; 78 process_info info[threads]; 79 ssize_t rc = 0; 80 81 /* start threads to process queue */ 82 for (int i = 0; i < threads; ++i) { 83 check(left > 0); 84 85 info[i].id = i; 86 info[i].f = f; 87 info[i].buf = &buf[pos - offset]; 88 info[i].count = (size_t)(end - pos); 89 info[i].offset = pos; 90 info[i].func = func; 91 info[i].rc = -1; 92 info[i].errors = 0; 93 94 if (info[i].count > left) { 95 info[i].count = left; 96 } 97 98 pthread_t thread; 99 100 if (pthread_create(&thread, NULL, __process, &info[i]) != 0) { 101 error("failed to create thread: %s", strerror(errno)); 102 rc = -1; 103 } else { 104 handles.push_back(thread); 105 } 106 107 pos = end; 108 end += count_per_thread; 109 left -= info[i].count; 110 } 111 112 check(left == 0); 113 114 ssize_t nread = 0; 115 116 /* wait for all threads to complete */ 117 for (auto thread : handles) { 118 process_info *p = NULL; 119 120 if (pthread_join(thread, (void **)&p) != 0) { 121 error("failed to join thread: %s", strerror(errno)); 122 rc = -1; 123 } else if (!p || p->rc == -1) { 124 rc = -1; 125 } else { 126 nread += p->rc; 127 f->errors += p->errors; 128 } 129 } 130 131 if (rc == -1) { 132 errno = EIO; 133 return -1; 134 } 135 136 return nread; 137} 138