1/*
2 * libhdfs engine
3 *
4 * this engine helps perform read/write operations on hdfs cluster using
5 * libhdfs. hdfs doesnot support modification of data once file is created.
6 *
7 * so to mimic that create many files of small size (e.g 256k), and this
8 * engine select a file based on the offset generated by fio.
9 *
10 * thus, random reads and writes can also be achieved with this logic.
11 *
12 * NOTE: please set environment variables FIO_HDFS_BS and FIO_HDFS_FCOUNT
13 * to appropriate value to work this engine properly
14 *
15 */
16
17#include <stdio.h>
18#include <stdlib.h>
19#include <unistd.h>
20#include <sys/uio.h>
21#include <errno.h>
22#include <assert.h>
23
24#include "../fio.h"
25
26#include "hdfs.h"
27
28struct hdfsio_data {
29	char host[256];
30	int port;
31	hdfsFS fs;
32	hdfsFile fp;
33	unsigned long fsbs;
34	unsigned long fscount;
35	unsigned long curr_file_id;
36	unsigned int numjobs;
37	unsigned int fid_correction;
38};
39
40static int fio_hdfsio_setup_fs_params(struct hdfsio_data *hd)
41{
42	/* make sure that hdfsConnect is invoked before executing this function */
43	hdfsSetWorkingDirectory(hd->fs, "/.perftest");
44	hd->fp = hdfsOpenFile(hd->fs, ".fcount", O_RDONLY, 0, 0, 0);
45	if (hd->fp) {
46		hdfsRead(hd->fs, hd->fp, &(hd->fscount), sizeof(hd->fscount));
47		hdfsCloseFile(hd->fs, hd->fp);
48	}
49	hd->fp = hdfsOpenFile(hd->fs, ".fbs", O_RDONLY, 0, 0, 0);
50	if (hd->fp) {
51		hdfsRead(hd->fs, hd->fp, &(hd->fsbs), sizeof(hd->fsbs));
52		hdfsCloseFile(hd->fs, hd->fp);
53	}
54
55	return 0;
56}
57
58static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
59{
60	struct hdfsio_data *hd;
61	hdfsFileInfo *fi;
62	unsigned long f_id;
63	char fname[80];
64	int open_flags = 0;
65
66	hd = td->io_ops->data;
67
68	if (hd->curr_file_id == -1) {
69		/* see comment in fio_hdfsio_setup() function */
70		fio_hdfsio_setup_fs_params(hd);
71	}
72
73	/* find out file id based on the offset generated by fio */
74	f_id = (io_u->offset / hd->fsbs) + hd->fid_correction;
75
76	if (f_id == hd->curr_file_id) {
77		/* file is already open */
78		return 0;
79	}
80
81	if (hd->curr_file_id != -1) {
82		hdfsCloseFile(hd->fs, hd->fp);
83	}
84
85	if (io_u->ddir == DDIR_READ) {
86		open_flags = O_RDONLY;
87	} else if (io_u->ddir == DDIR_WRITE) {
88		open_flags = O_WRONLY;
89	} else {
90		log_err("hdfs: Invalid I/O Operation\n");
91	}
92
93	hd->curr_file_id = f_id;
94	do {
95		sprintf(fname, ".f%lu", f_id);
96		fi = hdfsGetPathInfo(hd->fs, fname);
97		if (fi->mSize >= hd->fsbs || io_u->ddir == DDIR_WRITE) {
98			/* file has enough data to read OR file is opened in write mode */
99			hd->fp =
100			    hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
101					 hd->fsbs);
102			if (hd->fp) {
103				break;
104			}
105		}
106		/* file is empty, so try next file for reading */
107		f_id = (f_id + 1) % hd->fscount;
108	} while (1);
109
110	return 0;
111}
112
113static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret)
114{
115	if (ret != (int)io_u->xfer_buflen) {
116		if (ret >= 0) {
117			io_u->resid = io_u->xfer_buflen - ret;
118			io_u->error = 0;
119			return FIO_Q_COMPLETED;
120		} else
121			io_u->error = errno;
122	}
123
124	if (io_u->error)
125		td_verror(td, io_u->error, "xfer");
126
127	return FIO_Q_COMPLETED;
128}
129
130static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
131{
132	struct hdfsio_data *hd;
133	int ret = 0;
134
135	hd = td->io_ops->data;
136
137	if (io_u->ddir == DDIR_READ) {
138		ret =
139		    hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
140	} else if (io_u->ddir == DDIR_WRITE) {
141		ret =
142		    hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
143			      io_u->xfer_buflen);
144	} else {
145		log_err("hdfs: Invalid I/O Operation\n");
146	}
147
148	return fio_io_end(td, io_u, ret);
149}
150
151int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
152{
153	struct hdfsio_data *hd;
154
155	hd = td->io_ops->data;
156	hd->fs = hdfsConnect(hd->host, hd->port);
157	hdfsSetWorkingDirectory(hd->fs, "/.perftest");
158	hd->fid_correction = (getpid() % hd->numjobs);
159
160	return 0;
161}
162
163int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
164{
165	struct hdfsio_data *hd;
166
167	hd = td->io_ops->data;
168	hdfsDisconnect(hd->fs);
169
170	return 0;
171}
172
173static int fio_hdfsio_setup(struct thread_data *td)
174{
175	struct hdfsio_data *hd;
176	struct fio_file *f;
177	static unsigned int numjobs = 1;	/* atleast one job has to be there! */
178	numjobs = (td->o.numjobs > numjobs) ? td->o.numjobs : numjobs;
179
180	if (!td->io_ops->data) {
181		hd = malloc(sizeof(*hd));;
182
183		memset(hd, 0, sizeof(*hd));
184		td->io_ops->data = hd;
185
186		/* separate host and port from filename */
187		*(strchr(td->o.filename, ',')) = ' ';
188		sscanf(td->o.filename, "%s%d", hd->host, &(hd->port));
189
190		/* read fbs and fcount and based on that set f->real_file_size */
191		f = td->files[0];
192#if 0
193		/* IMHO, this should be done here instead of fio_hdfsio_prep()
194		 * but somehow calling it here doesn't seem to work,
195		 * some problem with libhdfs that needs to be debugged */
196		hd->fs = hdfsConnect(hd->host, hd->port);
197		fio_hdfsio_setup_fs_params(hd);
198		hdfsDisconnect(hd->fs);
199#else
200		/* so, as an alternate, using environment variables */
201		if (getenv("FIO_HDFS_FCOUNT") && getenv("FIO_HDFS_BS")) {
202			hd->fscount = atol(getenv("FIO_HDFS_FCOUNT"));
203			hd->fsbs = atol(getenv("FIO_HDFS_BS"));
204		} else {
205			log_err("FIO_HDFS_FCOUNT and/or FIO_HDFS_BS not set.\n");
206			return 1;
207		}
208#endif
209		f->real_file_size = hd->fscount * hd->fsbs;
210
211		td->o.nr_files = 1;
212		hd->curr_file_id = -1;
213		hd->numjobs = numjobs;
214		fio_file_set_size_known(f);
215	}
216
217	return 0;
218}
219
220static struct ioengine_ops ioengine_hdfs = {
221	.name = "libhdfs",
222	.version = FIO_IOOPS_VERSION,
223	.setup = fio_hdfsio_setup,
224	.prep = fio_hdfsio_prep,
225	.queue = fio_hdfsio_queue,
226	.open_file = fio_hdfsio_open_file,
227	.close_file = fio_hdfsio_close_file,
228	.flags = FIO_SYNCIO,
229};
230
231static void fio_init fio_hdfsio_register(void)
232{
233	register_ioengine(&ioengine_hdfs);
234}
235
236static void fio_exit fio_hdfsio_unregister(void)
237{
238	unregister_ioengine(&ioengine_hdfs);
239}
240