1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import re
8
9from collections import namedtuple
10
11from autotest_lib.client.bin import test, utils
12from autotest_lib.client.common_lib import error
13
14ShmRecord = namedtuple('ShmRecord', ['owner', 'perms', 'attached'])
15SemaphoreRecord = namedtuple('SemaphoreRecord', ['owner', 'perms'])
16
17class security_SysVIPC(test.test):
18    """Detect emergence of new attack surfaces in SysV IPC."""
19    version = 1
20    expected_shm = set([ShmRecord(owner='cras', perms='640',
21                                  attached=('/usr/bin/cras',))])
22    expected_sem = set([SemaphoreRecord(owner='root', perms='600')])
23
24    def dump_ipcs_to_results(self):
25        """Writes a copy of the 'ipcs' output to the autotest results dir."""
26        utils.system_output('ipcs > "%s/ipcs-output.txt"' % self.resultsdir)
27
28
29    def find_attached(self, shmid):
30        """Find programs attached to a given shared memory segment.
31
32        Returns full paths to each program identified.
33
34        Args:
35          @param shmid: the id as shown in ipcs and related utilities.
36        """
37        # This finds /proc/*/exe entries where maps shows they have
38        # attached to the specified shm segment.
39        cmd = 'grep "%s */SYSV" /proc/*/maps | sed "s/maps.*/exe/g"' % shmid
40        # Then we just need to readlink each of the links. Even though
41        # we ultimately convert to a sorted tuple, we use a set to avoid
42        # accumulating duplicates as we go along.
43        exes = set()
44        for link in utils.system_output(cmd).splitlines():
45            exes.add(os.readlink(link))
46        return tuple(sorted(exes))
47
48
49    def observe_shm(self):
50        """Return a set of ShmRecords representing current system shm usage."""
51        seen = set()
52        cmd = 'ipcs -m | grep ^0'
53        for line in utils.system_output(cmd, ignore_status=True).splitlines():
54            fields = re.split('\s+', line)
55            shmid = fields[1]
56            owner = fields[2]
57            perms = fields[3]
58            attached = self.find_attached(shmid)
59            seen.add(ShmRecord(owner=owner, perms=perms, attached=attached))
60        return seen
61
62
63    def observe_sems(self):
64        """Return a set of SemaphoreRecords representing current usage."""
65        seen = set()
66        cmd = 'ipcs -s | grep ^0'
67        for line in utils.system_output(cmd, ignore_status=True).splitlines():
68            fields = re.split('\s+', line)
69            seen.add(SemaphoreRecord(owner=fields[2], perms=fields[3]))
70        return seen
71
72
73    def run_once(self):
74        """Main entry point to run the security_SysVIPC autotest."""
75        test_fail = False
76        self.dump_ipcs_to_results()
77        # Check Shared Memory.
78        observed_shm = self.observe_shm()
79        missing = self.expected_shm.difference(observed_shm)
80        extra = observed_shm.difference(self.expected_shm)
81        if missing:
82            logging.error('Expected shm(s) not found:')
83            logging.error(missing)
84        if extra:
85            test_fail = True
86            logging.error('Unexpected shm(s) found:')
87            logging.error(extra)
88
89        # Check Semaphores.
90        observed_sem = self.observe_sems()
91        missing = self.expected_sem.difference(observed_sem)
92        extra = observed_sem.difference(self.expected_sem)
93        if missing:
94            logging.error('Expected semaphore(s) not found:')
95            logging.error(missing)
96        if extra:
97            test_fail = True
98            logging.error('Unexpected semaphore(s) found:')
99            logging.error(extra)
100
101        # Also check Message Queues. Since we currently expect
102        # none, we can avoid over-engineering this check.
103        queues = utils.system_output('ipcs -q | grep ^0', ignore_status=True)
104        if queues:
105            test_fail = True
106            logging.error('Unexpected message queues found:')
107            logging.error(queues)
108
109        if test_fail:
110            raise error.TestFail('SysV IPCs did not match expectations')
111