net.c revision 67bf982340d95ca98098ea050b54b4c7adb116c0
1/*
2 * net engine
3 *
4 * IO engine that reads/writes to/from sockets.
5 *
6 */
7#include <stdio.h>
8#include <stdlib.h>
9#include <unistd.h>
10#include <signal.h>
11#include <errno.h>
12#include <assert.h>
13#include <netinet/in.h>
14#include <arpa/inet.h>
15#include <netdb.h>
16#include <sys/poll.h>
17#include <sys/types.h>
18#include <sys/stat.h>
19#include <sys/socket.h>
20#include <sys/un.h>
21
22#include "../fio.h"
23
24struct netio_data {
25	int listenfd;
26	int use_splice;
27	int pipes[2];
28	struct sockaddr_in addr;
29	struct sockaddr_un addr_un;
30};
31
32struct netio_options {
33	struct thread_data *td;
34	unsigned int port;
35	unsigned int proto;
36	unsigned int listen;
37	unsigned int pingpong;
38};
39
40struct udp_close_msg {
41	uint32_t magic;
42	uint32_t cmd;
43};
44
45enum {
46	FIO_LINK_CLOSE = 0x89,
47	FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b,
48	FIO_LINK_OPEN = 0x98,
49
50	FIO_TYPE_TCP	= 1,
51	FIO_TYPE_UDP	= 2,
52	FIO_TYPE_UNIX	= 3,
53};
54
55static int str_hostname_cb(void *data, const char *input);
56static struct fio_option options[] = {
57	{
58		.name	= "hostname",
59		.type	= FIO_OPT_STR_STORE,
60		.cb	= str_hostname_cb,
61		.help	= "Hostname for net IO engine",
62	},
63	{
64		.name	= "port",
65		.type	= FIO_OPT_INT,
66		.off1	= offsetof(struct netio_options, port),
67		.minval	= 1,
68		.maxval	= 65535,
69		.help	= "Port to use for TCP or UDP net connections",
70	},
71	{
72		.name	= "protocol",
73		.alias	= "proto",
74		.type	= FIO_OPT_STR,
75		.off1	= offsetof(struct netio_options, proto),
76		.help	= "Network protocol to use",
77		.def	= "tcp",
78		.posval = {
79			  { .ival = "tcp",
80			    .oval = FIO_TYPE_TCP,
81			    .help = "Transmission Control Protocol",
82			  },
83			  { .ival = "udp",
84			    .oval = FIO_TYPE_UDP,
85			    .help = "User Datagram Protocol",
86			  },
87			  { .ival = "unix",
88			    .oval = FIO_TYPE_UNIX,
89			    .help = "UNIX domain socket",
90			  },
91		},
92	},
93	{
94		.name	= "listen",
95		.type	= FIO_OPT_STR_SET,
96		.off1	= offsetof(struct netio_options, listen),
97		.help	= "Listen for incoming TCP connections",
98	},
99	{
100		.name	= "pingpong",
101		.type	= FIO_OPT_STR_SET,
102		.off1	= offsetof(struct netio_options, pingpong),
103		.help	= "Ping-pong IO requests",
104	},
105	{
106		.name	= NULL,
107	},
108};
109
110/*
111 * Return -1 for error and 'nr events' for a positive number
112 * of events
113 */
114static int poll_wait(struct thread_data *td, int fd, short events)
115{
116	struct pollfd pfd;
117	int ret;
118
119	while (!td->terminate) {
120		pfd.fd = fd;
121		pfd.events = events;
122		ret = poll(&pfd, 1, -1);
123		if (ret < 0) {
124			if (errno == EINTR)
125				break;
126
127			td_verror(td, errno, "poll");
128			return -1;
129		} else if (!ret)
130			continue;
131
132		break;
133	}
134
135	if (pfd.revents & events)
136		return 1;
137
138	return -1;
139}
140
141static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
142{
143	struct netio_options *o = td->eo;
144
145	/*
146	 * Make sure we don't see spurious reads to a receiver, and vice versa
147	 */
148	if (o->proto == FIO_TYPE_TCP)
149		return 0;
150
151	if ((o->listen && io_u->ddir == DDIR_WRITE) ||
152	    (!o->listen && io_u->ddir == DDIR_READ)) {
153		td_verror(td, EINVAL, "bad direction");
154		return 1;
155	}
156
157	return 0;
158}
159
160#ifdef CONFIG_LINUX_SPLICE
161static int splice_io_u(int fdin, int fdout, unsigned int len)
162{
163	int bytes = 0;
164
165	while (len) {
166		int ret = splice(fdin, NULL, fdout, NULL, len, 0);
167
168		if (ret < 0) {
169			if (!bytes)
170				bytes = ret;
171
172			break;
173		} else if (!ret)
174			break;
175
176		bytes += ret;
177		len -= ret;
178	}
179
180	return bytes;
181}
182
183/*
184 * Receive bytes from a socket and fill them into the internal pipe
185 */
186static int splice_in(struct thread_data *td, struct io_u *io_u)
187{
188	struct netio_data *nd = td->io_ops->data;
189
190	return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
191}
192
193/*
194 * Transmit 'len' bytes from the internal pipe
195 */
196static int splice_out(struct thread_data *td, struct io_u *io_u,
197		      unsigned int len)
198{
199	struct netio_data *nd = td->io_ops->data;
200
201	return splice_io_u(nd->pipes[0], io_u->file->fd, len);
202}
203
204static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
205{
206	struct iovec iov = {
207		.iov_base = io_u->xfer_buf,
208		.iov_len = len,
209	};
210	int bytes = 0;
211
212	while (iov.iov_len) {
213		int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
214
215		if (ret < 0) {
216			if (!bytes)
217				bytes = ret;
218			break;
219		} else if (!ret)
220			break;
221
222		iov.iov_len -= ret;
223		iov.iov_base += ret;
224		bytes += ret;
225	}
226
227	return bytes;
228
229}
230
231/*
232 * vmsplice() pipe to io_u buffer
233 */
234static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
235			     unsigned int len)
236{
237	struct netio_data *nd = td->io_ops->data;
238
239	return vmsplice_io_u(io_u, nd->pipes[0], len);
240}
241
242/*
243 * vmsplice() io_u to pipe
244 */
245static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
246{
247	struct netio_data *nd = td->io_ops->data;
248
249	return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
250}
251
252/*
253 * splice receive - transfer socket data into a pipe using splice, then map
254 * that pipe data into the io_u using vmsplice.
255 */
256static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
257{
258	int ret;
259
260	ret = splice_in(td, io_u);
261	if (ret > 0)
262		return vmsplice_io_u_out(td, io_u, ret);
263
264	return ret;
265}
266
267/*
268 * splice transmit - map data from the io_u into a pipe by using vmsplice,
269 * then transfer that pipe to a socket using splice.
270 */
271static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
272{
273	int ret;
274
275	ret = vmsplice_io_u_in(td, io_u);
276	if (ret > 0)
277		return splice_out(td, io_u, ret);
278
279	return ret;
280}
281#else
282static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
283{
284	errno = EOPNOTSUPP;
285	return -1;
286}
287
288static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
289{
290	errno = EOPNOTSUPP;
291	return -1;
292}
293#endif
294
295static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
296{
297	struct netio_data *nd = td->io_ops->data;
298	struct netio_options *o = td->eo;
299	int ret, flags = 0;
300
301	do {
302		if (o->proto == FIO_TYPE_UDP) {
303			struct sockaddr *to = (struct sockaddr *) &nd->addr;
304
305			ret = sendto(io_u->file->fd, io_u->xfer_buf,
306					io_u->xfer_buflen, flags, to,
307					sizeof(*to));
308		} else {
309			/*
310			 * if we are going to write more, set MSG_MORE
311			 */
312#ifdef MSG_MORE
313			if ((td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
314			    td->o.size) && !o->pingpong)
315				flags |= MSG_MORE;
316#endif
317			ret = send(io_u->file->fd, io_u->xfer_buf,
318					io_u->xfer_buflen, flags);
319		}
320		if (ret > 0)
321			break;
322
323		ret = poll_wait(td, io_u->file->fd, POLLOUT);
324		if (ret <= 0)
325			break;
326	} while (1);
327
328	return ret;
329}
330
331static int is_udp_close(struct io_u *io_u, int len)
332{
333	struct udp_close_msg *msg;
334
335	if (len != sizeof(struct udp_close_msg))
336		return 0;
337
338	msg = io_u->xfer_buf;
339	if (ntohl(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC)
340		return 0;
341	if (ntohl(msg->cmd) != FIO_LINK_CLOSE)
342		return 0;
343
344	return 1;
345}
346
347static int fio_netio_recv(struct thread_data *td, struct io_u *io_u)
348{
349	struct netio_data *nd = td->io_ops->data;
350	struct netio_options *o = td->eo;
351	int ret, flags = 0;
352
353	do {
354		if (o->proto == FIO_TYPE_UDP) {
355			socklen_t len = sizeof(nd->addr);
356			struct sockaddr *from = (struct sockaddr *) &nd->addr;
357
358			ret = recvfrom(io_u->file->fd, io_u->xfer_buf,
359					io_u->xfer_buflen, flags, from, &len);
360			if (is_udp_close(io_u, ret)) {
361				td->done = 1;
362				return 0;
363			}
364		} else {
365			ret = recv(io_u->file->fd, io_u->xfer_buf,
366					io_u->xfer_buflen, flags);
367		}
368		if (ret > 0)
369			break;
370		else if (!ret && (flags & MSG_WAITALL))
371			break;
372
373		ret = poll_wait(td, io_u->file->fd, POLLIN);
374		if (ret <= 0)
375			break;
376		flags |= MSG_WAITALL;
377	} while (1);
378
379	return ret;
380}
381
382static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u,
383			     enum fio_ddir ddir)
384{
385	struct netio_data *nd = td->io_ops->data;
386	struct netio_options *o = td->eo;
387	int ret;
388
389	if (ddir == DDIR_WRITE) {
390		if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
391		    o->proto == FIO_TYPE_UNIX)
392			ret = fio_netio_send(td, io_u);
393		else
394			ret = fio_netio_splice_out(td, io_u);
395	} else if (ddir == DDIR_READ) {
396		if (!nd->use_splice || o->proto == FIO_TYPE_UDP ||
397		    o->proto == FIO_TYPE_UNIX)
398			ret = fio_netio_recv(td, io_u);
399		else
400			ret = fio_netio_splice_in(td, io_u);
401	} else
402		ret = 0;	/* must be a SYNC */
403
404	if (ret != (int) io_u->xfer_buflen) {
405		if (ret >= 0) {
406			io_u->resid = io_u->xfer_buflen - ret;
407			io_u->error = 0;
408			return FIO_Q_COMPLETED;
409		} else {
410			int err = errno;
411
412			if (ddir == DDIR_WRITE && err == EMSGSIZE)
413				return FIO_Q_BUSY;
414
415			io_u->error = err;
416		}
417	}
418
419	if (io_u->error)
420		td_verror(td, io_u->error, "xfer");
421
422	return FIO_Q_COMPLETED;
423}
424
425static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
426{
427	struct netio_options *o = td->eo;
428	int ret;
429
430	fio_ro_check(td, io_u);
431
432	ret = __fio_netio_queue(td, io_u, io_u->ddir);
433	if (!o->pingpong || ret != FIO_Q_COMPLETED)
434		return ret;
435
436	/*
437	 * For ping-pong mode, receive or send reply as needed
438	 */
439	if (td_read(td) && io_u->ddir == DDIR_READ)
440		ret = __fio_netio_queue(td, io_u, DDIR_WRITE);
441	else if (td_write(td) && io_u->ddir == DDIR_WRITE)
442		ret = __fio_netio_queue(td, io_u, DDIR_READ);
443
444	return ret;
445}
446
447static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
448{
449	struct netio_data *nd = td->io_ops->data;
450	struct netio_options *o = td->eo;
451	int type, domain;
452
453	if (o->proto == FIO_TYPE_TCP) {
454		domain = AF_INET;
455		type = SOCK_STREAM;
456	} else if (o->proto == FIO_TYPE_UDP) {
457		domain = AF_INET;
458		type = SOCK_DGRAM;
459	} else if (o->proto == FIO_TYPE_UNIX) {
460		domain = AF_UNIX;
461		type = SOCK_STREAM;
462	} else {
463		log_err("fio: bad network type %d\n", o->proto);
464		f->fd = -1;
465		return 1;
466	}
467
468	f->fd = socket(domain, type, 0);
469	if (f->fd < 0) {
470		td_verror(td, errno, "socket");
471		return 1;
472	}
473
474	if (o->proto == FIO_TYPE_UDP)
475		return 0;
476	else if (o->proto == FIO_TYPE_TCP) {
477		socklen_t len = sizeof(nd->addr);
478
479		if (connect(f->fd, (struct sockaddr *) &nd->addr, len) < 0) {
480			td_verror(td, errno, "connect");
481			close(f->fd);
482			return 1;
483		}
484	} else {
485		struct sockaddr_un *addr = &nd->addr_un;
486		socklen_t len;
487
488		len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
489
490		if (connect(f->fd, (struct sockaddr *) addr, len) < 0) {
491			td_verror(td, errno, "connect");
492			close(f->fd);
493			return 1;
494		}
495	}
496
497	return 0;
498}
499
500static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
501{
502	struct netio_data *nd = td->io_ops->data;
503	struct netio_options *o = td->eo;
504	socklen_t socklen = sizeof(nd->addr);
505	int state;
506
507	if (o->proto == FIO_TYPE_UDP) {
508		f->fd = nd->listenfd;
509		return 0;
510	}
511
512	state = td->runstate;
513	td_set_runstate(td, TD_SETTING_UP);
514
515	log_info("fio: waiting for connection\n");
516
517	if (poll_wait(td, nd->listenfd, POLLIN) < 0)
518		goto err;
519
520	f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
521	if (f->fd < 0) {
522		td_verror(td, errno, "accept");
523		goto err;
524	}
525
526	reset_all_stats(td);
527	td_set_runstate(td, state);
528	return 0;
529err:
530	td_set_runstate(td, state);
531	return 1;
532}
533
534static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f)
535{
536	struct netio_data *nd = td->io_ops->data;
537	struct udp_close_msg msg;
538	struct sockaddr *to = (struct sockaddr *) &nd->addr;
539	int ret;
540
541	msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
542	msg.cmd = htonl(FIO_LINK_CLOSE);
543
544	ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to,
545			sizeof(nd->addr));
546	if (ret < 0)
547		td_verror(td, errno, "sendto udp link close");
548}
549
550static int fio_netio_close_file(struct thread_data *td, struct fio_file *f)
551{
552	struct netio_options *o = td->eo;
553
554	/*
555	 * If this is an UDP connection, notify the receiver that we are
556	 * closing down the link
557	 */
558	if (o->proto == FIO_TYPE_UDP)
559		fio_netio_udp_close(td, f);
560
561	return generic_close_file(td, f);
562}
563
564static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f)
565{
566	struct netio_data *nd = td->io_ops->data;
567	struct udp_close_msg msg;
568	struct sockaddr *to = (struct sockaddr *) &nd->addr;
569	socklen_t len = sizeof(nd->addr);
570	int ret;
571
572	ret = recvfrom(f->fd, &msg, sizeof(msg), MSG_WAITALL, to, &len);
573	if (ret < 0) {
574		td_verror(td, errno, "sendto udp link open");
575		return ret;
576	}
577
578	if (ntohl(msg.magic) != FIO_LINK_OPEN_CLOSE_MAGIC ||
579	    ntohl(msg.cmd) != FIO_LINK_OPEN) {
580		log_err("fio: bad udp open magic %x/%x\n", ntohl(msg.magic),
581								ntohl(msg.cmd));
582		return -1;
583	}
584
585	return 0;
586}
587
588static int fio_netio_udp_send_open(struct thread_data *td, struct fio_file *f)
589{
590	struct netio_data *nd = td->io_ops->data;
591	struct udp_close_msg msg;
592	struct sockaddr *to = (struct sockaddr *) &nd->addr;
593	int ret;
594
595	msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC);
596	msg.cmd = htonl(FIO_LINK_OPEN);
597
598	ret = sendto(f->fd, &msg, sizeof(msg), MSG_WAITALL, to,
599			sizeof(nd->addr));
600	if (ret < 0) {
601		td_verror(td, errno, "sendto udp link open");
602		return ret;
603	}
604
605	return 0;
606}
607
608static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
609{
610	int ret;
611	struct netio_options *o = td->eo;
612
613	if (o->listen)
614		ret = fio_netio_accept(td, f);
615	else
616		ret = fio_netio_connect(td, f);
617
618	if (ret) {
619		f->fd = -1;
620		return ret;
621	}
622
623	if (o->proto == FIO_TYPE_UDP) {
624		if (td_write(td))
625			ret = fio_netio_udp_send_open(td, f);
626		else {
627			int state;
628
629			state = td->runstate;
630			td_set_runstate(td, TD_SETTING_UP);
631			ret = fio_netio_udp_recv_open(td, f);
632			td_set_runstate(td, state);
633		}
634	}
635
636	if (ret)
637		fio_netio_close_file(td, f);
638
639	return ret;
640}
641
642static int fio_netio_setup_connect_inet(struct thread_data *td,
643					const char *host, unsigned short port)
644{
645	struct netio_data *nd = td->io_ops->data;
646
647	if (!host) {
648		log_err("fio: connect with no host to connect to.\n");
649		if (td_read(td))
650			log_err("fio: did you forget to set 'listen'?\n");
651
652		td_verror(td, EINVAL, "no hostname= set");
653		return 1;
654	}
655
656	nd->addr.sin_family = AF_INET;
657	nd->addr.sin_port = htons(port);
658
659	if (inet_aton(host, &nd->addr.sin_addr) != 1) {
660		struct hostent *hent;
661
662		hent = gethostbyname(host);
663		if (!hent) {
664			td_verror(td, errno, "gethostbyname");
665			return 1;
666		}
667
668		memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
669	}
670
671	return 0;
672}
673
674static int fio_netio_setup_connect_unix(struct thread_data *td,
675					const char *path)
676{
677	struct netio_data *nd = td->io_ops->data;
678	struct sockaddr_un *soun = &nd->addr_un;
679
680	soun->sun_family = AF_UNIX;
681	strcpy(soun->sun_path, path);
682	return 0;
683}
684
685static int fio_netio_setup_connect(struct thread_data *td)
686{
687	struct netio_options *o = td->eo;
688
689	if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
690		return fio_netio_setup_connect_inet(td, td->o.filename,o->port);
691	else
692		return fio_netio_setup_connect_unix(td, td->o.filename);
693}
694
695static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path)
696{
697	struct netio_data *nd = td->io_ops->data;
698	struct sockaddr_un *addr = &nd->addr_un;
699	mode_t mode;
700	int len, fd;
701
702	fd = socket(AF_UNIX, SOCK_STREAM, 0);
703	if (fd < 0) {
704		log_err("fio: socket: %s\n", strerror(errno));
705		return -1;
706	}
707
708	mode = umask(000);
709
710	memset(addr, 0, sizeof(*addr));
711	addr->sun_family = AF_UNIX;
712	strcpy(addr->sun_path, path);
713	unlink(path);
714
715	len = sizeof(addr->sun_family) + strlen(path) + 1;
716
717	if (bind(fd, (struct sockaddr *) addr, len) < 0) {
718		log_err("fio: bind: %s\n", strerror(errno));
719		close(fd);
720		return -1;
721	}
722
723	umask(mode);
724	nd->listenfd = fd;
725	return 0;
726}
727
728static int fio_netio_setup_listen_inet(struct thread_data *td, short port)
729{
730	struct netio_data *nd = td->io_ops->data;
731	struct netio_options *o = td->eo;
732	int fd, opt, type;
733
734	if (o->proto == FIO_TYPE_TCP)
735		type = SOCK_STREAM;
736	else
737		type = SOCK_DGRAM;
738
739	fd = socket(AF_INET, type, 0);
740	if (fd < 0) {
741		td_verror(td, errno, "socket");
742		return 1;
743	}
744
745	opt = 1;
746	if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
747		td_verror(td, errno, "setsockopt");
748		return 1;
749	}
750#ifdef SO_REUSEPORT
751	if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
752		td_verror(td, errno, "setsockopt");
753		return 1;
754	}
755#endif
756
757	nd->addr.sin_family = AF_INET;
758	nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
759	nd->addr.sin_port = htons(port);
760
761	if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
762		td_verror(td, errno, "bind");
763		return 1;
764	}
765
766	nd->listenfd = fd;
767	return 0;
768}
769
770static int fio_netio_setup_listen(struct thread_data *td)
771{
772	struct netio_data *nd = td->io_ops->data;
773	struct netio_options *o = td->eo;
774	int ret;
775
776	if (o->proto == FIO_TYPE_UDP || o->proto == FIO_TYPE_TCP)
777		ret = fio_netio_setup_listen_inet(td, o->port);
778	else
779		ret = fio_netio_setup_listen_unix(td, td->o.filename);
780
781	if (ret)
782		return ret;
783	if (o->proto == FIO_TYPE_UDP)
784		return 0;
785
786	if (listen(nd->listenfd, 10) < 0) {
787		td_verror(td, errno, "listen");
788		nd->listenfd = -1;
789		return 1;
790	}
791
792	return 0;
793}
794
795static int fio_netio_init(struct thread_data *td)
796{
797	struct netio_options *o = td->eo;
798	int ret;
799
800#ifdef WIN32
801	WSADATA wsd;
802	WSAStartup(MAKEWORD(2,2), &wsd);
803#endif
804
805	if (td_random(td)) {
806		log_err("fio: network IO can't be random\n");
807		return 1;
808	}
809
810	if (o->proto == FIO_TYPE_UNIX && o->port) {
811		log_err("fio: network IO port not valid with unix socket\n");
812		return 1;
813	} else if (o->proto != FIO_TYPE_UNIX && !o->port) {
814		log_err("fio: network IO requires port for tcp or udp\n");
815		return 1;
816	}
817
818	if (o->proto != FIO_TYPE_TCP) {
819		if (o->listen) {
820			log_err("fio: listen only valid for TCP proto IO\n");
821			return 1;
822		}
823		if (td_rw(td)) {
824			log_err("fio: datagram network connections must be"
825				   " read OR write\n");
826			return 1;
827		}
828		if (o->proto == FIO_TYPE_UNIX && !td->o.filename) {
829			log_err("fio: UNIX sockets need host/filename\n");
830			return 1;
831		}
832		o->listen = td_read(td);
833	}
834
835	if (o->proto != FIO_TYPE_UNIX && o->listen && td->o.filename) {
836		log_err("fio: hostname not valid for inbound network IO\n");
837		return 1;
838	}
839
840	if (o->listen)
841		ret = fio_netio_setup_listen(td);
842	else
843		ret = fio_netio_setup_connect(td);
844
845	return ret;
846}
847
848static void fio_netio_cleanup(struct thread_data *td)
849{
850	struct netio_data *nd = td->io_ops->data;
851
852	if (nd) {
853		if (nd->listenfd != -1)
854			close(nd->listenfd);
855		if (nd->pipes[0] != -1)
856			close(nd->pipes[0]);
857		if (nd->pipes[1] != -1)
858			close(nd->pipes[1]);
859
860		free(nd);
861	}
862}
863
864static int fio_netio_setup(struct thread_data *td)
865{
866	struct netio_data *nd;
867
868	if (!td->files_index) {
869		add_file(td, td->o.filename ?: "net");
870		td->o.nr_files = td->o.nr_files ?: 1;
871	}
872
873	if (!td->io_ops->data) {
874		nd = malloc(sizeof(*nd));;
875
876		memset(nd, 0, sizeof(*nd));
877		nd->listenfd = -1;
878		nd->pipes[0] = nd->pipes[1] = -1;
879		td->io_ops->data = nd;
880	}
881
882	return 0;
883}
884
885static void fio_netio_terminate(struct thread_data *td)
886{
887	kill(td->pid, SIGUSR2);
888}
889
890#ifdef CONFIG_LINUX_SPLICE
891static int fio_netio_setup_splice(struct thread_data *td)
892{
893	struct netio_data *nd;
894
895	fio_netio_setup(td);
896
897	nd = td->io_ops->data;
898	if (nd) {
899		if (pipe(nd->pipes) < 0)
900			return 1;
901
902		nd->use_splice = 1;
903		return 0;
904	}
905
906	return 1;
907}
908
909static struct ioengine_ops ioengine_splice = {
910	.name			= "netsplice",
911	.version		= FIO_IOOPS_VERSION,
912	.prep			= fio_netio_prep,
913	.queue			= fio_netio_queue,
914	.setup			= fio_netio_setup_splice,
915	.init			= fio_netio_init,
916	.cleanup		= fio_netio_cleanup,
917	.open_file		= fio_netio_open_file,
918	.close_file		= fio_netio_close_file,
919	.terminate		= fio_netio_terminate,
920	.options		= options,
921	.option_struct_size	= sizeof(struct netio_options),
922	.flags			= FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
923				  FIO_PIPEIO,
924};
925#endif
926
927static struct ioengine_ops ioengine_rw = {
928	.name			= "net",
929	.version		= FIO_IOOPS_VERSION,
930	.prep			= fio_netio_prep,
931	.queue			= fio_netio_queue,
932	.setup			= fio_netio_setup,
933	.init			= fio_netio_init,
934	.cleanup		= fio_netio_cleanup,
935	.open_file		= fio_netio_open_file,
936	.close_file		= fio_netio_close_file,
937	.terminate		= fio_netio_terminate,
938	.options		= options,
939	.option_struct_size	= sizeof(struct netio_options),
940	.flags			= FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
941				  FIO_PIPEIO,
942};
943
944static int str_hostname_cb(void *data, const char *input)
945{
946	struct netio_options *o = data;
947
948	if (o->td->o.filename)
949		free(o->td->o.filename);
950	o->td->o.filename = strdup(input);
951	return 0;
952}
953
954static void fio_init fio_netio_register(void)
955{
956	register_ioengine(&ioengine_rw);
957#ifdef CONFIG_LINUX_SPLICE
958	register_ioengine(&ioengine_splice);
959#endif
960}
961
962static void fio_exit fio_netio_unregister(void)
963{
964	unregister_ioengine(&ioengine_rw);
965#ifdef CONFIG_LINUX_SPLICE
966	unregister_ioengine(&ioengine_splice);
967#endif
968}
969