1/* 2 * libhdfs engine 3 * 4 * this engine helps perform read/write operations on hdfs cluster using 5 * libhdfs. hdfs doesnot support modification of data once file is created. 6 * 7 * so to mimic that create many files of small size (e.g 256k), and this 8 * engine select a file based on the offset generated by fio. 9 * 10 * thus, random reads and writes can also be achieved with this logic. 11 * 12 * NOTE: please set environment variables FIO_HDFS_BS and FIO_HDFS_FCOUNT 13 * to appropriate value to work this engine properly 14 * 15 */ 16 17#include <stdio.h> 18#include <stdlib.h> 19#include <unistd.h> 20#include <sys/uio.h> 21#include <errno.h> 22#include <assert.h> 23 24#include "../fio.h" 25 26#include "hdfs.h" 27 28struct hdfsio_data { 29 char host[256]; 30 int port; 31 hdfsFS fs; 32 hdfsFile fp; 33 unsigned long fsbs; 34 unsigned long fscount; 35 unsigned long curr_file_id; 36 unsigned int numjobs; 37 unsigned int fid_correction; 38}; 39 40static int fio_hdfsio_setup_fs_params(struct hdfsio_data *hd) 41{ 42 /* make sure that hdfsConnect is invoked before executing this function */ 43 hdfsSetWorkingDirectory(hd->fs, "/.perftest"); 44 hd->fp = hdfsOpenFile(hd->fs, ".fcount", O_RDONLY, 0, 0, 0); 45 if (hd->fp) { 46 hdfsRead(hd->fs, hd->fp, &(hd->fscount), sizeof(hd->fscount)); 47 hdfsCloseFile(hd->fs, hd->fp); 48 } 49 hd->fp = hdfsOpenFile(hd->fs, ".fbs", O_RDONLY, 0, 0, 0); 50 if (hd->fp) { 51 hdfsRead(hd->fs, hd->fp, &(hd->fsbs), sizeof(hd->fsbs)); 52 hdfsCloseFile(hd->fs, hd->fp); 53 } 54 55 return 0; 56} 57 58static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u) 59{ 60 struct hdfsio_data *hd; 61 hdfsFileInfo *fi; 62 unsigned long f_id; 63 char fname[80]; 64 int open_flags = 0; 65 66 hd = td->io_ops->data; 67 68 if (hd->curr_file_id == -1) { 69 /* see comment in fio_hdfsio_setup() function */ 70 fio_hdfsio_setup_fs_params(hd); 71 } 72 73 /* find out file id based on the offset generated by fio */ 74 f_id = (io_u->offset / hd->fsbs) + hd->fid_correction; 75 76 if (f_id == hd->curr_file_id) { 77 /* file is already open */ 78 return 0; 79 } 80 81 if (hd->curr_file_id != -1) { 82 hdfsCloseFile(hd->fs, hd->fp); 83 } 84 85 if (io_u->ddir == DDIR_READ) { 86 open_flags = O_RDONLY; 87 } else if (io_u->ddir == DDIR_WRITE) { 88 open_flags = O_WRONLY; 89 } else { 90 log_err("hdfs: Invalid I/O Operation\n"); 91 } 92 93 hd->curr_file_id = f_id; 94 do { 95 sprintf(fname, ".f%lu", f_id); 96 fi = hdfsGetPathInfo(hd->fs, fname); 97 if (fi->mSize >= hd->fsbs || io_u->ddir == DDIR_WRITE) { 98 /* file has enough data to read OR file is opened in write mode */ 99 hd->fp = 100 hdfsOpenFile(hd->fs, fname, open_flags, 0, 0, 101 hd->fsbs); 102 if (hd->fp) { 103 break; 104 } 105 } 106 /* file is empty, so try next file for reading */ 107 f_id = (f_id + 1) % hd->fscount; 108 } while (1); 109 110 return 0; 111} 112 113static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret) 114{ 115 if (ret != (int)io_u->xfer_buflen) { 116 if (ret >= 0) { 117 io_u->resid = io_u->xfer_buflen - ret; 118 io_u->error = 0; 119 return FIO_Q_COMPLETED; 120 } else 121 io_u->error = errno; 122 } 123 124 if (io_u->error) 125 td_verror(td, io_u->error, "xfer"); 126 127 return FIO_Q_COMPLETED; 128} 129 130static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u) 131{ 132 struct hdfsio_data *hd; 133 int ret = 0; 134 135 hd = td->io_ops->data; 136 137 if (io_u->ddir == DDIR_READ) { 138 ret = 139 hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); 140 } else if (io_u->ddir == DDIR_WRITE) { 141 ret = 142 hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf, 143 io_u->xfer_buflen); 144 } else { 145 log_err("hdfs: Invalid I/O Operation\n"); 146 } 147 148 return fio_io_end(td, io_u, ret); 149} 150 151int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f) 152{ 153 struct hdfsio_data *hd; 154 155 hd = td->io_ops->data; 156 hd->fs = hdfsConnect(hd->host, hd->port); 157 hdfsSetWorkingDirectory(hd->fs, "/.perftest"); 158 hd->fid_correction = (getpid() % hd->numjobs); 159 160 return 0; 161} 162 163int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f) 164{ 165 struct hdfsio_data *hd; 166 167 hd = td->io_ops->data; 168 hdfsDisconnect(hd->fs); 169 170 return 0; 171} 172 173static int fio_hdfsio_setup(struct thread_data *td) 174{ 175 struct hdfsio_data *hd; 176 struct fio_file *f; 177 static unsigned int numjobs = 1; /* atleast one job has to be there! */ 178 numjobs = (td->o.numjobs > numjobs) ? td->o.numjobs : numjobs; 179 180 if (!td->io_ops->data) { 181 hd = malloc(sizeof(*hd));; 182 183 memset(hd, 0, sizeof(*hd)); 184 td->io_ops->data = hd; 185 186 /* separate host and port from filename */ 187 *(strchr(td->o.filename, ',')) = ' '; 188 sscanf(td->o.filename, "%s%d", hd->host, &(hd->port)); 189 190 /* read fbs and fcount and based on that set f->real_file_size */ 191 f = td->files[0]; 192#if 0 193 /* IMHO, this should be done here instead of fio_hdfsio_prep() 194 * but somehow calling it here doesn't seem to work, 195 * some problem with libhdfs that needs to be debugged */ 196 hd->fs = hdfsConnect(hd->host, hd->port); 197 fio_hdfsio_setup_fs_params(hd); 198 hdfsDisconnect(hd->fs); 199#else 200 /* so, as an alternate, using environment variables */ 201 if (getenv("FIO_HDFS_FCOUNT") && getenv("FIO_HDFS_BS")) { 202 hd->fscount = atol(getenv("FIO_HDFS_FCOUNT")); 203 hd->fsbs = atol(getenv("FIO_HDFS_BS")); 204 } else { 205 log_err("FIO_HDFS_FCOUNT and/or FIO_HDFS_BS not set.\n"); 206 return 1; 207 } 208#endif 209 f->real_file_size = hd->fscount * hd->fsbs; 210 211 td->o.nr_files = 1; 212 hd->curr_file_id = -1; 213 hd->numjobs = numjobs; 214 fio_file_set_size_known(f); 215 } 216 217 return 0; 218} 219 220static struct ioengine_ops ioengine_hdfs = { 221 .name = "libhdfs", 222 .version = FIO_IOOPS_VERSION, 223 .setup = fio_hdfsio_setup, 224 .prep = fio_hdfsio_prep, 225 .queue = fio_hdfsio_queue, 226 .open_file = fio_hdfsio_open_file, 227 .close_file = fio_hdfsio_close_file, 228 .flags = FIO_SYNCIO, 229}; 230 231static void fio_init fio_hdfsio_register(void) 232{ 233 register_ioengine(&ioengine_hdfs); 234} 235 236static void fio_exit fio_hdfsio_unregister(void) 237{ 238 unregister_ioengine(&ioengine_hdfs); 239} 240