server.c revision 53bd8dbcbf692d1f622d6c9e62284121e710fdc3
1#include <stdio.h>
2#include <stdlib.h>
3#include <stdarg.h>
4#include <unistd.h>
5#include <limits.h>
6#include <errno.h>
7#include <fcntl.h>
8#include <sys/poll.h>
9#include <sys/types.h>
10#include <sys/wait.h>
11#include <sys/socket.h>
12#include <sys/stat.h>
13#include <sys/un.h>
14#include <netinet/in.h>
15#include <arpa/inet.h>
16#include <netdb.h>
17#include <syslog.h>
18#include <signal.h>
19#include <zlib.h>
20
21#include "fio.h"
22#include "server.h"
23#include "crc/crc16.h"
24#include "lib/ieee754.h"
25
26#include "fio_version.h"
27
28int fio_net_port = FIO_NET_PORT;
29
30int exit_backend = 0;
31
32static int server_fd = -1;
33static char *fio_server_arg;
34static char *bind_sock;
35static struct sockaddr_in saddr_in;
36static struct sockaddr_in6 saddr_in6;
37static int first_cmd_check;
38static int use_ipv6;
39
40static const char *fio_server_ops[FIO_NET_CMD_NR] = {
41	"",
42	"QUIT",
43	"EXIT",
44	"JOB",
45	"JOBLINE",
46	"TEXT",
47	"TS",
48	"GS",
49	"SEND_ETA",
50	"ETA",
51	"PROBE",
52	"START",
53	"STOP",
54	"DISK_UTIL",
55	"SERVER_START",
56	"ADD_JOB",
57	"CMD_RUN"
58};
59
60const char *fio_server_op(unsigned int op)
61{
62	static char buf[32];
63
64	if (op < FIO_NET_CMD_NR)
65		return fio_server_ops[op];
66
67	sprintf(buf, "UNKNOWN/%d", op);
68	return buf;
69}
70
71static ssize_t iov_total_len(const struct iovec *iov, int count)
72{
73	ssize_t ret = 0;
74
75	while (count--) {
76		ret += iov->iov_len;
77		iov++;
78	}
79
80	return ret;
81}
82
83static int fio_sendv_data(int sk, struct iovec *iov, int count)
84{
85	ssize_t total_len = iov_total_len(iov, count);
86	ssize_t ret;
87
88	do {
89		ret = writev(sk, iov, count);
90		if (ret > 0) {
91			total_len -= ret;
92			if (!total_len)
93				break;
94
95			while (ret) {
96				if (ret >= iov->iov_len) {
97					ret -= iov->iov_len;
98					iov++;
99					continue;
100				}
101				iov->iov_base += ret;
102				iov->iov_len -= ret;
103				ret = 0;
104			}
105		} else if (!ret)
106			break;
107		else if (errno == EAGAIN || errno == EINTR)
108			continue;
109		else
110			break;
111	} while (!exit_backend);
112
113	if (!total_len)
114		return 0;
115
116	if (errno)
117		return -errno;
118
119	return 1;
120}
121
122int fio_send_data(int sk, const void *p, unsigned int len)
123{
124	struct iovec iov = { .iov_base = (void *) p, .iov_len = len };
125
126	assert(len <= sizeof(struct fio_net_cmd) + FIO_SERVER_MAX_FRAGMENT_PDU);
127
128	return fio_sendv_data(sk, &iov, 1);
129}
130
131int fio_recv_data(int sk, void *p, unsigned int len)
132{
133	do {
134		int ret = recv(sk, p, len, MSG_WAITALL);
135
136		if (ret > 0) {
137			len -= ret;
138			if (!len)
139				break;
140			p += ret;
141			continue;
142		} else if (!ret)
143			break;
144		else if (errno == EAGAIN || errno == EINTR)
145			continue;
146		else
147			break;
148	} while (!exit_backend);
149
150	if (!len)
151		return 0;
152
153	return -1;
154}
155
156static int verify_convert_cmd(struct fio_net_cmd *cmd)
157{
158	uint16_t crc;
159
160	cmd->cmd_crc16 = le16_to_cpu(cmd->cmd_crc16);
161	cmd->pdu_crc16 = le16_to_cpu(cmd->pdu_crc16);
162
163	crc = fio_crc16(cmd, FIO_NET_CMD_CRC_SZ);
164	if (crc != cmd->cmd_crc16) {
165		log_err("fio: server bad crc on command (got %x, wanted %x)\n",
166				cmd->cmd_crc16, crc);
167		return 1;
168	}
169
170	cmd->version	= le16_to_cpu(cmd->version);
171	cmd->opcode	= le16_to_cpu(cmd->opcode);
172	cmd->flags	= le32_to_cpu(cmd->flags);
173	cmd->tag	= le64_to_cpu(cmd->tag);
174	cmd->pdu_len	= le32_to_cpu(cmd->pdu_len);
175
176	switch (cmd->version) {
177	case FIO_SERVER_VER:
178		break;
179	default:
180		log_err("fio: bad server cmd version %d\n", cmd->version);
181		return 1;
182	}
183
184	if (cmd->pdu_len > FIO_SERVER_MAX_FRAGMENT_PDU) {
185		log_err("fio: command payload too large: %u\n", cmd->pdu_len);
186		return 1;
187	}
188
189	return 0;
190}
191
192/*
193 * Read (and defragment, if necessary) incoming commands
194 */
195struct fio_net_cmd *fio_net_recv_cmd(int sk)
196{
197	struct fio_net_cmd cmd, *cmdret = NULL;
198	size_t cmd_size = 0, pdu_offset = 0;
199	uint16_t crc;
200	int ret, first = 1;
201	void *pdu = NULL;
202
203	do {
204		ret = fio_recv_data(sk, &cmd, sizeof(cmd));
205		if (ret)
206			break;
207
208		/* We have a command, verify it and swap if need be */
209		ret = verify_convert_cmd(&cmd);
210		if (ret)
211			break;
212
213		if (first) {
214			/* if this is text, add room for \0 at the end */
215			cmd_size = sizeof(cmd) + cmd.pdu_len + 1;
216			assert(!cmdret);
217		} else
218			cmd_size += cmd.pdu_len;
219
220		cmdret = realloc(cmdret, cmd_size);
221
222		if (first)
223			memcpy(cmdret, &cmd, sizeof(cmd));
224		else if (cmdret->opcode != cmd.opcode) {
225			log_err("fio: fragment opcode mismatch (%d != %d)\n",
226					cmdret->opcode, cmd.opcode);
227			ret = 1;
228			break;
229		}
230
231		if (!cmd.pdu_len)
232			break;
233
234		/* There's payload, get it */
235		pdu = (void *) cmdret->payload + pdu_offset;
236		ret = fio_recv_data(sk, pdu, cmd.pdu_len);
237		if (ret)
238			break;
239
240		/* Verify payload crc */
241		crc = fio_crc16(pdu, cmd.pdu_len);
242		if (crc != cmd.pdu_crc16) {
243			log_err("fio: server bad crc on payload ");
244			log_err("(got %x, wanted %x)\n", cmd.pdu_crc16, crc);
245			ret = 1;
246			break;
247		}
248
249		pdu_offset += cmd.pdu_len;
250		if (!first)
251			cmdret->pdu_len += cmd.pdu_len;
252		first = 0;
253	} while (cmd.flags & FIO_NET_CMD_F_MORE);
254
255	if (ret) {
256		free(cmdret);
257		cmdret = NULL;
258	} else if (cmdret) {
259		/* zero-terminate text input */
260		if (cmdret->pdu_len) {
261			if (cmdret->opcode == FIO_NET_CMD_TEXT) {
262				struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmdret->payload;
263				char *buf = (char *) pdu->buf;
264
265				buf[pdu->buf_len ] = '\0';
266			} else if (cmdret->opcode == FIO_NET_CMD_JOB) {
267				struct cmd_job_pdu *pdu = (struct cmd_job_pdu *) cmdret->payload;
268				char *buf = (char *) pdu->buf;
269				int len = le32_to_cpu(pdu->buf_len);
270
271				buf[len] = '\0';
272			}
273		}
274
275		/* frag flag is internal */
276		cmdret->flags &= ~FIO_NET_CMD_F_MORE;
277	}
278
279	return cmdret;
280}
281
282void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu)
283{
284	uint32_t pdu_len;
285
286	cmd->cmd_crc16 = __cpu_to_le16(fio_crc16(cmd, FIO_NET_CMD_CRC_SZ));
287
288	pdu_len = le32_to_cpu(cmd->pdu_len);
289	cmd->pdu_crc16 = __cpu_to_le16(fio_crc16(pdu, pdu_len));
290}
291
292void fio_net_cmd_crc(struct fio_net_cmd *cmd)
293{
294	fio_net_cmd_crc_pdu(cmd, cmd->payload);
295}
296
297int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size,
298		     uint64_t tag)
299{
300	struct fio_net_cmd *cmd = NULL;
301	size_t this_len, cur_len = 0;
302	int ret;
303
304	do {
305		this_len = size;
306		if (this_len > FIO_SERVER_MAX_FRAGMENT_PDU)
307			this_len = FIO_SERVER_MAX_FRAGMENT_PDU;
308
309		if (!cmd || cur_len < sizeof(*cmd) + this_len) {
310			if (cmd)
311				free(cmd);
312
313			cur_len = sizeof(*cmd) + this_len;
314			cmd = malloc(cur_len);
315		}
316
317		fio_init_net_cmd(cmd, opcode, buf, this_len, tag);
318
319		if (this_len < size)
320			cmd->flags = __cpu_to_le32(FIO_NET_CMD_F_MORE);
321
322		fio_net_cmd_crc(cmd);
323
324		ret = fio_send_data(fd, cmd, sizeof(*cmd) + this_len);
325		size -= this_len;
326		buf += this_len;
327	} while (!ret && size);
328
329	if (cmd)
330		free(cmd);
331
332	return ret;
333}
334
335static int fio_net_send_simple_stack_cmd(int sk, uint16_t opcode, uint64_t tag)
336{
337	struct fio_net_cmd cmd;
338
339	fio_init_net_cmd(&cmd, opcode, NULL, 0, tag);
340	fio_net_cmd_crc(&cmd);
341
342	return fio_send_data(sk, &cmd, sizeof(cmd));
343}
344
345/*
346 * If 'list' is non-NULL, then allocate and store the sent command for
347 * later verification.
348 */
349int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag,
350			    struct flist_head *list)
351{
352	struct fio_net_int_cmd *cmd;
353	int ret;
354
355	if (!list)
356		return fio_net_send_simple_stack_cmd(sk, opcode, tag);
357
358	cmd = malloc(sizeof(*cmd));
359
360	fio_init_net_cmd(&cmd->cmd, opcode, NULL, 0, (uintptr_t) cmd);
361	fio_net_cmd_crc(&cmd->cmd);
362
363	INIT_FLIST_HEAD(&cmd->list);
364	gettimeofday(&cmd->tv, NULL);
365	cmd->saved_tag = tag;
366
367	ret = fio_send_data(sk, &cmd->cmd, sizeof(cmd->cmd));
368	if (ret) {
369		free(cmd);
370		return ret;
371	}
372
373	flist_add_tail(&cmd->list, list);
374	return 0;
375}
376
377static int fio_server_send_quit_cmd(void)
378{
379	dprint(FD_NET, "server: sending quit\n");
380	return fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_QUIT, 0, NULL);
381}
382
383static int handle_run_cmd(struct fio_net_cmd *cmd)
384{
385	struct cmd_end_pdu epdu;
386	int ret;
387
388	ret = fio_backend();
389
390	epdu.error = ret;
391	fio_net_send_cmd(server_fd, FIO_NET_CMD_STOP, &epdu, sizeof(epdu), 0);
392
393	fio_server_send_quit_cmd();
394	reset_fio_state();
395	first_cmd_check = 0;
396	return ret;
397}
398
399static int handle_job_cmd(struct fio_net_cmd *cmd)
400{
401	struct cmd_job_pdu *pdu = (struct cmd_job_pdu *) cmd->payload;
402	void *buf = pdu->buf;
403	struct cmd_start_pdu spdu;
404
405	pdu->buf_len = le32_to_cpu(pdu->buf_len);
406	pdu->client_type = le32_to_cpu(pdu->client_type);
407
408	if (parse_jobs_ini(buf, 1, 0, pdu->client_type)) {
409		fio_server_send_quit_cmd();
410		return -1;
411	}
412
413	spdu.jobs = cpu_to_le32(thread_number);
414	fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), 0);
415	return 0;
416}
417
418static int handle_jobline_cmd(struct fio_net_cmd *cmd)
419{
420	void *pdu = cmd->payload;
421	struct cmd_single_line_pdu *cslp;
422	struct cmd_line_pdu *clp;
423	unsigned long offset;
424	struct cmd_start_pdu spdu;
425	char **argv;
426	int i;
427
428	clp = pdu;
429	clp->lines = le16_to_cpu(clp->lines);
430	clp->client_type = le16_to_cpu(clp->client_type);
431	argv = malloc(clp->lines * sizeof(char *));
432	offset = sizeof(*clp);
433
434	dprint(FD_NET, "server: %d command line args\n", clp->lines);
435
436	for (i = 0; i < clp->lines; i++) {
437		cslp = pdu + offset;
438		argv[i] = (char *) cslp->text;
439
440		offset += sizeof(*cslp) + le16_to_cpu(cslp->len);
441		dprint(FD_NET, "server: %d: %s\n", i, argv[i]);
442	}
443
444	if (parse_cmd_line(clp->lines, argv, clp->client_type)) {
445		fio_server_send_quit_cmd();
446		free(argv);
447		return -1;
448	}
449
450	free(argv);
451
452	spdu.jobs = cpu_to_le32(thread_number);
453	fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), 0);
454	return 0;
455}
456
457static int handle_probe_cmd(struct fio_net_cmd *cmd)
458{
459	struct cmd_probe_pdu probe;
460
461	dprint(FD_NET, "server: sending probe reply\n");
462
463	memset(&probe, 0, sizeof(probe));
464	gethostname((char *) probe.hostname, sizeof(probe.hostname));
465#ifdef FIO_BIG_ENDIAN
466	probe.bigendian = 1;
467#endif
468	probe.fio_major = FIO_MAJOR;
469	probe.fio_minor = FIO_MINOR;
470	probe.fio_patch = FIO_PATCH;
471
472	probe.os	= FIO_OS;
473	probe.arch	= FIO_ARCH;
474
475	probe.bpp	= sizeof(void *);
476
477	return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), cmd->tag);
478}
479
480static int handle_send_eta_cmd(struct fio_net_cmd *cmd)
481{
482	struct jobs_eta *je;
483	size_t size;
484	int i;
485
486	if (!thread_number)
487		return 0;
488
489	size = sizeof(*je) + thread_number * sizeof(char) + 1;
490	je = malloc(size);
491	memset(je, 0, size);
492
493	if (!calc_thread_status(je, 1)) {
494		free(je);
495		return 0;
496	}
497
498	dprint(FD_NET, "server sending status\n");
499
500	je->nr_running		= cpu_to_le32(je->nr_running);
501	je->nr_ramp		= cpu_to_le32(je->nr_ramp);
502	je->nr_pending		= cpu_to_le32(je->nr_pending);
503	je->files_open		= cpu_to_le32(je->files_open);
504
505	for (i = 0; i < 2; i++) {
506		je->m_rate[i]	= cpu_to_le32(je->m_rate[i]);
507		je->t_rate[i]	= cpu_to_le32(je->t_rate[i]);
508		je->m_iops[i]	= cpu_to_le32(je->m_iops[i]);
509		je->t_iops[i]	= cpu_to_le32(je->t_iops[i]);
510		je->rate[i]	= cpu_to_le32(je->rate[i]);
511		je->iops[i]	= cpu_to_le32(je->iops[i]);
512	}
513
514	je->elapsed_sec		= cpu_to_le64(je->elapsed_sec);
515	je->eta_sec		= cpu_to_le64(je->eta_sec);
516	je->nr_threads		= cpu_to_le32(je->nr_threads);
517
518	fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, je, size, cmd->tag);
519	free(je);
520	return 0;
521}
522
523static int handle_command(struct fio_net_cmd *cmd)
524{
525	int ret;
526
527	dprint(FD_NET, "server: got op [%s], pdu=%u, tag=%lx\n",
528			fio_server_op(cmd->opcode), cmd->pdu_len, cmd->tag);
529
530	switch (cmd->opcode) {
531	case FIO_NET_CMD_QUIT:
532		fio_terminate_threads(TERMINATE_ALL);
533		return -1;
534	case FIO_NET_CMD_EXIT:
535		exit_backend = 1;
536		return -1;
537	case FIO_NET_CMD_JOB:
538		ret = handle_job_cmd(cmd);
539		break;
540	case FIO_NET_CMD_JOBLINE:
541		ret = handle_jobline_cmd(cmd);
542		break;
543	case FIO_NET_CMD_PROBE:
544		ret = handle_probe_cmd(cmd);
545		break;
546	case FIO_NET_CMD_SEND_ETA:
547		ret = handle_send_eta_cmd(cmd);
548		break;
549	case FIO_NET_CMD_RUN:
550		ret = handle_run_cmd(cmd);
551		break;
552	default:
553		log_err("fio: unknown opcode: %s\n",fio_server_op(cmd->opcode));
554		ret = 1;
555	}
556
557	return ret;
558}
559
560static int handle_connection(int sk, int block)
561{
562	struct fio_net_cmd *cmd = NULL;
563	int ret = 0;
564
565	/* read forever */
566	while (!exit_backend) {
567		struct pollfd pfd = {
568			.fd	= sk,
569			.events	= POLLIN,
570		};
571
572		ret = 0;
573		do {
574			ret = poll(&pfd, 1, 100);
575			if (ret < 0) {
576				if (errno == EINTR)
577					break;
578				log_err("fio: poll: %s\n", strerror(errno));
579				break;
580			} else if (!ret) {
581				if (!block)
582					return 0;
583				continue;
584			}
585
586			if (pfd.revents & POLLIN)
587				break;
588			if (pfd.revents & (POLLERR|POLLHUP)) {
589				ret = 1;
590				break;
591			}
592		} while (!exit_backend);
593
594		if (ret < 0)
595			break;
596
597		cmd = fio_net_recv_cmd(sk);
598		if (!cmd) {
599			ret = -1;
600			break;
601		}
602
603		ret = handle_command(cmd);
604		if (ret)
605			break;
606
607		free(cmd);
608		cmd = NULL;
609	}
610
611	if (cmd)
612		free(cmd);
613
614	return ret;
615}
616
617void fio_server_idle_loop(void)
618{
619	if (!first_cmd_check) {
620		fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL);
621		first_cmd_check = 1;
622	}
623	if (server_fd != -1)
624		handle_connection(server_fd, 0);
625}
626
627static int accept_loop(int listen_sk)
628{
629	struct sockaddr_in addr;
630	fio_socklen_t len = sizeof(addr);
631	struct pollfd pfd;
632	int ret, sk, flags, exitval = 0;
633
634	dprint(FD_NET, "server enter accept loop\n");
635
636	flags = fcntl(listen_sk, F_GETFL);
637	flags |= O_NONBLOCK;
638	fcntl(listen_sk, F_SETFL, flags);
639again:
640	pfd.fd = listen_sk;
641	pfd.events = POLLIN;
642	do {
643		ret = poll(&pfd, 1, 100);
644		if (ret < 0) {
645			if (errno == EINTR)
646				break;
647			log_err("fio: poll: %s\n", strerror(errno));
648			goto out;
649		} else if (!ret)
650			continue;
651
652		if (pfd.revents & POLLIN)
653			break;
654	} while (!exit_backend);
655
656	if (exit_backend)
657		goto out;
658
659	sk = accept(listen_sk, (struct sockaddr *) &addr, &len);
660	if (sk < 0) {
661		log_err("fio: accept: %s\n", strerror(errno));
662		return -1;
663	}
664
665	dprint(FD_NET, "server: connect from %s\n", inet_ntoa(addr.sin_addr));
666
667	server_fd = sk;
668
669	exitval = handle_connection(sk, 1);
670
671	server_fd = -1;
672	close(sk);
673
674	if (!exit_backend)
675		goto again;
676
677out:
678	return exitval;
679}
680
681int fio_server_text_output(int level, const char *buf, size_t len)
682{
683	struct cmd_text_pdu *pdu;
684	unsigned int tlen;
685	struct timeval tv;
686
687	if (server_fd == -1)
688		return log_local_buf(buf, len);
689
690	tlen = sizeof(*pdu) + len;
691	pdu = malloc(tlen);
692
693	pdu->level	= __cpu_to_le32(level);
694	pdu->buf_len	= __cpu_to_le32(len);
695
696	gettimeofday(&tv, NULL);
697	pdu->log_sec	= __cpu_to_le64(tv.tv_sec);
698	pdu->log_usec	= __cpu_to_le64(tv.tv_usec);
699
700	memcpy(pdu->buf, buf, len);
701
702	fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, pdu, tlen, 0);
703	free(pdu);
704	return len;
705}
706
707static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
708{
709	dst->max_val	= cpu_to_le64(src->max_val);
710	dst->min_val	= cpu_to_le64(src->min_val);
711	dst->samples	= cpu_to_le64(src->samples);
712
713	/*
714	 * Encode to IEEE 754 for network transfer
715	 */
716	dst->mean.u.i	= __cpu_to_le64(fio_double_to_uint64(src->mean.u.f));
717	dst->S.u.i	= __cpu_to_le64(fio_double_to_uint64(src->S.u.f));
718}
719
720static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
721{
722	int i;
723
724	for (i = 0; i < 2; i++) {
725		dst->max_run[i]		= cpu_to_le64(src->max_run[i]);
726		dst->min_run[i]		= cpu_to_le64(src->min_run[i]);
727		dst->max_bw[i]		= cpu_to_le64(src->max_bw[i]);
728		dst->min_bw[i]		= cpu_to_le64(src->min_bw[i]);
729		dst->io_kb[i]		= cpu_to_le64(src->io_kb[i]);
730		dst->agg[i]		= cpu_to_le64(src->agg[i]);
731	}
732
733	dst->kb_base	= cpu_to_le32(src->kb_base);
734	dst->groupid	= cpu_to_le32(src->groupid);
735}
736
737/*
738 * Send a CMD_TS, which packs struct thread_stat and group_run_stats
739 * into a single payload.
740 */
741void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs)
742{
743	struct cmd_ts_pdu p;
744	int i, j;
745
746	dprint(FD_NET, "server sending end stats\n");
747
748	memset(&p, 0, sizeof(p));
749
750	strcpy(p.ts.name, ts->name);
751	strcpy(p.ts.verror, ts->verror);
752	strcpy(p.ts.description, ts->description);
753
754	p.ts.error	= cpu_to_le32(ts->error);
755	p.ts.groupid	= cpu_to_le32(ts->groupid);
756	p.ts.pid	= cpu_to_le32(ts->pid);
757	p.ts.members	= cpu_to_le32(ts->members);
758
759	for (i = 0; i < 2; i++) {
760		convert_io_stat(&p.ts.clat_stat[i], &ts->clat_stat[i]);
761		convert_io_stat(&p.ts.slat_stat[i], &ts->slat_stat[i]);
762		convert_io_stat(&p.ts.lat_stat[i], &ts->lat_stat[i]);
763		convert_io_stat(&p.ts.bw_stat[i], &ts->bw_stat[i]);
764	}
765
766	p.ts.usr_time		= cpu_to_le64(ts->usr_time);
767	p.ts.sys_time		= cpu_to_le64(ts->sys_time);
768	p.ts.ctx		= cpu_to_le64(ts->ctx);
769	p.ts.minf		= cpu_to_le64(ts->minf);
770	p.ts.majf		= cpu_to_le64(ts->majf);
771	p.ts.clat_percentiles	= cpu_to_le64(ts->clat_percentiles);
772
773	for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
774		fio_fp64_t *src = &ts->percentile_list[i];
775		fio_fp64_t *dst = &p.ts.percentile_list[i];
776
777		dst->u.i = __cpu_to_le64(fio_double_to_uint64(src->u.f));
778	}
779
780	for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
781		p.ts.io_u_map[i]	= cpu_to_le32(ts->io_u_map[i]);
782		p.ts.io_u_submit[i]	= cpu_to_le32(ts->io_u_submit[i]);
783		p.ts.io_u_complete[i]	= cpu_to_le32(ts->io_u_complete[i]);
784	}
785
786	for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) {
787		p.ts.io_u_lat_u[i]	= cpu_to_le32(ts->io_u_lat_u[i]);
788		p.ts.io_u_lat_m[i]	= cpu_to_le32(ts->io_u_lat_m[i]);
789	}
790
791	for (i = 0; i < 2; i++)
792		for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
793			p.ts.io_u_plat[i][j] = cpu_to_le32(ts->io_u_plat[i][j]);
794
795	for (i = 0; i < 3; i++) {
796		p.ts.total_io_u[i]	= cpu_to_le64(ts->total_io_u[i]);
797		p.ts.short_io_u[i]	= cpu_to_le64(ts->short_io_u[i]);
798	}
799
800	p.ts.total_submit	= cpu_to_le64(ts->total_submit);
801	p.ts.total_complete	= cpu_to_le64(ts->total_complete);
802
803	for (i = 0; i < 2; i++) {
804		p.ts.io_bytes[i]	= cpu_to_le64(ts->io_bytes[i]);
805		p.ts.runtime[i]		= cpu_to_le64(ts->runtime[i]);
806	}
807
808	p.ts.total_run_time	= cpu_to_le64(ts->total_run_time);
809	p.ts.continue_on_error	= cpu_to_le16(ts->continue_on_error);
810	p.ts.total_err_count	= cpu_to_le64(ts->total_err_count);
811	p.ts.first_error	= cpu_to_le32(ts->first_error);
812	p.ts.kb_base		= cpu_to_le32(ts->kb_base);
813
814	convert_gs(&p.rs, rs);
815
816	fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p), 0);
817}
818
819void fio_server_send_gs(struct group_run_stats *rs)
820{
821	struct group_run_stats gs;
822
823	dprint(FD_NET, "server sending group run stats\n");
824
825	convert_gs(&gs, rs);
826	fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs), 0);
827}
828
829static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src)
830{
831	int i;
832
833	for (i = 0; i < 2; i++) {
834		dst->ios[i]	= cpu_to_le32(src->ios[i]);
835		dst->merges[i]	= cpu_to_le32(src->merges[i]);
836		dst->sectors[i]	= cpu_to_le64(src->sectors[i]);
837		dst->ticks[i]	= cpu_to_le32(src->ticks[i]);
838	}
839
840	dst->io_ticks		= cpu_to_le32(src->io_ticks);
841	dst->time_in_queue	= cpu_to_le32(src->time_in_queue);
842	dst->slavecount		= cpu_to_le32(src->slavecount);
843	dst->max_util.u.i	= __cpu_to_le64(fio_double_to_uint64(src->max_util.u.f));
844}
845
846static void convert_dus(struct disk_util_stat *dst, struct disk_util_stat *src)
847{
848	int i;
849
850	strcpy((char *) dst->name, (char *) src->name);
851
852	for (i = 0; i < 2; i++) {
853		dst->ios[i]	= cpu_to_le32(src->ios[i]);
854		dst->merges[i]	= cpu_to_le32(src->merges[i]);
855		dst->sectors[i]	= cpu_to_le64(src->sectors[i]);
856		dst->ticks[i]	= cpu_to_le32(src->ticks[i]);
857	}
858
859	dst->io_ticks		= cpu_to_le32(src->io_ticks);
860	dst->time_in_queue	= cpu_to_le32(src->time_in_queue);
861	dst->msec		= cpu_to_le64(src->msec);
862}
863
864void fio_server_send_du(void)
865{
866	struct disk_util *du;
867	struct flist_head *entry;
868	struct cmd_du_pdu pdu;
869
870	dprint(FD_NET, "server: sending disk_util %d\n", !flist_empty(&disk_list));
871
872	memset(&pdu, 0, sizeof(pdu));
873
874	flist_for_each(entry, &disk_list) {
875		du = flist_entry(entry, struct disk_util, list);
876
877		convert_dus(&pdu.dus, &du->dus);
878		convert_agg(&pdu.agg, &du->agg);
879
880		fio_net_send_cmd(server_fd, FIO_NET_CMD_DU, &pdu, sizeof(pdu), 0);
881	}
882}
883
884/*
885 * Send a command with a separate PDU, not inlined in the command
886 */
887static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf,
888				off_t size, uint64_t tag, uint32_t flags)
889{
890	struct fio_net_cmd cmd;
891	struct iovec iov[2];
892
893	iov[0].iov_base = &cmd;
894	iov[0].iov_len = sizeof(cmd);
895	iov[1].iov_base = (void *) buf;
896	iov[1].iov_len = size;
897
898	__fio_init_net_cmd(&cmd, opcode, size, tag);
899	cmd.flags = __cpu_to_le32(flags);
900	fio_net_cmd_crc_pdu(&cmd, buf);
901
902	return fio_sendv_data(server_fd, iov, 2);
903}
904
905int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
906{
907	struct cmd_iolog_pdu pdu;
908	z_stream stream;
909	void *out_pdu;
910	int i, ret = 0;
911
912	pdu.nr_samples = __cpu_to_le32(log->nr_samples);
913	pdu.log_type = cpu_to_le32(log->log_type);
914	strcpy((char *) pdu.name, name);
915
916	for (i = 0; i < log->nr_samples; i++) {
917		struct io_sample *s = &log->log[i];
918
919		s->time	= cpu_to_le64(s->time);
920		s->val	= cpu_to_le64(s->val);
921		s->ddir	= cpu_to_le32(s->ddir);
922		s->bs	= cpu_to_le32(s->bs);
923	}
924
925	/*
926	 * Dirty - since the log is potentially huge, compress it into
927	 * FIO_SERVER_MAX_FRAGMENT_PDU chunks and let the receiving
928	 * side defragment it.
929	 */
930	out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU);
931
932	stream.zalloc = Z_NULL;
933	stream.zfree = Z_NULL;
934	stream.opaque = Z_NULL;
935
936	if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) {
937		ret = 1;
938		goto err;
939	}
940
941	/*
942	 * Send header first, it's not compressed.
943	 */
944	ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG, &pdu,
945					sizeof(pdu), 0, FIO_NET_CMD_F_MORE);
946	if (ret)
947		goto err_zlib;
948
949	stream.next_in = (void *) log->log;
950	stream.avail_in = log->nr_samples * sizeof(struct io_sample);
951
952	do {
953		unsigned int this_len, flags = 0;
954		int ret;
955
956		stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
957		stream.next_out = out_pdu;
958		assert(deflate(&stream, Z_FINISH) == Z_OK);
959
960		this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
961
962		if (stream.avail_in)
963			flags = FIO_NET_CMD_F_MORE;
964
965		ret = fio_send_cmd_ext_pdu(server_fd, FIO_NET_CMD_IOLOG,
966					   out_pdu, this_len, 0, flags);
967		if (ret)
968			goto err_zlib;
969	} while (stream.avail_in);
970
971err_zlib:
972	deflateEnd(&stream);
973err:
974	free(out_pdu);
975	return ret;
976}
977
978void fio_server_send_add_job(struct thread_options *o, const char *ioengine)
979{
980	struct cmd_add_job_pdu pdu;
981
982	memset(&pdu, 0, sizeof(pdu));
983	convert_thread_options_to_net(&pdu.top, o);
984
985	fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), 0);
986}
987
988static int fio_init_server_ip(void)
989{
990	struct sockaddr *addr;
991	fio_socklen_t socklen;
992	int sk, opt;
993
994	if (use_ipv6)
995		sk = socket(AF_INET6, SOCK_STREAM, 0);
996	else
997		sk = socket(AF_INET, SOCK_STREAM, 0);
998
999	if (sk < 0) {
1000		log_err("fio: socket: %s\n", strerror(errno));
1001		return -1;
1002	}
1003
1004	opt = 1;
1005	if (setsockopt(sk, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
1006		log_err("fio: setsockopt: %s\n", strerror(errno));
1007		close(sk);
1008		return -1;
1009	}
1010#ifdef SO_REUSEPORT
1011	if (setsockopt(sk, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
1012		log_err("fio: setsockopt: %s\n", strerror(errno));
1013		close(sk);
1014		return -1;
1015	}
1016#endif
1017
1018	if (use_ipv6) {
1019		addr = (struct sockaddr *) &saddr_in6;
1020		socklen = sizeof(saddr_in6);
1021		saddr_in6.sin6_family = AF_INET6;
1022	} else {
1023		addr = (struct sockaddr *) &saddr_in;
1024		socklen = sizeof(saddr_in);
1025		saddr_in.sin_family = AF_INET;
1026	}
1027
1028	if (bind(sk, addr, socklen) < 0) {
1029		log_err("fio: bind: %s\n", strerror(errno));
1030		close(sk);
1031		return -1;
1032	}
1033
1034	return sk;
1035}
1036
1037static int fio_init_server_sock(void)
1038{
1039	struct sockaddr_un addr;
1040	fio_socklen_t len;
1041	mode_t mode;
1042	int sk;
1043
1044	sk = socket(AF_UNIX, SOCK_STREAM, 0);
1045	if (sk < 0) {
1046		log_err("fio: socket: %s\n", strerror(errno));
1047		return -1;
1048	}
1049
1050	mode = umask(000);
1051
1052	memset(&addr, 0, sizeof(addr));
1053	addr.sun_family = AF_UNIX;
1054	strcpy(addr.sun_path, bind_sock);
1055	unlink(bind_sock);
1056
1057	len = sizeof(addr.sun_family) + strlen(bind_sock) + 1;
1058
1059	if (bind(sk, (struct sockaddr *) &addr, len) < 0) {
1060		log_err("fio: bind: %s\n", strerror(errno));
1061		close(sk);
1062		return -1;
1063	}
1064
1065	umask(mode);
1066	return sk;
1067}
1068
1069static int fio_init_server_connection(void)
1070{
1071	char bind_str[128];
1072	int sk;
1073
1074	dprint(FD_NET, "starting server\n");
1075
1076	if (!bind_sock)
1077		sk = fio_init_server_ip();
1078	else
1079		sk = fio_init_server_sock();
1080
1081	if (sk < 0)
1082		return sk;
1083
1084	if (!bind_sock) {
1085		char *p, port[16];
1086		const void *src;
1087		int af;
1088
1089		if (use_ipv6) {
1090			af = AF_INET6;
1091			src = &saddr_in6.sin6_addr;
1092		} else {
1093			af = AF_INET;
1094			src = &saddr_in.sin_addr;
1095		}
1096
1097		p = (char *) inet_ntop(af, src, bind_str, sizeof(bind_str));
1098
1099		sprintf(port, ",%u", fio_net_port);
1100		if (p)
1101			strcat(p, port);
1102		else
1103			strcpy(bind_str, port);
1104	} else
1105		strcpy(bind_str, bind_sock);
1106
1107	log_info("fio: server listening on %s\n", bind_str);
1108
1109	if (listen(sk, 0) < 0) {
1110		log_err("fio: listen: %s\n", strerror(errno));
1111		return -1;
1112	}
1113
1114	return sk;
1115}
1116
1117int fio_server_parse_host(const char *host, int *ipv6, struct in_addr *inp,
1118			  struct in6_addr *inp6)
1119
1120{
1121	int ret = 0;
1122
1123	if (*ipv6)
1124		ret = inet_pton(AF_INET6, host, inp6);
1125	else
1126		ret = inet_pton(AF_INET, host, inp);
1127
1128	if (ret != 1) {
1129		struct hostent *hent;
1130
1131		hent = gethostbyname(host);
1132		if (!hent) {
1133			log_err("fio: failed to resolve <%s>\n", host);
1134			return 0;
1135		}
1136
1137		if (*ipv6) {
1138			if (hent->h_addrtype != AF_INET6) {
1139				log_info("fio: falling back to IPv4\n");
1140				*ipv6 = 0;
1141			} else
1142				memcpy(inp6, hent->h_addr_list[0], 16);
1143		}
1144		if (!*ipv6) {
1145			if (hent->h_addrtype != AF_INET) {
1146				log_err("fio: lookup type mismatch\n");
1147				return 0;
1148			}
1149			memcpy(inp, hent->h_addr_list[0], 4);
1150		}
1151		ret = 1;
1152	}
1153
1154	return !(ret == 1);
1155}
1156
1157/*
1158 * Parse a host/ip/port string. Reads from 'str'.
1159 *
1160 * Outputs:
1161 *
1162 * For IPv4:
1163 *	*ptr is the host, *port is the port, inp is the destination.
1164 * For IPv6:
1165 *	*ptr is the host, *port is the port, inp6 is the dest, and *ipv6 is 1.
1166 * For local domain sockets:
1167 *	*ptr is the filename, *is_sock is 1.
1168 */
1169int fio_server_parse_string(const char *str, char **ptr, int *is_sock,
1170			    int *port, struct in_addr *inp,
1171			    struct in6_addr *inp6, int *ipv6)
1172{
1173	const char *host = str;
1174	char *portp;
1175	int lport = 0;
1176
1177	*ptr = NULL;
1178	*is_sock = 0;
1179	*port = fio_net_port;
1180	*ipv6 = 0;
1181
1182	if (!strncmp(str, "sock:", 5)) {
1183		*ptr = strdup(str + 5);
1184		*is_sock = 1;
1185
1186		return 0;
1187	}
1188
1189	/*
1190	 * Is it ip:<ip or host>:port
1191	 */
1192	if (!strncmp(host, "ip:", 3))
1193		host += 3;
1194	else if (!strncmp(host, "ip4:", 4))
1195		host += 4;
1196	else if (!strncmp(host, "ip6:", 4)) {
1197		host += 4;
1198		*ipv6 = 1;
1199	} else if (host[0] == ':') {
1200		/* String is :port */
1201		host++;
1202		lport = atoi(host);
1203		if (!lport || lport > 65535) {
1204			log_err("fio: bad server port %u\n", port);
1205			return 1;
1206		}
1207		/* no hostname given, we are done */
1208		*port = lport;
1209		return 0;
1210	}
1211
1212	/*
1213	 * If no port seen yet, check if there's a last ':' at the end
1214	 */
1215	if (!lport) {
1216		portp = strchr(host, ',');
1217		if (portp) {
1218			*portp = '\0';
1219			portp++;
1220			lport = atoi(portp);
1221			if (!lport || lport > 65535) {
1222				log_err("fio: bad server port %u\n", port);
1223				return 1;
1224			}
1225		}
1226	}
1227
1228	if (lport)
1229		*port = lport;
1230
1231	if (!strlen(host))
1232		return 0;
1233
1234	*ptr = strdup(host);
1235
1236	if (fio_server_parse_host(*ptr, ipv6, inp, inp6)) {
1237		free(*ptr);
1238		*ptr = NULL;
1239		return 1;
1240	}
1241
1242	if (*port == 0)
1243		*port = fio_net_port;
1244
1245	return 0;
1246}
1247
1248/*
1249 * Server arg should be one of:
1250 *
1251 * sock:/path/to/socket
1252 *   ip:1.2.3.4
1253 *      1.2.3.4
1254 *
1255 * Where sock uses unix domain sockets, and ip binds the server to
1256 * a specific interface. If no arguments are given to the server, it
1257 * uses IP and binds to 0.0.0.0.
1258 *
1259 */
1260static int fio_handle_server_arg(void)
1261{
1262	int port = fio_net_port;
1263	int is_sock, ret = 0;
1264
1265	saddr_in.sin_addr.s_addr = htonl(INADDR_ANY);
1266
1267	if (!fio_server_arg)
1268		goto out;
1269
1270	ret = fio_server_parse_string(fio_server_arg, &bind_sock, &is_sock,
1271					&port, &saddr_in.sin_addr,
1272					&saddr_in6.sin6_addr, &use_ipv6);
1273
1274	if (!is_sock && bind_sock) {
1275		free(bind_sock);
1276		bind_sock = NULL;
1277	}
1278
1279out:
1280	fio_net_port = port;
1281	saddr_in.sin_port = htons(port);
1282	saddr_in6.sin6_port = htons(port);
1283	return ret;
1284}
1285
1286static int fio_server(void)
1287{
1288	int sk, ret;
1289
1290	dprint(FD_NET, "starting server\n");
1291
1292	if (fio_handle_server_arg())
1293		return -1;
1294
1295	sk = fio_init_server_connection();
1296	if (sk < 0)
1297		return -1;
1298
1299	ret = accept_loop(sk);
1300
1301	close(sk);
1302
1303	if (fio_server_arg) {
1304		free(fio_server_arg);
1305		fio_server_arg = NULL;
1306	}
1307	if (bind_sock)
1308		free(bind_sock);
1309
1310	return ret;
1311}
1312
1313void fio_server_got_signal(int signal)
1314{
1315	if (signal == SIGPIPE)
1316		server_fd = -1;
1317	else {
1318		log_info("\nfio: terminating on signal %d\n", signal);
1319		exit_backend = 1;
1320	}
1321}
1322
1323static int check_existing_pidfile(const char *pidfile)
1324{
1325	struct stat sb;
1326	char buf[16];
1327	pid_t pid;
1328	FILE *f;
1329
1330	if (stat(pidfile, &sb))
1331		return 0;
1332
1333	f = fopen(pidfile, "r");
1334	if (!f)
1335		return 0;
1336
1337	if (fread(buf, sb.st_size, 1, f) <= 0) {
1338		fclose(f);
1339		return 1;
1340	}
1341	fclose(f);
1342
1343	pid = atoi(buf);
1344	if (kill(pid, SIGCONT) < 0)
1345		return errno != ESRCH;
1346
1347	return 1;
1348}
1349
1350static int write_pid(pid_t pid, const char *pidfile)
1351{
1352	FILE *fpid;
1353
1354	fpid = fopen(pidfile, "w");
1355	if (!fpid) {
1356		log_err("fio: failed opening pid file %s\n", pidfile);
1357		return 1;
1358	}
1359
1360	fprintf(fpid, "%u\n", (unsigned int) pid);
1361	fclose(fpid);
1362	return 0;
1363}
1364
1365/*
1366 * If pidfile is specified, background us.
1367 */
1368int fio_start_server(char *pidfile)
1369{
1370	pid_t pid;
1371	int ret;
1372
1373#if defined(WIN32)
1374	WSADATA wsd;
1375	WSAStartup(MAKEWORD(2,2), &wsd);
1376#endif
1377
1378	if (!pidfile)
1379		return fio_server();
1380
1381	if (check_existing_pidfile(pidfile)) {
1382		log_err("fio: pidfile %s exists and server appears alive\n",
1383								pidfile);
1384		return -1;
1385	}
1386
1387	pid = fork();
1388	if (pid < 0) {
1389		log_err("fio: failed server fork: %s", strerror(errno));
1390		free(pidfile);
1391		return -1;
1392	} else if (pid) {
1393		int ret = write_pid(pid, pidfile);
1394
1395		exit(ret);
1396	}
1397
1398	setsid();
1399	openlog("fio", LOG_NDELAY|LOG_NOWAIT|LOG_PID, LOG_USER);
1400	log_syslog = 1;
1401	close(STDIN_FILENO);
1402	close(STDOUT_FILENO);
1403	close(STDERR_FILENO);
1404	f_out = NULL;
1405	f_err = NULL;
1406
1407	ret = fio_server();
1408
1409	closelog();
1410	unlink(pidfile);
1411	free(pidfile);
1412	return ret;
1413}
1414
1415void fio_server_set_arg(const char *arg)
1416{
1417	fio_server_arg = strdup(arg);
1418}
1419