1/* 2 * Rated submission helpers 3 * 4 * Copyright (C) 2015 Jens Axboe <axboe@kernel.dk> 5 * 6 */ 7#include "fio.h" 8#include "ioengines.h" 9#include "lib/getrusage.h" 10#include "rate-submit.h" 11 12static int io_workqueue_fn(struct submit_worker *sw, 13 struct workqueue_work *work) 14{ 15 struct io_u *io_u = container_of(work, struct io_u, work); 16 const enum fio_ddir ddir = io_u->ddir; 17 struct thread_data *td = sw->priv; 18 int ret; 19 20 dprint(FD_RATE, "io_u %p queued by %u\n", io_u, gettid()); 21 22 io_u_set(td, io_u, IO_U_F_NO_FILE_PUT); 23 24 td->cur_depth++; 25 26 do { 27 ret = td_io_queue(td, io_u); 28 if (ret != FIO_Q_BUSY) 29 break; 30 ret = io_u_queued_complete(td, 1); 31 if (ret > 0) 32 td->cur_depth -= ret; 33 io_u_clear(td, io_u, IO_U_F_FLIGHT); 34 } while (1); 35 36 dprint(FD_RATE, "io_u %p ret %d by %u\n", io_u, ret, gettid()); 37 38 io_queue_event(td, io_u, &ret, ddir, NULL, 0, NULL); 39 40 if (ret == FIO_Q_COMPLETED) 41 td->cur_depth--; 42 else if (ret == FIO_Q_QUEUED) { 43 unsigned int min_evts; 44 45 if (td->o.iodepth == 1) 46 min_evts = 1; 47 else 48 min_evts = 0; 49 50 ret = io_u_queued_complete(td, min_evts); 51 if (ret > 0) 52 td->cur_depth -= ret; 53 } else if (ret == FIO_Q_BUSY) { 54 ret = io_u_queued_complete(td, td->cur_depth); 55 if (ret > 0) 56 td->cur_depth -= ret; 57 } 58 59 return 0; 60} 61 62static bool io_workqueue_pre_sleep_flush_fn(struct submit_worker *sw) 63{ 64 struct thread_data *td = sw->priv; 65 66 if (td->io_u_queued || td->cur_depth || td->io_u_in_flight) 67 return true; 68 69 return false; 70} 71 72static void io_workqueue_pre_sleep_fn(struct submit_worker *sw) 73{ 74 struct thread_data *td = sw->priv; 75 int ret; 76 77 ret = io_u_quiesce(td); 78 if (ret > 0) 79 td->cur_depth -= ret; 80} 81 82static int io_workqueue_alloc_fn(struct submit_worker *sw) 83{ 84 struct thread_data *td; 85 86 td = calloc(1, sizeof(*td)); 87 sw->priv = td; 88 return 0; 89} 90 91static void io_workqueue_free_fn(struct submit_worker *sw) 92{ 93 free(sw->priv); 94 sw->priv = NULL; 95} 96 97static int io_workqueue_init_worker_fn(struct submit_worker *sw) 98{ 99 struct thread_data *parent = sw->wq->td; 100 struct thread_data *td = sw->priv; 101 102 memcpy(&td->o, &parent->o, sizeof(td->o)); 103 memcpy(&td->ts, &parent->ts, sizeof(td->ts)); 104 td->o.uid = td->o.gid = -1U; 105 dup_files(td, parent); 106 td->eo = parent->eo; 107 fio_options_mem_dupe(td); 108 109 if (ioengine_load(td)) 110 goto err; 111 112 td->pid = gettid(); 113 114 INIT_FLIST_HEAD(&td->io_log_list); 115 INIT_FLIST_HEAD(&td->io_hist_list); 116 INIT_FLIST_HEAD(&td->verify_list); 117 INIT_FLIST_HEAD(&td->trim_list); 118 INIT_FLIST_HEAD(&td->next_rand_list); 119 td->io_hist_tree = RB_ROOT; 120 121 td->o.iodepth = 1; 122 if (td_io_init(td)) 123 goto err_io_init; 124 125 set_epoch_time(td, td->o.log_unix_epoch); 126 fio_getrusage(&td->ru_start); 127 clear_io_state(td, 1); 128 129 td_set_runstate(td, TD_RUNNING); 130 td->flags |= TD_F_CHILD; 131 td->parent = parent; 132 return 0; 133 134err_io_init: 135 close_ioengine(td); 136err: 137 return 1; 138 139} 140 141static void io_workqueue_exit_worker_fn(struct submit_worker *sw, 142 unsigned int *sum_cnt) 143{ 144 struct thread_data *td = sw->priv; 145 146 (*sum_cnt)++; 147 sum_thread_stats(&sw->wq->td->ts, &td->ts, *sum_cnt == 1); 148 149 fio_options_free(td); 150 close_and_free_files(td); 151 if (td->io_ops) 152 close_ioengine(td); 153 td_set_runstate(td, TD_EXITED); 154} 155 156#ifdef CONFIG_SFAA 157static void sum_val(uint64_t *dst, uint64_t *src) 158{ 159 if (*src) { 160 __sync_fetch_and_add(dst, *src); 161 *src = 0; 162 } 163} 164#else 165static void sum_val(uint64_t *dst, uint64_t *src) 166{ 167 if (*src) { 168 *dst += *src; 169 *src = 0; 170 } 171} 172#endif 173 174static void pthread_double_unlock(pthread_mutex_t *lock1, 175 pthread_mutex_t *lock2) 176{ 177#ifndef CONFIG_SFAA 178 pthread_mutex_unlock(lock1); 179 pthread_mutex_unlock(lock2); 180#endif 181} 182 183static void pthread_double_lock(pthread_mutex_t *lock1, pthread_mutex_t *lock2) 184{ 185#ifndef CONFIG_SFAA 186 if (lock1 < lock2) { 187 pthread_mutex_lock(lock1); 188 pthread_mutex_lock(lock2); 189 } else { 190 pthread_mutex_lock(lock2); 191 pthread_mutex_lock(lock1); 192 } 193#endif 194} 195 196static void sum_ddir(struct thread_data *dst, struct thread_data *src, 197 enum fio_ddir ddir) 198{ 199 pthread_double_lock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock); 200 201 sum_val(&dst->io_bytes[ddir], &src->io_bytes[ddir]); 202 sum_val(&dst->io_blocks[ddir], &src->io_blocks[ddir]); 203 sum_val(&dst->this_io_blocks[ddir], &src->this_io_blocks[ddir]); 204 sum_val(&dst->this_io_bytes[ddir], &src->this_io_bytes[ddir]); 205 sum_val(&dst->bytes_done[ddir], &src->bytes_done[ddir]); 206 207 pthread_double_unlock(&dst->io_wq.stat_lock, &src->io_wq.stat_lock); 208} 209 210static void io_workqueue_update_acct_fn(struct submit_worker *sw) 211{ 212 struct thread_data *src = sw->priv; 213 struct thread_data *dst = sw->wq->td; 214 215 if (td_read(src)) 216 sum_ddir(dst, src, DDIR_READ); 217 if (td_write(src)) 218 sum_ddir(dst, src, DDIR_WRITE); 219 if (td_trim(src)) 220 sum_ddir(dst, src, DDIR_TRIM); 221 222} 223 224static struct workqueue_ops rated_wq_ops = { 225 .fn = io_workqueue_fn, 226 .pre_sleep_flush_fn = io_workqueue_pre_sleep_flush_fn, 227 .pre_sleep_fn = io_workqueue_pre_sleep_fn, 228 .update_acct_fn = io_workqueue_update_acct_fn, 229 .alloc_worker_fn = io_workqueue_alloc_fn, 230 .free_worker_fn = io_workqueue_free_fn, 231 .init_worker_fn = io_workqueue_init_worker_fn, 232 .exit_worker_fn = io_workqueue_exit_worker_fn, 233}; 234 235int rate_submit_init(struct thread_data *td, struct sk_out *sk_out) 236{ 237 if (td->o.io_submit_mode != IO_MODE_OFFLOAD) 238 return 0; 239 240 return workqueue_init(td, &td->io_wq, &rated_wq_ops, td->o.iodepth, sk_out); 241} 242 243void rate_submit_exit(struct thread_data *td) 244{ 245 if (td->o.io_submit_mode != IO_MODE_OFFLOAD) 246 return; 247 248 workqueue_exit(&td->io_wq); 249} 250