1import os, time, logging
2from autotest_lib.client.bin import test, utils
3from autotest_lib.client.bin.net import net_utils
4from autotest_lib.client.common_lib import error
5
6MPSTAT_IX = 0
7NETPERF_IX = 1
8
9class netperf2(test.test):
10    version = 4
11
12    # ftp://ftp.netperf.org/netperf/netperf-2.4.5.tar.bz2
13    def setup(self, tarball = 'netperf-2.4.5.tar.bz2'):
14        self.job.require_gcc()
15        tarball = utils.unmap_url(self.bindir, tarball, self.tmpdir)
16        utils.extract_tarball_to_dir(tarball, self.srcdir)
17        os.chdir(self.srcdir)
18
19        utils.system('patch -p0 < ../wait_before_data.patch')
20        utils.configure()
21        utils.make()
22
23
24    def initialize(self):
25        self.server_prog = '%s&' % os.path.join(self.srcdir, 'src/netserver')
26        self.client_prog = '%s' % os.path.join(self.srcdir, 'src/netperf')
27        self.valid_tests = ['TCP_STREAM', 'TCP_MAERTS', 'TCP_RR', 'TCP_CRR',
28                            'TCP_SENDFILE', 'UDP_STREAM', 'UDP_RR']
29        self.results = []
30        self.actual_times = []
31        self.netif = ''
32        self.network = net_utils.network()
33        self.network_utils = net_utils.network_utils()
34
35
36    def run_once(self, server_ip, client_ip, role, test = 'TCP_STREAM',
37                 test_time = 15, stream_list = [1], test_specific_args = '',
38                 cpu_affinity = '', dev = '', bidi = False, wait_time = 5):
39        """
40        server_ip: IP address of host running netserver
41        client_ip: IP address of host running netperf client(s)
42        role: 'client' or 'server'
43        test: one of TCP_STREAM, TCP_MEARTS, TCP_RR, TCP_CRR, TCP_SENDFILE,
44            UDP_STREAM or UDP_RR
45        test_time: time to run the test for in seconds
46        stream_list: list of number of netperf streams to launch
47        test_specific_args: Optional test specific args.  For example to set
48            the request,response size for RR tests to 200,100, set it
49            to: '-- -r 200,100'.  Or, to set the send buffer size of STREAM
50            tests to 200, set it to: '-- -m 200'
51        cpu_affinity: netperf/netserver processes will get taskset to the
52            cpu_affinity.  cpu_affinity is specified as a bitmask in hex
53            without the leading 0x.  For example, to run on CPUs 0 & 5,
54            cpu_affinity needs to be '21'
55        dev: device on which to run traffic on.  For example, to run on
56            inteface eth1, set it to 'eth1'.
57        bidi: bi-directional traffic.  This is supported for TCP_STREAM
58            test only. The RR & CRR tests are bi-directional by nature.
59        wait_time: Time to wait after establishing data/control connections
60            but before sending data traffic.
61        """
62        if test not in self.valid_tests:
63            raise error.TestError('invalid test specified')
64        self.role = role
65        self.test = test
66        self.test_time = test_time
67        self.wait_time = wait_time
68        self.stream_list = stream_list
69        self.bidi = bidi
70
71        server_tag = server_ip + '#netperf-server'
72        client_tag = client_ip + '#netperf-client'
73        all = [server_tag, client_tag]
74
75        # If a specific device has been requested, configure it.
76        if dev:
77            timeout = 60
78            if role == 'server':
79                self.configure_interface(dev, server_ip)
80                self.ping(client_ip, timeout)
81            else:
82                self.configure_interface(dev, client_ip)
83                self.ping(server_ip, timeout)
84
85        for num_streams in stream_list:
86            if role == 'server':
87                self.server_start(cpu_affinity)
88                try:
89                    # Wait up to ten minutes for the client to reach this
90                    # point.
91                    self.job.barrier(server_tag, 'start_%d' % num_streams,
92                                     600).rendezvous(*all)
93                    # Wait up to test_time + 5 minutes for the test to
94                    # complete
95                    self.job.barrier(server_tag, 'stop_%d' % num_streams,
96                                     test_time+300).rendezvous(*all)
97                finally:
98                    self.server_stop()
99
100            elif role == 'client':
101                # Wait up to ten minutes for the server to start
102                self.job.barrier(client_tag, 'start_%d' % num_streams,
103                                 600).rendezvous(*all)
104                self.client(server_ip, test, test_time, num_streams,
105                            test_specific_args, cpu_affinity)
106                # Wait up to 5 minutes for the server to also reach this point
107                self.job.barrier(client_tag, 'stop_%d' % num_streams,
108                                 300).rendezvous(*all)
109            else:
110                raise error.TestError('invalid role specified')
111
112        self.restore_interface()
113
114
115    def configure_interface(self, dev, ip_addr):
116        self.netif = net_utils.netif(dev)
117        self.netif.up()
118        if self.netif.get_ipaddr() != ip_addr:
119            self.netif.set_ipaddr(ip_addr)
120
121
122    def restore_interface(self):
123        if self.netif:
124            self.netif.restore()
125
126
127    def server_start(self, cpu_affinity):
128        utils.system('killall netserver', ignore_status=True)
129        cmd = self.server_prog
130        if cpu_affinity:
131            cmd = 'taskset %s %s' % (cpu_affinity, cmd)
132
133        self.results.append(utils.system_output(cmd, retain_output=True))
134
135
136    def server_stop(self):
137        utils.system('killall netserver', ignore_status=True)
138
139
140    def client(self, server_ip, test, test_time, num_streams,
141               test_specific_args, cpu_affinity):
142        args = '-H %s -t %s -l %d' % (server_ip, test, test_time)
143
144        if self.wait_time:
145            args += ' -s %d ' % self.wait_time
146
147        # Append the test specific arguments.
148        if test_specific_args:
149            args += ' ' + test_specific_args
150
151        cmd = '%s %s' % (self.client_prog, args)
152
153        if cpu_affinity:
154            cmd = 'taskset %s %s' % (cpu_affinity, cmd)
155
156        try:
157            cmds = []
158
159            # Get 5 mpstat samples. Since tests with large number of streams
160            # take a long time to start up all the streams, we'll toss out the
161            # first and last sample when recording results
162            interval = max(1, test_time / 5)
163            cmds.append('sleep %d && %s -P ALL %s 5' %
164                        (self.wait_time, 'mpstat', interval))
165
166            # Add the netperf commands
167            for i in xrange(num_streams):
168                cmds.append(cmd)
169                if self.bidi and test == 'TCP_STREAM':
170                    cmds.append(cmd.replace('TCP_STREAM', 'TCP_MAERTS'))
171
172            t0 = time.time()
173            # Launch all commands in parallel
174            out = utils.run_parallel(cmds, timeout=test_time + 500,
175                                     ignore_status=True)
176            t1 = time.time()
177
178            self.results.append(out)
179            self.actual_times.append(t1 - t0 - self.wait_time)
180            # Log test output
181            logging.info(out)
182
183        except error.CmdError, e:
184            """ Catch errors due to timeout, but raise others
185            The actual error string is:
186              "Command did not complete within %d seconds"
187            called in function join_bg_job in the file common_lib/utils.py
188
189            Looking for 'within' is probably not the best way to do this but
190            works for now"""
191
192            if ('within' in e.additional_text
193                or 'non-zero' in e.additional_text):
194                logging.debug(e.additional_text)
195                self.results.append(None)
196                self.actual_times.append(1)
197            else:
198                raise
199
200
201    def postprocess(self):
202        if self.role == 'client':
203            # if profilers are enabled, the test gets runs twice
204            if (len(self.stream_list) != len(self.results) and
205               2*len(self.stream_list) != len(self.results)):
206                raise error.TestError('Mismatched number of results')
207
208            function = None
209            keys = None
210
211            # Each of the functions return tuples in which the keys define
212            # what that item in the tuple represents
213            if self.test in ['TCP_STREAM', 'TCP_MAERTS', 'TCP_SENDFILE']:
214                function = self.process_tcp_stream
215                keys = ('Throughput',)
216            elif self.test == 'UDP_STREAM':
217                function = self.process_udp_stream
218                keys = ('Throughput', 'Errors')
219            elif self.test in ['TCP_RR', 'TCP_CRR', 'UDP_RR']:
220                function = self.process_request_response
221                keys = ('Transfer_Rate',)
222            else:
223                raise error.TestError('Unhandled test')
224
225            for i, streams in enumerate(self.stream_list):
226                attr = {'stream_count':streams}
227                keyval = {}
228                temp_vals = []
229
230                # Short circuit to handle errors due to client timeouts
231                if not self.results[i]:
232                    self.write_iteration_keyval(attr, keyval)
233                    continue
234
235                # Collect output of netperf sessions
236                failed_streams_count = 0
237                for result in self.results[i][NETPERF_IX:]:
238                    if result.exit_status:
239                        failed_streams_count += 1
240                    else:
241                        temp_vals.append(function(result.stdout))
242
243                keyval['Failed_streams_count'] = failed_streams_count
244
245                # Process mpstat output
246                mpstat_out = self.results[i][MPSTAT_IX].stdout
247                cpu_stats = self.network_utils.process_mpstat(mpstat_out, 5)
248                keyval['CPU_C'] = 100 - cpu_stats['idle']
249                keyval['CPU_C_SYS'] = cpu_stats['sys']
250                keyval['CPU_C_HI'] = cpu_stats['irq']
251                keyval['CPU_C_SI'] = cpu_stats['soft']
252                keyval['INTRS_C'] = cpu_stats['intr/s']
253
254                actual_time = self.actual_times[i]
255                keyval['actual_time'] = actual_time
256                logging.info('actual_time: %f', actual_time)
257
258                # Compute the sum of elements returned from function which
259                # represent the string contained in keys
260                for j, key in enumerate(keys):
261                    vals = [x[j] for x in temp_vals]
262                    # scale result by the actual time taken
263                    keyval[key] = sum(vals)
264
265                # record 'Efficiency' as perf/CPU
266                if keyval['CPU_C'] != 0:
267                    keyval['Efficieny_C'] = keyval[keys[0]]/keyval['CPU_C']
268                else:
269                    keyval['Efficieny_C'] = keyval[keys[0]]
270
271                self.write_iteration_keyval(attr, keyval)
272
273
274    def process_tcp_stream(self, output):
275        """Parses the following (works for both TCP_STREAM, TCP_MAERTS and
276        TCP_SENDFILE) and returns a singleton containing throughput.
277
278        TCP STREAM TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET to foo.bar.com \
279        (10.10.10.3) port 0 AF_INET
280        Recv   Send    Send
281        Socket Socket  Message  Elapsed
282        Size   Size    Size     Time     Throughput
283        bytes  bytes   bytes    secs.    10^6bits/sec
284
285        87380  16384  16384    2.00      941.28
286        """
287
288        return float(output.splitlines()[6].split()[4]),
289
290
291    def process_udp_stream(self, output):
292        """Parses the following and returns a touple containing throughput
293        and the number of errors.
294
295        UDP UNIDIRECTIONAL SEND TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET \
296        to foo.bar.com (10.10.10.3) port 0 AF_INET
297        Socket  Message  Elapsed      Messages
298        Size    Size     Time         Okay Errors   Throughput
299        bytes   bytes    secs            #      #   10^6bits/sec
300
301        129024   65507   2.00         3673      0     961.87
302        131072           2.00         3673            961.87
303        """
304
305        line_tokens = output.splitlines()[5].split()
306        return float(line_tokens[5]), int(line_tokens[4])
307
308
309    def process_request_response(self, output):
310        """Parses the following which works for both rr (TCP and UDP) and crr
311        tests and returns a singleton containing transfer rate.
312
313        TCP REQUEST/RESPONSE TEST from 0.0.0.0 (0.0.0.0) port 0 AF_INET \
314        to foo.bar.com (10.10.10.3) port 0 AF_INET
315        Local /Remote
316        Socket Size   Request  Resp.   Elapsed  Trans.
317        Send   Recv   Size     Size    Time     Rate
318        bytes  Bytes  bytes    bytes   secs.    per sec
319
320        16384  87380  1        1       2.00     14118.53
321        16384  87380
322        """
323
324        return float(output.splitlines()[6].split()[5]),
325
326
327    def ping(self, ip, timeout):
328        curr_time = time.time()
329        end_time = curr_time + timeout
330        while curr_time < end_time:
331            if not os.system('ping -c 1 ' + ip):
332                # Ping succeeded
333                return
334            # Ping failed. Lets sleep a bit and try again.
335            time.sleep(5)
336            curr_time = time.time()
337
338        return
339