1import webapp2
2from webapp2_extras import sessions
3
4from webapp2_extras import auth
5from webapp2_extras.appengine.auth import models
6
7from google.appengine.ext.ndb import model
8
9import test_base
10
11
12class TestAuth(test_base.BaseTestCase):
13
14    def setUp(self):
15        super(TestAuth, self).setUp()
16        self.register_model('User', models.User)
17        self.register_model('UserToken', models.UserToken)
18        self.register_model('Unique', models.Unique)
19
20    def _check_token(self, user_id, token, subject='auth'):
21        rv = models.UserToken.get(user=user_id, subject=subject, token=token)
22        return rv is not None
23
24    def test_get_user_by_session(self):
25        app = webapp2.WSGIApplication(config={
26            'webapp2_extras.sessions': {
27                'secret_key': 'foo',
28            }
29        })
30        req = webapp2.Request.blank('/')
31        rsp = webapp2.Response()
32        req.app = app
33        s = auth.get_store(app=app)
34        a = auth.Auth(request=req)
35        session_store = sessions.get_store(request=req)
36
37        # This won't work.
38        a.set_session_data({})
39        self.assertEqual(a.session.get('_user'), None)
40
41        # This won't work.
42        a.session['_user'] = {}
43        self.assertEqual(a.get_session_data(), None)
44        self.assertEqual(a.session.get('_user'), None)
45
46        # Create a user.
47        m = models.User
48        success, user = m.create_user(auth_id='auth_id',
49                                      password_raw='password')
50
51        user_id = user.key.id()
52
53        # Get user with session. An anonymous_user is returned.
54        rv = a.get_user_by_session()
55        self.assertTrue(rv is None)
56
57        # Login with password. User dict is returned.
58        rv = a.get_user_by_password('auth_id', 'password')
59        self.assertEqual(rv['user_id'], user_id)
60
61        # Save sessions.
62        session_store.save_sessions(rsp)
63
64        # Get user with session. Voila!
65        cookies = rsp.headers.get('Set-Cookie')
66        req = webapp2.Request.blank('/', headers=[('Cookie', cookies)])
67        rsp = webapp2.Response()
68        req.app = app
69        a = auth.Auth(request=req)
70
71        # only auth_id is returned when there're no
72        # custom user attributes defined.
73        rv = a.get_user_by_session()
74        self.assertEqual(rv['user_id'], user_id)
75
76        # If we call get_user_by_token() now, the same user is returned.
77        rv2 = a.get_user_by_token(rv['user_id'], rv['token'])
78        self.assertTrue(rv is rv2)
79
80        # Let's get it again and check that token is the same.
81        token = rv['token']
82        a._user = None
83        rv = a.get_user_by_session()
84        self.assertEqual(rv['user_id'], user_id)
85        self.assertEqual(rv['token'], token)
86
87        # Now let's force token to be renewed and check that we have a new one.
88        s.config['token_new_age'] = -300
89        a._user = None
90        rv = a.get_user_by_session()
91        self.assertEqual(rv['user_id'], user_id)
92        self.assertNotEqual(rv['token'], token)
93
94        # Now let's force token to be invalid.
95        s.config['token_max_age'] = -300
96        a._user = None
97        rv = a.get_user_by_session()
98        self.assertEqual(rv, None)
99
100    def test_get_user_by_password(self):
101        app = webapp2.WSGIApplication(config={
102            'webapp2_extras.sessions': {
103                'secret_key': 'foo',
104            }
105        })
106        req = webapp2.Request.blank('/')
107        req.app = app
108        s = auth.get_store(app=app)
109        a = auth.get_auth(request=req)
110        session_store = sessions.get_store(request=req)
111
112        m = models.User
113        success, user = m.create_user(auth_id='auth_id',
114                                      password_raw='password')
115
116        user_id = user.key.id()
117        # Lets test the cookie max_age when we use remember=True or False.
118        rv = a.get_user_by_password('auth_id', 'password', remember=True)
119        self.assertEqual(rv['user_id'], user_id)
120        self.assertEqual(session_store.sessions['auth'].session_args['max_age'],
121                         86400 * 7 * 3)
122
123        # Now remember=False.
124        rv = a.get_user_by_password('auth_id', 'password')
125        self.assertEqual(rv['user_id'], user_id)
126        self.assertEqual(session_store.sessions['auth'].session_args['max_age'],
127                         None)
128
129        # User was set so getting it from session will return the same one.
130        rv = a.get_user_by_session()
131        self.assertEqual(rv['user_id'], user_id)
132
133        # Now try a failed password submission: user will be unset.
134        rv = a.get_user_by_password('auth_id', 'password_2', silent=True)
135        self.assertTrue(rv is None)
136
137        # And getting by session will no longer work.
138        rv = a.get_user_by_session()
139        self.assertTrue(rv is None)
140
141    def test_validate_password(self):
142        app = webapp2.WSGIApplication()
143        req = webapp2.Request.blank('/')
144        req.app = app
145        s = auth.get_store(app=app)
146
147        m = models.User
148        success, user = m.create_user(auth_id='auth_id',
149                                      password_raw='foo')
150
151        u = s.validate_password('auth_id', 'foo')
152        self.assertEqual(u, s.user_to_dict(user))
153        self.assertRaises(auth.InvalidPasswordError,
154                          s.validate_password, 'auth_id', 'bar')
155        self.assertRaises(auth.InvalidAuthIdError,
156                          s.validate_password, 'auth_id_2', 'foo')
157
158    def test_validate_token(self):
159        app = webapp2.WSGIApplication()
160        req = webapp2.Request.blank('/')
161        req.app = app
162        s = auth.get_store(app=app)
163
164        rv = s.validate_token('auth_id', 'token')
165        self.assertEqual(rv, (None, None))
166
167        # Expired timestamp.
168        rv = s.validate_token('auth_id', 'token', -300)
169        self.assertEqual(rv, (None, None))
170
171        m = models.User
172        success, user = m.create_user(auth_id='auth_id',
173                                      password_raw='foo')
174
175        user_id = user.key.id()
176        token = m.create_auth_token(user_id)
177        rv = s.validate_token(user_id, token)
178        self.assertEqual(rv, (s.user_to_dict(user), token))
179        # Token must still be there.
180        self.assertTrue(self._check_token(user_id, token))
181
182        # Expired timestamp.
183        token = m.create_auth_token(user_id)
184        rv = s.validate_token(user_id, token, -300)
185        self.assertEqual(rv, (None, None))
186        # Token must have been deleted.
187        self.assertFalse(self._check_token(user_id, token))
188
189        # Force expiration.
190        token = m.create_auth_token(user_id)
191        s.config['token_max_age'] = -300
192        rv = s.validate_token(user_id, token)
193        self.assertEqual(rv, (None, None))
194        # Token must have been deleted.
195        self.assertFalse(self._check_token(user_id, token))
196
197        # Revert expiration, force renewal.
198        token = m.create_auth_token(user_id)
199        s.config['token_max_age'] = 86400 * 7 * 3
200        s.config['token_new_age'] = -300
201        rv = s.validate_token(user_id, token)
202        self.assertEqual(rv, (s.user_to_dict(user), None))
203        # Token must have been deleted.
204        self.assertFalse(self._check_token(user_id, token))
205
206    def test_set_auth_store(self):
207        app = webapp2.WSGIApplication()
208        req = webapp2.Request.blank('/')
209        req.app = app
210        store = auth.AuthStore(app)
211
212        self.assertEqual(len(app.registry), 0)
213        auth.set_store(store, app=app)
214        self.assertEqual(len(app.registry), 1)
215        s = auth.get_store(app=app)
216        self.assertTrue(isinstance(s, auth.AuthStore))
217
218    def test_get_auth_store(self):
219        app = webapp2.WSGIApplication()
220        req = webapp2.Request.blank('/')
221        req.app = app
222        self.assertEqual(len(app.registry), 0)
223        s = auth.get_store(app=app)
224        self.assertEqual(len(app.registry), 1)
225        self.assertTrue(isinstance(s, auth.AuthStore))
226
227    def test_set_auth(self):
228        app = webapp2.WSGIApplication()
229        req = webapp2.Request.blank('/')
230        req.app = app
231        a = auth.Auth(req)
232
233        self.assertEqual(len(req.registry), 0)
234        auth.set_auth(a, request=req)
235        self.assertEqual(len(req.registry), 1)
236        a = auth.get_auth(request=req)
237        self.assertTrue(isinstance(a, auth.Auth))
238
239    def test_get_auth(self):
240        app = webapp2.WSGIApplication()
241        req = webapp2.Request.blank('/')
242        req.app = app
243        self.assertEqual(len(req.registry), 0)
244        a = auth.get_auth(request=req)
245        self.assertEqual(len(req.registry), 1)
246        self.assertTrue(isinstance(a, auth.Auth))
247
248    '''
249    def test_set_callables(self):
250        app = webapp2.WSGIApplication()
251        req = webapp2.Request.blank('/')
252        req.app = app
253        s = auth.get_store(app=app)
254
255        def validate_password(store, auth_id, password):
256            self.assertTrue(store is s)
257            self.assertEqual(auth_id, 'auth_id')
258            self.assertEqual(password, 'password')
259            return 'validate_password'
260
261        def validate_token(store, auth_id, token, token_ts=None):
262            self.assertTrue(store is s)
263            self.assertEqual(auth_id, 'auth_id')
264            self.assertEqual(token, 'token')
265            self.assertEqual(token_ts, 'token_ts')
266            return 'validate_token'
267
268        s.set_password_validator(validate_password)
269        rv = s.validate_password('auth_id', 'password')
270        self.assertEqual(rv, 'validate_password')
271
272        s.set_token_validator(validate_token)
273        rv = s.validate_token('auth_id', 'token', 'token_ts')
274        self.assertEqual(rv, 'validate_token')
275    '''
276
277    def test_extended_user(self):
278        class MyUser(models.User):
279            newsletter = model.BooleanProperty()
280            age = model.IntegerProperty()
281
282        auth_id = 'own:username'
283        success, info = MyUser.create_user(auth_id, newsletter=True, age=22)
284        self.assertTrue(success)
285
286        app = webapp2.WSGIApplication(config={
287            'webapp2_extras.auth': {
288                'user_model': MyUser,
289            }
290        })
291        s = auth.get_store(app=app)
292        user = s.user_model.get_by_auth_id(auth_id)
293        self.assertEqual(info, user)
294        self.assertEqual(user.age, 22)
295        self.assertTrue(user.newsletter is True)
296
297
298if __name__ == '__main__':
299    test_base.main()
300