1/*
2 * block queue tracing application
3 *
4 * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5 * Copyright (C) 2006 Jens Axboe <axboe@kernel.dk>
6 *
7 * Rewrite to have a single thread per CPU (managing all devices on that CPU)
8 *	Alan D. Brunelle <alan.brunelle@hp.com> - January 2009
9 *
10 *  This program is free software; you can redistribute it and/or modify
11 *  it under the terms of the GNU General Public License as published by
12 *  the Free Software Foundation; either version 2 of the License, or
13 *  (at your option) any later version.
14 *
15 *  This program is distributed in the hope that it will be useful,
16 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18 *  GNU General Public License for more details.
19 *
20 *  You should have received a copy of the GNU General Public License
21 *  along with this program; if not, write to the Free Software
22 *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23 *
24 */
25
26#include <errno.h>
27#include <stdarg.h>
28#include <stdio.h>
29#include <stdlib.h>
30#include <string.h>
31#include <fcntl.h>
32#include <getopt.h>
33#include <sched.h>
34#include <unistd.h>
35#include <poll.h>
36#include <signal.h>
37#include <pthread.h>
38#include <locale.h>
39#include <sys/ioctl.h>
40#include <sys/types.h>
41#include <sys/stat.h>
42#include <sys/vfs.h>
43#include <sys/mman.h>
44#include <sys/param.h>
45#include <sys/time.h>
46#include <sys/resource.h>
47#include <sys/socket.h>
48#include <netinet/in.h>
49#include <arpa/inet.h>
50#include <netdb.h>
51#include <sys/sendfile.h>
52
53#include "btt/list.h"
54#include "blktrace.h"
55
56/*
57 * You may want to increase this even more, if you are logging at a high
58 * rate and see skipped/missed events
59 */
60#define BUF_SIZE		(512 * 1024)
61#define BUF_NR			(4)
62
63#define FILE_VBUF_SIZE		(128 * 1024)
64
65#define DEBUGFS_TYPE		(0x64626720)
66#define TRACE_NET_PORT		(8462)
67
68enum {
69	Net_none = 0,
70	Net_server,
71	Net_client,
72};
73
74enum thread_status {
75	Th_running,
76	Th_leaving,
77	Th_error
78};
79
80/*
81 * Generic stats collected: nevents can be _roughly_ estimated by data_read
82 * (discounting pdu...)
83 *
84 * These fields are updated w/ pdc_dr_update & pdc_nev_update below.
85 */
86struct pdc_stats {
87	unsigned long long data_read;
88	unsigned long long nevents;
89};
90
91struct devpath {
92	struct list_head head;
93	char *path;			/* path to device special file */
94	char *buts_name;		/* name returned from bt kernel code */
95	struct pdc_stats *stats;
96	int fd, idx, ncpus;
97	unsigned long long drops;
98
99	/*
100	 * For piped output only:
101	 *
102	 * Each tracer will have a tracer_devpath_head that it will add new
103	 * data onto. It's list is protected above (tracer_devpath_head.mutex)
104	 * and it will signal the processing thread using the dp_cond,
105	 * dp_mutex & dp_entries variables above.
106	 */
107	struct tracer_devpath_head *heads;
108
109	/*
110	 * For network server mode only:
111	 */
112	struct cl_host *ch;
113	u32 cl_id;
114	time_t cl_connect_time;
115	struct io_info *ios;
116};
117
118/*
119 * For piped output to stdout we will have each tracer thread (one per dev)
120 * tack buffers read from the relay queues on a per-device list.
121 *
122 * The main thread will then collect trace buffers from each of lists in turn.
123 *
124 * We will use a mutex to guard each of the trace_buf list. The tracers
125 * can then signal the main thread using <dp_cond,dp_mutex> and
126 * dp_entries. (When dp_entries is 0, and a tracer adds an entry it will
127 * signal. When dp_entries is 0, the main thread will wait for that condition
128 * to be signalled.)
129 *
130 * adb: It may be better just to have a large buffer per tracer per dev,
131 * and then use it as a ring-buffer. This would certainly cut down a lot
132 * of malloc/free thrashing, at the cost of more memory movements (potentially).
133 */
134struct trace_buf {
135	struct list_head head;
136	struct devpath *dpp;
137	void *buf;
138	int cpu, len;
139};
140
141struct tracer_devpath_head {
142	pthread_mutex_t mutex;
143	struct list_head head;
144	struct trace_buf *prev;
145};
146
147/*
148 * Used to handle the mmap() interfaces for output file (containing traces)
149 */
150struct mmap_info {
151	void *fs_buf;
152	unsigned long long fs_size, fs_max_size, fs_off, fs_buf_len;
153	unsigned long buf_size, buf_nr;
154	int pagesize;
155};
156
157/*
158 * Each thread doing work on a (client) side of blktrace will have one
159 * of these. The ios array contains input/output information, pfds holds
160 * poll() data. The volatile's provide flags to/from the main executing
161 * thread.
162 */
163struct tracer {
164	struct list_head head;
165	struct io_info *ios;
166	struct pollfd *pfds;
167	pthread_t thread;
168	int cpu, nios;
169	volatile int status, is_done;
170};
171
172/*
173 * networking stuff follows. we include a magic number so we know whether
174 * to endianness convert or not.
175 *
176 * The len field is overloaded:
177 *	0 - Indicates an "open" - allowing the server to set up for a dev/cpu
178 *	1 - Indicates a "close" - Shut down connection orderly
179 *
180 * The cpu field is overloaded on close: it will contain the number of drops.
181 */
182struct blktrace_net_hdr {
183	u32 magic;		/* same as trace magic */
184	char buts_name[32];	/* trace name */
185	u32 cpu;		/* for which cpu */
186	u32 max_cpus;
187	u32 len;		/* length of following trace data */
188	u32 cl_id;		/* id for set of client per-cpu connections */
189	u32 buf_size;		/* client buf_size for this trace  */
190	u32 buf_nr;		/* client buf_nr for this trace  */
191	u32 page_size;		/* client page_size for this trace  */
192};
193
194/*
195 * Each host encountered has one of these. The head is used to link this
196 * on to the network server's ch_list. Connections associated with this
197 * host are linked on conn_list, and any devices traced on that host
198 * are connected on the devpaths list.
199 */
200struct cl_host {
201	struct list_head head;
202	struct list_head conn_list;
203	struct list_head devpaths;
204	struct net_server_s *ns;
205	char *hostname;
206	struct in_addr cl_in_addr;
207	int connects, ndevs, cl_opens;
208};
209
210/*
211 * Each connection (client to server socket ('fd')) has one of these. A
212 * back reference to the host ('ch'), and lists headers (for the host
213 * list, and the network server conn_list) are also included.
214 */
215struct cl_conn {
216	struct list_head ch_head, ns_head;
217	struct cl_host *ch;
218	int fd, ncpus;
219	time_t connect_time;
220};
221
222/*
223 * The network server requires some poll structures to be maintained -
224 * one per conection currently on conn_list. The nchs/ch_list values
225 * are for each host connected to this server. The addr field is used
226 * for scratch as new connections are established.
227 */
228struct net_server_s {
229	struct list_head conn_list;
230	struct list_head ch_list;
231	struct pollfd *pfds;
232	int listen_fd, connects, nchs;
233	struct sockaddr_in addr;
234};
235
236/*
237 * This structure is (generically) used to providide information
238 * for a read-to-write set of values.
239 *
240 * ifn & ifd represent input information
241 *
242 * ofn, ofd, ofp, obuf & mmap_info are used for output file (optionally).
243 */
244struct io_info {
245	struct devpath *dpp;
246	FILE *ofp;
247	char *obuf;
248	struct cl_conn *nc;	/* Server network connection */
249
250	/*
251	 * mmap controlled output files
252	 */
253	struct mmap_info mmap_info;
254
255	/*
256	 * Client network fields
257	 */
258	unsigned int ready;
259	unsigned long long data_queued;
260
261	/*
262	 * Input/output file descriptors & names
263	 */
264	int ifd, ofd;
265	char ifn[MAXPATHLEN + 64];
266	char ofn[MAXPATHLEN + 64];
267};
268
269static char blktrace_version[] = "2.0.0";
270
271/*
272 * Linkage to blktrace helper routines (trace conversions)
273 */
274int data_is_native = -1;
275
276static int ndevs;
277static int ncpus;
278static int pagesize;
279static int act_mask = ~0U;
280static int kill_running_trace;
281static int stop_watch;
282static int piped_output;
283
284static char *debugfs_path = "/sys/kernel/debug";
285static char *output_name;
286static char *output_dir;
287
288static unsigned long buf_size = BUF_SIZE;
289static unsigned long buf_nr = BUF_NR;
290
291static FILE *pfp;
292
293static LIST_HEAD(devpaths);
294static LIST_HEAD(tracers);
295
296static volatile int done;
297
298/*
299 * tracer threads add entries, the main thread takes them off and processes
300 * them. These protect the dp_entries variable.
301 */
302static pthread_cond_t dp_cond = PTHREAD_COND_INITIALIZER;
303static pthread_mutex_t dp_mutex = PTHREAD_MUTEX_INITIALIZER;
304static volatile int dp_entries;
305
306/*
307 * These synchronize master / thread interactions.
308 */
309static pthread_cond_t mt_cond = PTHREAD_COND_INITIALIZER;
310static pthread_mutex_t mt_mutex = PTHREAD_MUTEX_INITIALIZER;
311static volatile int nthreads_running;
312static volatile int nthreads_leaving;
313static volatile int nthreads_error;
314static volatile int tracers_run;
315
316/*
317 * network cmd line params
318 */
319static struct sockaddr_in hostname_addr;
320static char hostname[MAXHOSTNAMELEN];
321static int net_port = TRACE_NET_PORT;
322static int net_use_sendfile = 1;
323static int net_mode;
324static int *cl_fds;
325
326static int (*handle_pfds)(struct tracer *, int, int);
327static int (*handle_list)(struct tracer_devpath_head *, struct list_head *);
328
329#define S_OPTS	"d:a:A:r:o:kw:vVb:n:D:lh:p:sI:"
330static struct option l_opts[] = {
331	{
332		.name = "dev",
333		.has_arg = required_argument,
334		.flag = NULL,
335		.val = 'd'
336	},
337	{
338		.name = "input-devs",
339		.has_arg = required_argument,
340		.flag = NULL,
341		.val = 'I'
342	},
343	{
344		.name = "act-mask",
345		.has_arg = required_argument,
346		.flag = NULL,
347		.val = 'a'
348	},
349	{
350		.name = "set-mask",
351		.has_arg = required_argument,
352		.flag = NULL,
353		.val = 'A'
354	},
355	{
356		.name = "relay",
357		.has_arg = required_argument,
358		.flag = NULL,
359		.val = 'r'
360	},
361	{
362		.name = "output",
363		.has_arg = required_argument,
364		.flag = NULL,
365		.val = 'o'
366	},
367	{
368		.name = "kill",
369		.has_arg = no_argument,
370		.flag = NULL,
371		.val = 'k'
372	},
373	{
374		.name = "stopwatch",
375		.has_arg = required_argument,
376		.flag = NULL,
377		.val = 'w'
378	},
379	{
380		.name = "version",
381		.has_arg = no_argument,
382		.flag = NULL,
383		.val = 'v'
384	},
385	{
386		.name = "version",
387		.has_arg = no_argument,
388		.flag = NULL,
389		.val = 'V'
390	},
391	{
392		.name = "buffer-size",
393		.has_arg = required_argument,
394		.flag = NULL,
395		.val = 'b'
396	},
397	{
398		.name = "num-sub-buffers",
399		.has_arg = required_argument,
400		.flag = NULL,
401		.val = 'n'
402	},
403	{
404		.name = "output-dir",
405		.has_arg = required_argument,
406		.flag = NULL,
407		.val = 'D'
408	},
409	{
410		.name = "listen",
411		.has_arg = no_argument,
412		.flag = NULL,
413		.val = 'l'
414	},
415	{
416		.name = "host",
417		.has_arg = required_argument,
418		.flag = NULL,
419		.val = 'h'
420	},
421	{
422		.name = "port",
423		.has_arg = required_argument,
424		.flag = NULL,
425		.val = 'p'
426	},
427	{
428		.name = "no-sendfile",
429		.has_arg = no_argument,
430		.flag = NULL,
431		.val = 's'
432	},
433	{
434		.name = NULL,
435	}
436};
437
438static char usage_str[] = \
439	"-d <dev> [ -r debugfs path ] [ -o <output> ] [-k ] [ -w time ]\n" \
440	"[ -a action ] [ -A action mask ] [ -I  <devs file> ] [ -v ]\n\n" \
441	"\t-d Use specified device. May also be given last after options\n" \
442	"\t-r Path to mounted debugfs, defaults to /sys/kernel/debug\n" \
443	"\t-o File(s) to send output to\n" \
444	"\t-D Directory to prepend to output file names\n" \
445	"\t-k Kill a running trace\n" \
446	"\t-w Stop after defined time, in seconds\n" \
447	"\t-a Only trace specified actions. See documentation\n" \
448	"\t-A Give trace mask as a single value. See documentation\n" \
449	"\t-b Sub buffer size in KiB\n" \
450	"\t-n Number of sub buffers\n" \
451	"\t-l Run in network listen mode (blktrace server)\n" \
452	"\t-h Run in network client mode, connecting to the given host\n" \
453	"\t-p Network port to use (default 8462)\n" \
454	"\t-s Make the network client NOT use sendfile() to transfer data\n" \
455	"\t-I Add devices found in <devs file>\n" \
456	"\t-V Print program version info\n\n";
457
458static void clear_events(struct pollfd *pfd)
459{
460	pfd->events = 0;
461	pfd->revents = 0;
462}
463
464static inline int net_client_use_sendfile(void)
465{
466	return net_mode == Net_client && net_use_sendfile;
467}
468
469static inline int net_client_use_send(void)
470{
471	return net_mode == Net_client && !net_use_sendfile;
472}
473
474static inline int use_tracer_devpaths(void)
475{
476	return piped_output || net_client_use_send();
477}
478
479static inline int in_addr_eq(struct in_addr a, struct in_addr b)
480{
481	return a.s_addr == b.s_addr;
482}
483
484static inline void pdc_dr_update(struct devpath *dpp, int cpu, int data_read)
485{
486	dpp->stats[cpu].data_read += data_read;
487}
488
489static inline void pdc_nev_update(struct devpath *dpp, int cpu, int nevents)
490{
491	dpp->stats[cpu].nevents += nevents;
492}
493
494static void show_usage(char *prog)
495{
496	fprintf(stderr, "Usage: %s %s %s", prog, blktrace_version, usage_str);
497}
498
499/*
500 * Create a timespec 'msec' milliseconds into the future
501 */
502static inline void make_timespec(struct timespec *tsp, long delta_msec)
503{
504	struct timeval now;
505
506	gettimeofday(&now, NULL);
507	tsp->tv_sec = now.tv_sec;
508	tsp->tv_nsec = 1000L * now.tv_usec;
509
510	tsp->tv_nsec += (delta_msec * 1000000L);
511	if (tsp->tv_nsec > 1000000000L) {
512		long secs = tsp->tv_nsec / 1000000000L;
513
514		tsp->tv_sec += secs;
515		tsp->tv_nsec -= (secs * 1000000000L);
516	}
517}
518
519/*
520 * Add a timer to ensure wait ends
521 */
522static void t_pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
523{
524	struct timespec ts;
525
526	make_timespec(&ts, 50);
527	pthread_cond_timedwait(cond, mutex, &ts);
528}
529
530static void unblock_tracers(void)
531{
532	pthread_mutex_lock(&mt_mutex);
533	tracers_run = 1;
534	pthread_cond_broadcast(&mt_cond);
535	pthread_mutex_unlock(&mt_mutex);
536}
537
538static void tracer_wait_unblock(struct tracer *tp)
539{
540	pthread_mutex_lock(&mt_mutex);
541	while (!tp->is_done && !tracers_run)
542		pthread_cond_wait(&mt_cond, &mt_mutex);
543	pthread_mutex_unlock(&mt_mutex);
544}
545
546static void tracer_signal_ready(struct tracer *tp,
547				enum thread_status th_status,
548				int status)
549{
550	pthread_mutex_lock(&mt_mutex);
551	tp->status = status;
552
553	if (th_status == Th_running)
554		nthreads_running++;
555	else if (th_status == Th_error)
556		nthreads_error++;
557	else
558		nthreads_leaving++;
559
560	pthread_cond_signal(&mt_cond);
561	pthread_mutex_unlock(&mt_mutex);
562}
563
564static void wait_tracers_ready(int ncpus_started)
565{
566	pthread_mutex_lock(&mt_mutex);
567	while ((nthreads_running + nthreads_error) < ncpus_started)
568		t_pthread_cond_wait(&mt_cond, &mt_mutex);
569	pthread_mutex_unlock(&mt_mutex);
570}
571
572static void wait_tracers_leaving(void)
573{
574	pthread_mutex_lock(&mt_mutex);
575	while (nthreads_leaving < nthreads_running)
576		t_pthread_cond_wait(&mt_cond, &mt_mutex);
577	pthread_mutex_unlock(&mt_mutex);
578}
579
580static void init_mmap_info(struct mmap_info *mip)
581{
582	mip->buf_size = buf_size;
583	mip->buf_nr = buf_nr;
584	mip->pagesize = pagesize;
585}
586
587static void net_close_connection(int *fd)
588{
589	shutdown(*fd, SHUT_RDWR);
590	close(*fd);
591	*fd = -1;
592}
593
594static void dpp_free(struct devpath *dpp)
595{
596	if (dpp->stats)
597		free(dpp->stats);
598	if (dpp->ios)
599		free(dpp->ios);
600	if (dpp->path)
601		free(dpp->path);
602	if (dpp->buts_name)
603		free(dpp->buts_name);
604	free(dpp);
605}
606
607static int lock_on_cpu(int cpu)
608{
609#ifndef _ANDROID_
610	cpu_set_t cpu_mask;
611
612	CPU_ZERO(&cpu_mask);
613	CPU_SET(cpu, &cpu_mask);
614	if (sched_setaffinity(0, sizeof(cpu_mask), &cpu_mask) < 0)
615		return errno;
616#endif
617
618	return 0;
619}
620
621#ifndef _ANDROID_
622static int increase_limit(int resource, rlim_t increase)
623{
624	struct rlimit rlim;
625	int save_errno = errno;
626
627	if (!getrlimit(resource, &rlim)) {
628		rlim.rlim_cur += increase;
629		if (rlim.rlim_cur >= rlim.rlim_max)
630			rlim.rlim_max = rlim.rlim_cur + increase;
631
632		if (!setrlimit(resource, &rlim))
633			return 1;
634	}
635
636	errno = save_errno;
637	return 0;
638}
639#endif
640
641static int handle_open_failure(void)
642{
643	if (errno == ENFILE || errno == EMFILE)
644#ifndef _ANDROID_
645		return increase_limit(RLIMIT_NOFILE, 16);
646#else
647		return -ENOSYS;
648#endif
649	return 0;
650}
651
652static int handle_mem_failure(size_t length)
653{
654	if (errno == ENFILE)
655		return handle_open_failure();
656	else if (errno == ENOMEM)
657#ifndef _ANDROID_
658		return increase_limit(RLIMIT_MEMLOCK, 2 * length);
659#else
660		return -ENOSYS;
661#endif
662	return 0;
663}
664
665static FILE *my_fopen(const char *path, const char *mode)
666{
667	FILE *fp;
668
669	do {
670		fp = fopen(path, mode);
671	} while (fp == NULL && handle_open_failure());
672
673	return fp;
674}
675
676static int my_open(const char *path, int flags)
677{
678	int fd;
679
680	do {
681		fd = open(path, flags);
682	} while (fd < 0 && handle_open_failure());
683
684	return fd;
685}
686
687static int my_socket(int domain, int type, int protocol)
688{
689	int fd;
690
691	do {
692		fd = socket(domain, type, protocol);
693	} while (fd < 0 && handle_open_failure());
694
695	return fd;
696}
697
698static int my_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
699{
700	int fd;
701
702	do {
703		fd = accept(sockfd, addr, addrlen);
704	} while (fd < 0 && handle_open_failure());
705
706	return fd;
707}
708
709static void *my_mmap(void *addr, size_t length, int prot, int flags, int fd,
710		     off_t offset)
711{
712	void *new;
713
714	do {
715		new = mmap(addr, length, prot, flags, fd, offset);
716	} while (new == MAP_FAILED && handle_mem_failure(length));
717
718	return new;
719}
720
721static int my_mlock(const void *addr, size_t len)
722{
723	int ret;
724
725	do {
726		ret = mlock(addr, len);
727	} while (ret < 0 && handle_mem_failure(len));
728
729	return ret;
730}
731
732static int setup_mmap(int fd, unsigned int maxlen, struct mmap_info *mip)
733{
734	if (mip->fs_off + maxlen > mip->fs_buf_len) {
735		unsigned long nr = max(16, mip->buf_nr);
736
737		if (mip->fs_buf) {
738			munlock(mip->fs_buf, mip->fs_buf_len);
739			munmap(mip->fs_buf, mip->fs_buf_len);
740			mip->fs_buf = NULL;
741		}
742
743		mip->fs_off = mip->fs_size & (mip->pagesize - 1);
744		mip->fs_buf_len = (nr * mip->buf_size) - mip->fs_off;
745		mip->fs_max_size += mip->fs_buf_len;
746
747		if (ftruncate(fd, mip->fs_max_size) < 0) {
748			perror("setup_mmap: ftruncate");
749			return 1;
750		}
751
752		mip->fs_buf = my_mmap(NULL, mip->fs_buf_len, PROT_WRITE,
753				      MAP_SHARED, fd,
754				      mip->fs_size - mip->fs_off);
755		if (mip->fs_buf == MAP_FAILED) {
756			perror("setup_mmap: mmap");
757			return 1;
758		}
759		my_mlock(mip->fs_buf, mip->fs_buf_len);
760	}
761
762	return 0;
763}
764
765static int __stop_trace(int fd)
766{
767	/*
768	 * Should be stopped, don't complain if it isn't
769	 */
770	ioctl(fd, BLKTRACESTOP);
771	return ioctl(fd, BLKTRACETEARDOWN);
772}
773
774static int write_data(char *buf, int len)
775{
776	int ret;
777
778rewrite:
779	ret = fwrite(buf, len, 1, pfp);
780	if (ferror(pfp) || ret != 1) {
781		if (errno == EINTR) {
782			clearerr(pfp);
783			goto rewrite;
784		}
785
786		if (!piped_output || (errno != EPIPE && errno != EBADF)) {
787			fprintf(stderr, "write(%d) failed: %d/%s\n",
788				len, errno, strerror(errno));
789		}
790		goto err;
791	}
792
793	fflush(pfp);
794	return 0;
795
796err:
797	clearerr(pfp);
798	return 1;
799}
800
801/*
802 * Returns the number of bytes read (successfully)
803 */
804static int __net_recv_data(int fd, void *buf, unsigned int len)
805{
806	unsigned int bytes_left = len;
807
808	while (bytes_left && !done) {
809		int ret = recv(fd, buf, bytes_left, MSG_WAITALL);
810
811		if (ret == 0)
812			break;
813		else if (ret < 0) {
814			if (errno == EAGAIN) {
815				usleep(50);
816				continue;
817			}
818			perror("server: net_recv_data: recv failed");
819			break;
820		} else {
821			buf += ret;
822			bytes_left -= ret;
823		}
824	}
825
826	return len - bytes_left;
827}
828
829static int net_recv_data(int fd, void *buf, unsigned int len)
830{
831	return __net_recv_data(fd, buf, len);
832}
833
834/*
835 * Returns number of bytes written
836 */
837static int net_send_data(int fd, void *buf, unsigned int buf_len)
838{
839	int ret;
840	unsigned int bytes_left = buf_len;
841
842	while (bytes_left) {
843		ret = send(fd, buf, bytes_left, 0);
844		if (ret < 0) {
845			perror("send");
846			break;
847		}
848
849		buf += ret;
850		bytes_left -= ret;
851	}
852
853	return buf_len - bytes_left;
854}
855
856static int net_send_header(int fd, int cpu, char *buts_name, int len)
857{
858	struct blktrace_net_hdr hdr;
859
860	memset(&hdr, 0, sizeof(hdr));
861
862	hdr.magic = BLK_IO_TRACE_MAGIC;
863	strncpy(hdr.buts_name, buts_name, sizeof(hdr.buts_name));
864	hdr.buts_name[sizeof(hdr.buts_name)-1] = '\0';
865	hdr.cpu = cpu;
866	hdr.max_cpus = ncpus;
867	hdr.len = len;
868	hdr.cl_id = getpid();
869	hdr.buf_size = buf_size;
870	hdr.buf_nr = buf_nr;
871	hdr.page_size = pagesize;
872
873	return net_send_data(fd, &hdr, sizeof(hdr)) != sizeof(hdr);
874}
875
876static void net_send_open_close(int fd, int cpu, char *buts_name, int len)
877{
878	struct blktrace_net_hdr ret_hdr;
879
880	net_send_header(fd, cpu, buts_name, len);
881	net_recv_data(fd, &ret_hdr, sizeof(ret_hdr));
882}
883
884static void net_send_open(int fd, int cpu, char *buts_name)
885{
886	net_send_open_close(fd, cpu, buts_name, 0);
887}
888
889static void net_send_close(int fd, char *buts_name, int drops)
890{
891	/*
892	 * Overload CPU w/ number of drops
893	 *
894	 * XXX: Need to clear/set done around call - done=1 (which
895	 * is true here) stops reads from happening... :-(
896	 */
897	done = 0;
898	net_send_open_close(fd, drops, buts_name, 1);
899	done = 1;
900}
901
902static void ack_open_close(int fd, char *buts_name)
903{
904	net_send_header(fd, 0, buts_name, 2);
905}
906
907static void net_send_drops(int fd)
908{
909	struct list_head *p;
910
911	__list_for_each(p, &devpaths) {
912		struct devpath *dpp = list_entry(p, struct devpath, head);
913
914		net_send_close(fd, dpp->buts_name, dpp->drops);
915	}
916}
917
918/*
919 * Returns:
920 *	 0: "EOF"
921 *	 1: OK
922 *	-1: Error
923 */
924static int net_get_header(struct cl_conn *nc, struct blktrace_net_hdr *bnh)
925{
926	int bytes_read;
927	int fl = fcntl(nc->fd, F_GETFL);
928
929	fcntl(nc->fd, F_SETFL, fl | O_NONBLOCK);
930	bytes_read = __net_recv_data(nc->fd, bnh, sizeof(*bnh));
931	fcntl(nc->fd, F_SETFL, fl & ~O_NONBLOCK);
932
933	if (bytes_read == sizeof(*bnh))
934		return 1;
935	else if (bytes_read == 0)
936		return 0;
937	else
938		return -1;
939}
940
941static int net_setup_addr(void)
942{
943	struct sockaddr_in *addr = &hostname_addr;
944
945	memset(addr, 0, sizeof(*addr));
946	addr->sin_family = AF_INET;
947	addr->sin_port = htons(net_port);
948
949	if (inet_aton(hostname, &addr->sin_addr) != 1) {
950		struct hostent *hent;
951retry:
952		hent = gethostbyname(hostname);
953		if (!hent) {
954			if (h_errno == TRY_AGAIN) {
955				usleep(100);
956				goto retry;
957			} else if (h_errno == NO_RECOVERY) {
958				fprintf(stderr, "gethostbyname(%s)"
959					"non-recoverable error encountered\n",
960					hostname);
961			} else {
962				/*
963				 * HOST_NOT_FOUND, NO_ADDRESS or NO_DATA
964				 */
965				fprintf(stderr, "Host %s not found\n",
966					hostname);
967			}
968			return 1;
969		}
970
971		memcpy(&addr->sin_addr, hent->h_addr, 4);
972		strcpy(hostname, hent->h_name);
973	}
974
975	return 0;
976}
977
978static int net_setup_client(void)
979{
980	int fd;
981	struct sockaddr_in *addr = &hostname_addr;
982
983	fd = my_socket(AF_INET, SOCK_STREAM, 0);
984	if (fd < 0) {
985		perror("client: socket");
986		return -1;
987	}
988
989	if (connect(fd, (struct sockaddr *)addr, sizeof(*addr)) < 0) {
990		if (errno == ECONNREFUSED)
991			fprintf(stderr,
992				"\nclient: Connection to %s refused, "
993				"perhaps the server is not started?\n\n",
994				hostname);
995		else
996			perror("client: connect");
997
998		close(fd);
999		return -1;
1000	}
1001
1002	return fd;
1003}
1004
1005static int open_client_connections(void)
1006{
1007	int cpu;
1008
1009	cl_fds = calloc(ncpus, sizeof(*cl_fds));
1010	for (cpu = 0; cpu < ncpus; cpu++) {
1011		cl_fds[cpu] = net_setup_client();
1012		if (cl_fds[cpu] < 0)
1013			goto err;
1014	}
1015	return 0;
1016
1017err:
1018	while (cpu > 0)
1019		close(cl_fds[cpu--]);
1020	free(cl_fds);
1021	return 1;
1022}
1023
1024static void close_client_connections(void)
1025{
1026	if (cl_fds) {
1027		int cpu, *fdp;
1028
1029		for (cpu = 0, fdp = cl_fds; cpu < ncpus; cpu++, fdp++) {
1030			if (*fdp >= 0) {
1031				net_send_drops(*fdp);
1032				net_close_connection(fdp);
1033			}
1034		}
1035		free(cl_fds);
1036	}
1037}
1038
1039static void setup_buts(void)
1040{
1041	struct list_head *p;
1042
1043	__list_for_each(p, &devpaths) {
1044		struct blk_user_trace_setup buts;
1045		struct devpath *dpp = list_entry(p, struct devpath, head);
1046
1047		memset(&buts, 0, sizeof(buts));
1048		buts.buf_size = buf_size;
1049		buts.buf_nr = buf_nr;
1050		buts.act_mask = act_mask;
1051		if (ioctl(dpp->fd, BLKTRACESETUP, &buts) >= 0) {
1052			dpp->ncpus = ncpus;
1053			dpp->buts_name = strdup(buts.name);
1054			if (dpp->stats)
1055				free(dpp->stats);
1056			dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
1057			memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
1058		} else
1059			fprintf(stderr, "BLKTRACESETUP(2) %s failed: %d/%s\n",
1060				dpp->path, errno, strerror(errno));
1061	}
1062}
1063
1064static void start_buts(void)
1065{
1066	struct list_head *p;
1067
1068	__list_for_each(p, &devpaths) {
1069		struct devpath *dpp = list_entry(p, struct devpath, head);
1070
1071		if (ioctl(dpp->fd, BLKTRACESTART) < 0) {
1072			fprintf(stderr, "BLKTRACESTART %s failed: %d/%s\n",
1073				dpp->path, errno, strerror(errno));
1074		}
1075	}
1076}
1077
1078static int get_drops(struct devpath *dpp)
1079{
1080	int fd, drops = 0;
1081	char fn[MAXPATHLEN + 64], tmp[256];
1082
1083	snprintf(fn, sizeof(fn), "%s/block/%s/dropped", debugfs_path,
1084		 dpp->buts_name);
1085
1086	fd = my_open(fn, O_RDONLY);
1087	if (fd < 0) {
1088		/*
1089		 * This may be ok: the kernel may not support
1090		 * dropped counts.
1091		 */
1092		if (errno != ENOENT)
1093			fprintf(stderr, "Could not open %s: %d/%s\n",
1094				fn, errno, strerror(errno));
1095		return 0;
1096	} else if (read(fd, tmp, sizeof(tmp)) < 0) {
1097		fprintf(stderr, "Could not read %s: %d/%s\n",
1098			fn, errno, strerror(errno));
1099	} else
1100		drops = atoi(tmp);
1101	close(fd);
1102
1103	return drops;
1104}
1105
1106static void get_all_drops(void)
1107{
1108	struct list_head *p;
1109
1110	__list_for_each(p, &devpaths) {
1111		struct devpath *dpp = list_entry(p, struct devpath, head);
1112
1113		dpp->drops = get_drops(dpp);
1114	}
1115}
1116
1117static inline struct trace_buf *alloc_trace_buf(int cpu, int bufsize)
1118{
1119	struct trace_buf *tbp;
1120
1121	tbp = malloc(sizeof(*tbp) + bufsize);
1122	INIT_LIST_HEAD(&tbp->head);
1123	tbp->len = 0;
1124	tbp->buf = (void *)(tbp + 1);
1125	tbp->cpu = cpu;
1126	tbp->dpp = NULL;	/* Will be set when tbp is added */
1127
1128	return tbp;
1129}
1130
1131static void free_tracer_heads(struct devpath *dpp)
1132{
1133	int cpu;
1134	struct tracer_devpath_head *hd;
1135
1136	for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1137		if (hd->prev)
1138			free(hd->prev);
1139
1140		pthread_mutex_destroy(&hd->mutex);
1141	}
1142	free(dpp->heads);
1143}
1144
1145static int setup_tracer_devpaths(void)
1146{
1147	struct list_head *p;
1148
1149	if (net_client_use_send())
1150		if (open_client_connections())
1151			return 1;
1152
1153	__list_for_each(p, &devpaths) {
1154		int cpu;
1155		struct tracer_devpath_head *hd;
1156		struct devpath *dpp = list_entry(p, struct devpath, head);
1157
1158		dpp->heads = calloc(ncpus, sizeof(struct tracer_devpath_head));
1159		for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1160			INIT_LIST_HEAD(&hd->head);
1161			pthread_mutex_init(&hd->mutex, NULL);
1162			hd->prev = NULL;
1163		}
1164	}
1165
1166	return 0;
1167}
1168
1169static inline void add_trace_buf(struct devpath *dpp, int cpu,
1170						struct trace_buf **tbpp)
1171{
1172	struct trace_buf *tbp = *tbpp;
1173	struct tracer_devpath_head *hd = &dpp->heads[cpu];
1174
1175	tbp->dpp = dpp;
1176
1177	pthread_mutex_lock(&hd->mutex);
1178	list_add_tail(&tbp->head, &hd->head);
1179	pthread_mutex_unlock(&hd->mutex);
1180
1181	*tbpp = alloc_trace_buf(cpu, buf_size);
1182}
1183
1184static inline void incr_entries(int entries_handled)
1185{
1186	pthread_mutex_lock(&dp_mutex);
1187	if (dp_entries == 0)
1188		pthread_cond_signal(&dp_cond);
1189	dp_entries += entries_handled;
1190	pthread_mutex_unlock(&dp_mutex);
1191}
1192
1193static void decr_entries(int handled)
1194{
1195	pthread_mutex_lock(&dp_mutex);
1196	dp_entries -= handled;
1197	pthread_mutex_unlock(&dp_mutex);
1198}
1199
1200static int wait_empty_entries(void)
1201{
1202	pthread_mutex_lock(&dp_mutex);
1203	while (!done && dp_entries == 0)
1204		t_pthread_cond_wait(&dp_cond, &dp_mutex);
1205	pthread_mutex_unlock(&dp_mutex);
1206
1207	return !done;
1208}
1209
1210static int add_devpath(char *path)
1211{
1212	int fd;
1213	struct devpath *dpp;
1214
1215	/*
1216	 * Verify device is valid before going too far
1217	 */
1218	fd = my_open(path, O_RDONLY | O_NONBLOCK);
1219	if (fd < 0) {
1220		fprintf(stderr, "Invalid path %s specified: %d/%s\n",
1221			path, errno, strerror(errno));
1222		return 1;
1223	}
1224
1225	dpp = malloc(sizeof(*dpp));
1226	memset(dpp, 0, sizeof(*dpp));
1227	dpp->path = strdup(path);
1228	dpp->fd = fd;
1229	dpp->idx = ndevs++;
1230	list_add_tail(&dpp->head, &devpaths);
1231
1232	return 0;
1233}
1234
1235static void rel_devpaths(void)
1236{
1237	struct list_head *p, *q;
1238
1239	list_for_each_safe(p, q, &devpaths) {
1240		struct devpath *dpp = list_entry(p, struct devpath, head);
1241
1242		list_del(&dpp->head);
1243		__stop_trace(dpp->fd);
1244		close(dpp->fd);
1245
1246		if (dpp->heads)
1247			free_tracer_heads(dpp);
1248
1249		dpp_free(dpp);
1250		ndevs--;
1251	}
1252}
1253
1254static int flush_subbuf_net(struct trace_buf *tbp)
1255{
1256	int fd = cl_fds[tbp->cpu];
1257	struct devpath *dpp = tbp->dpp;
1258
1259	if (net_send_header(fd, tbp->cpu, dpp->buts_name, tbp->len))
1260		return 1;
1261	else if (net_send_data(fd, tbp->buf, tbp->len) != tbp->len)
1262		return 1;
1263
1264	return 0;
1265}
1266
1267static int
1268handle_list_net(__attribute__((__unused__))struct tracer_devpath_head *hd,
1269		struct list_head *list)
1270{
1271	struct trace_buf *tbp;
1272	struct list_head *p, *q;
1273	int entries_handled = 0;
1274
1275	list_for_each_safe(p, q, list) {
1276		tbp = list_entry(p, struct trace_buf, head);
1277
1278		list_del(&tbp->head);
1279		entries_handled++;
1280
1281		if (cl_fds[tbp->cpu] >= 0) {
1282			if (flush_subbuf_net(tbp)) {
1283				close(cl_fds[tbp->cpu]);
1284				cl_fds[tbp->cpu] = -1;
1285			}
1286		}
1287
1288		free(tbp);
1289	}
1290
1291	return entries_handled;
1292}
1293
1294/*
1295 * Tack 'tbp's buf onto the tail of 'prev's buf
1296 */
1297static struct trace_buf *tb_combine(struct trace_buf *prev,
1298				    struct trace_buf *tbp)
1299{
1300	unsigned long tot_len;
1301
1302	tot_len = prev->len + tbp->len;
1303	if (tot_len > buf_size) {
1304		/*
1305		 * tbp->head isn't connected (it was 'prev'
1306		 * so it had been taken off of the list
1307		 * before). Therefore, we can realloc
1308		 * the whole structures, as the other fields
1309		 * are "static".
1310		 */
1311		prev = realloc(prev->buf, sizeof(*prev) + tot_len);
1312		prev->buf = (void *)(prev + 1);
1313	}
1314
1315	memcpy(prev->buf + prev->len, tbp->buf, tbp->len);
1316	prev->len = tot_len;
1317
1318	free(tbp);
1319	return prev;
1320}
1321
1322static int handle_list_file(struct tracer_devpath_head *hd,
1323			    struct list_head *list)
1324{
1325	int off, t_len, nevents;
1326	struct blk_io_trace *t;
1327	struct list_head *p, *q;
1328	int entries_handled = 0;
1329	struct trace_buf *tbp, *prev;
1330
1331	prev = hd->prev;
1332	list_for_each_safe(p, q, list) {
1333		tbp = list_entry(p, struct trace_buf, head);
1334		list_del(&tbp->head);
1335		entries_handled++;
1336
1337		/*
1338		 * If there was some leftover before, tack this new
1339		 * entry onto the tail of the previous one.
1340		 */
1341		if (prev)
1342			tbp = tb_combine(prev, tbp);
1343
1344		/*
1345		 * See how many whole traces there are - send them
1346		 * all out in one go.
1347		 */
1348		off = 0;
1349		nevents = 0;
1350		while (off + (int)sizeof(*t) <= tbp->len) {
1351			t = (struct blk_io_trace *)(tbp->buf + off);
1352			t_len = sizeof(*t) + t->pdu_len;
1353			if (off + t_len > tbp->len)
1354				break;
1355
1356			off += t_len;
1357			nevents++;
1358		}
1359		if (nevents)
1360			pdc_nev_update(tbp->dpp, tbp->cpu, nevents);
1361
1362		/*
1363		 * Write any full set of traces, any remaining data is kept
1364		 * for the next pass.
1365		 */
1366		if (off) {
1367			if (write_data(tbp->buf, off) || off == tbp->len) {
1368				free(tbp);
1369				prev = NULL;
1370			}
1371			else {
1372				/*
1373				 * Move valid data to beginning of buffer
1374				 */
1375				tbp->len -= off;
1376				memmove(tbp->buf, tbp->buf + off, tbp->len);
1377				prev = tbp;
1378			}
1379		} else
1380			prev = tbp;
1381	}
1382	hd->prev = prev;
1383
1384	return entries_handled;
1385}
1386
1387static void __process_trace_bufs(void)
1388{
1389	int cpu;
1390	struct list_head *p;
1391	struct list_head list;
1392	int handled = 0;
1393
1394	__list_for_each(p, &devpaths) {
1395		struct devpath *dpp = list_entry(p, struct devpath, head);
1396		struct tracer_devpath_head *hd = dpp->heads;
1397
1398		for (cpu = 0; cpu < ncpus; cpu++, hd++) {
1399			pthread_mutex_lock(&hd->mutex);
1400			if (list_empty(&hd->head)) {
1401				pthread_mutex_unlock(&hd->mutex);
1402				continue;
1403			}
1404
1405			list_replace_init(&hd->head, &list);
1406			pthread_mutex_unlock(&hd->mutex);
1407
1408			handled += handle_list(hd, &list);
1409		}
1410	}
1411
1412	if (handled)
1413		decr_entries(handled);
1414}
1415
1416static void process_trace_bufs(void)
1417{
1418	while (wait_empty_entries())
1419		__process_trace_bufs();
1420}
1421
1422static void clean_trace_bufs(void)
1423{
1424	/*
1425	 * No mutex needed here: we're only reading from the lists,
1426	 * tracers are done
1427	 */
1428	while (dp_entries)
1429		__process_trace_bufs();
1430}
1431
1432static inline void read_err(int cpu, char *ifn)
1433{
1434	if (errno != EAGAIN)
1435		fprintf(stderr, "Thread %d failed read of %s: %d/%s\n",
1436			cpu, ifn, errno, strerror(errno));
1437}
1438
1439static int net_sendfile(struct io_info *iop)
1440{
1441	int ret;
1442
1443	ret = sendfile(iop->ofd, iop->ifd, NULL, iop->ready);
1444	if (ret < 0) {
1445		perror("sendfile");
1446		return 1;
1447	} else if (ret < (int)iop->ready) {
1448		fprintf(stderr, "short sendfile send (%d of %d)\n",
1449			ret, iop->ready);
1450		return 1;
1451	}
1452
1453	return 0;
1454}
1455
1456static inline int net_sendfile_data(struct tracer *tp, struct io_info *iop)
1457{
1458	struct devpath *dpp = iop->dpp;
1459
1460	if (net_send_header(iop->ofd, tp->cpu, dpp->buts_name, iop->ready))
1461		return 1;
1462	return net_sendfile(iop);
1463}
1464
1465static int fill_ofname(struct io_info *iop, int cpu)
1466{
1467	int len;
1468	struct stat sb;
1469	char *dst = iop->ofn;
1470
1471	if (output_dir)
1472		len = snprintf(iop->ofn, sizeof(iop->ofn), "%s/", output_dir);
1473	else
1474		len = snprintf(iop->ofn, sizeof(iop->ofn), "./");
1475
1476	if (net_mode == Net_server) {
1477		struct cl_conn *nc = iop->nc;
1478
1479		len += sprintf(dst + len, "%s-", nc->ch->hostname);
1480		len += strftime(dst + len, 64, "%F-%T/",
1481				gmtime(&iop->dpp->cl_connect_time));
1482	}
1483
1484	if (stat(iop->ofn, &sb) < 0) {
1485		if (errno != ENOENT) {
1486			fprintf(stderr,
1487				"Destination dir %s stat failed: %d/%s\n",
1488				iop->ofn, errno, strerror(errno));
1489			return 1;
1490		}
1491		/*
1492		 * There is no synchronization between multiple threads
1493		 * trying to create the directory at once.  It's harmless
1494		 * to let them try, so just detect the problem and move on.
1495		 */
1496		if (mkdir(iop->ofn, 0755) < 0 && errno != EEXIST) {
1497			fprintf(stderr,
1498				"Destination dir %s can't be made: %d/%s\n",
1499				iop->ofn, errno, strerror(errno));
1500			return 1;
1501		}
1502	}
1503
1504	if (output_name)
1505		snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1506			 output_name, cpu);
1507	else
1508		snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1509			 iop->dpp->buts_name, cpu);
1510
1511	return 0;
1512}
1513
1514static int set_vbuf(struct io_info *iop, int mode, size_t size)
1515{
1516	iop->obuf = malloc(size);
1517	if (setvbuf(iop->ofp, iop->obuf, mode, size) < 0) {
1518		fprintf(stderr, "setvbuf(%s, %d) failed: %d/%s\n",
1519			iop->dpp->path, (int)size, errno,
1520			strerror(errno));
1521		free(iop->obuf);
1522		return 1;
1523	}
1524
1525	return 0;
1526}
1527
1528static int iop_open(struct io_info *iop, int cpu)
1529{
1530	iop->ofd = -1;
1531	if (fill_ofname(iop, cpu))
1532		return 1;
1533
1534	iop->ofp = my_fopen(iop->ofn, "w+");
1535	if (iop->ofp == NULL) {
1536		fprintf(stderr, "Open output file %s failed: %d/%s\n",
1537			iop->ofn, errno, strerror(errno));
1538		return 1;
1539	}
1540
1541	if (set_vbuf(iop, _IOLBF, FILE_VBUF_SIZE)) {
1542		fprintf(stderr, "set_vbuf for file %s failed: %d/%s\n",
1543			iop->ofn, errno, strerror(errno));
1544		fclose(iop->ofp);
1545		return 1;
1546	}
1547
1548	iop->ofd = fileno(iop->ofp);
1549	return 0;
1550}
1551
1552static void close_iop(struct io_info *iop)
1553{
1554	struct mmap_info *mip = &iop->mmap_info;
1555
1556	if (mip->fs_buf)
1557		munmap(mip->fs_buf, mip->fs_buf_len);
1558
1559	if (!piped_output) {
1560		if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) {
1561			fprintf(stderr,
1562				"Ignoring err: ftruncate(%s): %d/%s\n",
1563				iop->ofn, errno, strerror(errno));
1564		}
1565	}
1566
1567	if (iop->ofp)
1568		fclose(iop->ofp);
1569	if (iop->obuf)
1570		free(iop->obuf);
1571}
1572
1573static void close_ios(struct tracer *tp)
1574{
1575	while (tp->nios > 0) {
1576		struct io_info *iop = &tp->ios[--tp->nios];
1577
1578		iop->dpp->drops = get_drops(iop->dpp);
1579		if (iop->ifd >= 0)
1580			close(iop->ifd);
1581
1582		if (iop->ofp)
1583			close_iop(iop);
1584		else if (iop->ofd >= 0) {
1585			struct devpath *dpp = iop->dpp;
1586
1587			net_send_close(iop->ofd, dpp->buts_name, dpp->drops);
1588			net_close_connection(&iop->ofd);
1589		}
1590	}
1591
1592	free(tp->ios);
1593	free(tp->pfds);
1594}
1595
1596static int open_ios(struct tracer *tp)
1597{
1598	struct pollfd *pfd;
1599	struct io_info *iop;
1600	struct list_head *p;
1601
1602	tp->ios = calloc(ndevs, sizeof(struct io_info));
1603	memset(tp->ios, 0, ndevs * sizeof(struct io_info));
1604
1605	tp->pfds = calloc(ndevs, sizeof(struct pollfd));
1606	memset(tp->pfds, 0, ndevs * sizeof(struct pollfd));
1607
1608	tp->nios = 0;
1609	iop = tp->ios;
1610	pfd = tp->pfds;
1611	__list_for_each(p, &devpaths) {
1612		struct devpath *dpp = list_entry(p, struct devpath, head);
1613
1614		iop->dpp = dpp;
1615		iop->ofd = -1;
1616		snprintf(iop->ifn, sizeof(iop->ifn), "%s/block/%s/trace%d",
1617			debugfs_path, dpp->buts_name, tp->cpu);
1618
1619		iop->ifd = my_open(iop->ifn, O_RDONLY | O_NONBLOCK);
1620		if (iop->ifd < 0) {
1621			fprintf(stderr, "Thread %d failed open %s: %d/%s\n",
1622				tp->cpu, iop->ifn, errno, strerror(errno));
1623			return 1;
1624		}
1625
1626		init_mmap_info(&iop->mmap_info);
1627
1628		pfd->fd = iop->ifd;
1629		pfd->events = POLLIN;
1630
1631		if (piped_output)
1632			;
1633		else if (net_client_use_sendfile()) {
1634			iop->ofd = net_setup_client();
1635			if (iop->ofd < 0)
1636				goto err;
1637			net_send_open(iop->ofd, tp->cpu, dpp->buts_name);
1638		} else if (net_mode == Net_none) {
1639			if (iop_open(iop, tp->cpu))
1640				goto err;
1641		} else {
1642			/*
1643			 * This ensures that the server knows about all
1644			 * connections & devices before _any_ closes
1645			 */
1646			net_send_open(cl_fds[tp->cpu], tp->cpu, dpp->buts_name);
1647		}
1648
1649		pfd++;
1650		iop++;
1651		tp->nios++;
1652	}
1653
1654	return 0;
1655
1656err:
1657	close(iop->ifd);	/* tp->nios _not_ bumped */
1658	close_ios(tp);
1659	return 1;
1660}
1661
1662static int handle_pfds_file(struct tracer *tp, int nevs, int force_read)
1663{
1664	struct mmap_info *mip;
1665	int i, ret, nentries = 0;
1666	struct pollfd *pfd = tp->pfds;
1667	struct io_info *iop = tp->ios;
1668
1669	for (i = 0; nevs > 0 && i < ndevs; i++, pfd++, iop++) {
1670		if (pfd->revents & POLLIN || force_read) {
1671			mip = &iop->mmap_info;
1672
1673			ret = setup_mmap(iop->ofd, buf_size, mip);
1674			if (ret < 0) {
1675				pfd->events = 0;
1676				break;
1677			}
1678
1679			ret = read(iop->ifd, mip->fs_buf + mip->fs_off,
1680				   buf_size);
1681			if (ret > 0) {
1682				pdc_dr_update(iop->dpp, tp->cpu, ret);
1683				mip->fs_size += ret;
1684				mip->fs_off += ret;
1685				nentries++;
1686			} else if (ret == 0) {
1687				/*
1688				 * Short reads after we're done stop us
1689				 * from trying reads.
1690				 */
1691				if (tp->is_done)
1692					clear_events(pfd);
1693			} else {
1694				read_err(tp->cpu, iop->ifn);
1695				if (errno != EAGAIN || tp->is_done)
1696					clear_events(pfd);
1697			}
1698			nevs--;
1699		}
1700	}
1701
1702	return nentries;
1703}
1704
1705static int handle_pfds_netclient(struct tracer *tp, int nevs, int force_read)
1706{
1707	struct stat sb;
1708	int i, nentries = 0;
1709	struct pdc_stats *sp;
1710	struct pollfd *pfd = tp->pfds;
1711	struct io_info *iop = tp->ios;
1712
1713	for (i = 0; i < ndevs; i++, pfd++, iop++, sp++) {
1714		if (pfd->revents & POLLIN || force_read) {
1715			if (fstat(iop->ifd, &sb) < 0) {
1716				perror(iop->ifn);
1717				pfd->events = 0;
1718			} else if (sb.st_size > (off_t)iop->data_queued) {
1719				iop->ready = sb.st_size - iop->data_queued;
1720				iop->data_queued = sb.st_size;
1721
1722				if (!net_sendfile_data(tp, iop)) {
1723					pdc_dr_update(iop->dpp, tp->cpu,
1724						      iop->ready);
1725					nentries++;
1726				} else
1727					clear_events(pfd);
1728			}
1729			if (--nevs == 0)
1730				break;
1731		}
1732	}
1733
1734	if (nentries)
1735		incr_entries(nentries);
1736
1737	return nentries;
1738}
1739
1740static int handle_pfds_entries(struct tracer *tp, int nevs, int force_read)
1741{
1742	int i, nentries = 0;
1743	struct trace_buf *tbp;
1744	struct pollfd *pfd = tp->pfds;
1745	struct io_info *iop = tp->ios;
1746
1747	tbp = alloc_trace_buf(tp->cpu, buf_size);
1748	for (i = 0; i < ndevs; i++, pfd++, iop++) {
1749		if (pfd->revents & POLLIN || force_read) {
1750			tbp->len = read(iop->ifd, tbp->buf, buf_size);
1751			if (tbp->len > 0) {
1752				pdc_dr_update(iop->dpp, tp->cpu, tbp->len);
1753				add_trace_buf(iop->dpp, tp->cpu, &tbp);
1754				nentries++;
1755			} else if (tbp->len == 0) {
1756				/*
1757				 * Short reads after we're done stop us
1758				 * from trying reads.
1759				 */
1760				if (tp->is_done)
1761					clear_events(pfd);
1762			} else {
1763				read_err(tp->cpu, iop->ifn);
1764				if (errno != EAGAIN || tp->is_done)
1765					clear_events(pfd);
1766			}
1767			if (!piped_output && --nevs == 0)
1768				break;
1769		}
1770	}
1771	free(tbp);
1772
1773	if (nentries)
1774		incr_entries(nentries);
1775
1776	return nentries;
1777}
1778
1779static void *thread_main(void *arg)
1780{
1781	int ret, ndone, to_val;
1782	struct tracer *tp = arg;
1783
1784	ret = lock_on_cpu(tp->cpu);
1785	if (ret)
1786		goto err;
1787
1788	ret = open_ios(tp);
1789	if (ret)
1790		goto err;
1791
1792	if (piped_output)
1793		to_val = 50;		/* Frequent partial handles */
1794	else
1795		to_val = 500;		/* 1/2 second intervals */
1796
1797
1798	tracer_signal_ready(tp, Th_running, 0);
1799	tracer_wait_unblock(tp);
1800
1801	while (!tp->is_done) {
1802		ndone = poll(tp->pfds, ndevs, to_val);
1803		if (ndone || piped_output)
1804			(void)handle_pfds(tp, ndone, piped_output);
1805		else if (ndone < 0 && errno != EINTR)
1806			fprintf(stderr, "Thread %d poll failed: %d/%s\n",
1807				tp->cpu, errno, strerror(errno));
1808	}
1809
1810	/*
1811	 * Trace is stopped, pull data until we get a short read
1812	 */
1813	while (handle_pfds(tp, ndevs, 1) > 0)
1814		;
1815
1816	close_ios(tp);
1817	tracer_signal_ready(tp, Th_leaving, 0);
1818	return NULL;
1819
1820err:
1821	tracer_signal_ready(tp, Th_error, ret);
1822	return NULL;
1823}
1824
1825static int start_tracer(int cpu)
1826{
1827	struct tracer *tp;
1828
1829	tp = malloc(sizeof(*tp));
1830	memset(tp, 0, sizeof(*tp));
1831
1832	INIT_LIST_HEAD(&tp->head);
1833	tp->status = 0;
1834	tp->cpu = cpu;
1835
1836	if (pthread_create(&tp->thread, NULL, thread_main, tp)) {
1837		fprintf(stderr, "FAILED to start thread on CPU %d: %d/%s\n",
1838			cpu, errno, strerror(errno));
1839		free(tp);
1840		return 1;
1841	}
1842
1843	list_add_tail(&tp->head, &tracers);
1844	return 0;
1845}
1846
1847static void start_tracers(void)
1848{
1849	int cpu;
1850	struct list_head *p;
1851
1852	for (cpu = 0; cpu < ncpus; cpu++)
1853		if (start_tracer(cpu))
1854			break;
1855
1856	wait_tracers_ready(cpu);
1857
1858	__list_for_each(p, &tracers) {
1859		struct tracer *tp = list_entry(p, struct tracer, head);
1860		if (tp->status)
1861			fprintf(stderr,
1862				"FAILED to start thread on CPU %d: %d/%s\n",
1863				tp->cpu, tp->status, strerror(tp->status));
1864	}
1865}
1866
1867static void stop_tracers(void)
1868{
1869	struct list_head *p;
1870
1871	/*
1872	 * Stop the tracing - makes the tracer threads clean up quicker.
1873	 */
1874	__list_for_each(p, &devpaths) {
1875		struct devpath *dpp = list_entry(p, struct devpath, head);
1876		(void)ioctl(dpp->fd, BLKTRACESTOP);
1877	}
1878
1879	/*
1880	 * Tell each tracer to quit
1881	 */
1882	__list_for_each(p, &tracers) {
1883		struct tracer *tp = list_entry(p, struct tracer, head);
1884		tp->is_done = 1;
1885	}
1886}
1887
1888static void del_tracers(void)
1889{
1890	struct list_head *p, *q;
1891
1892	list_for_each_safe(p, q, &tracers) {
1893		struct tracer *tp = list_entry(p, struct tracer, head);
1894
1895		list_del(&tp->head);
1896		free(tp);
1897	}
1898}
1899
1900static void wait_tracers(void)
1901{
1902	struct list_head *p;
1903
1904	if (use_tracer_devpaths())
1905		process_trace_bufs();
1906
1907	wait_tracers_leaving();
1908
1909	__list_for_each(p, &tracers) {
1910		int ret;
1911		struct tracer *tp = list_entry(p, struct tracer, head);
1912
1913		ret = pthread_join(tp->thread, NULL);
1914		if (ret)
1915			fprintf(stderr, "Thread join %d failed %d\n",
1916				tp->cpu, ret);
1917	}
1918
1919	if (use_tracer_devpaths())
1920		clean_trace_bufs();
1921
1922	get_all_drops();
1923}
1924
1925static void exit_tracing(void)
1926{
1927	signal(SIGINT, SIG_IGN);
1928	signal(SIGHUP, SIG_IGN);
1929	signal(SIGTERM, SIG_IGN);
1930	signal(SIGALRM, SIG_IGN);
1931
1932	stop_tracers();
1933	wait_tracers();
1934	del_tracers();
1935	rel_devpaths();
1936}
1937
1938static void handle_sigint(__attribute__((__unused__)) int sig)
1939{
1940	done = 1;
1941	stop_tracers();
1942}
1943
1944static void show_stats(struct list_head *devpaths)
1945{
1946	FILE *ofp;
1947	struct list_head *p;
1948	unsigned long long nevents, data_read;
1949	unsigned long long total_drops = 0;
1950	unsigned long long total_events = 0;
1951
1952	if (piped_output)
1953		ofp = my_fopen("/dev/null", "w");
1954	else
1955		ofp = stdout;
1956
1957	__list_for_each(p, devpaths) {
1958		int cpu;
1959		struct pdc_stats *sp;
1960		struct devpath *dpp = list_entry(p, struct devpath, head);
1961
1962		if (net_mode == Net_server)
1963			printf("server: end of run for %s:%s\n",
1964				dpp->ch->hostname, dpp->buts_name);
1965
1966		data_read = 0;
1967		nevents = 0;
1968
1969		fprintf(ofp, "=== %s ===\n", dpp->buts_name);
1970		for (cpu = 0, sp = dpp->stats; cpu < dpp->ncpus; cpu++, sp++) {
1971			/*
1972			 * Estimate events if not known...
1973			 */
1974			if (sp->nevents == 0) {
1975				sp->nevents = sp->data_read /
1976						sizeof(struct blk_io_trace);
1977			}
1978
1979			fprintf(ofp,
1980				"  CPU%3d: %20llu events, %8llu KiB data\n",
1981				cpu, sp->nevents, (sp->data_read + 1023) >> 10);
1982
1983			data_read += sp->data_read;
1984			nevents += sp->nevents;
1985		}
1986
1987		fprintf(ofp, "  Total:  %20llu events (dropped %llu),"
1988			     " %8llu KiB data\n", nevents,
1989			     dpp->drops, (data_read + 1024) >> 10);
1990
1991		total_drops += dpp->drops;
1992		total_events += (nevents + dpp->drops);
1993	}
1994
1995	fflush(ofp);
1996	if (piped_output)
1997		fclose(ofp);
1998
1999	if (total_drops) {
2000		double drops_ratio = 1.0;
2001
2002		if (total_events)
2003			drops_ratio = (double)total_drops/(double)total_events;
2004
2005		fprintf(stderr, "\nYou have %llu (%5.1lf%%) dropped events\n"
2006				"Consider using a larger buffer size (-b) "
2007				"and/or more buffers (-n)\n",
2008			total_drops, 100.0 * drops_ratio);
2009	}
2010}
2011
2012static int handle_args(int argc, char *argv[])
2013{
2014	int c, i;
2015	struct statfs st;
2016	int act_mask_tmp = 0;
2017
2018	while ((c = getopt_long(argc, argv, S_OPTS, l_opts, NULL)) >= 0) {
2019		switch (c) {
2020		case 'a':
2021			i = find_mask_map(optarg);
2022			if (i < 0) {
2023				fprintf(stderr, "Invalid action mask %s\n",
2024					optarg);
2025				return 1;
2026			}
2027			act_mask_tmp |= i;
2028			break;
2029
2030		case 'A':
2031			if ((sscanf(optarg, "%x", &i) != 1) ||
2032							!valid_act_opt(i)) {
2033				fprintf(stderr,
2034					"Invalid set action mask %s/0x%x\n",
2035					optarg, i);
2036				return 1;
2037			}
2038			act_mask_tmp = i;
2039			break;
2040
2041		case 'd':
2042			if (add_devpath(optarg) != 0)
2043				return 1;
2044			break;
2045
2046		case 'I': {
2047			char dev_line[256];
2048			FILE *ifp = my_fopen(optarg, "r");
2049
2050			if (!ifp) {
2051				fprintf(stderr,
2052					"Invalid file for devices %s\n",
2053					optarg);
2054				return 1;
2055			}
2056
2057			while (fscanf(ifp, "%s\n", dev_line) == 1)
2058				if (add_devpath(dev_line) != 0)
2059					return 1;
2060			break;
2061		}
2062
2063		case 'r':
2064			debugfs_path = optarg;
2065			break;
2066
2067		case 'o':
2068			output_name = optarg;
2069			break;
2070		case 'k':
2071			kill_running_trace = 1;
2072			break;
2073		case 'w':
2074			stop_watch = atoi(optarg);
2075			if (stop_watch <= 0) {
2076				fprintf(stderr,
2077					"Invalid stopwatch value (%d secs)\n",
2078					stop_watch);
2079				return 1;
2080			}
2081			break;
2082		case 'V':
2083		case 'v':
2084			printf("%s version %s\n", argv[0], blktrace_version);
2085			exit(0);
2086			/*NOTREACHED*/
2087		case 'b':
2088			buf_size = strtoul(optarg, NULL, 10);
2089			if (buf_size <= 0 || buf_size > 16*1024) {
2090				fprintf(stderr, "Invalid buffer size (%lu)\n",
2091					buf_size);
2092				return 1;
2093			}
2094			buf_size <<= 10;
2095			break;
2096		case 'n':
2097			buf_nr = strtoul(optarg, NULL, 10);
2098			if (buf_nr <= 0) {
2099				fprintf(stderr,
2100					"Invalid buffer nr (%lu)\n", buf_nr);
2101				return 1;
2102			}
2103			break;
2104		case 'D':
2105			output_dir = optarg;
2106			break;
2107		case 'h':
2108			net_mode = Net_client;
2109			strcpy(hostname, optarg);
2110			break;
2111		case 'l':
2112			net_mode = Net_server;
2113			break;
2114		case 'p':
2115			net_port = atoi(optarg);
2116			break;
2117		case 's':
2118			net_use_sendfile = 0;
2119			break;
2120		default:
2121			show_usage(argv[0]);
2122			exit(1);
2123			/*NOTREACHED*/
2124		}
2125	}
2126
2127	while (optind < argc)
2128		if (add_devpath(argv[optind++]) != 0)
2129			return 1;
2130
2131	if (net_mode != Net_server && ndevs == 0) {
2132		show_usage(argv[0]);
2133		return 1;
2134	}
2135
2136	if (statfs(debugfs_path, &st) < 0 || st.f_type != (long)DEBUGFS_TYPE) {
2137		fprintf(stderr, "Invalid debug path %s: %d/%s\n",
2138			debugfs_path, errno, strerror(errno));
2139		return 1;
2140	}
2141
2142	if (act_mask_tmp != 0)
2143		act_mask = act_mask_tmp;
2144
2145	if (net_mode == Net_client && net_setup_addr())
2146		return 1;
2147
2148	/*
2149	 * Set up for appropriate PFD handler based upon output name.
2150	 */
2151	if (net_client_use_sendfile())
2152		handle_pfds = handle_pfds_netclient;
2153	else if (net_client_use_send())
2154		handle_pfds = handle_pfds_entries;
2155	else if (output_name && (strcmp(output_name, "-") == 0)) {
2156		piped_output = 1;
2157		handle_pfds = handle_pfds_entries;
2158		pfp = stdout;
2159		setvbuf(pfp, NULL, _IONBF, 0);
2160	} else
2161		handle_pfds = handle_pfds_file;
2162	return 0;
2163}
2164
2165static void ch_add_connection(struct net_server_s *ns, struct cl_host *ch,
2166			      int fd)
2167{
2168	struct cl_conn *nc;
2169
2170	nc = malloc(sizeof(*nc));
2171	memset(nc, 0, sizeof(*nc));
2172
2173	time(&nc->connect_time);
2174	nc->ch = ch;
2175	nc->fd = fd;
2176	nc->ncpus = -1;
2177
2178	list_add_tail(&nc->ch_head, &ch->conn_list);
2179	ch->connects++;
2180
2181	list_add_tail(&nc->ns_head, &ns->conn_list);
2182	ns->connects++;
2183	ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2184}
2185
2186static void ch_rem_connection(struct net_server_s *ns, struct cl_host *ch,
2187			      struct cl_conn *nc)
2188{
2189	net_close_connection(&nc->fd);
2190
2191	list_del(&nc->ch_head);
2192	ch->connects--;
2193
2194	list_del(&nc->ns_head);
2195	ns->connects--;
2196	ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2197
2198	free(nc);
2199}
2200
2201static struct cl_host *net_find_client_host(struct net_server_s *ns,
2202					    struct in_addr cl_in_addr)
2203{
2204	struct list_head *p;
2205
2206	__list_for_each(p, &ns->ch_list) {
2207		struct cl_host *ch = list_entry(p, struct cl_host, head);
2208
2209		if (in_addr_eq(ch->cl_in_addr, cl_in_addr))
2210			return ch;
2211	}
2212
2213	return NULL;
2214}
2215
2216static struct cl_host *net_add_client_host(struct net_server_s *ns,
2217					   struct sockaddr_in *addr)
2218{
2219	struct cl_host *ch;
2220
2221	ch = malloc(sizeof(*ch));
2222	memset(ch, 0, sizeof(*ch));
2223
2224	ch->ns = ns;
2225	ch->cl_in_addr = addr->sin_addr;
2226	list_add_tail(&ch->head, &ns->ch_list);
2227	ns->nchs++;
2228
2229	ch->hostname = strdup(inet_ntoa(addr->sin_addr));
2230	printf("server: connection from %s\n", ch->hostname);
2231
2232	INIT_LIST_HEAD(&ch->conn_list);
2233	INIT_LIST_HEAD(&ch->devpaths);
2234
2235	return ch;
2236}
2237
2238static void device_done(struct devpath *dpp, int ncpus)
2239{
2240	int cpu;
2241	struct io_info *iop;
2242
2243	for (cpu = 0, iop = dpp->ios; cpu < ncpus; cpu++, iop++)
2244		close_iop(iop);
2245
2246	list_del(&dpp->head);
2247	dpp_free(dpp);
2248}
2249
2250static void net_ch_remove(struct cl_host *ch, int ncpus)
2251{
2252	struct list_head *p, *q;
2253	struct net_server_s *ns = ch->ns;
2254
2255	list_for_each_safe(p, q, &ch->devpaths) {
2256		struct devpath *dpp = list_entry(p, struct devpath, head);
2257		device_done(dpp, ncpus);
2258	}
2259
2260	list_for_each_safe(p, q, &ch->conn_list) {
2261		struct cl_conn *nc = list_entry(p, struct cl_conn, ch_head);
2262
2263		ch_rem_connection(ns, ch, nc);
2264	}
2265
2266	list_del(&ch->head);
2267	ns->nchs--;
2268
2269	if (ch->hostname)
2270		free(ch->hostname);
2271	free(ch);
2272}
2273
2274static void net_add_connection(struct net_server_s *ns)
2275{
2276	int fd;
2277	struct cl_host *ch;
2278	socklen_t socklen = sizeof(ns->addr);
2279
2280	fd = my_accept(ns->listen_fd, (struct sockaddr *)&ns->addr, &socklen);
2281	if (fd < 0) {
2282		/*
2283		 * This is OK: we just won't accept this connection,
2284		 * nothing fatal.
2285		 */
2286		perror("accept");
2287	} else {
2288		ch = net_find_client_host(ns, ns->addr.sin_addr);
2289		if (!ch)
2290			ch = net_add_client_host(ns, &ns->addr);
2291
2292		ch_add_connection(ns, ch, fd);
2293	}
2294}
2295
2296static struct devpath *nc_add_dpp(struct cl_conn *nc,
2297				  struct blktrace_net_hdr *bnh,
2298				  time_t connect_time)
2299{
2300	int cpu;
2301	struct io_info *iop;
2302	struct devpath *dpp;
2303
2304	dpp = malloc(sizeof(*dpp));
2305	memset(dpp, 0, sizeof(*dpp));
2306
2307	dpp->buts_name = strdup(bnh->buts_name);
2308	dpp->path = strdup(bnh->buts_name);
2309	dpp->fd = -1;
2310	dpp->ch = nc->ch;
2311	dpp->cl_id = bnh->cl_id;
2312	dpp->cl_connect_time = connect_time;
2313	dpp->ncpus = nc->ncpus;
2314	dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
2315	memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
2316
2317	list_add_tail(&dpp->head, &nc->ch->devpaths);
2318	nc->ch->ndevs++;
2319
2320	dpp->ios = calloc(nc->ncpus, sizeof(*iop));
2321	memset(dpp->ios, 0, ndevs * sizeof(*iop));
2322
2323	for (cpu = 0, iop = dpp->ios; cpu < nc->ncpus; cpu++, iop++) {
2324		iop->dpp = dpp;
2325		iop->nc = nc;
2326		init_mmap_info(&iop->mmap_info);
2327
2328		if (iop_open(iop, cpu))
2329			goto err;
2330	}
2331
2332	return dpp;
2333
2334err:
2335	/*
2336	 * Need to unravel what's been done...
2337	 */
2338	while (cpu >= 0)
2339		close_iop(&dpp->ios[cpu--]);
2340	dpp_free(dpp);
2341
2342	return NULL;
2343}
2344
2345static struct devpath *nc_find_dpp(struct cl_conn *nc,
2346				   struct blktrace_net_hdr *bnh)
2347{
2348	struct list_head *p;
2349	time_t connect_time = nc->connect_time;
2350
2351	__list_for_each(p, &nc->ch->devpaths) {
2352		struct devpath *dpp = list_entry(p, struct devpath, head);
2353
2354		if (!strcmp(dpp->buts_name, bnh->buts_name))
2355			return dpp;
2356
2357		if (dpp->cl_id == bnh->cl_id)
2358			connect_time = dpp->cl_connect_time;
2359	}
2360
2361	return nc_add_dpp(nc, bnh, connect_time);
2362}
2363
2364static void net_client_read_data(struct cl_conn *nc, struct devpath *dpp,
2365				 struct blktrace_net_hdr *bnh)
2366{
2367	int ret;
2368	struct io_info *iop = &dpp->ios[bnh->cpu];
2369	struct mmap_info *mip = &iop->mmap_info;
2370
2371	if (setup_mmap(iop->ofd, bnh->len, &iop->mmap_info)) {
2372		fprintf(stderr, "ncd(%s:%d): mmap failed\n",
2373			nc->ch->hostname, nc->fd);
2374		exit(1);
2375	}
2376
2377	ret = net_recv_data(nc->fd, mip->fs_buf + mip->fs_off, bnh->len);
2378	if (ret > 0) {
2379		pdc_dr_update(dpp, bnh->cpu, ret);
2380		mip->fs_size += ret;
2381		mip->fs_off += ret;
2382	} else if (ret < 0)
2383		exit(1);
2384}
2385
2386/*
2387 * Returns 1 if we closed a host - invalidates other polling information
2388 * that may be present.
2389 */
2390static int net_client_data(struct cl_conn *nc)
2391{
2392	int ret;
2393	struct devpath *dpp;
2394	struct blktrace_net_hdr bnh;
2395
2396	ret = net_get_header(nc, &bnh);
2397	if (ret == 0)
2398		return 0;
2399
2400	if (ret < 0) {
2401		fprintf(stderr, "ncd(%d): header read failed\n", nc->fd);
2402		exit(1);
2403	}
2404
2405	if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
2406		fprintf(stderr, "ncd(%d): received data is bad\n", nc->fd);
2407		exit(1);
2408	}
2409
2410	if (!data_is_native) {
2411		bnh.magic = be32_to_cpu(bnh.magic);
2412		bnh.cpu = be32_to_cpu(bnh.cpu);
2413		bnh.max_cpus = be32_to_cpu(bnh.max_cpus);
2414		bnh.len = be32_to_cpu(bnh.len);
2415		bnh.cl_id = be32_to_cpu(bnh.cl_id);
2416		bnh.buf_size = be32_to_cpu(bnh.buf_size);
2417		bnh.buf_nr = be32_to_cpu(bnh.buf_nr);
2418		bnh.page_size = be32_to_cpu(bnh.page_size);
2419	}
2420
2421	if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) {
2422		fprintf(stderr, "ncd(%s:%d): bad data magic\n",
2423			nc->ch->hostname, nc->fd);
2424		exit(1);
2425	}
2426
2427	if (nc->ncpus == -1)
2428		nc->ncpus = bnh.max_cpus;
2429
2430	/*
2431	 * len == 0 means the other end is sending us a new connection/dpp
2432	 * len == 1 means that the other end signalled end-of-run
2433	 */
2434	dpp = nc_find_dpp(nc, &bnh);
2435	if (bnh.len == 0) {
2436		/*
2437		 * Just adding in the dpp above is enough
2438		 */
2439		ack_open_close(nc->fd, dpp->buts_name);
2440		nc->ch->cl_opens++;
2441	} else if (bnh.len == 1) {
2442		/*
2443		 * overload cpu count with dropped events
2444		 */
2445		dpp->drops = bnh.cpu;
2446
2447		ack_open_close(nc->fd, dpp->buts_name);
2448		if (--nc->ch->cl_opens == 0) {
2449			show_stats(&nc->ch->devpaths);
2450			net_ch_remove(nc->ch, nc->ncpus);
2451			return 1;
2452		}
2453	} else
2454		net_client_read_data(nc, dpp, &bnh);
2455
2456	return 0;
2457}
2458
2459static void handle_client_data(struct net_server_s *ns, int events)
2460{
2461	struct cl_conn *nc;
2462	struct pollfd *pfd;
2463	struct list_head *p, *q;
2464
2465	pfd = &ns->pfds[1];
2466	list_for_each_safe(p, q, &ns->conn_list) {
2467		if (pfd->revents & POLLIN) {
2468			nc = list_entry(p, struct cl_conn, ns_head);
2469
2470			if (net_client_data(nc) || --events == 0)
2471				break;
2472		}
2473		pfd++;
2474	}
2475}
2476
2477static void net_setup_pfds(struct net_server_s *ns)
2478{
2479	struct pollfd *pfd;
2480	struct list_head *p;
2481
2482	ns->pfds[0].fd = ns->listen_fd;
2483	ns->pfds[0].events = POLLIN;
2484
2485	pfd = &ns->pfds[1];
2486	__list_for_each(p, &ns->conn_list) {
2487		struct cl_conn *nc = list_entry(p, struct cl_conn, ns_head);
2488
2489		pfd->fd = nc->fd;
2490		pfd->events = POLLIN;
2491		pfd++;
2492	}
2493}
2494
2495static int net_server_handle_connections(struct net_server_s *ns)
2496{
2497	int events;
2498
2499	printf("server: waiting for connections...\n");
2500
2501	while (!done) {
2502		net_setup_pfds(ns);
2503		events = poll(ns->pfds, ns->connects + 1, -1);
2504		if (events < 0) {
2505			if (errno != EINTR) {
2506				perror("FATAL: poll error");
2507				return 1;
2508			}
2509		} else if (events > 0) {
2510			if (ns->pfds[0].revents & POLLIN) {
2511				net_add_connection(ns);
2512				events--;
2513			}
2514
2515			if (events)
2516				handle_client_data(ns, events);
2517		}
2518	}
2519
2520	return 0;
2521}
2522
2523static int net_server(void)
2524{
2525	int fd, opt;
2526	int ret = 1;
2527	struct net_server_s net_server;
2528	struct net_server_s *ns = &net_server;
2529
2530	memset(ns, 0, sizeof(*ns));
2531	INIT_LIST_HEAD(&ns->ch_list);
2532	INIT_LIST_HEAD(&ns->conn_list);
2533	ns->pfds = malloc(sizeof(struct pollfd));
2534
2535	fd = my_socket(AF_INET, SOCK_STREAM, 0);
2536	if (fd < 0) {
2537		perror("server: socket");
2538		goto out;
2539	}
2540
2541	opt = 1;
2542	if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
2543		perror("setsockopt");
2544		goto out;
2545	}
2546
2547	memset(&ns->addr, 0, sizeof(ns->addr));
2548	ns->addr.sin_family = AF_INET;
2549	ns->addr.sin_addr.s_addr = htonl(INADDR_ANY);
2550	ns->addr.sin_port = htons(net_port);
2551
2552	if (bind(fd, (struct sockaddr *) &ns->addr, sizeof(ns->addr)) < 0) {
2553		perror("bind");
2554		goto out;
2555	}
2556
2557	if (listen(fd, 1) < 0) {
2558		perror("listen");
2559		goto out;
2560	}
2561
2562	/*
2563	 * The actual server looping is done here:
2564	 */
2565	ns->listen_fd = fd;
2566	ret = net_server_handle_connections(ns);
2567
2568	/*
2569	 * Clean up and return...
2570	 */
2571out:
2572	free(ns->pfds);
2573	return ret;
2574}
2575
2576static int run_tracers(void)
2577{
2578	atexit(exit_tracing);
2579	if (net_mode == Net_client)
2580		printf("blktrace: connecting to %s\n", hostname);
2581
2582	setup_buts();
2583
2584	if (use_tracer_devpaths()) {
2585		if (setup_tracer_devpaths())
2586			return 1;
2587
2588		if (piped_output)
2589			handle_list = handle_list_file;
2590		else
2591			handle_list = handle_list_net;
2592	}
2593
2594	start_tracers();
2595	if (nthreads_running == ncpus) {
2596		unblock_tracers();
2597		start_buts();
2598		if (net_mode == Net_client)
2599			printf("blktrace: connected!\n");
2600		if (stop_watch)
2601			alarm(stop_watch);
2602	} else
2603		stop_tracers();
2604
2605	wait_tracers();
2606	if (nthreads_running == ncpus)
2607		show_stats(&devpaths);
2608	if (net_client_use_send())
2609		close_client_connections();
2610	del_tracers();
2611
2612	return 0;
2613}
2614
2615int main(int argc, char *argv[])
2616{
2617	int ret = 0;
2618
2619	setlocale(LC_NUMERIC, "en_US");
2620	pagesize = getpagesize();
2621	ncpus = sysconf(_SC_NPROCESSORS_ONLN);
2622	if (ncpus < 0) {
2623		fprintf(stderr, "sysconf(_SC_NPROCESSORS_ONLN) failed %d/%s\n",
2624			errno, strerror(errno));
2625		ret = 1;
2626		goto out;
2627	} else if (handle_args(argc, argv)) {
2628		ret = 1;
2629		goto out;
2630	}
2631
2632	signal(SIGINT, handle_sigint);
2633	signal(SIGHUP, handle_sigint);
2634	signal(SIGTERM, handle_sigint);
2635	signal(SIGALRM, handle_sigint);
2636	signal(SIGPIPE, SIG_IGN);
2637
2638	if (kill_running_trace) {
2639		struct devpath *dpp;
2640		struct list_head *p;
2641
2642		__list_for_each(p, &devpaths) {
2643			dpp = list_entry(p, struct devpath, head);
2644			if (__stop_trace(dpp->fd)) {
2645				fprintf(stderr,
2646					"BLKTRACETEARDOWN %s failed: %d/%s\n",
2647					dpp->path, errno, strerror(errno));
2648			}
2649		}
2650	} else if (net_mode == Net_server) {
2651		if (output_name) {
2652			fprintf(stderr, "-o ignored in server mode\n");
2653			output_name = NULL;
2654		}
2655		ret = net_server();
2656	} else
2657		ret = run_tracers();
2658
2659out:
2660	if (pfp)
2661		fclose(pfp);
2662	rel_devpaths();
2663	return ret;
2664}
2665