1/*
2 * Basic workqueue like code, that sets up a thread and allows async
3 * processing of some sort. Could be extended to allow for multiple
4 * worker threads. But right now fio associates one of this per IO
5 * thread, so should be enough to have just a single thread doing the
6 * work.
7 */
8#include <stdio.h>
9#include <stdlib.h>
10#include <stdarg.h>
11#include <unistd.h>
12#include <errno.h>
13#include <pthread.h>
14#include <string.h>
15
16#include "../smalloc.h"
17#include "../log.h"
18#include "tp.h"
19
20static void tp_flush_work(struct flist_head *list)
21{
22	struct tp_work *work;
23
24	while (!flist_empty(list)) {
25		int prio;
26
27		work = flist_entry(list->next, struct tp_work, list);
28		flist_del(&work->list);
29
30		prio = work->prio;
31		if (nice(prio) < 0)
32			log_err("fio: nice %s\n", strerror(errno));
33
34		work->fn(work);
35
36		if (nice(prio) < 0)
37			log_err("fio: nice %s\n", strerror(errno));
38	}
39}
40
41static void *tp_thread(void *data)
42{
43	struct tp_data *tdat = data;
44	struct flist_head work_list;
45
46	INIT_FLIST_HEAD(&work_list);
47
48	while (1) {
49		pthread_mutex_lock(&tdat->lock);
50
51		if (!tdat->thread_exit && flist_empty(&tdat->work))
52			pthread_cond_wait(&tdat->cv, &tdat->lock);
53
54		if (!flist_empty(&tdat->work))
55			flist_splice_tail_init(&tdat->work, &work_list);
56
57		pthread_mutex_unlock(&tdat->lock);
58
59		if (flist_empty(&work_list)) {
60			if (tdat->thread_exit)
61				break;
62			continue;
63		}
64
65		tp_flush_work(&work_list);
66	}
67
68	return NULL;
69}
70
71void tp_queue_work(struct tp_data *tdat, struct tp_work *work)
72{
73	work->done = 0;
74
75	pthread_mutex_lock(&tdat->lock);
76	flist_add_tail(&work->list, &tdat->work);
77	pthread_mutex_unlock(&tdat->lock);
78
79	pthread_cond_signal(&tdat->cv);
80}
81
82void tp_init(struct tp_data **tdatp)
83{
84	struct tp_data *tdat;
85	int ret;
86
87	if (*tdatp)
88		return;
89
90	*tdatp = tdat = smalloc(sizeof(*tdat));
91	pthread_mutex_init(&tdat->lock, NULL);
92	INIT_FLIST_HEAD(&tdat->work);
93	pthread_cond_init(&tdat->cv, NULL);
94	pthread_cond_init(&tdat->sleep_cv, NULL);
95
96	ret = pthread_create(&tdat->thread, NULL, tp_thread, tdat);
97	if (ret)
98		log_err("fio: failed to create tp thread\n");
99}
100
101void tp_exit(struct tp_data **tdatp)
102{
103	struct tp_data *tdat = *tdatp;
104	void *ret;
105
106	if (!tdat)
107		return;
108
109	pthread_mutex_lock(&tdat->lock);
110	tdat->thread_exit = 1;
111	pthread_mutex_unlock(&tdat->lock);
112
113	pthread_cond_signal(&tdat->cv);
114
115	pthread_join(tdat->thread, &ret);
116
117	sfree(tdat);
118	*tdatp = NULL;
119}
120