1#!/usr/bin/env python
2# Copyright (c) 2013 Amazon.com, Inc. or its affiliates.  All Rights Reserved
3#
4# Permission is hereby granted, free of charge, to any person obtaining a
5# copy of this software and associated documentation files (the
6# "Software"), to deal in the Software without restriction, including
7# without limitation the rights to use, copy, modify, merge, publish, dis-
8# tribute, sublicense, and/or sell copies of the Software, and to permit
9# persons to whom the Software is furnished to do so, subject to the fol-
10# lowing conditions:
11#
12# The above copyright notice and this permission notice shall be included
13# in all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21# IN THE SOFTWARE.
22#
23import json
24from tests.unit import unittest
25from tests.unit import AWSMockServiceTestCase
26from mock import Mock
27
28from boto.sns.connection import SNSConnection
29
30QUEUE_POLICY = {
31    u'Policy':
32        (u'{"Version":"2008-10-17","Id":"arn:aws:sqs:us-east-1:'
33         'idnum:testqueuepolicy/SQSDefaultPolicy","Statement":'
34         '[{"Sid":"sidnum","Effect":"Allow","Principal":{"AWS":"*"},'
35         '"Action":"SQS:GetQueueUrl","Resource":'
36         '"arn:aws:sqs:us-east-1:idnum:testqueuepolicy"}]}')}
37
38
39class TestSNSConnection(AWSMockServiceTestCase):
40    connection_class = SNSConnection
41
42    def setUp(self):
43        super(TestSNSConnection, self).setUp()
44
45    def default_body(self):
46        return b"{}"
47
48    def test_sqs_with_existing_policy(self):
49        self.set_http_response(status_code=200)
50
51        queue = Mock()
52        queue.get_attributes.return_value = QUEUE_POLICY
53        queue.arn = 'arn:aws:sqs:us-east-1:idnum:queuename'
54
55        self.service_connection.subscribe_sqs_queue('topic_arn', queue)
56        self.assert_request_parameters({
57               'Action': 'Subscribe',
58               'ContentType': 'JSON',
59               'Endpoint': 'arn:aws:sqs:us-east-1:idnum:queuename',
60               'Protocol': 'sqs',
61               'TopicArn': 'topic_arn',
62               'Version': '2010-03-31',
63        }, ignore_params_values=[])
64
65        # Verify that the queue policy was properly updated.
66        actual_policy = json.loads(queue.set_attribute.call_args[0][1])
67        self.assertEqual(actual_policy['Version'], '2008-10-17')
68        # A new statement should be appended to the end of the statement list.
69        self.assertEqual(len(actual_policy['Statement']), 2)
70        self.assertEqual(actual_policy['Statement'][1]['Action'],
71                         'SQS:SendMessage')
72
73    def test_sqs_with_no_previous_policy(self):
74        self.set_http_response(status_code=200)
75
76        queue = Mock()
77        queue.get_attributes.return_value = {}
78        queue.arn = 'arn:aws:sqs:us-east-1:idnum:queuename'
79
80        self.service_connection.subscribe_sqs_queue('topic_arn', queue)
81        self.assert_request_parameters({
82               'Action': 'Subscribe',
83               'ContentType': 'JSON',
84               'Endpoint': 'arn:aws:sqs:us-east-1:idnum:queuename',
85               'Protocol': 'sqs',
86               'TopicArn': 'topic_arn',
87               'Version': '2010-03-31',
88        }, ignore_params_values=[])
89        actual_policy = json.loads(queue.set_attribute.call_args[0][1])
90        # Only a single statement should be part of the policy.
91        self.assertEqual(len(actual_policy['Statement']), 1)
92
93    def test_publish_with_positional_args(self):
94        self.set_http_response(status_code=200)
95
96        self.service_connection.publish('topic', 'message', 'subject')
97        self.assert_request_parameters({
98            'Action': 'Publish',
99            'TopicArn': 'topic',
100            'Subject': 'subject',
101            'Message': 'message',
102        }, ignore_params_values=['Version', 'ContentType'])
103
104    def test_publish_with_kwargs(self):
105        self.set_http_response(status_code=200)
106
107        self.service_connection.publish(topic='topic',
108                                        message='message',
109                                        subject='subject')
110        self.assert_request_parameters({
111            'Action': 'Publish',
112            'TopicArn': 'topic',
113            'Subject': 'subject',
114            'Message': 'message',
115        }, ignore_params_values=['Version', 'ContentType'])
116
117    def test_publish_with_target_arn(self):
118        self.set_http_response(status_code=200)
119
120        self.service_connection.publish(target_arn='target_arn',
121                                        message='message',
122                                        subject='subject')
123        self.assert_request_parameters({
124            'Action': 'Publish',
125            'TargetArn': 'target_arn',
126            'Subject': 'subject',
127            'Message': 'message',
128        }, ignore_params_values=['Version', 'ContentType'])
129
130    def test_create_platform_application(self):
131        self.set_http_response(status_code=200)
132
133        self.service_connection.create_platform_application(
134            name='MyApp',
135            platform='APNS',
136            attributes={
137                'PlatformPrincipal': 'a ssl certificate',
138                'PlatformCredential': 'a private key'
139            }
140        )
141        self.assert_request_parameters({
142            'Action': 'CreatePlatformApplication',
143            'Name': 'MyApp',
144            'Platform': 'APNS',
145            'Attributes.entry.1.key': 'PlatformCredential',
146            'Attributes.entry.1.value': 'a private key',
147            'Attributes.entry.2.key': 'PlatformPrincipal',
148            'Attributes.entry.2.value': 'a ssl certificate',
149        }, ignore_params_values=['Version', 'ContentType'])
150
151    def test_set_platform_application_attributes(self):
152        self.set_http_response(status_code=200)
153
154        self.service_connection.set_platform_application_attributes(
155            platform_application_arn='arn:myapp',
156            attributes={'PlatformPrincipal': 'a ssl certificate',
157                        'PlatformCredential': 'a private key'})
158        self.assert_request_parameters({
159            'Action': 'SetPlatformApplicationAttributes',
160            'PlatformApplicationArn': 'arn:myapp',
161            'Attributes.entry.1.key': 'PlatformCredential',
162            'Attributes.entry.1.value': 'a private key',
163            'Attributes.entry.2.key': 'PlatformPrincipal',
164            'Attributes.entry.2.value': 'a ssl certificate',
165        }, ignore_params_values=['Version', 'ContentType'])
166
167    def test_create_platform_endpoint(self):
168        self.set_http_response(status_code=200)
169
170        self.service_connection.create_platform_endpoint(
171            platform_application_arn='arn:myapp',
172            token='abcde12345',
173            custom_user_data='john',
174            attributes={'Enabled': False})
175        self.assert_request_parameters({
176            'Action': 'CreatePlatformEndpoint',
177            'PlatformApplicationArn': 'arn:myapp',
178            'Token': 'abcde12345',
179            'CustomUserData': 'john',
180            'Attributes.entry.1.key': 'Enabled',
181            'Attributes.entry.1.value': False,
182        }, ignore_params_values=['Version', 'ContentType'])
183
184    def test_set_endpoint_attributes(self):
185        self.set_http_response(status_code=200)
186
187        self.service_connection.set_endpoint_attributes(
188            endpoint_arn='arn:myendpoint',
189            attributes={'CustomUserData': 'john',
190                        'Enabled': False})
191        self.assert_request_parameters({
192            'Action': 'SetEndpointAttributes',
193            'EndpointArn': 'arn:myendpoint',
194            'Attributes.entry.1.key': 'CustomUserData',
195            'Attributes.entry.1.value': 'john',
196            'Attributes.entry.2.key': 'Enabled',
197            'Attributes.entry.2.value': False,
198        }, ignore_params_values=['Version', 'ContentType'])
199
200    def test_message_is_required(self):
201        self.set_http_response(status_code=200)
202
203        with self.assertRaises(TypeError):
204            self.service_connection.publish(topic='topic', subject='subject')
205
206    def test_publish_with_json(self):
207        self.set_http_response(status_code=200)
208
209        self.service_connection.publish(
210            message=json.dumps({
211                'default': 'Ignored.',
212                'GCM': {
213                    'data': 'goes here',
214                }
215            }),
216            message_structure='json',
217            subject='subject',
218            target_arn='target_arn'
219        )
220        self.assert_request_parameters({
221            'Action': 'Publish',
222            'TargetArn': 'target_arn',
223            'Subject': 'subject',
224            'MessageStructure': 'json',
225        }, ignore_params_values=['Version', 'ContentType', 'Message'])
226        self.assertDictEqual(
227            json.loads(self.actual_request.params["Message"]),
228            {"default": "Ignored.", "GCM": {"data": "goes here"}})
229
230    def test_publish_with_utf8_message(self):
231        self.set_http_response(status_code=200)
232        subject = message = u'We \u2665 utf-8'.encode('utf-8')
233        self.service_connection.publish('topic', message, subject)
234        self.assert_request_parameters({
235            'Action': 'Publish',
236            'TopicArn': 'topic',
237            'Subject': subject,
238            'Message': message,
239        }, ignore_params_values=['Version', 'ContentType'])
240
241    def test_publish_with_attributes(self):
242        self.set_http_response(status_code=200)
243
244        self.service_connection.publish(
245            message=json.dumps({
246                'default': 'Ignored.',
247                'GCM': {
248                    'data': 'goes here',
249                }
250            }, sort_keys=True),
251            message_structure='json',
252            subject='subject',
253            target_arn='target_arn',
254            message_attributes={
255                'name1': {
256                    'data_type': 'Number',
257                    'string_value': '42'
258                },
259                'name2': {
260                    'data_type': 'String',
261                    'string_value': 'Bob'
262                },
263            },
264        )
265        self.assert_request_parameters({
266            'Action': 'Publish',
267            'TargetArn': 'target_arn',
268            'Subject': 'subject',
269            'Message': '{"GCM": {"data": "goes here"}, "default": "Ignored."}',
270            'MessageStructure': 'json',
271            'MessageAttributes.entry.1.Name': 'name1',
272            'MessageAttributes.entry.1.Value.DataType': 'Number',
273            'MessageAttributes.entry.1.Value.StringValue': '42',
274            'MessageAttributes.entry.2.Name': 'name2',
275            'MessageAttributes.entry.2.Value.DataType': 'String',
276            'MessageAttributes.entry.2.Value.StringValue': 'Bob',
277        }, ignore_params_values=['Version', 'ContentType'])
278
279
280if __name__ == '__main__':
281    unittest.main()
282