net.c revision 414c2a3e741bb7dd7147ce6843f529c7773cea38
1/*
2 * net engine
3 *
4 * IO engine that reads/writes to/from sockets.
5 *
6 */
7#include <stdio.h>
8#include <stdlib.h>
9#include <unistd.h>
10#include <errno.h>
11#include <assert.h>
12#include <netinet/in.h>
13#include <arpa/inet.h>
14#include <netdb.h>
15#include <sys/poll.h>
16#include <sys/types.h>
17#include <sys/socket.h>
18
19#include "../fio.h"
20
21struct netio_data {
22	int listenfd;
23	int send_to_net;
24	int use_splice;
25	int net_protocol;
26	int pipes[2];
27	char host[64];
28	struct sockaddr_in addr;
29};
30
31static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
32{
33	struct netio_data *nd = td->io_ops->data;
34
35	/*
36	 * Make sure we don't see spurious reads to a receiver, and vice versa
37	 */
38	if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
39	    (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
40		td_verror(td, EINVAL, "bad direction");
41		return 1;
42	}
43
44	return 0;
45}
46
47#ifdef FIO_HAVE_SPLICE
48static int splice_io_u(int fdin, int fdout, unsigned int len)
49{
50	int bytes = 0;
51
52	while (len) {
53		int ret = splice(fdin, NULL, fdout, NULL, len, 0);
54
55		if (ret < 0) {
56			if (!bytes)
57				bytes = ret;
58
59			break;
60		} else if (!ret)
61			break;
62
63		bytes += ret;
64		len -= ret;
65	}
66
67	return bytes;
68}
69
70/*
71 * Receive bytes from a socket and fill them into the internal pipe
72 */
73static int splice_in(struct thread_data *td, struct io_u *io_u)
74{
75	struct netio_data *nd = td->io_ops->data;
76
77	return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
78}
79
80/*
81 * Transmit 'len' bytes from the internal pipe
82 */
83static int splice_out(struct thread_data *td, struct io_u *io_u,
84		      unsigned int len)
85{
86	struct netio_data *nd = td->io_ops->data;
87
88	return splice_io_u(nd->pipes[0], io_u->file->fd, len);
89}
90
91static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
92{
93	struct iovec iov = {
94		.iov_base = io_u->xfer_buf,
95		.iov_len = len,
96	};
97	int bytes = 0;
98
99	while (iov.iov_len) {
100		int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
101
102		if (ret < 0) {
103			if (!bytes)
104				bytes = ret;
105			break;
106		} else if (!ret)
107			break;
108
109		iov.iov_len -= ret;
110		iov.iov_base += ret;
111		bytes += ret;
112	}
113
114	return bytes;
115
116}
117
118/*
119 * vmsplice() pipe to io_u buffer
120 */
121static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
122			     unsigned int len)
123{
124	struct netio_data *nd = td->io_ops->data;
125
126	return vmsplice_io_u(io_u, nd->pipes[0], len);
127}
128
129/*
130 * vmsplice() io_u to pipe
131 */
132static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
133{
134	struct netio_data *nd = td->io_ops->data;
135
136	return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
137}
138
139/*
140 * splice receive - transfer socket data into a pipe using splice, then map
141 * that pipe data into the io_u using vmsplice.
142 */
143static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
144{
145	int ret;
146
147	ret = splice_in(td, io_u);
148	if (ret > 0)
149		return vmsplice_io_u_out(td, io_u, ret);
150
151	return ret;
152}
153
154/*
155 * splice transmit - map data from the io_u into a pipe by using vmsplice,
156 * then transfer that pipe to a socket using splice.
157 */
158static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
159{
160	int ret;
161
162	ret = vmsplice_io_u_in(td, io_u);
163	if (ret > 0)
164		return splice_out(td, io_u, ret);
165
166	return ret;
167}
168#else
169static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
170{
171	errno = EOPNOTSUPP;
172	return -1;
173}
174
175static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
176{
177	errno = EOPNOTSUPP;
178	return -1;
179}
180#endif
181
182static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
183{
184	struct netio_data *nd = td->io_ops->data;
185	int flags = 0;
186
187	/*
188	 * if we are going to write more, set MSG_MORE
189	 */
190#ifdef MSG_MORE
191	if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < td->o.size)
192		flags = MSG_MORE;
193#endif
194
195	if (nd->net_protocol == IPPROTO_UDP) {
196		return sendto(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
197				0, &nd->addr, sizeof(nd->addr));
198	} else {
199		return send(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
200				flags);
201	}
202}
203
204static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
205{
206	struct netio_data *nd = td->io_ops->data;
207	int flags = MSG_WAITALL;
208
209	if (nd->net_protocol == IPPROTO_UDP) {
210		socklen_t len = sizeof(nd->addr);
211
212		return recvfrom(io_u->file->fd, io_u->xfer_buf,
213				io_u->xfer_buflen, 0, &nd->addr, &len);
214	} else {
215		return recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen,
216				flags);
217	}
218}
219
220static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
221{
222	struct netio_data *nd = td->io_ops->data;
223	int ret;
224
225	fio_ro_check(td, io_u);
226
227	if (io_u->ddir == DDIR_WRITE) {
228		if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP)
229			ret = fio_netio_send(td, io_u);
230		else
231			ret = fio_netio_splice_out(td, io_u);
232	} else if (io_u->ddir == DDIR_READ) {
233		if (!nd->use_splice || nd->net_protocol == IPPROTO_UDP)
234			ret = fio_netio_recv(td, io_u);
235		else
236			ret = fio_netio_splice_in(td, io_u);
237	} else
238		ret = 0;	/* must be a SYNC */
239
240	if (ret != (int) io_u->xfer_buflen) {
241		if (ret >= 0) {
242			io_u->resid = io_u->xfer_buflen - ret;
243			io_u->error = 0;
244			return FIO_Q_COMPLETED;
245		} else {
246			int err = errno;
247
248			if (io_u->ddir == DDIR_WRITE && err == EMSGSIZE)
249				return FIO_Q_BUSY;
250
251			io_u->error = err;
252		}
253	}
254
255	if (io_u->error)
256		td_verror(td, io_u->error, "xfer");
257
258	return FIO_Q_COMPLETED;
259}
260
261static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
262{
263	struct netio_data *nd = td->io_ops->data;
264	int type;
265
266	if (nd->net_protocol == IPPROTO_TCP)
267		type = SOCK_STREAM;
268	else
269		type = SOCK_DGRAM;
270
271	f->fd = socket(AF_INET, type, nd->net_protocol);
272	if (f->fd < 0) {
273		td_verror(td, errno, "socket");
274		return 1;
275	}
276
277	if (nd->net_protocol == IPPROTO_UDP)
278		return 0;
279
280	if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
281		td_verror(td, errno, "connect");
282		return 1;
283	}
284
285	return 0;
286}
287
288static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
289{
290	struct netio_data *nd = td->io_ops->data;
291	socklen_t socklen = sizeof(nd->addr);
292	struct pollfd pfd;
293	int ret;
294
295	if (nd->net_protocol == IPPROTO_UDP) {
296		f->fd = nd->listenfd;
297		return 0;
298	}
299
300	log_info("fio: waiting for connection\n");
301
302	/*
303	 * Accept loop. poll for incoming events, accept them. Repeat until we
304	 * have all connections.
305	 */
306	while (!td->terminate) {
307		pfd.fd = nd->listenfd;
308		pfd.events = POLLIN;
309
310		ret = poll(&pfd, 1, -1);
311		if (ret < 0) {
312			if (errno == EINTR)
313				continue;
314
315			td_verror(td, errno, "poll");
316			break;
317		} else if (!ret)
318			continue;
319
320		/*
321		 * should be impossible
322		 */
323		if (!(pfd.revents & POLLIN))
324			continue;
325
326		f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
327		if (f->fd < 0) {
328			td_verror(td, errno, "accept");
329			return 1;
330		}
331		break;
332	}
333
334	return 0;
335}
336
337static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
338{
339	if (td_read(td))
340		return fio_netio_accept(td, f);
341	else
342		return fio_netio_connect(td, f);
343}
344
345static int fio_netio_setup_connect(struct thread_data *td, const char *host,
346				   unsigned short port)
347{
348	struct netio_data *nd = td->io_ops->data;
349
350	nd->addr.sin_family = AF_INET;
351	nd->addr.sin_port = htons(port);
352
353	if (inet_aton(host, &nd->addr.sin_addr) != 1) {
354		struct hostent *hent;
355
356		hent = gethostbyname(host);
357		if (!hent) {
358			td_verror(td, errno, "gethostbyname");
359			return 1;
360		}
361
362		memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
363	}
364
365	return 0;
366}
367
368static int fio_netio_setup_listen(struct thread_data *td, short port)
369{
370	struct netio_data *nd = td->io_ops->data;
371	int fd, opt, type;
372
373	if (nd->net_protocol == IPPROTO_TCP)
374		type = SOCK_STREAM;
375	else
376		type = SOCK_DGRAM;
377
378	fd = socket(AF_INET, type, nd->net_protocol);
379	if (fd < 0) {
380		td_verror(td, errno, "socket");
381		return 1;
382	}
383
384	opt = 1;
385	if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
386		td_verror(td, errno, "setsockopt");
387		return 1;
388	}
389#ifdef SO_REUSEPORT
390	if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
391		td_verror(td, errno, "setsockopt");
392		return 1;
393	}
394#endif
395
396	nd->addr.sin_family = AF_INET;
397	nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
398	nd->addr.sin_port = htons(port);
399
400	if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
401		td_verror(td, errno, "bind");
402		return 1;
403	}
404	if (nd->net_protocol == IPPROTO_TCP && listen(fd, 1) < 0) {
405		td_verror(td, errno, "listen");
406		return 1;
407	}
408
409	nd->listenfd = fd;
410	return 0;
411}
412
413static int fio_netio_init(struct thread_data *td)
414{
415	struct netio_data *nd = td->io_ops->data;
416	unsigned int port;
417	char host[64], buf[128];
418	char *sep, *portp, *modep;
419	int ret;
420
421	if (td_rw(td)) {
422		log_err("fio: network connections must be read OR write\n");
423		return 1;
424	}
425	if (td_random(td)) {
426		log_err("fio: network IO can't be random\n");
427		return 1;
428	}
429
430	strcpy(buf, td->o.filename);
431
432	sep = strchr(buf, '/');
433	if (!sep)
434		goto bad_host;
435
436	*sep = '\0';
437	sep++;
438	strcpy(host, buf);
439	if (!strlen(host))
440		goto bad_host;
441
442	modep = NULL;
443	portp = sep;
444	sep = strchr(portp, '/');
445	if (sep) {
446		*sep = '\0';
447		modep = sep + 1;
448	}
449
450	port = strtol(portp, NULL, 10);
451	if (!port || port > 65535)
452		goto bad_host;
453
454	if (modep) {
455		if (!strncmp("tcp", modep, strlen(modep)))
456			nd->net_protocol = IPPROTO_TCP;
457		else if (!strncmp("udp", modep, strlen(modep)))
458			nd->net_protocol = IPPROTO_UDP;
459		else
460			goto bad_host;
461	} else
462		nd->net_protocol = IPPROTO_TCP;
463
464	if (td_read(td)) {
465		nd->send_to_net = 0;
466		ret = fio_netio_setup_listen(td, port);
467	} else {
468		nd->send_to_net = 1;
469		ret = fio_netio_setup_connect(td, host, port);
470	}
471
472	return ret;
473bad_host:
474	log_err("fio: bad network host/port/protocol: %s\n", td->o.filename);
475	return 1;
476}
477
478static void fio_netio_cleanup(struct thread_data *td)
479{
480	struct netio_data *nd = td->io_ops->data;
481
482	if (nd) {
483		if (nd->listenfd != -1)
484			close(nd->listenfd);
485		if (nd->pipes[0] != -1)
486			close(nd->pipes[0]);
487		if (nd->pipes[1] != -1)
488			close(nd->pipes[1]);
489
490		free(nd);
491	}
492}
493
494static int fio_netio_setup(struct thread_data *td)
495{
496	struct netio_data *nd;
497
498	if (!td->io_ops->data) {
499		nd = malloc(sizeof(*nd));;
500
501		memset(nd, 0, sizeof(*nd));
502		nd->listenfd = -1;
503		nd->pipes[0] = nd->pipes[1] = -1;
504		td->io_ops->data = nd;
505	}
506
507	return 0;
508}
509
510#ifdef FIO_HAVE_SPLICE
511static int fio_netio_setup_splice(struct thread_data *td)
512{
513	struct netio_data *nd;
514
515	fio_netio_setup(td);
516
517	nd = td->io_ops->data;
518	if (nd) {
519		if (pipe(nd->pipes) < 0)
520			return 1;
521
522		nd->use_splice = 1;
523		return 0;
524	}
525
526	return 1;
527}
528
529static struct ioengine_ops ioengine_splice = {
530	.name		= "netsplice",
531	.version	= FIO_IOOPS_VERSION,
532	.prep		= fio_netio_prep,
533	.queue		= fio_netio_queue,
534	.setup		= fio_netio_setup_splice,
535	.init		= fio_netio_init,
536	.cleanup	= fio_netio_cleanup,
537	.open_file	= fio_netio_open_file,
538	.close_file	= generic_close_file,
539	.flags		= FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
540			  FIO_SIGQUIT,
541};
542#endif
543
544static struct ioengine_ops ioengine_rw = {
545	.name		= "net",
546	.version	= FIO_IOOPS_VERSION,
547	.prep		= fio_netio_prep,
548	.queue		= fio_netio_queue,
549	.setup		= fio_netio_setup,
550	.init		= fio_netio_init,
551	.cleanup	= fio_netio_cleanup,
552	.open_file	= fio_netio_open_file,
553	.close_file	= generic_close_file,
554	.flags		= FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
555			  FIO_SIGQUIT,
556};
557
558static void fio_init fio_netio_register(void)
559{
560	register_ioengine(&ioengine_rw);
561#ifdef FIO_HAVE_SPLICE
562	register_ioengine(&ioengine_splice);
563#endif
564}
565
566static void fio_exit fio_netio_unregister(void)
567{
568	unregister_ioengine(&ioengine_rw);
569#ifdef FIO_HAVE_SPLICE
570	unregister_ioengine(&ioengine_splice);
571#endif
572}
573