1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import dbus, os
6from dbus.mainloop.glib import DBusGMainLoop
7
8from autotest_lib.client.bin import test, utils
9from autotest_lib.client.common_lib import error
10from autotest_lib.client.common_lib.cros import policy, session_manager
11from autotest_lib.client.cros import cryptohome, ownership
12
13
14class login_MultiUserPolicy(test.test):
15    """Verifies that storing and retrieving user policy works with
16       multiple profiles signed-in.
17    """
18
19    version = 1
20
21    _user1 = 'user1@somewhere.com'
22    _user2 = 'user2@somewhere.com'
23
24    def setup(self):
25        os.chdir(self.srcdir)
26        utils.make('OUT_DIR=.')
27
28
29    def initialize(self):
30        super(login_MultiUserPolicy, self).initialize()
31        self._bus_loop = DBusGMainLoop(set_as_default=True)
32
33        # Clear the user's vault, to make sure the test starts without any
34        # policy or key lingering around. At this stage the session isn't
35        # started and there's no user signed in.
36        ownership.restart_ui_to_clear_ownership_files()
37        cryptohome_proxy = cryptohome.CryptohomeProxy(self._bus_loop)
38        cryptohome_proxy.ensure_clean_cryptohome_for(self._user1)
39        cryptohome_proxy.ensure_clean_cryptohome_for(self._user2)
40
41
42    def run_once(self):
43        sm = session_manager.connect(self._bus_loop)
44
45        # Start a session for the first user, and verify that no policy exists
46        # for that user yet.
47        if not sm.StartSession(self._user1, ''):
48            raise error.TestError('Could not start session')
49        policy_blob = sm.RetrievePolicyForUser(self._user1, byte_arrays=True)
50        if policy_blob:
51            raise error.TestError('session_manager already has user policy!')
52
53        # Now store a policy. This is building a device policy protobuf, but
54        # that's fine as far as the session_manager is concerned; it's the
55        # outer PolicyFetchResponse that contains the public_key.
56        public_key = ownership.known_pubkey()
57        private_key = ownership.known_privkey()
58        policy_data = policy.build_policy_data(self.srcdir)
59        policy_response = policy.generate_policy(self.srcdir,
60                                                 private_key,
61                                                 public_key,
62                                                 policy_data)
63        try:
64          result = sm.StorePolicyForUser(self._user1,
65                                         dbus.ByteArray(policy_response))
66          if not result:
67              raise error.TestFail('Failed to store user policy')
68        except dbus.exceptions.DBusException, e:
69          raise error.TestFail('Call to StorePolicyForUser failed', e)
70
71        # Storing policy for the second user fails before his session starts.
72        try:
73          result = sm.StorePolicyForUser(self._user2,
74                                         dbus.ByteArray(policy_response))
75          raise error.TestFail('Storing policy should fail before the session '
76                               'is started')
77        except dbus.exceptions.DBusException, e:
78          pass
79
80        # Now start the second user's session, and verify that he has no
81        # policy stored yet.
82        if not sm.StartSession(self._user2, ''):
83            raise error.TestError('Could not start second session')
84        policy_blob = sm.RetrievePolicyForUser(self._user2, byte_arrays=True)
85        if policy_blob:
86            raise error.TestError('session_manager already has user policy!')
87
88        # Storing works now.
89        try:
90          result = sm.StorePolicyForUser(self._user2,
91                                         dbus.ByteArray(policy_response))
92          if not result:
93              raise error.TestFail('Failed to store user policy')
94        except dbus.exceptions.DBusException, e:
95          raise error.TestFail('Call to StorePolicyForUser failed', e)
96
97        # Verify that retrieving policy works too.
98        policy_blob = sm.RetrievePolicyForUser(self._user2, byte_arrays=True)
99        if not policy_blob:
100            raise error.TestError('Failed to retrieve stored policy')
101